# Data Analysis on Brazilian E-Commerce Public Dataset by Olist

Dataset link: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?select=olist_customers_dataset.csv

In [156]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("project").getOrCreate()

# Loading the dataset

In [157]:
# Defining path to the dataset
customer_data_path = "./Data/olist_customers_dataset.csv"  # Replace with the actual path
order_item_path = "./Data/olist_order_items_dataset.csv"
order_payment_path = "./Data/olist_order_payments_dataset.csv"
product_category_translation_path= "./Data/product_category_name_translation.csv"
product_path = './Data/olist_products_dataset.csv'
seller_path = './Data/olist_sellers_dataset.csv'
geolocation_path = './Data/olist_geolocation_dataset.csv'
orders_path = './Data/olist_orders_dataset.csv'

# Load the Chipotle dataset into a Spark DataFrame
customer_df = spark.read.csv(customer_data_path, header=True, inferSchema=True)
order_item_df = spark.read.csv(order_item_path, header=True, inferSchema=True)
order_payment_df = spark.read.csv(order_payment_path, header=True, inferSchema=True)
product_category_translation_df = spark.read.csv(product_category_translation_path, header=True, inferSchema=True)
seller_df_uncleaned = spark.read.csv(seller_path, header=True, inferSchema=True)
product_df_uncleaned = spark.read.csv(product_path, header=True, inferSchema=True)
geoloacation_df_uncleaned = spark.read.csv(geolocation_path, header=True, inferSchema= True)
orders_df_uncleaned = spark.read.csv(orders_path, header=True, inferSchema= True)

                                                                                

# Data Cleaning and pre-processing

In [158]:
from pyspark.sql.functions import col, trim,regexp_replace, when

### Removing whitespace  

In [159]:
# Remove leading and trailing whitespace from all columns
seller_df_uncleaned.select([trim(col(c)).alias(c) for c in seller_df_uncleaned.columns])

# Remove whitespace characters between words in all columns
seller_df = seller_df_uncleaned.select([regexp_replace(col(c), r'\s+', ' ').alias(c) for c in seller_df_uncleaned.columns])


In [160]:
# Remove leading and trailing whitespace from all columns
geoloacation_df_uncleaned.select([trim(col(c)).alias(c) for c in geoloacation_df_uncleaned.columns])

geoloacation_df_uncleaned.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       são paulo|               SP|
|                       1047|-23.546273112412678| -46.6

### Working with inconsistent data

In [161]:
# Replace "são paulo" with "sao paulo" in the geolocation dataframe
geolocation_df = geoloacation_df_uncleaned.replace("são paulo", "sao paulo")

# Show the DataFrame with the replaced values
geolocation_df.show()

+---------------------------+-------------------+-------------------+----------------+-----------------+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city|geolocation_state|
+---------------------------+-------------------+-------------------+----------------+-----------------+
|                       1037| -23.54562128115268| -46.63929204800168|       sao paulo|               SP|
|                       1046|-23.546081127035535| -46.64482029837157|       sao paulo|               SP|
|                       1046| -23.54612896641469| -46.64295148361138|       sao paulo|               SP|
|                       1041|  -23.5443921648681| -46.63949930627844|       sao paulo|               SP|
|                       1035|-23.541577961711493| -46.64160722329613|       sao paulo|               SP|
|                       1012|-23.547762303364266| -46.63536053788448|       sao paulo|               SP|
|                       1047|-23.546273112412678| -46.6

### Drop null values

In [162]:
# Print the number of rows in the 'orders_df_uncleaned' DataFrame
print("No of rows in uncleaned dataset = ", orders_df_uncleaned.count())

# Drop rows with null values in the 'orders_df_uncleaned' DataFrame
orders_df = orders_df_uncleaned.dropna()

# Print the number of rows in the 'orders_df' DataFrame after dropping null values
print("No of rows of cleaned datset = ", orders_df.count())

