#### Reading Data From Cosmos

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint","XXXXXXXXXXXXXXX") \
    .option("spark.cosmos.accountKey", "XXXXXXXXXXXXXXXXXXX") \
    .option("spark.cosmos.database", "XXXXXXXXXXXXXXXXXXXXX") \
    .option("spark.cosmos.container", "XXXXXXXXXXXXXXXXXXXX") \
    .load()
 
df.show()

 

+------------------+-------------------+--------+------------+----------+-----------------+----+-------------+--------------+-------------------+------+----------------+--------------------+-----+--------------+--------+--------------------+----------+----------+------+---+------+---------+
|ship-service-level|           Order ID|Category|ship-country|Fulfilment|              SKU|Size|Sales Channel|Courier Status|          ship-city|Amount|ship-postal-code|                  id|  B2B|    ship-state|currency|       promotion-ids|      Date|      ASIN| Style|Qty| index|   Status|
+------------------+-------------------+--------+------------+----------+-----------------+----+-------------+--------------+-------------------+------+----------------+--------------------+-----+--------------+--------+--------------------+----------+----------+------+---+------+---------+
|         Expedited|171-7642751-1586737|     Set|          IN|    Amazon|   SET062-KR-SP-S|   S|    Amazon.in|       Shipped

In [0]:
df=df.repartition(8)

In [0]:
df=df.cache()

#### Initial Shape Of The Data

In [0]:
num_rows = df.count()

num_columns = len(df.columns)

print(f"Shape of DataFrame: ({num_rows}, {num_columns})")

Shape of DataFrame: (128975, 23)


#### Correcting The Schema

In [0]:
df.printSchema()

root
 |-- ship-service-level: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- ship-country: string (nullable = true)
 |-- Fulfilment: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Courier Status: string (nullable = true)
 |-- ship-city: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- ship-postal-code: string (nullable = true)
 |-- id: string (nullable = false)
 |-- B2B: string (nullable = true)
 |-- ship-state: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- promotion-ids: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- ASIN: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- Qty: string (nullable = true)
 |-- index: string (nullable = true)
 |-- Status: string (nullable = true)



