1: Setting credentials and reading CSV Files

In [None]:
spark.conf.set(
 "fs.azure.account.key.<storage_account_name>.blob.core.windows.net",
 "<your_account_key>"
  )

2: Load CSV files from Azure Blob Storage

In [None]:
df = spark.read.csv("wasbs://<your_container_name>@<your_storage_account_name>.blob.core.windows.net/<your_csv_file_path>", header=True, inferSchema=True)
print(f"Number of rows in the DataFrame: {df.count()}")
print(f"Number of partitions: {df.rdd.getNumPartitions()}")
df.display()

3: Filter rows based on conditions to do some analysis

In [None]:
# Filter 1: Filter by prices greater than 2000 USD
filtered_df_1 = df.filter(df["`Current Price`"] > 2000)
print("Filtered by Current Price > 2000:")
display(filtered_df_1)

# Filter 2: Filter by prices less than 101 USD
filtered_df_2 = df.filter(df["`Current Price`"] < 101)
print("Filtered by Current Price < 101:")
display(filtered_df_2)

# Filter 3: Filter by positive 24h Change (if the column exists)
filtered_df_3 = df.filter(df["`24h Change`"] > 0)
print("Filtered by 24h Change > 0:")
display(filtered_df_3)

# Filter 4: Filter by negative 24h Change (if the column exists)
filtered_df_4 = df.filter(df["`24h Change`"] < 0)
print("Filtered by 24h Change < 0:")
display(filtered_df_4)

# Filter 5: Filter by coins containing "Bitcoin" in the Name
filtered_df_5 = df.filter(df["Name"].like("%Bitcoin%"))
print("Filtered by Name containing 'Bitcoin':")
display(filtered_df_5)

# Filter 6: Filter by coins specifically named 'Ethereum'
filtered_df_6 = df.filter(df["Name"] == "Ethereum")
print("Filtered by Name 'Ethereum':")
display(filtered_df_6)

# Filter 7: Filter by coins specifically named 'Litecoin'
filtered_df_7 = df.filter(df["Name"] == "Litecoin")
print("Filtered by Name 'Litecoin':")
display(filtered_df_7)

4: Save the results as CSV

In [None]:
from pyspark.sql.functions import col, max, avg
from pyspark.sql.window import Window

# Get the most recent 'Creation Date' for each 'Name'
latest_creation_date_df = df.withColumn("Creation Date", col("`Creation Date`").cast("timestamp"))
latest_creation_date_df = latest_creation_date_df.withColumn(
    "max_creation_date", max("Creation Date").over(Window.partitionBy("Name"))
)

# Filter rows where 'Creation Date' is the most recent
filtered_df = latest_creation_date_df.filter(col("Creation Date") == col("max_creation_date")).drop("max_creation_date")

# Group by 'Name' and calculate the average of '24h Change', and keep the most recent 'Current Price'
agg_df = filtered_df.groupBy("Name").agg(
    avg("`24h Change`").alias("AVG price change"),  # Temporary alias
    max("`Current Price`")  # Keep the most recent 'Current Price'
)

# Rename the 'AVG price change' column permanently
result_df = agg_df.withColumnRenamed("AVG price change", "Average Price Change")

# Show the resulting DataFrame
display(result_df)

# Write the result to a CSV file in /dbfs/tmp/result_df with semicolon as the delimiter
output_path = "/dbfs/tmp/result_df"
result_df.coalesce(1).write.mode("overwrite").option("header", "true").option("delimiter", ";").csv(output_path)

# Find the generated CSV file in the directory
csv_files = dbutils.fs.ls(output_path)
csv_file_path = ""
for file_info in csv_files:
    if file_info.name.endswith(".csv"):
        csv_file_path = file_info.path
        break

if csv_file_path:
    # Move the file to the /FileStore directory
    dbutils.fs.mv(csv_file_path, "dbfs:/FileStore/result_df.csv")
    # Read the contents of the CSV file and print it
    file_content = dbutils.fs.head("dbfs:/FileStore/result_df.csv")
    result_df.show()
    print("File content:\n")
    print(file_content)
    print(f"\nNumber of rows in result_df: {result_df.count()}")
    print("The prices are in USD.")
    print("The 'Average Price Change' is the result of averaging all the daily price changes.")
else:
    print("No CSV file was generated.")

5: Check things

In [None]:
# '/dbfs/' section:

#dbutils.fs.ls("dbfs/tmp/") # Checks the 'tmp' directory to see if the 'result_df' directory was successfully created

#dbutils.fs.ls("/dbfs/tmp/result_df/") # Checks if the files on 'result_df' were successfully created (The one you want is the 'part-00000-tid-<id>.csv')

#dbutils.fs.head("/dbfs/tmp/result_df/part-00000-tid-???.csv") # Check the content of the file, you need to fill the id

#dbutils.fs.rm("/dbfs/tmp/result_df/", recurse=True) # Deletes the 'result_df' directory (don't worry because the code above creates it again)


# 'dbfs:/' section:   <-- This is the one you need to use

#dbutils.fs.rm("dbfs:/FileStore/result_df.csv", recurse=True) # Deletes the 'result_df.csv' file

#dbutils.fs.ls("dbfs:/FileStore/") # Checks the 'FileStore' directory to see if the 'result_df.csv' file was successfully created