No of rows in uncleaned dataset =  99441
No of rows of cleaned datset =  96461


### Replacing column on product dataset with content from product category translation dataset

In [163]:
# Perform a left join between the 'product_df_uncleaned' DataFrame and 'product_category_translation_df'
# based on the 'Product_category_name' column. This operation combines the two DataFrames .
product_joined_df= product_df_uncleaned.join(product_category_translation_df, "Product_category_name", "left")

# Drop "product_category_name" will be removed from the DataFrame.
product_df = product_joined_df.drop("product_category_name")

# Rename the "product_category_name_english" column to "product_category_name"
product_df = product_df.withColumnRenamed("product_category_name_english", "product_category_name")

# Show the 'product_df' DataFrame with the dropped and renamed columns.
product_df.show()

+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_name|
+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|                 40|                       287|                 1|             225|               16|               10|              14|            perfumery|
|3aa071139cb16b67c...|                 44|                       276|                 1|            1000|               30|               18|              20|                  art|
|96bd76ec8810374ed...|                 46|                       250|                 1|       

In [164]:
# Set payment_installment to 0 where payment_type is "not_defined"
order_payment_df = order_payment_df.withColumn("Payment_installments",
                                   when(col("Payment_type") == "not_defined", 0)
                                   .otherwise(col("Payment_installments")))


## Applying Transformation on the Dataframes 


### List of Dataframes:
    -customer_df 
    -order_item_df 
    -order_payment_df 
    -product_category_translation_df 
    -seller_df
    -product_df
    -geoloacation_df
    -orders_df

### Create a pivot table to find the number of transactions made by customers using different payment_types for each state.

In [165]:
from pyspark.sql.functions import count

orders_customer_df = orders_df.join(customer_df, "customer_id")

# Joined orders_customer_df with payment_df to get payment type
orders_payment_df = orders_customer_df.join(order_payment_df, "order_id")

# Grouped by 'customer_state' and 'payment_type' and count the orders
pivot_table = orders_payment_df.groupBy('customer_state', 'payment_type').agg(count('order_id').alias('order_count'))

# Pivoted the table to create the desired pivot table
pivot_table = pivot_table.groupBy('customer_state').pivot('payment_type').sum('order_count')

# Filled missing values with 0
pivot_table = pivot_table.na.fill(0)

pivot_table.show()

+--------------+------+-----------+----------+-------+
|customer_state|boleto|credit_card|debit_card|voucher|
+--------------+------+-----------+----------+-------+
|            SC|   817|       2650|        45|    150|
|            RO|    61|        180|         3|      7|
|            PI|    89|        377|        10|     28|
|            AM|    21|        121|         2|      7|
|            RR|    12|         29|         0|      0|
|            GO|   432|       1478|        22|    115|
|            TO|    75|        192|         4|     24|
|            MT|   234|        645|         4|     26|
|            SP|  7952|      31244|       737|   2363|
|            PB|    91|        412|        13|     35|
|            ES|   399|       1540|        26|    103|
|            RS|  1326|       3900|        74|    241|
|            MS|   174|        506|        11|     31|
|            AL|    64|        331|         3|     13|
|            MG|  2243|       8858|       137|    564|
|         

###  Find the total number of active sellers and how they have changed over time. This question helps to find the sellers who are actively selling their product i.e. they have not canceled their orders.

In [166]:
from pyspark.sql.functions import year, countDistinct

joined_df = order_item_df.join(orders_df, order_item_df.order_id == orders_df.order_id) \
    .join(order_payment_df, order_item_df.order_id == order_payment_df.order_id) \
    .join(seller_df.alias('s'), order_item_df.seller_id == seller_df.seller_id) \
    .join(product_df.alias('p'), order_item_df.product_id == product_df.product_id)

# Extract the year from order_approved_at column
joined_df = joined_df.withColumn("year", year(col("order_approved_at")))

