Problem Statement 01 - Data Loading and Basic Operations


In [0]:
# Load a CSV file containing retail transactions into a Spark DataFrame.
azure_storage_account = "stgaccaneta"
azure_container_products = "products" #json
azure_container_transactions = "transactions" #csv
azure_access_keys = "xxx"

 
products_file_path = f"abfss://{azure_container_products}@{azure_storage_account}.dfs.core.windows.net/products.json"
df_products = spark.read.option(
    f"fs.azure.account.key.{azure_storage_account}.dfs.core.windows.net",
    azure_access_keys,
).json(path=products_file_path,
     )

transactions_file_path = f"abfss://{azure_container_transactions}@{azure_storage_account}.dfs.core.windows.net/transactions.csv"
df_transactions = spark.read.option(
    f"fs.azure.account.key.{azure_storage_account}.dfs.core.windows.net",
    azure_access_keys,
).csv(path=transactions_file_path,
     )

In [0]:
df_transactions.display()

In [0]:
df_products.display()

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder \
    .appName("Azure project") \
    .getOrCreate()

In [0]:
# 2. After loading CSV, convert it to JSON.
#loading transactions_df from volume because it gives: UnknownException: (shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException) Failure to initialize configuration for storage account stgaccaneta.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.key ehwn changing to json
df_transactions_2 = spark.read.option("header", "true").option("inferSchema", "true").csv("/Volumes/project_data/default/project_data/transactions.csv")
df_transactions_2.write.mode("overwrite").json("/Volumes/project_data/default/project_data/transactions_json")
df_transactions_2.display()



In [0]:
# 4. Show first 10 rows of transactions.
df_transactions_2.show(10)

In [0]:
# Count the total number of transactions.
total_transactions = df_transactions_2.count()
print(f"Total number of transactions: {total_transactions}")

Problem Statement 02 - Filter Operations

In [0]:
# Filter transactions where the total amount is greater than $100
from pyspark.sql.functions import col
df_filtered_transactions = df_transactions_2.filter(col("total_amount") > 100)
df_filtered_transactions.show()

In [0]:
#  Select product names and prices for products that belong to the "Electronics" category
df_electronics = df_products.filter(col("category") == "Electronics").select("name", "price")
df_electronics.show()

## intermediate
Problem Statement 01 - Data Loading and Basic


In [0]:
#  Join transactions and products on product_id
df_joined = df_transactions_2.join(df_products, df_transactions_2.product_id == df_products.product_id, "inner")
df_joined.display()


In [0]:
# Calculate the total sales amount for each product category
from pyspark.sql.functions import sum
df_category_sales = df_joined.groupBy("category").agg(sum("total_amount").alias("total_sales"))
df_category_sales.display()


In [0]:
# Find the average transaction amount for each day
from pyspark.sql.functions import avg, to_date

df_transactions_date = df_transactions_2.withColumn("date", to_date("transaction_date"))
df_avg_per_day = df_transactions_date.groupBy("date").agg(avg("total_amount").alias("avg_transaction_amount"))
df_avg_per_day.display()

In [0]:

# 4. Sort based on the average transaction amount
df_avg_category = df_joined.groupBy("category").agg(avg("total_amount").alias("avg_transaction_amount"))
df_sorted_avg_category = df_avg_category.orderBy("avg_transaction_amount", ascending=False)
df_sorted_avg_category.display()


In [0]:
# 4. Sort based on the average transaction amount
df_sorted_avg = df_avg_per_day.orderBy("avg_transaction_amount", ascending=False)
df_sorted_avg.display()