In [0]:
print("Hello World")

Hello World


# Customer Behaviour Analysis Project

Customer behavior analysis involves examining patterns in how customers engage with a business, including their purchasing habits, preferences, and interactions with products or services. This analysis enables businesses to better understand customer motivations, allowing them to craft personalized marketing strategies, optimize offerings, and enhance overall customer satisfaction. By identifying trends such as frequent buyers or customers who may be at risk of disengaging, companies can target specific segments with tailored promotions or product recommendations.

In [0]:
#Load Dataset

Orders_df = spark.read.csv('dbfs:/FileStore/tables/Orders-1.csv', header=True,inferSchema=True)
Products_df = spark.read.csv('dbfs:/FileStore/tables/Products-1.csv', header=True,inferSchema=True)
Finance_df = spark.read.csv('dbfs:/FileStore/tables/Finance.csv', header=True,inferSchema=True)



In [0]:
#joining datasets of Orders, Products, Finance
joined_df = Orders_df.join(Products_df, "order_id", "inner")
final_df = joined_df.join(Finance_df, "order_id", "inner")
#final_df.display()

In [0]:
display(final_df)


order_id,order_date,ship_date,customer,region,zip,city,state,country,product_name,manufactory,segment,category,subcategory,discount,profit,quantity,sales,profit_margin
US-2020-103800,2019-01-03,2019-01-07,Darren Powers,Central,77095,Houston,Texas,United States,"""Message Book, Wirebound, Four 5 1/2"""" X 4"""" Forms/Pg.","200 Dupl. Sets/Book""",Message Book,Consumer,Office Supplies,0.2,5.5512,2,16.448,0.3375
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,SAFCO Boltless Steel Shelving,SAFCO,Home Office,Office Supplies,Storage,0.2,-64.7748,3,272.736,-0.2375
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,SAFCO Boltless Steel Shelving,SAFCO,Home Office,Office Supplies,Storage,0.2,4.2717,3,11.784,0.3625
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,SAFCO Boltless Steel Shelving,SAFCO,Home Office,Office Supplies,Storage,0.8,-5.487,2,3.54,-1.55
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,Avery 508,Avery,Home Office,Office Supplies,Labels,0.2,-64.7748,3,272.736,-0.2375
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,Avery 508,Avery,Home Office,Office Supplies,Labels,0.2,4.2717,3,11.784,0.3625
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,Avery 508,Avery,Home Office,Office Supplies,Labels,0.8,-5.487,2,3.54,-1.55
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,GBC Standard Plastic Binding Systems Combs,GBC,Home Office,Office Supplies,Binders,0.2,-64.7748,3,272.736,-0.2375
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,GBC Standard Plastic Binding Systems Combs,GBC,Home Office,Office Supplies,Binders,0.2,4.2717,3,11.784,0.3625
US-2020-112326,2019-01-04,2019-01-08,Phillina Ober,Central,60540,Naperville,Illinois,United States,GBC Standard Plastic Binding Systems Combs,GBC,Home Office,Office Supplies,Binders,0.8,-5.487,2,3.54,-1.55


**Top 10 most products sold**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, desc

final_df = final_df.withColumn("quantity", col("quantity").cast("int"))
top_sold_products_df = final_df.groupBy("product_name").agg(sum("quantity").alias("total_quantity")).orderBy(desc("total_quantity")).limit(10)
top_sold_products_df.display()