# Calculate the counts
result_df = joined_df.groupBy("year") \
    .agg(countDistinct("s.seller_id").alias("active_seller"), countDistinct("p.product_id").alias("product_count"))

# Order the result
result_df = result_df.orderBy("year", col("active_seller").desc())

result_df.show()

+----+-------------+-------------+
|year|active_seller|product_count|
+----+-------------+-------------+
|2016|          130|          241|
|2017|         1690|        16787|
|2018|         2331|        20184|
+----+-------------+-------------+



### Find the customer shares based on the states. This question helps to provide the insights about the percentage of customers making in each state.

In [167]:
from pyspark.sql.window import Window
from pyspark.sql.functions import count,desc,col,sum,round

# Define a window specification for the running total calculation
window_spec = Window.orderBy(desc("no_customers"))

cteDF = customer_df.groupBy("customer_state") \
    .agg(count("customer_unique_id").alias("no_customers")) \
    .orderBy(desc("no_customers")) \
    .withColumn("percentage_customer_base", round(col("no_customers") / sum("no_customers").over(Window.partitionBy().orderBy())* 100, 2) ) \
    .withColumn("running_total_percentage", round(sum("no_customers").over(window_spec) / sum("no_customers").over(Window.partitionBy().orderBy())* 100, 2) )

resultDF = cteDF.orderBy(desc("no_customers"))
resultDF.show()

23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 1

+--------------+------------+------------------------+------------------------+
|customer_state|no_customers|percentage_customer_base|running_total_percentage|
+--------------+------------+------------------------+------------------------+
|            SP|       41746|                   41.98|                   41.98|
|            RJ|       12852|                   12.92|                    54.9|
|            MG|       11635|                    11.7|                   66.61|
|            RS|        5466|                     5.5|                    72.1|
|            PR|        5045|                    5.07|                   77.18|
|            SC|        3637|                    3.66|                   80.83|
|            BA|        3380|                     3.4|                   84.23|
|            DF|        2140|                    2.15|                   86.38|
|            ES|        2033|                    2.04|                   88.43|
|            GO|        2020|           

23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 14:53:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 1

### Find the states whose sales value is higher than the buy value.

In [168]:

customer_df.createOrReplaceTempView("customer")
order_item_df.createOrReplaceTempView("order_item")
order_payment_df.createOrReplaceTempView("order_payment")
geolocation_df.createOrReplaceTempView("location")
seller_df.createOrReplaceTempView("seller")
product_df.createOrReplaceTempView("product")
orders_df.createOrReplaceTempView("orders")

In [169]:
from pyspark.sql.functions import sum
customer_seller_comparison_SQL  = spark.sql('''SELECT a.order_id, a.price,a.seller_id,b.seller_state, c.customer_id,d.customer_state
                                    FROM order_item a 
                                    INNER JOIN seller b ON a.seller_id = b.seller_id
                                    INNER JOIN orders c ON a.order_id = c.order_id
                                    INNER JOIN customer d ON d.customer_id=c.customer_id
''')
                                            
# customer_seller_comparison_SQL.show()
comparision_df = customer_seller_comparison_SQL

comparision_df = comparision_df.dropDuplicates()

comparision_df.count()

seller_df = comparision_df.groupBy(['seller_state', 'seller_id']).agg(sum('price').alias('sell_value'))

# Group by 'seller_state' and sum 'sell_value'
seller_df = seller_df.groupBy('seller_state').agg(sum('sell_value').alias('sell_value'))

# seller_df.show()

In [170]:
buyer_df = comparision_df.groupBy(['customer_state', 'customer_id']).agg(sum('price').alias('buy_value'))

# Group by 'customer_state' and sum 'buy_value'
buyer_df = buyer_df.groupBy('customer_state').agg(sum('buy_value').alias('buy_value'))

# buyer_df.show()

In [171]:
from pyspark.sql.functions import col,udf
from pyspark.sql.types import StringType

