In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName('data_analysis') \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

In [3]:
df = spark.read.csv('purchase.csv', header=True, inferSchema=True)

In [4]:
df.count()

1000000

In [5]:
df.printSchema()

root
 |-- Product ID: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Discount: integer (nullable = true)
 |-- Tax Rate: integer (nullable = true)
 |-- Stock Level: integer (nullable = true)
 |-- Supplier ID: string (nullable = true)
 |-- Customer Age Group: string (nullable = true)
 |-- Customer Location: string (nullable = true)
 |-- Shipping Cost: double (nullable = true)
 |-- Shipping Method: string (nullable = true)
 |-- Return Rate: double (nullable = true)
 |-- Popularity Index: integer (nullable = true)



In [6]:
new_columns = [col.replace(" ","_").lower() for col in df.columns]
df = df.toDF(*new_columns)
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- discount: integer (nullable = true)
 |-- tax_rate: integer (nullable = true)
 |-- stock_level: integer (nullable = true)
 |-- supplier_id: string (nullable = true)
 |-- customer_age_group: string (nullable = true)
 |-- customer_location: string (nullable = true)
 |-- shipping_cost: double (nullable = true)
 |-- shipping_method: string (nullable = true)
 |-- return_rate: double (nullable = true)
 |-- popularity_index: integer (nullable = true)



In [7]:
df_nulls = df.select([
    F.count(F.when(F.col(c).contains('null') | \
                   F.col(c).contains('Null') | \
                   F.col(c).contains('none') | \
                   F.col(c).contains('None') | \
                   F.col(c).contains('null') | \
                   (F.col(c) == "") | \
                   F.col(c).isNull() | \
                   F.isnan(c), c)).alias(c)
    for c in df.columns])

In [8]:
expression = ''
num_cols = len(new_columns)
for idx in range(0, num_cols):
    expression += new_columns[idx]
    if idx != (num_cols - 1):
        expression += "+"
df_nulls = df_nulls.withColumn('sum_of_nulls', F.expr(expression))
df_nulls.select('sum_of_nulls').show()

+------------+
|sum_of_nulls|
+------------+
|           0|
+------------+



In [9]:
df.select(
    "product_name",
    (F.abs(F.col('popularity_index') - F.col('return_rate'))).alias('abs_diff_popularity_return')
).orderBy(F.col('abs_diff_popularity_return').desc()).show(10)


+---------------+--------------------------+
|   product_name|abs_diff_popularity_return|
+---------------+--------------------------+
|      Textbooks|                     98.99|
|        Fiction|                     98.99|
|          Shirt|                     98.99|
|        Blender|                     98.99|
|   Hiking Shoes|                     98.99|
|         Shorts|                     98.99|
|Air Conditioner|                     98.99|
|      Cookbooks|                     98.98|
| Graphic Novels|                     98.98|
|        Sandals|                     98.98|
+---------------+--------------------------+
only showing top 10 rows



In [10]:
df_standard = df.filter(F.col('shipping_method') == 'Standard')
counts = df_standard.groupby('supplier_id').agg(F.count('*').alias('total_num'))
counts.orderBy(F.col('total_num').desc()).limit(1).show()                            

+-----------+---------+
|supplier_id|total_num|
+-----------+---------+
|       S648|      441|
+-----------+---------+



In [11]:
price_per_category = df.groupby('category').agg(
    F.avg(F.col('shipping_cost')).alias('ave_cost'),
    F.min(F.col('shipping_cost')).alias('min_cost'),
    F.max(F.col('shipping_cost')).alias('max_cost'),
    F.median(F.col('shipping_cost')).alias('median_cost')
)
price_per_category.show(10)

+---------------+------------------+--------+--------+-----------+
|       category|          ave_cost|min_cost|max_cost|median_cost|
+---------------+------------------+--------+--------+-----------+
|        Apparel|25.015327350028343|     0.0|    50.0|      25.06|
|    Electronics| 25.02260244417696|     0.0|    50.0|      25.05|
|       Footwear|24.926221381485224|     0.0|    50.0|      24.86|
|          Books|24.959534547773085|     0.0|    50.0|      24.88|
|Home Appliances|25.002656528531656|     0.0|    50.0|       25.0|
+---------------+------------------+--------+--------+-----------+



In [12]:
popular = df.filter((F.col('popularity_index') > 50) & (F.col('popularity_index') < 70))
popular_counts = popular.groupby('category').agg(F.count('*').alias('most_popular'))
popular_counts.orderBy(F.col('most_popular').desc()).limit(5).show()

+---------------+------------+
|       category|most_popular|
+---------------+------------+
|Home Appliances|       37764|
|          Books|       37724|
|       Footwear|       37614|
|    Electronics|       37364|
|        Apparel|       37342|
+---------------+------------+



In [13]:
df = df.withColumn(
    'total_net',
    (((1 - (F.col('discount')/100)) * F.col('price')) * (1 - (F.col('tax_rate')/100)))
)
totals = df.groupby('supplier_id').agg(F.sum('total_net').alias('total_net_sum'))
totals.orderBy(F.col('total_net_sum').desc()).limit(5).show()

+-----------+-----------------+
|supplier_id|    total_net_sum|
+-----------+-----------------+
|       S960|986093.0523300001|
|       S648|    974139.456565|
|       S912|    964389.585645|
|       S852|     961270.20233|
|       S437|953615.2408799999|
+-----------+-----------------+



In [14]:
df.select('customer_age_group').distinct().show()

+------------------+
|customer_age_group|
+------------------+
|             45-54|
|             35-44|
|             25-34|
|             18-24|
|               55+|
+------------------+



