In [127]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum, mean, count, lower, lit, year, month, concat_ws, date_format, cast, rank
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import json

In [35]:
#Some importand properties for config and Master
#Property	                            Description
#"spark.ui.port"	                    Changes Spark UI Web Port (default: 4040)
#"spark.executor.memory"	            Memory per executor (e.g., "4g")
#"spark.executor.cores"	                Number of cores per executor (e.g., "2")
#"spark.driver.memory"	                Memory allocated to the driver (e.g., "2g")
#"spark.sql.shuffle.partitions"	        Number of partitions for shuffle operations (default: 200)
#"spark.default.parallelism"	        Default number of partitions for RDD operations
#"spark.serializer"	                    Serializer to use (e.g., "org.apache.spark.serializer.KryoSerializer")
#"spark.sql.execution.arrow.enabled"	Enable Pandas UDFs for performance (true/false)
#"spark.sql.warehouse.dir"	            Path to warehouse directory for Hive tables
#"spark.yarn.queue"	                    Sets YARN queue for Spark jobs


#Master	                                Description
#"local"	                            Run Spark on one thread (for debugging)
#"local[*]"	                            Run Spark using all available cores on the machine
#"local[4]"	                            Run Spark with 4 threads
#"yarn"	                                Run Spark on YARN cluster (Hadoop)
#"mesos://host:port"	                Run on Apache Mesos
#"spark://host:port"	                Run on Spark Standalone Cluster
#"k8s://host:port"	                    Run on Kubernetes

In [None]:
spark = SparkSession.builder.appName("Orders_Schema").getOrCreate()

25/02/10 00:53:03 WARN Utils: Your hostname, gautam-Aspire-E5-576 resolves to a loopback address: 127.0.1.1; using 192.168.1.37 instead (on interface wlp2s0)
25/02/10 00:53:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/10 00:53:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
orders = spark.read.csv('/home/gautam/airflow/retail_db/orders/part-00000',inferSchema='True',)