# Join buyer_df and seller_df on 'customer_state' and 'seller_state'
compare_buy_sell_activity = buyer_df.alias("buyer").join(
    seller_df.alias("seller"),
    col("buyer.customer_state") == col("seller.seller_state"),
    "left"
)

# Fill missing values with 0 for 'sell_value' and 'seller_state'
compare_buy_sell_activity = compare_buy_sell_activity.fillna(0, subset=["sell_value"])
compare_buy_sell_activity = compare_buy_sell_activity.withColumn(
    "seller_state",
    col("buyer.customer_state")
)

# Calculate the margin activity
compare_buy_sell_activity = compare_buy_sell_activity.withColumn(
    "margin_activity",
    compare_buy_sell_activity["sell_value"] - compare_buy_sell_activity["buy_value"]
)

def encode_margin_udf(value):
    if value < 0:
        return 'consumer_dominant'
    elif value == 0:
        return 'balanced'
    elif value > 0:
        return 'seller_dominant'

# Register the UDF
encode_margin_udf_spark = udf(encode_margin_udf, StringType())

compare_buy_sell_activity = compare_buy_sell_activity.withColumn(
    "margin_category",
    encode_margin_udf_spark(compare_buy_sell_activity["margin_activity"])
)

compare_buy_sell_activity.show()



+--------------+------------------+------------+------------------+-------------------+-----------------+
|customer_state|         buy_value|seller_state|        sell_value|    margin_activity|  margin_category|
+--------------+------------------+------------+------------------+-------------------+-----------------+
|            SC|469650.34999999905|          SC| 584587.8300000004| 114937.48000000138|  seller_dominant|
|            RO| 44486.18999999998|          RO|            4762.2| -39723.98999999998|consumer_dominant|
|            PI|          78356.66|          PI|            2383.0|          -75973.66|consumer_dominant|
|            AM|20835.459999999995|          AM|            1177.0|-19658.459999999995|consumer_dominant|
|            RR|6186.5599999999995|          RR|               0.0|-6186.5599999999995|consumer_dominant|
|            GO| 259822.1200000004|          GO|          60896.41| -198925.7100000004|consumer_dominant|
|            TO| 46620.56999999998|          T

                                                                                

### Find the total number of orders placed by customers every hour in each day of week .


In [172]:
query_result_order = orders_df.select("order_purchase_timestamp", "order_id")
query_result_order.show(5)

+------------------------+--------------------+
|order_purchase_timestamp|            order_id|
+------------------------+--------------------+
|     2017-10-02 10:56:33|e481f51cbdc54678b...|
|     2018-07-24 20:41:37|53cdb2fc8bc7dce0b...|
|     2018-08-08 08:38:49|47770eb9100c2d0c4...|
|     2017-11-18 19:28:06|949d5b44dbf5de918...|
|     2018-02-13 21:18:39|ad21c59c0840e6cb8...|
+------------------------+--------------------+
only showing top 5 rows



In [173]:
print("Null Checking")
null_counts = query_result_order.select([col(c).alias(c) for c in query_result_order.columns]).na.drop().count()
print("Number of null values in each column:")
query_result_order.select([col(c).alias(c) for c in query_result_order.columns]).na.drop().show()
print(f"Total null values: {query_result_order.count() - null_counts}")

# Duplicate data checking
print("Duplicate Data Checking")
duplicate_count = query_result_order.count() - query_result_order.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_count}")

# Drop duplicate data
query_result_order = query_result_order.dropDuplicates()
print("After Removing Duplicate Data")
query_result_order.show()