In [15]:
df.select('customer_location').distinct().show()

+--------------------+
|   customer_location|
+--------------------+
|           Singapore|
|     Toronto, Canada|
|       Mumbai, India|
|        Chicago, USA|
|   Sydney, Australia|
|          Dubai, UAE|
|        Phoenix, USA|
|          London, UK|
|     Berlin, Germany|
|    Los Angeles, USA|
|Cape Town, South ...|
|       New York, USA|
|        Tokyo, Japan|
|       Paris, France|
|        Houston, USA|
+--------------------+



In [16]:
df = df.withColumn('city', F.split(F.col('customer_location'), ",").getItem(0))
df = df.withColumn('country', F.split(F.col('customer_location'), ",").getItem(1))

In [17]:
df_less_35 = df.filter((F.col('customer_age_group') == '18-24') | (F.col('customer_age_group') == '25-34'))
df_less_35 = df_less_35.withColumn('total_cost', (((1 - (F.col('discount')/100)) * F.col('price')) + F.col('shipping_cost')))
counts = df_less_35.groupby('city').agg(F.avg(F.col('total_cost')).alias('total_cost'))
counts.orderBy(F.col('total_cost').desc()).limit(5).show()

+-------+-----------------+
|   city|       total_cost|
+-------+-----------------+
|Chicago|912.4320126553755|
|Phoenix| 911.771666146382|
|  Tokyo|909.2769984100574|
|Houston|907.2379199784973|
| Mumbai|906.5524393350369|
+-------+-----------------+



In [18]:
country_stock_level = df.groupby('country').agg(F.sum('stock_level').alias('stock_level_per_country'))
country_stock_level.orderBy(F.col('stock_level_per_country').desc()).show()

+-------------+-----------------------+
|      country|stock_level_per_country|
+-------------+-----------------------+
|          USA|               83337091|
|          UAE|               16901458|
| South Africa|               16868967|
|         NULL|               16737180|
|        India|               16703968|
|           UK|               16656183|
|        Japan|               16649969|
|       Canada|               16567516|
|    Australia|               16564571|
|       France|               16543152|
|      Germany|               16498481|
+-------------+-----------------------+



In [19]:
city_shipping_cost = df.groupby('city').agg(F.avg('shipping_cost').alias('shipping_cost_per_city'))
city_shipping_cost.orderBy(F.col('shipping_cost_per_city').desc()).show()

+-----------+----------------------+
|       city|shipping_cost_per_city|
+-----------+----------------------+
|     Mumbai|     25.05759594691535|
|      Dubai|     25.03926263512409|
|     Sydney|    25.036199987939813|
|   New York|    25.028146637979006|
|  Singapore|    25.012167300380234|
|  Cape Town|    25.011344988310285|
|     London|    24.983443603387546|
|      Paris|    24.974364728670462|
|      Tokyo|    24.969093788465596|
|     Berlin|     24.96588914444815|
|    Chicago|    24.965885080162714|
|    Phoenix|     24.96542775208696|
|    Houston|    24.958903770433125|
|    Toronto|     24.91684280112381|
|Los Angeles|    24.892773692994716|
+-----------+----------------------+



In [20]:
popularity_indexdf_popularity_less_80 = df.filter(F.col('popularity_index') <= 80)
popularity_indexdf_popularity_less_80.count() / df.count() 

0.801781

In [21]:
discount_group = df.groupby('category').agg(
    F.avg(F.col('discount')).alias('avg_discount_category'))
discount_group.orderBy(F.col('avg_discount_category').desc()).show()

+---------------+---------------------+
|       category|avg_discount_category|
+---------------+---------------------+
|Home Appliances|   12.535051174715084|
|          Books|    12.52608953380058|
|       Footwear|   12.517129877204644|
|        Apparel|   12.504781122472028|
|    Electronics|    12.50168555318607|
+---------------+---------------------+



In [22]:
df_older_55 = df.filter((F.col('customer_age_group') == '55+'))
higher_popularity = df_older_55.groupby('product_name').agg(
    F.avg(F.col('popularity_index')).alias('popularity')
)
higher_popularity.orderBy(F.col('popularity').desc()).show(5)

+---------------+------------------+
|   product_name|        popularity|
+---------------+------------------+
|        Sandals| 50.86104624007473|
|     Smartwatch|50.772359550561795|
|        Monitor| 50.74241030358785|
|     Dishwasher|50.729935794542534|
|Washing Machine|50.655539627278316|
+---------------+------------------+
only showing top 5 rows



In [26]:
change_in_price = df.groupby('category').agg(
    F.min(F.col('price')).alias('min_price'),
    F.max(F.col('price')).alias('max_price'),
    F.avg(F.col('price')).alias('avg_price'),
    F.std(F.col('price')).alias('std_price'),
    F.median(F.col('price')).alias('median_price'),    
)
change_in_price.orderBy(F.col('category').desc()).show()

+---------------+---------+---------+------------------+-----------------+------------+
|       category|min_price|max_price|         avg_price|        std_price|median_price|
+---------------+---------+---------+------------------+-----------------+------------+
|Home Appliances|    10.01|  1999.98|1004.8413886273955|   574.9555585721|     1004.52|
|       Footwear|     10.0|  1999.99|1003.2791850626477|574.0032217810402|     1002.08|
|    Electronics|    10.01|  1999.98|1006.5645442513896|  574.27244804008|     1008.73|
|          Books|    10.02|  1999.99|1006.5614861196201|574.2156068994809|     1007.21|
|        Apparel|     10.0|   2000.0|1004.3474277310136|574.8106953691446|     1003.86|
+---------------+---------+---------+------------------+-----------------+------------+

