In [0]:
spark.conf.set(
    "fs.azure.account.key.datalakealexvidal.blob.core.windows.net",
    "PUT_KEY_AZURE_BLOB_STORAGE"
)


In [0]:
# Define Azure Storage path
storage_account_name = "datalakealexvidal"
container_name = "raw-data"
file_path = "products.json"

# Full path to the file
azure_blob_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_path}"

# Read JSON file from Azure Blob Storage
df = spark.read.option("multiline", "true").json(azure_blob_path)

# Show DataFrame
df.show(truncate=False)


+------------------+----------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+------------------+---+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+--------------------+-------+------+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import col, current_timestamp

# Select relevant columns and rename them for better readability
df_clean = df.select(
    col("id").alias("product_id"),    # Renaming 'id' to 'product_id'
    col("title").alias("product_name"),  # Renaming 'title' to 'product_name'
    col("price"),  # Keeping 'price' column
    col("brand")   # Keeping 'brand' column
)

# Remove duplicate rows
df_clean = df_clean.dropDuplicates()

# Drop rows with missing values in critical columns
df_clean = df_clean.dropna(subset=["product_id", "product_name"])

# Add a new column with the processing timestamp
df_clean = df_clean.withColumn("processing_date", current_timestamp())

# Display the transformed DataFrame
df_clean.show(truncate=False)

# Print the schema to verify data types
df_clean.printSchema()

# Count the number of records after cleaning
print(f"Total records after cleaning: {df_clean.count()}")






+----------+-----------------------------------------+-------+----------------+-----------------------+
|product_id|product_name                             |price  |brand           |processing_date        |
+----------+-----------------------------------------+-------+----------------+-----------------------+
|8         |Dior J'adore                             |89.99  |Dior            |2025-02-28 16:10:48.491|
|19        |Chicken Meat                             |9.99   |null            |2025-02-28 16:10:48.491|
|20        |Cooking Oil                              |4.99   |null            |2025-02-28 16:10:48.491|
|2         |Eyeshadow Palette with Mirror            |19.99  |Glamour Beauty  |2025-02-28 16:10:48.491|
|4         |Red Lipstick                             |12.99  |Chic Cosmetics  |2025-02-28 16:10:48.491|
|18        |Cat Food                                 |8.99   |null            |2025-02-28 16:10:48.491|
|21        |Cucumber                                 |1.49   |nu