Null Checking
Number of null values in each column:
+------------------------+--------------------+
|order_purchase_timestamp|            order_id|
+------------------------+--------------------+
|     2017-10-02 10:56:33|e481f51cbdc54678b...|
|     2018-07-24 20:41:37|53cdb2fc8bc7dce0b...|
|     2018-08-08 08:38:49|47770eb9100c2d0c4...|
|     2017-11-18 19:28:06|949d5b44dbf5de918...|
|     2018-02-13 21:18:39|ad21c59c0840e6cb8...|
|     2017-07-09 21:57:05|a4591c265e18cb1dc...|
|     2017-05-16 13:10:30|6514b8ad8028c9f2c...|
|     2017-01-23 18:29:09|76c6e866289321a7c...|
|     2017-07-29 11:55:02|e69bfb5eb88e0ed6a...|
|     2017-05-16 19:41:10|e6ce16cb79ec1d90b...|
|     2017-07-13 19:58:11|34513ce0c4fab462a...|
|     2018-06-07 10:06:19|82566a660a982b15f...|
|     2018-07-25 17:44:10|5ff96c15d0b717ac6...|
|     2018-03-01 14:14:28|432aaf21d85167c2c...|
|     2018-06-07 19:03:12|dcb36b511fcac050b...|
|     2018-01-02 19:00:43|403b97836b0c04a62...|
|     2017-12-26 23:41:31|116f0b0934

In [174]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col,date_format,hour

print("Dtypes:")
query_result_order.printSchema()

# Convert the 'order_purchase_timestamp' column to a timestamp type
query_result_order = query_result_order.withColumn("order_purchase_timestamp", col("order_purchase_timestamp").cast(TimestampType()))

# Parse day name
query_result_order = query_result_order.withColumn("Day", date_format(col("order_purchase_timestamp"), "EEEE"))

# Parse hour
query_result_order = query_result_order.withColumn("Hour", hour(col("order_purchase_timestamp")))

# Check data types after conversion
print("Dtypes After Conversion:")
query_result_order.printSchema()

Dtypes:
root
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_id: string (nullable = true)

Dtypes After Conversion:
root
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_id: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Hour: integer (nullable = true)



In [175]:
from pyspark.sql.functions import col, count
from pyspark.sql import functions as F

custom_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

# Use the 'when' function to create a custom sorting column
day_hour_group = query_result_order.groupBy('Hour').pivot('Day').agg(count('order_id'))
day_hour_group = day_hour_group.select(
    ['Hour'] + [F.col(day).alias(day) for day in custom_order]
)

# Order the DataFrame by 'Hour'
day_hour_group = day_hour_group.orderBy('Hour', ascending=False)

# Show the result
day_hour_group.show()

+----+------+-------+---------+--------+------+--------+------+
|Hour|Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday|
+----+------+-------+---------+--------+------+--------+------+
|  23|   697|    676|      600|     532|   496|     422|   591|
|  22|   962|    939|      855|     834|   675|     534|   857|
|  21|  1084|    996|      927|     826|   700|     645|   862|
|  20|   998|    946|      881|     816|   714|     709|   944|
|  19|   921|    894|      822|     798|   757|     729|   881|
|  18|   891|    852|      818|     759|   697|     693|   875|
|  17|   954|    942|      943|     881|   790|     673|   776|
|  16|  1060|   1059|     1007|    1042|   943|     675|   688|
|  15|  1047|   1005|      952|     901|   947|     699|   697|
|  14|  1063|   1095|     1027|     951|   938|     649|   660|
|  13|  1000|   1011|      990|     953|   967|     685|   701|
|  12|   944|    874|      897|     939|   827|     666|   653|
|  11|  1052|   1023|     1030|     928|

#### #Payment Analysis: For each payment type (payment_type), calculate the total payment value (sum of payment_value) and the average number of payment installments (payment_installments), rename the columns to 'Total Payment Value' and 'Avg Installments,' and order the result by payment type in ascending order

In [176]:
# Perform the analysis
payment_analysis = order_payment_df.groupBy("Payment_type") \
    .agg(
        sum("Payment_value").alias("Total Payment Value"),
        avg("Payment_installments").alias("Avg Installments")
    ) \
    .orderBy("Payment_type")