In [0]:
schema = StructType([
    StructField("ship-service-level", StringType(), True),
    StructField("Order ID", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("ship-country", StringType(), True),
    StructField("Fulfilment", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("Size", StringType(), True),
    StructField("Sales Channel", StringType(), True),
    StructField("Courier Status", StringType(), True),
    StructField("ship-city", StringType(), True),
    StructField("Amount", DoubleType(), True),
    StructField("ship-postal-code", StringType(), True),
    StructField("id", StringType(), False),
    StructField("B2B", StringType(), True),
    StructField("ship-state", StringType(), True),
    StructField("currency", StringType(), True),
    StructField("promotion-ids", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("ASIN", StringType(), True),
    StructField("Style", StringType(), True),
    StructField("Qty", IntegerType(), True),
    StructField("index", StringType(), True),
    StructField("Status", StringType(), True)
])


In [0]:
from pyspark.sql.functions import *
df_schema_corrected= df.select(
    [col(field.name).cast(field.dataType) for field in schema.fields]
)

In [0]:
df_schema_corrected.select("Date").distinct().show()

+----------+
|      Date|
+----------+
|  06-27-22|
|06-04-2022|
|04-10-2022|
|  04-17-22|
|  06-13-22|
|  06-16-22|
|04-07-2022|
|  03-31-22|
|  05-16-22|
|04-11-2022|
|06-08-2022|
|  06-29-22|
|  04-25-22|
|  04-18-22|
|  04-13-22|
|  05-29-22|
|  05-23-22|
|04-12-2022|
|  05-15-22|
|05-07-2022|
+----------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col, when, regexp_replace

df_schema_corrected = df_schema_corrected.withColumn(
    "Date",
    when(
        col("Date").endswith("-22"),
        regexp_replace(col("Date"), r"-(\d{2})$", "-20$1")
    ).otherwise(col("Date")) 
)

df_schema_corrected=df_schema_corrected.withColumn("Date",to_date(col("Date"),"MM-dd-yyyy"))
df_schema_corrected.show()
df_schema_corrected=df_schema_corrected.cache()


+------------------+-------------------+--------+------------+----------+-----------------+----+-------------+--------------+------------------+-------+----------------+--------------------+-----+-------------+--------+--------------------+----------+----------+------+---+------+--------------------+
|ship-service-level|           Order ID|Category|ship-country|Fulfilment|              SKU|Size|Sales Channel|Courier Status|         ship-city| Amount|ship-postal-code|                  id|  B2B|   ship-state|currency|       promotion-ids|      Date|      ASIN| Style|Qty| index|              Status|
+------------------+-------------------+--------+------------+----------+-----------------+----+-------------+--------------+------------------+-------+----------------+--------------------+-----+-------------+--------+--------------------+----------+----------+------+---+------+--------------------+
|         Expedited|403-1727324-3527525|     Set|          IN|    Amazon|SET347-KR-NP-XXXL| 3X

In [0]:
df_schema_corrected.select("Date").distinct()

Out[11]: DataFrame[Date: date]

In [0]:
duplicates = (
    df_schema_corrected.groupBy("Order ID")
    .count()
    .filter(col("count") > 1)
)

duplicates.show()

+-------------------+-----+
|           Order ID|count|
+-------------------+-----+
|403-9586636-7831531|    2|
|406-5927721-0109124|    3|
|171-4462103-4906759|    2|
|404-2890351-6625116|    2|
|403-4187845-4493168|    2|
|405-0812081-2267512|    2|
|405-3051208-4542752|    2|
|406-6127261-4444325|    2|
|408-6276355-3968334|    4|
|404-9932919-6662730|   11|
|403-7016534-4621954|    2|
|408-3292144-2149913|    3|
|408-9688958-6934717|    3|
|403-1738971-1389151|    2|
|404-9752786-9818712|    2|
|403-4591468-8381960|    2|
|403-1650373-8655510|    3|
|405-0367818-5396307|    2|
|402-8538611-0499536|    2|
|402-1073069-3489901|    2|
+-------------------+-----+
only showing top 20 rows



In [0]:
df_duplicates_removed = df_schema_corrected.dropDuplicates([
'Order ID'])

In [0]:
duplicates1 = (
    df_duplicates_removed.groupBy("Order ID")
    .count()
    .filter(col("count") > 1)
)

duplicates1.show()

+--------+-----+
|Order ID|count|
+--------+-----+
+--------+-----+



#### DROPPING IRRELEVANT COLUMNS

In [0]:
df_cleaned= df_duplicates_removed.drop("id", "index")
df_cleaned.columns

Out[15]: ['ship-service-level',
 'Order ID',
 'Category',
 'ship-country',
 'Fulfilment',
 'SKU',
 'Size',
 'Sales Channel',
 'Courier Status',
 'ship-city',
 'Amount',
 'ship-postal-code',
 'B2B',
 'ship-state',
 'currency',
 'promotion-ids',
 'Date',
 'ASIN',
 'Style',
 'Qty',
 'Status']

In [0]:
len(df_cleaned.columns)

Out[16]: 21

#### Checking For Null Values And Processing The Same

In [0]:
null_counts = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])

null_counts.show()

+------------------+--------+--------+------------+----------+---+----+-------------+--------------+---------+------+----------------+---+----------+--------+-------------+----+----+-----+---+------+
|ship-service-level|Order ID|Category|ship-country|Fulfilment|SKU|Size|Sales Channel|Courier Status|ship-city|Amount|ship-postal-code|B2B|ship-state|currency|promotion-ids|Date|ASIN|Style|Qty|Status|
+------------------+--------+--------+------------+----------+---+----+-------------+--------------+---------+------+----------------+---+----------+--------+-------------+----+----+-----+---+------+
|                 0|       0|       0|          28|         0|  0|   0|            0|          6387|       28|  7395|              28|  0|        28|    7395|        46927|   0|   0|    0|  0|     0|
+------------------+--------+--------+------------+----------+---+----+-------------+--------------+---------+------+----------------+---+----------+--------+-------------+----+----+-----+---+------+


In [0]:
null_counts_list = null_counts.collect()[0].asDict()

filtered_null_counts = {k: v for k, v in null_counts_list.items() if v > 0}

for column, count in filtered_null_counts.items():
    print(f"Column: {column}, Null Count: {count}")

Column: ship-country, Null Count: 28
Column: Courier Status, Null Count: 6387
Column: ship-city, Null Count: 28
Column: Amount, Null Count: 7395
Column: ship-postal-code, Null Count: 28
Column: ship-state, Null Count: 28
Column: currency, Null Count: 7395
Column: promotion-ids, Null Count: 46927


In [0]:
df_cleaned.cache()

Out[19]: DataFrame[ship-service-level: string, Order ID: string, Category: string, ship-country: string, Fulfilment: string, SKU: string, Size: string, Sales Channel: string, Courier Status: string, ship-city: string, Amount: double, ship-postal-code: string, B2B: string, ship-state: string, currency: string, promotion-ids: string, Date: date, ASIN: string, Style: string, Qty: int, Status: string]

#### Filling Null Vales In Currency By INR

In [0]:
df_cleaned.select("currency").distinct()

Out[20]: DataFrame[currency: string]

In [0]:
from pyspark.sql import functions as F

mode_value = df_cleaned.groupBy("currency").count().orderBy(F.desc("count")).first()[0]

df_cleaned = df_cleaned.na.fill({"currency": mode_value})

#### Converting Promotions To Yes And No

In [0]:
df_cleaned = df_cleaned.withColumnRenamed('promotion-ids', 'promotion_applied') \
                       .withColumn('promotion_applied',
                                   when(col('promotion_applied').isNull(), 'No')
                                   .otherwise('Yes'))

In [0]:
df_cleaned.show()

+------------------+-------------------+-------------+------------+----------+--------------------+----+-------------+--------------+--------------+------+----------------+-----+--------------+--------+-----------------+----------+----------+-------+---+--------------------+
|ship-service-level|           Order ID|     Category|ship-country|Fulfilment|                 SKU|Size|Sales Channel|Courier Status|     ship-city|Amount|ship-postal-code|  B2B|    ship-state|currency|promotion_applied|      Date|      ASIN|  Style|Qty|              Status|
+------------------+-------------------+-------------+------------+----------+--------------------+----+-------------+--------------+--------------+------+----------------+-----+--------------+--------+-----------------+----------+----------+-------+---+--------------------+
|          Standard|171-0115096-8779555|        kurta|          IN|  Merchant|        JNE3764-KR-M|   M|    Amazon.in|          null|       LUCKNOW|463.81|          227202|

#### Processing Courier Status & Quantity

In [0]:
print(df.filter(col("courier status")=="Cancelled").count())
print(df.filter((col("courier status")=="Cancelled") & (col("Qty")==0)).count())

5935
5935


In [0]:


df_cleaned=df_cleaned.withColumn("Courier Status",when((col("Qty")==0) & (col("Courier status").isNull()),"Cancelled").otherwise(col("Courier Status")))
df_cleaned.filter((col("Courier status").isNull())).count()


Out[25]: 0

In [0]:
df_cleaned.filter((col("Amount").isNull())).count()

Out[26]: 7395

In [0]:
df_cleaned=df_cleaned.withColumn("Amount",when((col("Qty")==0),0).otherwise(col("Amount")))

#### Processing Amount Null Values

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window


quantity_cases = [1,2, 3, 4,9,13,15]

df_final=df_cleaned


for qty in quantity_cases:
    mean_amounts = df_final.filter(F.col("Qty") == qty) \
    .groupBy("Category") \
    .agg(F.expr("percentile_approx(Amount, 0.5)").alias(f"median_amount_qty_{qty}"))

    
    df_with_avg = df_final.join(mean_amounts, on="Category", how="left")

    df_final = df_with_avg.withColumn(
        'Amount',
        F.when(
            ((F.col('Amount').isNull()) | (F.col('Amount') == 0)) & (F.col('Qty') == qty), 
            F.col(f'median_amount_qty_{qty}')
        ).otherwise(F.col('Amount'))
    )
    
    df_final = df_final.drop(f'median_amount_qty_{qty}')

df_final.show()

df_final.select("Qty", "Amount").filter(F.col("Amount").isNull()).show()


In [0]:
df_final.select("courier status","Qty", "Amount").filter((F.col("courier status")=="Cancelled") & F.col("Amount").isNull()).count()

Out[29]: 0

In [0]:
df_final=df_final.cache()

In [0]:
df_final=df_final.cache()

In [0]:
df_final.show()

+------------+------------------+-------------------+------------+----------+--------------+----+-------------+--------------+--------------------+------+----------------+-----+-----------+--------+-----------------+----------+----------+-----+---+--------------------+
|    Category|ship-service-level|           Order ID|ship-country|Fulfilment|           SKU|Size|Sales Channel|Courier Status|           ship-city|Amount|ship-postal-code|  B2B| ship-state|currency|promotion_applied|      Date|      ASIN|Style|Qty|              Status|
+------------+------------------+-------------------+------------+----------+--------------+----+-------------+--------------+--------------------+------+----------------+-----+-----------+--------+-----------------+----------+----------+-----+---+--------------------+
|Ethnic Dress|         Expedited|407-7496699-8523552|          IN|    Amazon|    J0164-DR-L|   L|    Amazon.in|       Shipped|           NEW DELHI| 373.0|          110075|FALSE|      DELHI| 

In [0]:
df_final.count()

Out[33]: 120378

#### Drop Rows Where either ship-city, ship-state, ship-country or ship-postal-code was null

In [0]:
df_final=df_final.dropna()

In [0]:
from pyspark.sql.functions import col, sum

null_counts = df_final.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_final.columns])
null_counts.show()


