In [0]:
orders = spark.read.format('csv').option("header",True).option("inferschema",True).load('dbfs:/Volumes/dev/demodb/landing_zone/olist_orders_dataset.csv')


order_items = spark.read.format('csv').option("header",True).option("inferschema",True).load('dbfs:/Volumes/dev/demodb/landing_zone/olist_order_items_dataset.csv')


products = spark.read.format('csv').option("header",True).option("inferschema",True).load('dbfs:/Volumes/dev/demodb/landing_zone/olist_products_dataset.csv')
display(orders)
display(order_items)
display(products)

In [0]:
order_details = orders.join(order_items, "order_id","left")\
    .join(products,"product_id", "left")

display(order_details)

In [0]:
#all the unique  values in the order status column
order_details.select('order_status').distinct().show()



In [0]:
delivered_df = order_details.filter("order_status = 'delivered'")
display(delivered_df)

In [0]:
from pyspark.sql.functions import col
missing_freight = order_details.filter(col("freight_value").isNotNull())

display(missing_freight)

In [0]:
clean_df = order_details.fillna({"freight_value": 0.0})


In [0]:
threshold = clean_df.approxQuantile("freight_value", [0.95], 0.01)[0]
order_details = clean_df.filter(col("freight_value") <= threshold)

In [0]:
order_details.select('product_category_name').distinct().show()

In [0]:
from pyspark.sql.functions import array

array_df = products.withColumn("category_tags", array("product_category_name"))
display(array_df)

In [0]:
from pyspark.sql.functions import explode

array_df.select(
    "product_id",
    explode("category_tags").alias("category")
).show()

**We want to find top product category**

In [0]:
from pyspark.sql.functions import count

order_details.columns

In [0]:
top_categories = order_details.groupBy("product_category_name").agg(count("*").alias('num_sales'))\
    .orderBy(col('num_sales').desc())
display(top_categories)

**We want to find product volume by month or product sales by month**

In [0]:
from pyspark.sql.functions import month, to_date

order_by_month = order_details.withColumn('order_month', month(to_date('order_purchase_timestamp'))) \
    .groupBy('product_category_name')\
    .pivot('order_month')\
    .agg(count('*'))


display(order_by_month)




In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

order_details = order_details.filter(col("product_category_name").isNotNull())

windowSpec = Window.partitionBy("product_category_name").orderBy(col("num_sales").desc())

ranked_products = order_details.groupBy("product_id", "product_category_name") \
                               .agg(count("*").alias("num_sales")) \
                               .withColumn("rank", row_number().over(windowSpec))

display(ranked_products)

USING CACHED FOR OPTIMISED PERFORMANCE

Faster iterative development

Boosts performance for repeated reads

In-memory columnar storage

In [0]:
order_items.cache()
order_items.count()

**Implementing Time-Series Analysis in Databricks**

**Extract Order Date and Count Orders Over Time**

In [0]:
display(order_details)

In [0]:
from pyspark.sql.functions import to_date

ts_df = order_details.withColumn('order_date', to_date('order_purchase_timestamp'))\
    .groupBy('order_date')\
    .count()\
    .orderBy('order_date')
display(ts_df)