# Show the result
payment_analysis.show()

+------------+--------------------+-----------------+
|Payment_type| Total Payment Value| Avg Installments|
+------------+--------------------+-----------------+
|      boleto|  2869361.2700000196|              1.0|
| credit_card|1.2542084189999647E7|3.507155413763917|
|  debit_card|  217989.79000000015|              1.0|
| not_defined|                 0.0|              0.0|
|     voucher|   379436.8700000001|              1.0|
+------------+--------------------+-----------------+



#### #Growth analysis: Determine the month-over-month sales growth percentage, using a window function to compare the current month's total sales with the previous month's total sales for each seller (seller_id).

In [177]:
# Join the orders and order_items datasets
combined_df = orders_df.join(order_item_df, "Order_id")

# Calculate monthly sales for each seller
monthly_sales_window = Window.partitionBy("Seller_id").orderBy("YearMonth")
combined_df = combined_df.withColumn("YearMonth", combined_df["Order_purchase_timestamp"].substr(1, 7))
monthly_sales_df = combined_df.groupBy("Seller_id", "YearMonth").agg(sum("Price").alias("monthly_sales"))
monthly_sales_df = monthly_sales_df.withColumn("monthly_sales", monthly_sales_df["monthly_sales"].cast("float"))
monthly_sales_df = monthly_sales_df.withColumn("prev_month_sales", lag("monthly_sales").over(monthly_sales_window))

# Calculate month-over-month sales growth percentage
monthly_sales_df = monthly_sales_df.withColumn("sales_growth_percentage",
                                               ((monthly_sales_df["monthly_sales"] - monthly_sales_df["prev_month_sales"]) /
                                                monthly_sales_df["prev_month_sales"]) * 100)

# Show the result
monthly_sales_df.select("Seller_id", "YearMonth", "monthly_sales", "prev_month_sales", "sales_growth_percentage").show()

+--------------------+---------+-------------+----------------+-----------------------+
|           Seller_id|YearMonth|monthly_sales|prev_month_sales|sales_growth_percentage|
+--------------------+---------+-------------+----------------+-----------------------+
|0015a82c2db000af6...|  2017-09|        895.0|            null|                   null|
|0015a82c2db000af6...|  2017-10|       1790.0|           895.0|                  100.0|
|001cca7ae9ae17fb1...|  2017-02|       1098.9|            null|                   null|
|001cca7ae9ae17fb1...|  2017-03|       1676.7|          1098.9|     52.579844746649954|
|001cca7ae9ae17fb1...|  2017-04|       1708.2|          1676.7|     1.8786903391977854|
|001cca7ae9ae17fb1...|  2017-05|      2639.99|          1708.2|     54.548066133783976|
|001cca7ae9ae17fb1...|  2017-06|      2213.49|         2639.99|     -16.15536428462503|
|001cca7ae9ae17fb1...|  2017-07|      2483.95|         2213.49|       12.2187117236009|
|001cca7ae9ae17fb1...|  2017-08|

#### #Total number of orders placed by customers in each state (customer_state), rename the column to 'Order Count,' and order the result in ascending order of order count

In [179]:
# Join customers and orders datasets to identify unique customers in the orders dataset
unique_customers_df = customer_df.join(orders_df, "Customer_id")

# Group by customer_state and count the number of orders in each state
order_count_by_state = unique_customers_df.groupBy("customer_state").agg(count("Order_id").alias("Order Count"))

# Order the result in ascending order of order count
order_count_by_state = order_count_by_state.orderBy("Order Count")

# Show the result
order_count_by_state.show()