product_name,total_quantity
Staples,2793
Staple envelope,2072
Plantronics Cordless Phone Headset with In-line Volume - M214C,1391
"""Pressboard Covers with Storage Hooks, 9 1/2"""" x 11""""",1276
Easy-staple paper,1270
"Global Wood Trimmed Manager's Task Chair, Khaki",1227
Staple-based wall hangings,1166
SAFCO Boltless Steel Shelving,1143
GBC Recycled VeloBinder Covers,1130
KI Adjustable-Height Table,1129


**Monthly Sales through years** 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sum

extract_TimeData = final_df.withColumn("order_year", year(col("order_date"))).withColumn("order_month", month(col("order_date")))

def get_monthly_sales(selected_year):
    filtered_data = extract_TimeData.filter(col("order_year") == selected_year)
    monthly_sales = filtered_data.groupBy("order_month").agg(sum("sales").alias("total_sales")).orderBy("order_month")
    return monthly_sales
  
#take input from user 
try:
    selected_year = int(input("Enter the year (2019-2022) for which you want to see monthly sales: "))
    monthly_sales = get_monthly_sales(selected_year)
    print(f"\nMonthly sales for the year {selected_year}:")
    monthly_sales.show()
except ValueError:
    print("Invalid year entered. Please enter a valid numeric year.")
 

Enter the year (2019-2022) for which you want to see monthly sales:  2020


Monthly sales for the year 2020:
+-----------+------------------+
|order_month|       total_sales|
+-----------+------------------+
|          1| 112358.1264000004|
|          2| 71851.69200000002|
|          3|354934.82999999984|
|          4| 442257.0829999976|
|          5| 355934.5539999989|
|          6| 385665.9800000015|
|          7|455366.94700000586|
|          8| 765119.6651999973|
|          9|1683500.1630000102|
|         10|  441851.817000004|
|         11|   856144.46239998|
|         12| 801034.1841999975|
+-----------+------------------+



**Customer with Maximum Purchases for each year**


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, sum as _sum, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import round

year = final_df.withColumn("year", year(col("order_date")))
customer_sales_by_year = year.groupBy("year", "customer").agg(_sum("sales").alias("total_sales"))

window_spec = Window.partitionBy("year").orderBy(col("total_sales").desc())
ranked_customers = customer_sales_by_year.withColumn("rank", row_number().over(window_spec))

top_customers_by_year = ranked_customers.filter(col("rank") == 1)

top_customers_by_year = top_customers_by_year.withColumn("total_sales", round("total_sales", 2))

top_customers_by_year.select("year", "customer", "total_sales").display(truncate=False)


year,customer,total_sales
2019,Sean Miller,1159400.17
2020,Peter Fuller,779161.94
2021,Tamara Chand,458425.81
2022,Seth Vernon,1450590.55


**Total Quantity, Sales, and Profit**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, sum, round
from pyspark.sql.functions import ceil

final_df = final_df.withColumn("year", year("order_date"))
finance = final_df.groupBy("year").agg(round(sum("sales"), 2).alias("Total_Sales"),
    round(sum("profit"), 2).alias("Total_Profit"),
    sum("quantity").cast("int").alias("Total_Quantity") 
)
finance = finance.withColumn("total_sales", ceil("total_sales"))
finance = finance.withColumn("Total_Profit", ceil("Total_Profit"))

finance.display(truncate=False)


year,total_sales,Total_Profit,Total_Quantity
2022,10216307,1285577,159643
2019,7146953,475556,95222
2020,6726020,765640,104783
2021,7246535,1022045,128336


**Largest Manufacturer for each year**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, sum, col, row_number, rank
from pyspark.sql.window import Window
from pyspark.sql.functions import ceil


manufacturer_sales = final_df.groupBy("year", "manufactory").agg(sum("sales").alias("Total_Sales"))
total_sales_by_year = manufacturer_sales.groupBy("year").agg(sum("Total_Sales").alias("Yearly_Total_Sales"))

manufacturer_with_totals = manufacturer_sales.join(total_sales_by_year,on="year")
manufacturer_percentages = manufacturer_with_totals.withColumn("Percentage_Contribution",round((col("Total_Sales") / col("Yearly_Total_Sales")) * 100, 0).cast("integer")).withColumn("Total_Sales",round(col("Total_Sales"), 0).cast("integer"))

window_spec = Window.partitionBy("year").orderBy(col("Total_Sales").desc())     

top_5_contributors = manufacturer_percentages.withColumn("rank", rank().over(window_spec)).filter(col("rank") <= 5)

display(manufacturer_percentages.select("year", "manufactory", "Total_Sales"))


year,manufactory,Total_Sales
2019,Avery,384525
2019,Fellowes,127507
2019,Stanley,3122
2019,Staple holder,596
2019,OtterBox,2193
2020,Cisco,96327
2020,Brother,13965
2022,Staple holder,22540
2019,Ultra,6936
2020,Chromcraft,19787


**Top products sold in each State**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, col, sum, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import date_format

year_df = final_df.withColumn("year", date_format("order_date", "yyyy"))
data_2022 = year_df.filter(col("year") == 2018)
state_product_sales = year_df.groupBy("state", "product_name").agg(sum("quantity").alias("Total_Quantity"))
window_spec = Window.partitionBy("state").orderBy(col("Total_Quantity").desc())
top_product_per_state = state_product_sales.withColumn("rank", row_number().over(window_spec)).filter(col("rank") ==1)
top_product_per_state.select("state", "product_name", "Total_Quantity").show(truncate=False)

+--------------------+------------------------------------------------------------+--------------+
|state               |product_name                                                |Total_Quantity|
+--------------------+------------------------------------------------------------+--------------+
|Alabama             |Easy-staple paper                                           |92            |
|Arizona             |"Howard Miller 13"" Diameter Pewter Finish Round Wall Clock"|287           |
|Arkansas            |Newell 327                                                  |238           |
|California          |Staples                                                     |692           |
|Colorado            |Memorex Micro Travel Drive 16 GB                            |608           |
|Connecticut         |Xerox 1914                                                  |68            |
|Delaware            |Staple envelope                                             |172           |
|District 

**State-Wise Sales Contribution in the year 2022**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, sum, col

df_2022 = final_df.filter(final_df.year == 2022)
state_sales_profit = final_df.groupBy("year", "state").agg(sum("sales").alias("Total_Sales"),sum("profit").alias("Total_Profit"))

state_sales_profit_2022 = df_2022.groupBy("year", "state").agg(
    sum("sales").alias("Total_Sales"),
    sum("profit").alias("Total_Profit")
)

yearly_sales_profit_2022 = state_sales_profit_2022.groupBy("year").agg(
    sum("Total_Sales").alias("Yearly_Total_Sales"),
    sum("Total_Profit").alias("Yearly_Total_Profit")
)

state_sales_profit_with_total_2022 = state_sales_profit_2022.join(yearly_sales_profit_2022, on="year")

state_sales_profit_with_percentage_2022 = state_sales_profit_with_total_2022.withColumn(
    "Sales_Percentage", round((col("Total_Sales") / col("Yearly_Total_Sales")) * 100, 2)
).withColumn(
    "Profit_Percentage", round((col("Total_Profit") / col("Yearly_Total_Profit")) * 100, 2)
)

state_sales_profit_with_percentage_2022.select("year", "state","Sales_Percentage", "Profit_Percentage"
).display(truncate=False)


year,state,Sales_Percentage,Profit_Percentage
2022,New York,25.18,43.58
2022,Nebraska,0.04,0.07
2022,Wisconsin,0.74,1.49
2022,Alabama,0.15,0.48
2022,Illinois,2.09,-0.02
2022,Arizona,1.76,-1.55
2022,Iowa,0.14,0.38
2022,Arkansas,0.35,0.79
2022,Colorado,2.31,-7.9
2022,New Mexico,0.27,0.8


**Analyze Impact of Ship Date on Profitability**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, sum, round
from pyspark.sql.functions import ceil

df = final_df.withColumn("ship_delay", datediff(col("ship_date"), col("order_date")))

from pyspark.sql.functions import when

df = df.withColumn("ship_delay_range", when(col("ship_delay") <= 2, "0-2 Days")
                                      .when((col("ship_delay") > 2) & (col("ship_delay") <= 5), "3-5 Days")
                                      .when((col("ship_delay") > 5) & (col("ship_delay") <= 10), "6-10 Days")
                                      .otherwise("10+ Days"))

delay_impact = df.groupBy("ship_delay_range").agg(round(sum("sales"), 2).alias("Total_Sales"),
    round(sum("profit"), 2).alias("Total_Profit"),
    round((sum("profit") / sum("sales")) * 100, 2).alias("Profit_Margin_Percentage")
)
delay_impact = delay_impact.withColumn("total_sales", ceil("total_sales"))
delay_impact = delay_impact.withColumn("Total_Profit", ceil("Total_Profit"))
delay_impact.show(truncate=False)

+----------------+-----------+------------+------------------------+
|ship_delay_range|total_sales|Total_Profit|Profit_Margin_Percentage|
+----------------+-----------+------------+------------------------+
|6-10 Days       |6742296    |1042851     |15.47                   |
|0-2 Days        |6775499    |1040626     |15.36                   |
|3-5 Days        |17818019   |1465341     |8.22                    |
+----------------+-----------+------------+------------------------+

