### Importing necessary libraries

In [0]:
from pyspark.sql.functions import when, col, expr, round, min, max, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, IntegerType, TimestampType, FloatType

##### Setting up ADLS account key config

In [0]:
spark.conf.set("fs.azure.account.key.dhyaneshcapadls.dfs.core.windows.net",
               "CSaenZLRbE43FtNHV8byUEXN7vAXyxxCnJLz/X1yYHieOigV2LbU64CF1NVWg00LI4xC5x7KS/wh+AStnRMOTg==")

In [0]:
# List files in the 'bronze' container of Azure Data Lake Storage
dbutils.fs.ls("abfss://bronze@dhyaneshcapadls.dfs.core.windows.net/")

[FileInfo(path='abfss://bronze@dhyaneshcapadls.dfs.core.windows.net/rankData_bronze.csv', name='rankData_bronze.csv', size=3182, modificationTime=1697045555000)]

In [0]:
# Step 1: Define the schema for your dataset
my_schema = StructType([
    StructField("retailer", StringType(), True),
    StructField("retailer_product_id", StringType(), True),
    StructField("retailer_group_id", StringType(), True),
    StructField("retailer_product_name", StringType(), True),
    StructField("net_quantity", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("price_per_unit", DoubleType(), True),
    StructField("sponsored", StringType(), False),  
    StructField("promotion_text", StringType(), True),
    StructField("parsed_price", IntegerType(), True),
    StructField("promotion_type", StringType(), True),
    StructField("search_term", StringType(), True),
    StructField("rank", IntegerType(), True),
    StructField("country", StringType(), True),
    StructField("gathered_timestamp", TimestampType(), True),
    StructField("insert_timestamp", TimestampType(), True),
])


In [0]:
# Step 2: Read the data
data = spark.read.format('delta').schema(my_schema).csv("abfss://bronze@dhyaneshcapadls.dfs.core.windows.net/rankData_bronze.csv", header=True)

# Check the first few records
data.show(5)

+---------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+--------------------+--------------------+
| retailer|retailer_product_id|retailer_group_id|retailer_product_name|net_quantity|price|price_per_unit|sponsored|promotion_text|parsed_price|promotion_type|search_term|rank|country|  gathered_timestamp|    insert_timestamp|
+---------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+--------------------+--------------------+
|Amazon-In|             ASIN01|       ASINGROUP1|  Chocolate box of 10|        5 kg|  100|          null|     null|          null|         100|          null|  chocolate|  19|  India|2021-05-12 00:07:...|2021-05-12 11:53:...|
|Amazon-In|             ASIN02|       ASINGROUP2|  Chocolate box of 11|        6 kg|  150|      

In [0]:
display(data)

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,5 kg,100,,,,100,,chocolate,19,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN02,ASINGROUP2,Chocolate box of 11,6 kg,150,,,,150,,chocolate,26,India,2021-05-12T00:20:29.276+0000,2021-05-12T11:53:33.191+0000
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170,,1.0,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000
Amazon-In,ASIN04,ASINGROUP4,Chocolate box of 13,8 kg,150,,1.0,was $201,150,PRICE_CUT,chocolate,10,India,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000
Amazon-In,ASIN05,ASINGROUP5,Chocolate box of 14,9 kg,110,,1.0,was $202,110,PRICE_CUT,chocolate,5,India,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150,,1.0,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000
Amazon-In,ASIN07,ASINGROUP7,Chocolate box of 16,11 kg,200,,,,200,,chocolate,45,India,2021-05-12T00:06:12.979+0000,2021-05-12T11:53:32.699+0000
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250,,1.0,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-In,ASIN09,ASINGROUP9,Chocolate box of 18,13 kg,300,,,,300,,chocolate,20,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-us,ASIN101,ASINGROUP101,gums pack of 100,50 g,10,,,,10,,gum,19,USA,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000


In [0]:
# Step 3: Verify the schema
data.printSchema()

root
 |-- retailer: string (nullable = true)
 |-- retailer_product_id: string (nullable = true)
 |-- retailer_group_id: string (nullable = true)
 |-- retailer_product_name: string (nullable = true)
 |-- net_quantity: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- price_per_unit: string (nullable = true)
 |-- sponsored: string (nullable = true)
 |-- promotion_text: string (nullable = true)
 |-- parsed_price: integer (nullable = true)
 |-- promotion_type: string (nullable = true)
 |-- search_term: string (nullable = true)
 |-- rank: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- gathered_timestamp: timestamp (nullable = true)
 |-- insert_timestamp: timestamp (nullable = true)



In [0]:
# Step 4: Check the datatypes
data.dtypes

[('retailer', 'string'),
 ('retailer_product_id', 'string'),
 ('retailer_group_id', 'string'),
 ('retailer_product_name', 'string'),
 ('net_quantity', 'string'),
 ('price', 'int'),
 ('price_per_unit', 'string'),
 ('sponsored', 'string'),
 ('promotion_text', 'string'),
 ('parsed_price', 'int'),
 ('promotion_type', 'string'),
 ('search_term', 'string'),
 ('rank', 'int'),
 ('country', 'string'),
 ('gathered_timestamp', 'timestamp'),
 ('insert_timestamp', 'timestamp')]

In [0]:
# Step 5: Cache the dataframe
data.cache()

DataFrame[retailer: string, retailer_product_id: string, retailer_group_id: string, retailer_product_name: string, net_quantity: string, price: int, price_per_unit: string, sponsored: string, promotion_text: string, parsed_price: int, promotion_type: string, search_term: string, rank: int, country: string, gathered_timestamp: timestamp, insert_timestamp: timestamp]

In [0]:
# Step 6: Verify the first few records
data.show(3)

+---------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+--------------------+--------------------+
| retailer|retailer_product_id|retailer_group_id|retailer_product_name|net_quantity|price|price_per_unit|sponsored|promotion_text|parsed_price|promotion_type|search_term|rank|country|  gathered_timestamp|    insert_timestamp|
+---------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+--------------------+--------------------+
|Amazon-In|             ASIN01|       ASINGROUP1|  Chocolate box of 10|        5 kg|  100|          null|     null|          null|         100|          null|  chocolate|  19|  India|2021-05-12 00:07:...|2021-05-12 11:53:...|
|Amazon-In|             ASIN02|       ASINGROUP2|  Chocolate box of 11|        6 kg|  150|      

In [0]:
# Step 7: Clean the data by removing duplicates and null values
data = data.dropDuplicates()
data = data.dropna()

+---------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+--------------------+--------------------+
| retailer|retailer_product_id|retailer_group_id|retailer_product_name|net_quantity|price|price_per_unit|sponsored|promotion_text|parsed_price|promotion_type|search_term|rank|country|  gathered_timestamp|    insert_timestamp|
+---------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+--------------------+--------------------+
|Amazon-In|             ASIN06|       ASINGROUP6|  Chocolate box of 15|       10 kg|  150|          null|        1|      was $203|         150|     PRICE_CUT|  chocolate|  32|  India|2021-05-12 00:07:...|2021-05-12 11:53:...|
|Amazon-us|            ASIN103|     ASINGROUP103|     gums pack of 200|       130 g|   14|      

In [0]:
# Check for Missing Values in dataframe
missing_values = {}
for col_name in data.columns:
    missing_count = data.filter(col(col_name).isNull()).count()
    if missing_count > 0:
        missing_values[col_name] = missing_count

# Check for and report missing values
if not missing_values:
    print("No Missing values")
else:
    print("Missing Values:")
    for col_name, count in missing_values.items():
        print(f"Column '{col_name}': {count} missing values")

No Missing values


In [0]:
# Convert 'price_per_unit' column to double data type
data = data.withColumn("price_per_unit", col("price_per_unit").cast("double"))
# Fill null values in 'price_per_unit' with 0
data = data.fillna({'price_per_unit': 0})

In [0]:
display(data)

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150,0.0,1.0,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000
Amazon-us,ASIN103,ASINGROUP103,gums pack of 200,130 g,14,0.0,1.0,was $20,14,PRICE_CUT,gum,30,USA,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000
Amazon-us,ASIN105,ASINGROUP105,gums pack of 300,210 g,18,0.0,1.0,was $20,18,PRICE_CUT,gum,5,USA,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000
Amazon-us,ASIN108,ASINGROUP108,gums pack of 450,310 g,24,0.0,1.0,was $30,24,PRICE_CUT,gum,2,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250,0.0,1.0,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-us,ASIN104,ASINGROUP104,gums pack of 250,170 g,16,0.0,1.0,was $20,16,PRICE_CUT,gum,10,USA,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000
Amazon-us,ASIN101,ASINGROUP101,gums pack of 100,50 g,10,0.0,,,10,,gum,19,USA,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,5 kg,100,0.0,,,100,,chocolate,19,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN09,ASINGROUP9,Chocolate box of 18,13 kg,300,0.0,,,300,,chocolate,20,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170,0.0,1.0,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000


In [0]:
# Convert "sponsored" column values to boolean
data = data.withColumn("sponsored", when(col("sponsored") == "1", True).otherwise(False))

In [0]:
display(data)

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150,0.0,True,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000
Amazon-us,ASIN103,ASINGROUP103,gums pack of 200,130 g,14,0.0,True,was $20,14,PRICE_CUT,gum,30,USA,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000
Amazon-us,ASIN105,ASINGROUP105,gums pack of 300,210 g,18,0.0,True,was $20,18,PRICE_CUT,gum,5,USA,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000
Amazon-us,ASIN108,ASINGROUP108,gums pack of 450,310 g,24,0.0,True,was $30,24,PRICE_CUT,gum,2,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250,0.0,True,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-us,ASIN104,ASINGROUP104,gums pack of 250,170 g,16,0.0,True,was $20,16,PRICE_CUT,gum,10,USA,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000
Amazon-us,ASIN101,ASINGROUP101,gums pack of 100,50 g,10,0.0,False,,10,,gum,19,USA,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,5 kg,100,0.0,False,,100,,chocolate,19,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN09,ASINGROUP9,Chocolate box of 18,13 kg,300,0.0,False,,300,,chocolate,20,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170,0.0,True,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000


In [0]:
# Replace null values in "promotion_text","promotion_type" column with "None"
data = data.withColumn("promotion_text", when(col("promotion_text")=="null", "None").otherwise(col("promotion_text")))
data = data.withColumn("promotion_type", when(col("promotion_type")=="null", "None").otherwise(col("promotion_type")))

In [0]:
display(data)

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150,0.0,True,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000
Amazon-us,ASIN103,ASINGROUP103,gums pack of 200,130 g,14,0.0,True,was $20,14,PRICE_CUT,gum,30,USA,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000
Amazon-us,ASIN105,ASINGROUP105,gums pack of 300,210 g,18,0.0,True,was $20,18,PRICE_CUT,gum,5,USA,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000
Amazon-us,ASIN108,ASINGROUP108,gums pack of 450,310 g,24,0.0,True,was $30,24,PRICE_CUT,gum,2,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250,0.0,True,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000
Amazon-us,ASIN104,ASINGROUP104,gums pack of 250,170 g,16,0.0,True,was $20,16,PRICE_CUT,gum,10,USA,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000
Amazon-us,ASIN101,ASINGROUP101,gums pack of 100,50 g,10,0.0,False,,10,,gum,19,USA,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,5 kg,100,0.0,False,,100,,chocolate,19,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN09,ASINGROUP9,Chocolate box of 18,13 kg,300,0.0,False,,300,,chocolate,20,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170,0.0,True,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000


In [0]:
# Remove dollar symbols and convert the 'price' column to double
data = data.withColumn("price", regexp_replace(col("price"), "\\$", "").cast("double"))

In [0]:
# Extract 'net_quantity_kg' from 'net_quantity' column
data = data.withColumn(
    "net_quantity_kg",
    when(col("net_quantity").contains("kg"), expr("substring(net_quantity, 1, length(net_quantity) - 3)").cast("double"))
    .when(col("net_quantity").contains("g"), expr("substring(net_quantity, 1, length(net_quantity) - 2)").cast("double") / 1000)
    .otherwise(0.0)
)

In [0]:
# Calculate "price_per_unit" as "price" divided by "net_quantity_kg"
data = data.withColumn("price_per_unit", col("price") / col("net_quantity_kg"))
data = data.withColumn("price_per_unit", round(col("price_per_unit"), 2))

In [0]:
display(data)

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp,net_quantity_kg
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150.0,15.0,True,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000,10.0
Amazon-us,ASIN103,ASINGROUP103,gums pack of 200,130 g,14.0,107.69,True,was $20,14,PRICE_CUT,gum,30,USA,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000,0.13
Amazon-us,ASIN105,ASINGROUP105,gums pack of 300,210 g,18.0,85.71,True,was $20,18,PRICE_CUT,gum,5,USA,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000,0.21
Amazon-us,ASIN108,ASINGROUP108,gums pack of 450,310 g,24.0,77.42,True,was $30,24,PRICE_CUT,gum,2,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000,0.31
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250.0,20.83,True,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000,12.0
Amazon-us,ASIN104,ASINGROUP104,gums pack of 250,170 g,16.0,94.12,True,was $20,16,PRICE_CUT,gum,10,USA,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000,0.17
Amazon-us,ASIN101,ASINGROUP101,gums pack of 100,50 g,10.0,200.0,False,,10,,gum,19,USA,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000,0.05
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,5 kg,100.0,20.0,False,,100,,chocolate,19,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000,5.0
Amazon-In,ASIN09,ASINGROUP9,Chocolate box of 18,13 kg,300.0,23.08,False,,300,,chocolate,20,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000,13.0
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170.0,24.29,True,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000,7.0


#### Checking Business Rules

In [0]:
# 1. Check if any price are negative
negative_prices = data.filter(col("price") < 0).count() > 0

# 2. Check if any rank values are outside the range 1-100
invalid_rank = data.filter((col("rank") < 1 ) | (col("rank") > 100)).count() > 0

# 3. Check if the retailer's suffix aligns with country
misaligned_retailer_country_in = data.filter(col("retailer").contains("-In") & (col("country") != "India")).count() > 0
misaligned_retailer_country_us = data.filter(col("retailer").contains("-us") & (col("country") != "USA")).count() > 0

# Check and report business rule violations
if negative_prices:
    print("Error: Some records have negative prices")

if invalid_rank:
    print("Error: Some products have ranks outside the 1-100 range!")

if misaligned_retailer_country_in or misaligned_retailer_country_us:
    print("Error: Some retailers don't align with their respecive countries!")

if not (negative_prices or invalid_rank or misaligned_retailer_country_in or misaligned_retailer_country_us):
    print("All business rules are satisfied.")

All business rules are satisfied.


In [0]:
# Check for Data Type Consistency
data_type_inconsistencies = {}
for field in my_schema.fields:
    col_name = field.name
    expected_data_type = field.dataType
    actual_data_type = data.schema[col_name].dataType

    if actual_data_type != expected_data_type:
        data_type_inconsistencies[col_name] = (actual_data_type, expected_data_type)

if not data_type_inconsistencies:
    print("There is no Data Type inconsistencies in Given dataset")
else:
    print("\nData Type Inconsistencies:")
    for col_name, (actual_type, expected_type) in data_type_inconsistencies.items():
        print(f"Column '{col_name}': Expected data type {expected_type}, Actual data type {actual_type}")

There is no Data Type inconsistencies in Given dataset


In [0]:
# Check if any data quality issues were found
if not (missing_values or data_type_inconsistencies):
    print("\nData Validation: All checks passed.")


Data Validation: All checks passed.


In [0]:
# Step 11: Verify the total number of rows and columns
print("Number of rows: ", data.count())
print("Number of columns: ", len(data.columns))

Number of rows:  19
Number of columns:  17


In [0]:
# Step 12: Verify summary statistics
display(data.describe())

summary,retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,promotion_text,parsed_price,promotion_type,search_term,rank,country,net_quantity_kg
count,19,19,19,19,19,19.0,19.0,19,19.0,19,19,19.0,19,19.0
mean,,,,,,93.15789473684212,61.976842105263145,,93.15789473684212,,,20.42105263157895,,4.381052631578948
stddev,,,,,,91.30246629131764,50.03151240886218,,91.30246629131764,,,13.347141972362824,,4.858777732375868
min,Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,10 kg,10.0,12.22,,10.0,,chocolate,2.0,India,0.05
max,Amazon-us,ASIN110,ASINGROUP9,gums pack of 550,90 g,300.0,200.0,was $300,300.0,PRICE_CUT,gum,45.0,USA,13.0


In [0]:
# Step 13: Find the maximum and minimum values in each column
display(data.agg(*[min(col_name).alias(f"min_{col_name}") for col_name in data.columns]))
display(data.agg(*[max(col_name).alias(f"max_{col_name}") for col_name in data.columns]))

min_retailer,min_retailer_product_id,min_retailer_group_id,min_retailer_product_name,min_net_quantity,min_price,min_price_per_unit,min_sponsored,min_promotion_text,min_parsed_price,min_promotion_type,min_search_term,min_rank,min_country,min_gathered_timestamp,min_insert_timestamp,min_net_quantity_kg
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,10 kg,10.0,12.22,False,,10,,chocolate,2,India,2021-05-12T00:06:12.979+0000,2021-05-12T11:53:32.321+0000,0.05


max_retailer,max_retailer_product_id,max_retailer_group_id,max_retailer_product_name,max_net_quantity,max_price,max_price_per_unit,max_sponsored,max_promotion_text,max_parsed_price,max_promotion_type,max_search_term,max_rank,max_country,max_gathered_timestamp,max_insert_timestamp,max_net_quantity_kg
Amazon-us,ASIN110,ASINGROUP9,gums pack of 550,90 g,300.0,200.0,True,was $300,300,PRICE_CUT,gum,45,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:33.191+0000,13.0


In [0]:
# Step 14: Find duplicate values in columns if needed
duplicate_rows = data.groupBy(data.columns).count().where(col("count") > 1)
duplicate_rows.show()

+--------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+------------------+----------------+---------------+-----+
|retailer|retailer_product_id|retailer_group_id|retailer_product_name|net_quantity|price|price_per_unit|sponsored|promotion_text|parsed_price|promotion_type|search_term|rank|country|gathered_timestamp|insert_timestamp|net_quantity_kg|count|
+--------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+------------------+----------------+---------------+-----+
+--------+-------------------+-----------------+---------------------+------------+-----+--------------+---------+--------------+------------+--------------+-----------+----+-------+------------------+----------------+---------------+-----+



In [0]:
display(data)

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp,net_quantity_kg
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150.0,15.0,True,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000,10.0
Amazon-us,ASIN103,ASINGROUP103,gums pack of 200,130 g,14.0,107.69,True,was $20,14,PRICE_CUT,gum,30,USA,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000,0.13
Amazon-us,ASIN105,ASINGROUP105,gums pack of 300,210 g,18.0,85.71,True,was $20,18,PRICE_CUT,gum,5,USA,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000,0.21
Amazon-us,ASIN108,ASINGROUP108,gums pack of 450,310 g,24.0,77.42,True,was $30,24,PRICE_CUT,gum,2,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000,0.31
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250.0,20.83,True,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000,12.0
Amazon-us,ASIN104,ASINGROUP104,gums pack of 250,170 g,16.0,94.12,True,was $20,16,PRICE_CUT,gum,10,USA,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000,0.17
Amazon-us,ASIN101,ASINGROUP101,gums pack of 100,50 g,10.0,200.0,False,,10,,gum,19,USA,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000,0.05
Amazon-In,ASIN01,ASINGROUP1,Chocolate box of 10,5 kg,100.0,20.0,False,,100,,chocolate,19,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000,5.0
Amazon-In,ASIN09,ASINGROUP9,Chocolate box of 18,13 kg,300.0,23.08,False,,300,,chocolate,20,India,2021-05-12T00:07:04.072+0000,2021-05-12T11:53:32.697+0000,13.0
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170.0,24.29,True,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000,7.0


In [0]:
# Step 15: Create a table/view on the spark df to run SQL queries
data.createOrReplaceTempView("myTable")

In [0]:
%sql
select retailer, retailer_product_name, parsed_price, price_per_unit
from myTable
where sponsored = true

retailer,retailer_product_name,parsed_price,price_per_unit
Amazon-In,Chocolate box of 15,150,15.0
Amazon-us,gums pack of 200,14,107.69
Amazon-us,gums pack of 300,18,85.71
Amazon-us,gums pack of 450,24,77.42
Amazon-In,Chocolate box of 17,250,20.83
Amazon-us,gums pack of 250,16,94.12
Amazon-In,Chocolate box of 12,170,24.29
Amazon-In,Chocolate box of 14,110,12.22
Amazon-In,Chocolate box of 13,150,18.75
Amazon-us,gums pack of 350,20,80.0


In [0]:
%sql
select *
from myTable
where promotion_type != 'None' and sponsored = True

retailer,retailer_product_id,retailer_group_id,retailer_product_name,net_quantity,price,price_per_unit,sponsored,promotion_text,parsed_price,promotion_type,search_term,rank,country,gathered_timestamp,insert_timestamp,net_quantity_kg
Amazon-In,ASIN06,ASINGROUP6,Chocolate box of 15,10 kg,150.0,15.0,True,was $203,150,PRICE_CUT,chocolate,32,India,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000,10.0
Amazon-us,ASIN103,ASINGROUP103,gums pack of 200,130 g,14.0,107.69,True,was $20,14,PRICE_CUT,gum,30,USA,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000,0.13
Amazon-us,ASIN105,ASINGROUP105,gums pack of 300,210 g,18.0,85.71,True,was $20,18,PRICE_CUT,gum,5,USA,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000,0.21
Amazon-us,ASIN108,ASINGROUP108,gums pack of 450,310 g,24.0,77.42,True,was $30,24,PRICE_CUT,gum,2,USA,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000,0.31
Amazon-In,ASIN08,ASINGROUP8,Chocolate box of 17,12 kg,250.0,20.83,True,was $300,250,PRICE_CUT,chocolate,2,India,2021-05-12T00:20:53.302+0000,2021-05-12T11:53:32.692+0000,12.0
Amazon-us,ASIN104,ASINGROUP104,gums pack of 250,170 g,16.0,94.12,True,was $20,16,PRICE_CUT,gum,10,USA,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000,0.17
Amazon-In,ASIN03,ASINGROUP3,Chocolate box of 12,7 kg,170.0,24.29,True,was $200,170,PRICE_CUT,chocolate,30,India,2021-05-12T00:20:47.941+0000,2021-05-12T11:53:33.189+0000,7.0
Amazon-In,ASIN05,ASINGROUP5,Chocolate box of 14,9 kg,110.0,12.22,True,was $202,110,PRICE_CUT,chocolate,5,India,2021-05-12T00:20:34.657+0000,2021-05-12T11:53:33.188+0000,9.0
Amazon-In,ASIN04,ASINGROUP4,Chocolate box of 13,8 kg,150.0,18.75,True,was $201,150,PRICE_CUT,chocolate,10,India,2021-05-12T00:07:11.911+0000,2021-05-12T11:53:32.321+0000,8.0
Amazon-us,ASIN106,ASINGROUP106,gums pack of 350,250 g,20.0,80.0,True,was $20,20,PRICE_CUT,gum,32,USA,2021-05-12T00:07:02.337+0000,2021-05-12T11:53:32.321+0000,0.25


In [0]:
dbutils.fs.ls("abfss://gold@dhyaneshcapadls.dfs.core.windows.net/")

[FileInfo(path='abfss://gold@dhyaneshcapadls.dfs.core.windows.net/processed_data_csv/', name='processed_data_csv/', size=0, modificationTime=1697044541000),
 FileInfo(path='abfss://gold@dhyaneshcapadls.dfs.core.windows.net/processed_data_pqt/', name='processed_data_pqt/', size=0, modificationTime=1697044540000)]

In [0]:
# Define the path to the Azure Data Lake Storage 'gold' container
adls_gold_container = "abfss://gold@dhyaneshcapadls.dfs.core.windows.net/"

# Specify the output directory within the 'gold' container
output_directory = adls_gold_container + "/processed_data_pqt"

# Write the DataFrame to Parquet format with partitioning
data.write.mode("overwrite").partitionBy("country").option("header", "true").parquet(output_directory)


In [0]:
# Save the DataFrame as a CSV file with a custom name in Azure Data Lake Storage
output_directory2 = adls_gold_container + "processed_data_csv"
data.write.csv(output_directory2, mode="overwrite", header=True)

In [0]:
# Specify the output directory within the 'gold' container
adls_gold_container = "abfss://gold@dhyaneshcapadls.dfs.core.windows.net/"
output_directory3 = adls_gold_container + "/processed_data_delta"
data.write.mode("overwrite").format("delta").partitionBy("country").option("header", "true").save(output_directory3)

In [0]:
dbutils.fs.ls("abfss://gold@dhyaneshcapadls.dfs.core.windows.net/")

[FileInfo(path='abfss://gold@dhyaneshcapadls.dfs.core.windows.net/processed_data_csv/', name='processed_data_csv/', size=0, modificationTime=1697044541000),
 FileInfo(path='abfss://gold@dhyaneshcapadls.dfs.core.windows.net/processed_data_delta/', name='processed_data_delta/', size=0, modificationTime=1697289861000),
 FileInfo(path='abfss://gold@dhyaneshcapadls.dfs.core.windows.net/processed_data_pqt/', name='processed_data_pqt/', size=0, modificationTime=1697044540000)]