+--------------+-----------+
|customer_state|Order Count|
+--------------+-----------+
|            RR|         41|
|            AP|         67|
|            AC|         80|
|            AM|        145|
|            RO|        243|
|            TO|        274|
|            SE|        335|
|            AL|        397|
|            RN|        474|
|            PI|        476|
|            PB|        517|
|            MS|        701|
|            MA|        716|
|            MT|        886|
|            PA|        946|
|            CE|       1278|
|            PE|       1593|
|            GO|       1957|
|            ES|       1995|
|            DF|       2080|
+--------------+-----------+
only showing top 20 rows



### #Calculate the order count per customer, computes the average order value per customer, create a customer and order analysis DataFrame, and display the result. Also print the total number of customers.

In [180]:
# Join the Orders DataFrame and Order Payments DataFrame based on "Order_id"
order_payment_joined_df = orders_df.join(order_payment_df , "Order_id")

# Calculate order count per customer
order_count = orders_df.groupBy("Customer_id").agg(count("Order_id").alias("Order_Count"))


# Calculate average order value per customer
average_order_value = order_payment_joined_df.groupBy("Customer_id").agg(avg("Payment_value").alias("Average_Order_Value"))


# Create a customer and order analysis DataFrame
customer_behaviour_df = order_count.join(average_order_value, "Customer_id", "inner").orderBy(desc("Average_Order_Value"))

# Show the resulting DataFrame
customer_behaviour_df.show()
print("Total NUmber of Customer = ", customer_behaviour_df.count())


+--------------------+-----------+-------------------+
|         Customer_id|Order_Count|Average_Order_Value|
+--------------------+-----------+-------------------+
|1617b1357756262bf...|          1|           13664.08|
|ec5b2ba62e5743423...|          1|            7274.88|
|c6e2731c5b391845f...|          1|            6929.31|
|f48d464a0baaea338...|          1|            6922.21|
|3fd6777bbce08a352...|          1|            6726.66|
|05455dfa7cd02f13d...|          1|            6081.54|
|df55c14d1476a9a34...|          1|            4950.34|
|24bbf5fd2f2e1b359...|          1|            4764.34|
|3d979689f636322c6...|          1|            4681.78|
|1afc82cd60e303ef0...|          1|            4513.32|
|cc803a2c412833101...|          1|             4445.5|
|35a413c7ca3c69756...|          1|            4175.26|
|e9b0d0eb3015ef1c9...|          1|            4163.51|
|3be2c536886b2ea46...|          1|            4042.74|
|c6695e3b1e48680db...|          1|            4016.91|
|31e83c01f

### #Create a UDF to categorize products into different size categories based on their dimensions (length, width, and height) and weight? Then, calculate the average order value for each product size category.

In [181]:
# Define a UDF to categorize products into size categories based on dimensions and weight
def categorize_product_size(length, width, height, weight):
    if length is not None and width is not None and height is not None and weight is not None:
        if length <= 20 and width <= 20 and height <= 20 and weight <= 500:
            return "Small"
        elif length <= 40 and width <= 40 and height <= 40 and weight <= 2000:
            return "Medium"
        else:
            return "Large"
    else:
        return "Unknown"

# Register the UDF
categorize_udf = udf(categorize_product_size, StringType())

# Apply the UDF to create a new column "Product_Size_Category"
product_df = product_df.withColumn("Product_Size_Category", categorize_udf(
    product_df["Product_length_cm"],
    product_df["Product_width_cm"],
    product_df["Product_height_cm"],
    product_df["Product_weight_g"]
))

# Join the necessary datasets
joined_df = order_item_df.join(product_df, "Product_id")

# Calculate the average order value for each product size category
average_order_value_by_size = joined_df.groupBy("Product_Size_Category").agg(
    round(avg("Price"),2).alias("Average_Order_Value")
)

# Show the resulting DataFrame
average_order_value_by_size.show()

+---------------------+-------------------+
|Product_Size_Category|Average_Order_Value|
+---------------------+-------------------+
|              Unknown|             138.72|
|               Medium|             101.92|
|                Small|              82.06|
|                Large|             175.04|
+---------------------+-------------------+