+--------+------------------+--------+------------+----------+---+----+-------------+--------------+---------+------+----------------+---+----------+--------+-----------------+----+----+-----+---+------+
|Category|ship-service-level|Order ID|ship-country|Fulfilment|SKU|Size|Sales Channel|Courier Status|ship-city|Amount|ship-postal-code|B2B|ship-state|currency|promotion_applied|Date|ASIN|Style|Qty|Status|
+--------+------------------+--------+------------+----------+---+----+-------------+--------------+---------+------+----------------+---+----------+--------+-----------------+----+----+-----+---+------+
|       0|                 0|       0|           0|         0|  0|   0|            0|             0|        0|     0|               0|  0|         0|       0|                0|   0|   0|    0|  0|     0|
+--------+------------------+--------+------------+----------+---+----+-------------+--------------+---------+------+----------------+---+----------+--------+-----------------+----+---

#### Adding Columns

In [0]:
df_final=df_final.withColumn("DayOfWeek",dayofweek(col("Date")))
df_final=df_final.withColumn("DayOfmonth",dayofmonth(col("Date")))
df_final=df_final.withColumn("Month",month(col("Date")))
df_final.show()


+------------+------------------+-------------------+------------+----------+--------------+----+-------------+--------------+--------------------+------+----------------+-----+-----------+--------+-----------------+----------+----------+-----+---+--------------------+---------+----------+-----+
|    Category|ship-service-level|           Order ID|ship-country|Fulfilment|           SKU|Size|Sales Channel|Courier Status|           ship-city|Amount|ship-postal-code|  B2B| ship-state|currency|promotion_applied|      Date|      ASIN|Style|Qty|              Status|DayOfWeek|DayOfmonth|Month|
+------------+------------------+-------------------+------------+----------+--------------+----+-------------+--------------+--------------------+------+----------------+-----+-----------+--------+-----------------+----------+----------+-----+---+--------------------+---------+----------+-----+
|Ethnic Dress|         Expedited|407-7496699-8523552|          IN|    Amazon|    J0164-DR-L|   L|    Amazon.i

#### Final Shape Of The Data

In [0]:
num_rows = df_final.count()

num_columns = len(df_final.columns)

print(f"Shape of DataFrame: ({num_rows}, {num_columns})")

Shape of DataFrame: (120347, 24)


In [0]:
storage_account_name = "XXXXXXXXXXXXXXXXXX"
storage_account_key = "XXXXXXXXXXXXXXXXXXX"
container_name = "XXXXXXXXXXXXXXXXXX"
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
df_final.write.mode("overwrite").option("header", "true").parquet(output_path)