[0;31mType:[0m        property
[0;31mString form:[0m <property object at 0x70638cad6f20>
[0;31mDocstring:[0m  
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.

.. versionadded:: 2.0.0

.. versionchanged:: 3.4.0
    Supports Spark Connect.

Returns
-------
:class:`DataFrameReader`

Examples
--------
>>> spark.read
<...DataFrameReader object ...>

Write a DataFrame into a JSON file and read it back.

>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
...     # Write a DataFrame into a JSON file
...     spark.createDataFrame(
...         [{"age": 100, "name": "Hyukjin Kwon"}]
...     ).write.mode("overwrite").format("json").save(d)
...
...     # Read the JSON file as a DataFrame.
...     spark.read.format('json').load(d).show()
+---+------------+
|age|        name|
+---+------------+
|100|Hyukjin Kwon|
+---+------------+

In [None]:
# It works fast and reliable to define the schema 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True)
])

orders = spark.read.csv('/home/gautam/airflow/retail_db/orders/part-00000', schema=schema, header=False)
orders.show(5)


In [None]:
#the type is changed to tuple as we can add tuple using * before the column name
def get_column_names(ds_name, file_path):
    with open(file_path, "r") as json_file:
        data = json.load(json_file)
    ku = data[ds_name]
    sorted_ku = sorted(ku, key=lambda col:col['column_position'])
    xyz2 = tuple(k['column_name'] for k in sorted_ku) 
    return xyz2

In [23]:
orders_column = get_column_names('orders', '/home/gautam/airflow/retail_db/schemas.json')
order_items_column = get_column_names('order_items', '/home/gautam/airflow/retail_db/schemas.json')
products_column = get_column_names('products', '/home/gautam/airflow/retail_db/schemas.json')
categories_column = get_column_names('categories', '/home/gautam/airflow/retail_db/schemas.json')
customers_column = get_column_names('customers', '/home/gautam/airflow/retail_db/schemas.json')

In [26]:
orders = spark.read.csv('/home/gautam/airflow/retail_db/orders/part-00000',inferSchema='True',).toDF(*orders_column)
order_items = spark.read.csv('/home/gautam/airflow/retail_db/order_items/part-00000',inferSchema='True',).toDF(*order_items_column)
products = spark.read.csv('/home/gautam/airflow/retail_db/products/part-00000',inferSchema='True',).toDF(*products_column)
categories = spark.read.csv('/home/gautam/airflow/retail_db/categories/part-00000',inferSchema='True',).toDF(*categories_column)
customers = spark.read.csv('/home/gautam/airflow/retail_db/customers/part-00000',inferSchema='True',).toDF(*customers_column)

                                                                                

In [27]:
orders.show(10)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+
only showing top 10 rows



In [28]:
## When their are multiple files in a folder you just have to mention the folder name instead of folder name 
# And we can use union() as well like we have orders1 and orders2 we will simply use orders1.union(orders2)

In [29]:
orders.count()

68883

In [30]:
order_items.count()

172198

In [31]:
products.count()

1345

In [32]:
categories.count()

58

In [33]:
#Check for missing values in each DataFrame.

In [None]:
# Count missing values for each column
missing_counts = orders.select([sum(col(c).isNull().cast("int")).alias(c) for c in orders.columns])
missing_counts.show()

[Stage 33:>                                                         (0 + 1) / 1]

+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
|       0|         0|                0|           0|
+--------+----------+-----------------+------------+



                                                                                

In [None]:
#drop a single column orders = orders.drop("order_status")
#mean_value = orders.select(mean("order_item_quantity")).collect()[0][0]  # Get the mean value
#orders = orders.fillna({"order_item_quantity": mean_value})
#expr stands for median

#median_value = orders.select(expr("percentile_approx(order_item_quantity, 0.5)")).collect()[0][0]
#orders = orders.fillna({"order_item_quantity": median_value})

#mode_value = orders.groupBy("order_item_quantity").count().orderBy("count", ascending=False).first()[0]
#orders = orders.fillna({"order_item_quantity": mode_value})


In [42]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [43]:
#Method	                                    Action
#.drop("col_name")	                        Drops a column
#.fillna(value, subset=["col1", "col2"])	Replaces NULL with value
#.fillna({"col1": value1, "col2": value2})	Fill multiple columns
#mean()	                                    Compute mean for numerical columns
#expr("percentile_approx(col, 0.5)")	    Compute median
#.groupBy("col").count().orderBy("count", ascending=False).first()[0]	Compute mode

In [44]:
orders_filtered = orders.filter(orders.order_status.isin(["COMPLETE", "PENDING"]))
orders_filtered.show(10)

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|       3|2013-07-25 00:00:00|            12111|    COMPLETE|
|       5|2013-07-25 00:00:00|            11318|    COMPLETE|
|       6|2013-07-25 00:00:00|             7130|    COMPLETE|
|       7|2013-07-25 00:00:00|             4530|    COMPLETE|
|      15|2013-07-25 00:00:00|             2568|    COMPLETE|
|      17|2013-07-25 00:00:00|             2667|    COMPLETE|
|      21|2013-07-25 00:00:00|             2711|     PENDING|
|      22|2013-07-25 00:00:00|              333|    COMPLETE|
|      26|2013-07-25 00:00:00|             7562|    COMPLETE|
|      28|2013-07-25 00:00:00|              656|    COMPLETE|
+--------+-------------------+-----------------+------------+
only showing top 10 rows



In [48]:
(orders.filter(lower(orders.order_status) == 'complete')).show(10)

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|       3|2013-07-25 00:00:00|            12111|    COMPLETE|
|       5|2013-07-25 00:00:00|            11318|    COMPLETE|
|       6|2013-07-25 00:00:00|             7130|    COMPLETE|
|       7|2013-07-25 00:00:00|             4530|    COMPLETE|
|      15|2013-07-25 00:00:00|             2568|    COMPLETE|
|      17|2013-07-25 00:00:00|             2667|    COMPLETE|
|      22|2013-07-25 00:00:00|              333|    COMPLETE|
|      26|2013-07-25 00:00:00|             7562|    COMPLETE|
|      28|2013-07-25 00:00:00|              656|    COMPLETE|
|      32|2013-07-25 00:00:00|             3960|    COMPLETE|
+--------+-------------------+-----------------+------------+
only showing top 10 rows



In [52]:
#If you want to use the sql query over a df please create temporary table or view 
orders.createOrReplaceTempView("orders")

In [53]:
pending_orders = spark.sql("SELECT * FROM orders WHERE order_status = 'PENDING'")
pending_orders.show(10)

+--------+-------------------+-----------------+------------+
|order_id|         order_date|order_customer_id|order_status|
+--------+-------------------+-----------------+------------+
|      21|2013-07-25 00:00:00|             2711|     PENDING|
|      36|2013-07-25 00:00:00|             5649|     PENDING|
|      39|2013-07-25 00:00:00|             8214|     PENDING|
|      42|2013-07-25 00:00:00|             9776|     PENDING|
|      44|2013-07-25 00:00:00|            10500|     PENDING|
|      49|2013-07-25 00:00:00|             1871|     PENDING|
|      55|2013-07-25 00:00:00|             2052|     PENDING|
|      68|2013-07-25 00:00:00|             4320|     PENDING|
|      85|2013-07-25 00:00:00|             1485|     PENDING|
|      96|2013-07-25 00:00:00|             8683|     PENDING|
+--------+-------------------+-----------------+------------+
only showing top 10 rows



In [54]:
orders = orders.withColumn("order_date", col("order_date").cast('date'))

In [55]:
recent_orders = orders.orderBy(col("order_date").desc()).limit(10)
recent_orders.show(10)

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|   57595|2014-07-24|             9102|       COMPLETE|
|   57604|2014-07-24|            11274|        PENDING|
|   57596|2014-07-24|             2634|PENDING_PAYMENT|
|   57597|2014-07-24|             4574|PENDING_PAYMENT|
|   57598|2014-07-24|              138|        PENDING|
|   57599|2014-07-24|             4500|PENDING_PAYMENT|
|   57600|2014-07-24|            11876|     PROCESSING|
|   57601|2014-07-24|             1046|        ON_HOLD|
|   57602|2014-07-24|             5033| PAYMENT_REVIEW|
|   57603|2014-07-24|            10713|       COMPLETE|
+--------+----------+-----------------+---------------+



                                                                                

In [57]:
recent_orders = spark.sql("""
    SELECT * FROM orders
    ORDER BY order_date
    LIMIT 10
""")
recent_orders.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+



In [59]:
customers.columns

['customer_id',
 'customer_fname',
 'customer_lname',
 'customer_email',
 'customer_password',
 'customer_street',
 'customer_city',
 'customer_state',
 'customer_zipcode']

In [60]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [61]:
#Get the top 5 most expensive products.

In [62]:
products.columns

['product_id',
 'product_cateogry_id',
 'product_name',
 'product_description',
 'product_price',
 'product_image']

In [64]:
(products.orderBy(col("product_price").desc()).limit(5)).show()

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_cateogry_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|       208|                 10| SOLE E35 Elliptical|               NULL|      1999.99|http://images.acm...|
|        66|                  4|  SOLE F85 Treadmill|               NULL|      1799.99|http://images.acm...|
|       199|                 10|  SOLE F85 Treadmill|               NULL|      1799.99|http://images.acm...|
|       496|                 22|  SOLE F85 Treadmill|               NULL|      1799.99|http://images.acm...|
|      1048|                 47|"Spalding Beast 6...|               NULL|      1099.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+



In [65]:
#Merge orders_df with customers_df on customer_id to get customer details for each order.

In [67]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [68]:
customers.columns

['customer_id',
 'customer_fname',
 'customer_lname',
 'customer_email',
 'customer_password',
 'customer_street',
 'customer_city',
 'customer_state',
 'customer_zipcode']

In [69]:
(orders.join(customers, orders.order_customer_id == customers.customer_id, how='inner')).show(10)

+--------+----------+-----------------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|order_id|order_date|order_customer_id|   order_status|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+--------+----------+-----------------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|       1|2013-07-25|            11599|         CLOSED|      11599|          Mary|        Malone|     XXXXXXXXX|        XXXXXXXXX|8708 Indian Horse...|      Hickory|            NC|           28601|
|       2|2013-07-25|              256|PENDING_PAYMENT|        256|         David|     Rodriguez|     XXXXXXXXX|        XXXXXXXXX|7605 Tawny Horse ...|      Chicago|            IL|           60625|
|       3|

In [70]:
#Merge order_items_df with products_df to get product details for each order item.

In [72]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price']

In [73]:
products.columns

['product_id',
 'product_cateogry_id',
 'product_name',
 'product_description',
 'product_price',
 'product_image']

In [75]:
(order_items.join(products, order_items.order_item_product_id == products.product_id, how='inner')).show(10)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|product_id|product_cateogry_id|        product_name|product_description|product_price|       product_image|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|       957|                 43|Diamondback Women...|               NULL|       299.98|http://images.acm...|
|            2|                  2|                 1073|                  1|   

In [76]:
#Perform a left join between orders_df and order_items_df on order_id.

In [77]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [78]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price']

In [81]:
(orders.join(order_items, orders.order_id == order_items.order_item_order_id, "left")).show(10)

                                                                                

+--------+----------+-----------------+---------------+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_id|order_date|order_customer_id|   order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+--------+----------+-----------------+---------------+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|       1|2013-07-25|            11599|         CLOSED|            1|                  1|                  957|                  1|             299.98|                  299.98|
|       2|2013-07-25|              256|PENDING_PAYMENT|            4|                  2|                  403|                  1|             129.99|                  129.99|
|       2|2013-07-25|              256|PENDING_PAYMENT|            3|                  2|                  502|    

In [None]:
#Perform an outer join between categories_df and products_df on category_id.

In [83]:
categories.columns

['category_id', 'category_department_id', 'category_name']

In [84]:
products.columns

['product_id',
 'product_cateogry_id',
 'product_name',
 'product_description',
 'product_price',
 'product_image']

In [87]:
xyz = (products.join(categories, products.product_cateogry_id == categories.category_id, "outer")).show(10)

[Stage 50:>                 (0 + 1) / 1][Stage 51:>                 (0 + 1) / 1]

+----------+-------------------+--------------------+-------------------+-------------+--------------------+-----------+----------------------+-------------+
|product_id|product_cateogry_id|        product_name|product_description|product_price|       product_image|category_id|category_department_id|category_name|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+-----------+----------------------+-------------+
|      NULL|               NULL|                NULL|               NULL|         NULL|                NULL|          1|                     2|     Football|
|         1|                  2|Quest Q64 10 FT. ...|               NULL|        59.98|http://images.acm...|          2|                     2|       Soccer|
|         2|                  2|Under Armour Men'...|               NULL|       129.99|http://images.acm...|          2|                     2|       Soccer|
|         3|                  2|Under Armour Men'...

                                                                                

In [88]:
#Calculate total revenue per order.

In [89]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [90]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price']

In [92]:
ooi = orders.join(order_items, orders.order_id == order_items.order_item_order_id, "inner")
(ooi.groupBy('order_id').agg(sum("order_item_subtotal").alias("total_revenue"))).show(10)



+--------+------------------+
|order_id|     total_revenue|
+--------+------------------+
|     148|            479.99|
|     463| 829.9200000000001|
|     471|169.98000000000002|
|     496|441.95000000000005|
|    1088|249.97000000000003|
|    1580|            299.95|
|    1591|            439.86|
|    1645|1509.7900000000002|
|    2366|            299.97|
|    2659| 724.9100000000001|
+--------+------------------+
only showing top 10 rows



                                                                                

In [93]:
#Multiply two columns (order_item_quantity * order_item_product_price) and create a new column total_cost.

In [94]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price']

In [95]:
order_items = order_items.withColumn("total_cost", col("order_item_quantity") * col("order_item_product_price"))
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price',
 'total_cost']

In [96]:
#Apply a 10% discount on all product prices and add a new column discounted_price.

In [97]:
order_items = order_items.withColumn("discounted_price", col("order_item_product_price") * .9)
order_items.show(10)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+----------+------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|total_cost|  discounted_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+----------+------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|    299.98|           269.982|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|    199.99|           179.991|
|            3|                  2|                  502|                  5|              250.0|                    50.0|     250.0|              45.0|
|            4|                  2|                  403|                  1|     

In [98]:
#Extract the year and month from order_date and create new columns order_year and order_month.

In [99]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [102]:
(orders.withColumn("order_year", year(col("order_date"))).withColumn("order_month", month(col("order_date")))).show(10)

+--------+----------+-----------------+---------------+----------+-----------+
|order_id|order_date|order_customer_id|   order_status|order_year|order_month|
+--------+----------+-----------------+---------------+----------+-----------+
|       1|2013-07-25|            11599|         CLOSED|      2013|          7|
|       2|2013-07-25|              256|PENDING_PAYMENT|      2013|          7|
|       3|2013-07-25|            12111|       COMPLETE|      2013|          7|
|       4|2013-07-25|             8827|         CLOSED|      2013|          7|
|       5|2013-07-25|            11318|       COMPLETE|      2013|          7|
|       6|2013-07-25|             7130|       COMPLETE|      2013|          7|
|       7|2013-07-25|             4530|       COMPLETE|      2013|          7|
|       8|2013-07-25|             2911|     PROCESSING|      2013|          7|
|       9|2013-07-25|             5657|PENDING_PAYMENT|      2013|          7|
|      10|2013-07-25|             5648|PENDING_PAYME

In [103]:
#Create a new column customer_fullname by concatenating customer_fname and customer_lname.

In [104]:
customers.columns

['customer_id',
 'customer_fname',
 'customer_lname',
 'customer_email',
 'customer_password',
 'customer_street',
 'customer_city',
 'customer_state',
 'customer_zipcode']

In [108]:
(customers.withColumn("customer_fullname", concat_ws(" ",col('customer_fname'), col('customer_lname')))).show(10)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+-----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|customer_fullname|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+-----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|Richard Hernandez|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|     Mary Barrett|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|             725|        Ann Smith|
|          4|          Mary|

In [109]:
#Add a column indicating whether an order was placed on a weekend.

In [111]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [None]:

(orders.withColumn("is_weekend", date_format(col("order_date"), "E").isin("Sat","Sun").cast("boolean"))).show(10)

+--------+----------+-----------------+---------------+----------+
|order_id|order_date|order_customer_id|   order_status|is_weekend|
+--------+----------+-----------------+---------------+----------+
|       1|2013-07-25|            11599|         CLOSED|     false|
|       2|2013-07-25|              256|PENDING_PAYMENT|     false|
|       3|2013-07-25|            12111|       COMPLETE|     false|
|       4|2013-07-25|             8827|         CLOSED|     false|
|       5|2013-07-25|            11318|       COMPLETE|     false|
|       6|2013-07-25|             7130|       COMPLETE|     false|
|       7|2013-07-25|             4530|       COMPLETE|     false|
|       8|2013-07-25|             2911|     PROCESSING|     false|
|       9|2013-07-25|             5657|PENDING_PAYMENT|     false|
|      10|2013-07-25|             5648|PENDING_PAYMENT|     false|
+--------+----------+-----------------+---------------+----------+
only showing top 10 rows



In [119]:

(orders.withColumn("is_weekend", date_format(col("order_date"), "E").isin("Sat","Sun").cast("string"))).show(10)

+--------+----------+-----------------+---------------+----------+
|order_id|order_date|order_customer_id|   order_status|is_weekend|
+--------+----------+-----------------+---------------+----------+
|       1|2013-07-25|            11599|         CLOSED|     false|
|       2|2013-07-25|              256|PENDING_PAYMENT|     false|
|       3|2013-07-25|            12111|       COMPLETE|     false|
|       4|2013-07-25|             8827|         CLOSED|     false|
|       5|2013-07-25|            11318|       COMPLETE|     false|
|       6|2013-07-25|             7130|       COMPLETE|     false|
|       7|2013-07-25|             4530|       COMPLETE|     false|
|       8|2013-07-25|             2911|     PROCESSING|     false|
|       9|2013-07-25|             5657|PENDING_PAYMENT|     false|
|      10|2013-07-25|             5648|PENDING_PAYMENT|     false|
+--------+----------+-----------------+---------------+----------+
only showing top 10 rows



In [120]:
(orders.withColumn("order_day", date_format(col("order_date"), "EEEE"))).show(10)


+--------+----------+-----------------+---------------+---------+
|order_id|order_date|order_customer_id|   order_status|order_day|
+--------+----------+-----------------+---------------+---------+
|       1|2013-07-25|            11599|         CLOSED| Thursday|
|       2|2013-07-25|              256|PENDING_PAYMENT| Thursday|
|       3|2013-07-25|            12111|       COMPLETE| Thursday|
|       4|2013-07-25|             8827|         CLOSED| Thursday|
|       5|2013-07-25|            11318|       COMPLETE| Thursday|
|       6|2013-07-25|             7130|       COMPLETE| Thursday|
|       7|2013-07-25|             4530|       COMPLETE| Thursday|
|       8|2013-07-25|             2911|     PROCESSING| Thursday|
|       9|2013-07-25|             5657|PENDING_PAYMENT| Thursday|
|      10|2013-07-25|             5648|PENDING_PAYMENT| Thursday|
+--------+----------+-----------------+---------------+---------+
only showing top 10 rows



In [121]:
#Rank products by sales revenue using a window function

In [122]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price',
 'total_cost',
 'discounted_price']

In [129]:
order_items_2 = order_items.groupBy("order_item_product_id").agg(sum("order_item_subtotal").alias("total_revenue")).repartition(4)
window_spec = Window.orderBy(col("total_revenue").desc())
ranked_products = order_items_2.withColumn("rank", rank().over(window_spec))
ranked_products.show(10)

25/02/11 11:30:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 11:30:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 11:30:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 11:30:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 11:30:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 11:30:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 1

+---------------------+------------------+----+
|order_item_product_id|     total_revenue|rank|
+---------------------+------------------+----+
|                 1004| 6929653.500001108|   1|
|                  365| 4421143.020000522|   2|
|                  957| 4118425.419999785|   3|
|                  191|3667633.2000001078|   4|
|                  502|         3147800.0|   5|
|                 1073|3099845.0000002305|   6|
|                  403| 2891757.540000081|   7|
|                 1014|2888993.9399995143|   8|
|                  627|1269082.6499998837|   9|
|                  565|           67830.0|  10|
+---------------------+------------------+----+
only showing top 10 rows



                                                                                

In [131]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, rank, col

# Step 1: Aggregate total revenue per product
order_items_2 = order_items.groupBy("order_item_product_id") \
    .agg(sum("order_item_subtotal").alias("total_revenue"))

# Step 2: Define a Window Specification without partitioning
window_spec = Window.orderBy(col("total_revenue").desc())

# Step 3: Apply Rank Function
ranked_products = order_items_2.withColumn("rank", rank().over(window_spec))

# Step 4: Show top 10 ranked products
ranked_products.show(10)


25/02/11 12:18:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 12:18:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 12:18:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 12:18:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 12:18:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/02/11 12:18:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------------+------------------+----+
|order_item_product_id|     total_revenue|rank|
+---------------------+------------------+----+
|                 1004| 6929653.500001108|   1|
|                  365| 4421143.020000522|   2|
|                  957| 4118425.419999785|   3|
|                  191|3667633.2000001078|   4|
|                  502|         3147800.0|   5|
|                 1073|3099845.0000002305|   6|
|                  403| 2891757.540000081|   7|
|                 1014|2888993.9399995143|   8|
|                  627|1269082.6499998837|   9|
|                  565|           67830.0|  10|
+---------------------+------------------+----+
only showing top 10 rows



In [132]:
#Find the cumulative revenue per order using sum() with a window function.

In [133]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [134]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price',
 'total_cost',
 'discounted_price']

In [135]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col

# Step 1: Define a Window Specification partitioned by order_id and ordered by order_item_id
window_spec = Window.partitionBy("order_item_order_id").orderBy("order_item_id").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Step 2: Calculate cumulative revenue using sum() over the window
order_items_cumulative = order_items.withColumn(
    "cumulative_revenue",
    sum(col("order_item_subtotal")).over(window_spec)
)

# Step 3: Show results
order_items_cumulative.select("order_item_order_id", "order_item_id", "order_item_subtotal", "cumulative_revenue").show(10)




+-------------------+-------------+-------------------+------------------+
|order_item_order_id|order_item_id|order_item_subtotal|cumulative_revenue|
+-------------------+-------------+-------------------+------------------+
|                  1|            1|             299.98|            299.98|
|                  2|            2|             199.99|            199.99|
|                  2|            3|              250.0|            449.99|
|                  2|            4|             129.99|            579.98|
|                  4|            5|              49.98|             49.98|
|                  4|            6|             299.95|            349.93|
|                  4|            7|              150.0|            499.93|
|                  4|            8|             199.92|            699.85|
|                  5|            9|             299.98|            299.98|
|                  5|           10|             299.95| 599.9300000000001|
+-------------------+----

                                                                                

In [136]:
#Find the first order date per customer using min().

In [137]:
from pyspark.sql.functions import min

# Step 1: Find the first order date per customer
first_order_per_customer = orders.groupBy("order_customer_id") \
    .agg(min("order_date").alias("first_order_date"))

# Step 2: Show results
first_order_per_customer.show(10)


[Stage 87:>                                                         (0 + 1) / 1]

+-----------------+----------------+
|order_customer_id|first_order_date|
+-----------------+----------------+
|              833|      2013-07-29|
|             1088|      2013-07-30|
|             6397|      2013-07-30|
|            11858|      2013-07-31|
|            11033|      2013-07-31|
|             8389|      2013-07-31|
|            12046|      2013-08-01|
|             4900|      2013-08-01|
|             6357|      2013-08-02|
|            11458|      2013-08-04|
+-----------------+----------------+
only showing top 10 rows



                                                                                

In [138]:
#Find the top-selling product in each category.

In [139]:
order_items.columns

['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_quantity',
 'order_item_subtotal',
 'order_item_product_price',
 'total_cost',
 'discounted_price']

In [143]:
categories.columns

['category_id', 'category_department_id', 'category_name']

In [145]:
products.columns

['product_id',
 'product_cateogry_id',
 'product_name',
 'product_description',
 'product_price',
 'product_image']

In [144]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, rank, col

# Step 1: Join order_items with categories to get category_id for each product
order_items_with_category = order_items.join(categories, order_items.order_item_product_id == categories.category_id, "inner")

# Step 2: Aggregate total revenue per product within each category
product_sales = order_items_with_category.groupBy("category_id", "order_item_product_id") \
    .agg(sum("order_item_subtotal").alias("total_revenue"))

# Step 3: Define a Window Specification partitioned by category and ordered by revenue
window_spec = Window.partitionBy("category_id").orderBy(col("total_revenue").desc())

# Step 4: Rank products within each category
ranked_products = product_sales.withColumn("rank", rank().over(window_spec))

# Step 5: Filter only the top-ranked product per category
top_selling_products = ranked_products.filter(col("rank") == 1)

# Step 6: Show results
top_selling_products.show(10)




+-----------+---------------------+------------------+----+
|category_id|order_item_product_id|     total_revenue|rank|
+-----------+---------------------+------------------+----+
|         19|                   19|  7999.35999999999|   1|
|         24|                   24|18477.689999999988|   1|
|         35|                   35| 10399.34999999999|   1|
|         37|                   37| 27327.19000000001|   1|
|         44|                   44| 56330.60999999994|   1|
|         58|                   58| 8699.709999999995|   1|
+-----------+---------------------+------------------+----+



                                                                                

In [146]:
#Check if there are any duplicate rows in customers_df.

In [149]:
from pyspark.sql.functions import count

# Count total rows in customers_df
total_rows = customers.count()

# Count distinct rows in customers_df
distinct_rows = customers.distinct().count()

# Check for duplicates
duplicate_rows = total_rows - distinct_rows

print(f"Total Rows: {total_rows}")
print(f"Distinct Rows: {distinct_rows}")
print(f"Duplicate Rows: {duplicate_rows}")

if duplicate_rows > 0:
    print(f"There are {duplicate_rows} duplicate rows in customers_df.")
else:
    print("No duplicate rows found in customers_df.")


Total Rows: 12435
Distinct Rows: 12435
Duplicate Rows: 0
No duplicate rows found in customers_df.


In [150]:
#Fill missing values in product_price with the average price of that category.

In [152]:
products.columns

['product_id',
 'product_cateogry_id',
 'product_name',
 'product_description',
 'product_price',
 'product_image']

In [156]:
from pyspark.sql.functions import col, avg, when

# Step 1: Compute average product price per category
category_avg_price = products.groupBy("product_cateogry_id").agg(avg("product_price").alias("avg_price"))

# Step 2: Join with products table on "product_cateogry_id" (Fix join column)
products_enriched = products.join(category_avg_price, on="product_cateogry_id", how="left")

# Step 3: Fill missing values in product_price with the category's avg_price
products_filled = products_enriched.withColumn(
    "product_price",
    when(col("product_price").isNull(), col("avg_price")).otherwise(col("product_price"))
).drop("avg_price")  # Drop avg_price column after filling

# Step 4: Show results
products_filled.show(10)

+-------------------+----------+--------------------+-------------------+-------------+--------------------+
|product_cateogry_id|product_id|        product_name|product_description|product_price|       product_image|
+-------------------+----------+--------------------+-------------------+-------------+--------------------+
|                  2|         1|Quest Q64 10 FT. ...|               NULL|        59.98|http://images.acm...|
|                  2|         2|Under Armour Men'...|               NULL|       129.99|http://images.acm...|
|                  2|         3|Under Armour Men'...|               NULL|        89.99|http://images.acm...|
|                  2|         4|Under Armour Men'...|               NULL|        89.99|http://images.acm...|
|                  2|         5|Riddell Youth Rev...|               NULL|       199.99|http://images.acm...|
|                  2|         6|Jordan Men's VI R...|               NULL|       134.99|http://images.acm...|
|                  

In [157]:
#Replace all NULL values in order_status with "UNKNOWN".

In [159]:
orders.show(10)

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-25|              256|PENDING_PAYMENT|
|       3|2013-07-25|            12111|       COMPLETE|
|       4|2013-07-25|             8827|         CLOSED|
|       5|2013-07-25|            11318|       COMPLETE|
|       6|2013-07-25|             7130|       COMPLETE|
|       7|2013-07-25|             4530|       COMPLETE|
|       8|2013-07-25|             2911|     PROCESSING|
|       9|2013-07-25|             5657|PENDING_PAYMENT|
|      10|2013-07-25|             5648|PENDING_PAYMENT|
+--------+----------+-----------------+---------------+
only showing top 10 rows



In [160]:
orders.filter(col("order_status").isNull()).show()

+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
+--------+----------+-----------------+------------+



In [161]:
#Find all orders where customer_id is missing.

In [164]:
orders.columns

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [167]:
from pyspark.sql.functions import col

orders.filter(col("order_customer_id").isNull()).show()

+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
+--------+----------+-----------------+------------+

