In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F 

In [None]:
spark = SparkSession.builder.appName("midterm_exam_app").getOrCreate()

In [None]:
spark

In [None]:
df_articles = spark.read.csv("/FileStore/tables/articles.csv", header = True, inferSchema= True)
df_customers = spark.read.csv("/FileStore/tables/customers.csv", header = True, inferSchema= True)
df_transactions = spark.read.csv("/FileStore/tables/transactions.csv", header = True, inferSchema= True)

In [None]:
df_articles.show(1, False, True)

-RECORD 0---------------------------------------------------------------
 article_id                   | 108775015                               
 product_code                 | 108775                                  
 prod_name                    | Strap top                               
 product_type_no              | 253                                     
 product_type_name            | Vest top                                
 product_group_name           | Garment Upper body                      
 graphical_appearance_no      | 1010016                                 
 graphical_appearance_name    | Solid                                   
 colour_group_code            | 9                                       
 colour_group_name            | Black                                   
 perceived_colour_value_id    | 4                                       
 perceived_colour_value_name  | Dark                                    
 perceived_colour_master_id   | 5                  

In [None]:
df_customers.show(1, False, True)

-RECORD 0----------------------------------------------------------------------------------
 customer_id            | 0000423b00ade91418cceaf3b26c6af3dd342b51fd051eec9c12fb36984420fa 
 FN                     | null                                                             
 Active                 | null                                                             
 club_member_status     | ACTIVE                                                           
 fashion_news_frequency | NONE                                                             
 age                    | 25                                                               
 postal_code            | 2973abc54daa8a5f8ccfe9362140c63247c5eee03f1d93f4c830291c32bc3057 
only showing top 1 row



In [None]:
df_transactions.show(1, False, True)

-RECORD 0----------------------------------------------------------------------------
 t_dat            | 2019-02-28 20:00:00                                              
 customer_id      | 0002db27a1651998a3de4463437b580b45dfa7d8107afa778daad67b3d015d94 
 article_id       | 688873010                                                        
 price            | 0.033881355932203386                                             
 sales_channel_id | 1                                                                
only showing top 1 row



In [None]:
df_transactions.count()

Out[19]: 482189

In [None]:
df_transactions.select('customer_id').distinct().count()

Out[26]: 294058

In [None]:
df_customers.count()

Out[27]: 409862

In [None]:
df_articles.count()

Out[24]: 105542

### Analyzing product trends over seasons and years

In [None]:
from pyspark.sql.types import DateType

In [None]:
df_transactions = df_transactions.withColumn('date', F.col('t_dat').cast(DateType()))

In [None]:
df_transactions.show(3,False, True)

-RECORD 0----------------------------------------------------------------------------
 t_dat            | 2019-02-28 20:00:00                                              
 customer_id      | 0002db27a1651998a3de4463437b580b45dfa7d8107afa778daad67b3d015d94 
 article_id       | 688873010                                                        
 price            | 0.033881355932203386                                             
 sales_channel_id | 1                                                                
 date             | 2019-02-28                                                       
-RECORD 1----------------------------------------------------------------------------
 t_dat            | 2019-02-28 20:00:00                                              
 customer_id      | 0031a3ee9e817a6268d6bb4d598fd6922eb35f26dd0054dbfeed3c7fded09112 
 article_id       | 562245046                                                        
 price            | 0.030508474576271188              

In [None]:
from pyspark.sql.functions import year, month, when


In [None]:
df_transactions = df_transactions.withColumn('year', year("date").cast("int"))

In [None]:
df_transactions = df_transactions.withColumn("Season", when((month("date").between(3, 5)), "Spring")
                                                    .when((month("date").between(6, 8)), "Summer")
                                                    .when((month("date").between(9, 11)), "Autumn")
                                                    .otherwise("Winter")
                                            )

In [None]:
df_transactions.show(10, False, True)

-RECORD 0----------------------------------------------------------------------------
 t_dat            | 2019-02-28 20:00:00                                              
 customer_id      | 0002db27a1651998a3de4463437b580b45dfa7d8107afa778daad67b3d015d94 
 article_id       | 688873010                                                        
 price            | 0.033881355932203386                                             
 sales_channel_id | 1                                                                
 date             | 2019-02-28                                                       
 year             | 2019                                                             
 Season           | Winter                                                           
-RECORD 1----------------------------------------------------------------------------
 t_dat            | 2019-02-28 20:00:00                                              
 customer_id      | 0031a3ee9e817a6268d6bb4d598fd6922e

Having separate year and season features, now we can do some grouping and aggregation. We also use "articles" dataset for having comprehensive analysis about products

In [None]:
transactions_with_articles = df_transactions.join(df_articles, on="article_id")

In [None]:
season_trend = transactions_with_articles.groupBy("year", "season", "article_id") \
    .count() \
    .orderBy("year", "season", "count", ascending=False)


In [None]:
season_trend.show()

+----+------+----------+-----+
|year|season|article_id|count|
+----+------+----------+-----+
|2020|Winter| 720125001|  116|
|2020|Winter| 706016001|  115|
|2020|Winter| 866383001|  103|
|2020|Winter| 537116001|   99|
|2020|Winter| 799421001|   98|
|2020|Winter| 706016002|   96|
|2020|Winter| 733098018|   92|
|2020|Winter| 799421004|   91|
|2020|Winter| 799417004|   90|
|2020|Winter| 706016003|   87|
|2020|Winter| 791587001|   87|
|2020|Winter| 464297007|   81|
|2020|Winter| 590928001|   77|
|2020|Winter| 562245046|   77|
|2020|Winter| 372860001|   77|
|2020|Winter| 803757001|   73|
|2020|Winter| 599580041|   72|
|2020|Winter| 810838011|   71|
|2020|Winter| 730683041|   70|
|2020|Winter| 832361007|   70|
+----+------+----------+-----+
only showing top 20 rows



In [None]:
year_trend = transactions_with_articles.groupBy("year", "article_id") \
    .count() \
    .orderBy("year", "count", ascending=False)

year_trend.show()

+----+----------+-----+
|year|article_id|count|
+----+----------+-----+
|2020| 720125001|  116|
|2020| 706016001|  115|
|2020| 866383001|  103|
|2020| 537116001|   99|
|2020| 799421001|   98|
|2020| 706016002|   96|
|2020| 733098018|   92|
|2020| 799421004|   91|
|2020| 799417004|   90|
|2020| 706016003|   87|
|2020| 791587001|   87|
|2020| 464297007|   81|
|2020| 590928001|   77|
|2020| 372860001|   77|
|2020| 562245046|   77|
|2020| 803757001|   73|
|2020| 599580041|   72|
|2020| 810838011|   71|
|2020| 832361007|   70|
|2020| 730683041|   70|
+----+----------+-----+
only showing top 20 rows



**Analyzing product trend by years (2019 and 2020)**

In [None]:
year_trend_product = transactions_with_articles.groupBy("year", 'product_type_name', 'colour_group_name').count().orderBy('year', 'count', ascending=False)
year_trend_product.show()

+----+-----------------+-----------------+-----+
|year|product_type_name|colour_group_name|count|
+----+-----------------+-----------------+-----+
|2020|         Trousers|            Black| 3304|
|2020|            Dress|            Black| 2136|
|2020|  Leggings/Tights|            Black| 1739|
|2020|              Top|            Black| 1516|
|2020|          Sweater|            Black| 1499|
|2020|              Bra|            Black| 1491|
|2020| Underwear bottom|            Black| 1215|
|2020|         Trousers|        Dark Blue| 1139|
|2020|         Trousers|             Blue| 1084|
|2020|          T-shirt|            Black| 1043|
|2020|         Vest top|            Black| 1021|
|2020|          T-shirt|            White|  897|
|2020|            Skirt|            Black|  822|
|2020|  Swimwear bottom|            Black|  746|
|2020|       Bikini top|            Black|  720|
|2020|          Sweater|            Beige|  664|
|2020|         Trousers|       Light Blue|  651|
|2020|           Blo

In [None]:
pd_year_trend = year_trend_product.toPandas()

In [None]:
import plotly.express as px

fig = px.histogram(pd_year_trend, x='product_type_name', y='count', color='year', barmode='group',
              title="Yearly Product Trends")
fig.show()

In [None]:
trend_product_2019 = year_trend_product.filter(F.col('year')==2019).limit(10)
trend_product_2020 = year_trend_product.filter(F.col('year')==2020).limit(10)
trend_product_2019.show()
trend_product_2020.show()

+----+-----------------+-----------------+-----+
|year|product_type_name|colour_group_name|count|
+----+-----------------+-----------------+-----+
|2019|         Trousers|            Black|19484|
|2019|            Dress|            Black|16819|
|2019|          T-shirt|            White| 8613|
|2019|              Top|            Black| 8114|
|2019|          T-shirt|            Black| 7704|
|2019|         Trousers|        Dark Blue| 7511|
|2019|         Vest top|            Black| 7490|
|2019|          Sweater|            Black| 6971|
|2019|              Bra|            Black| 6733|
|2019|  Leggings/Tights|            Black| 6320|
+----+-----------------+-----------------+-----+

+----+-----------------+-----------------+-----+
|year|product_type_name|colour_group_name|count|
+----+-----------------+-----------------+-----+
|2020|         Trousers|            Black| 3304|
|2020|            Dress|            Black| 2136|
|2020|  Leggings/Tights|            Black| 1739|
|2020|             

In [None]:
trend_product_2019pd = trend_product_2019.toPandas()
trend_product_2020pd = trend_product_2020.toPandas()

In [None]:

fig1 = px.bar(trend_product_2019pd, x=trend_product_2019pd['product_type_name'], y=trend_product_2019pd['count'], 
              color=trend_product_2019pd['colour_group_name'], barmode='group',title="2019 Product Trends")

fig2 = px.bar(trend_product_2020pd, x=trend_product_2020pd['product_type_name'], y=trend_product_2020pd['count'], 
              color=trend_product_2020pd['colour_group_name'], barmode='group',title="2020 Product Trends")
fig1.show()
fig2.show()

Above graphs show that in both 2019 and 2020, black trousers are the leader of year trend, the same can be said about trend changes of dress. But changes are seen in 2020 trends of:
- Leggins: In contrast to 2019, in 2020 it has been purchased much more
- T-shirts have less popularity in purchase trends in 2020

These two products' trend changes can be taken account by sales managers

**Analyzing product trend by year seasons**

In [None]:
season_trend = transactions_with_articles.groupBy("year", "season", "product_type_name", 'colour_group_name') \
    .count() \
    .orderBy("year", "season", "count", ascending=False)

In [None]:
season_trend_winter = season_trend.filter(F.col('season')=='Winter').limit(10)
season_trend_spring = season_trend.filter(F.col('season')=='Spring').limit(10)
season_trend_summer = season_trend.filter(F.col('season')=='Summer').limit(10)
season_trend_autumn = season_trend.filter(F.col('season')=='Autumn').limit(10)
season_trend_winter.show()
season_trend_spring.show()
season_trend_summer.show()
season_trend_autumn.show()

+----+------+-----------------+-----------------+-----+
|year|season|product_type_name|colour_group_name|count|
+----+------+-----------------+-----------------+-----+
|2020|Winter|         Trousers|            Black| 3304|
|2020|Winter|            Dress|            Black| 2136|
|2020|Winter|  Leggings/Tights|            Black| 1739|
|2020|Winter|              Top|            Black| 1516|
|2020|Winter|          Sweater|            Black| 1499|
|2020|Winter|              Bra|            Black| 1491|
|2020|Winter| Underwear bottom|            Black| 1215|
|2020|Winter|         Trousers|        Dark Blue| 1139|
|2020|Winter|         Trousers|             Blue| 1084|
|2020|Winter|          T-shirt|            Black| 1043|
+----+------+-----------------+-----------------+-----+

+----+------+-----------------+-----------------+-----+
|year|season|product_type_name|colour_group_name|count|
+----+------+-----------------+-----------------+-----+
|2019|Spring|         Trousers|            Blac

In [None]:
season_trend_winter_pd = season_trend_winter.toPandas()
season_trend_spring_pd = season_trend_spring.toPandas()
season_trend_summer_pd = season_trend_summer.toPandas()
season_trend_autumn_pd = season_trend_autumn.toPandas()

In [None]:
fig3 = px.bar(season_trend_winter_pd, x=season_trend_winter_pd['product_type_name'], y=season_trend_winter_pd['count'], 
              color=season_trend_winter_pd['colour_group_name'], barmode='group',title="Product Trends in Winter")

fig4 = px.bar(season_trend_spring_pd, x=season_trend_spring_pd['product_type_name'], y=season_trend_spring_pd['count'], 
              color=season_trend_spring_pd['colour_group_name'], barmode='group',title="Product Trends in Spring")

fig5 = px.bar(season_trend_summer_pd, x=season_trend_summer_pd['product_type_name'], y=season_trend_summer_pd['count'], 
              color=season_trend_summer_pd['colour_group_name'], barmode='group',title="Product Trends in Summer")

fig6 = px.bar(season_trend_autumn_pd, x=season_trend_autumn_pd['product_type_name'], y=season_trend_autumn_pd['count'], 
              color=season_trend_autumn_pd['colour_group_name'], barmode='group',title="Product Trends in Autumn")

#fig6 = px.bar(season_trend_autumn_pd, x=season_trend_autumn_pd['product_type_name'], y=season_trend_autumn_pd['count'],  color=season_trend_autumn_pd['colour_group_name'], barmode='group',title="Product Trends in Autumn")

for i in [fig3, fig4, fig5, fig6]:
    i.show()

We can see that product trend greatly changes according to year season. Every season's trend is its corresponding type of clothes.
We can also notice that white clothes are trendy in summer and spring.

### Estimations for 2 years' transactions summary

If we assume that we have an unbiased sample, we can simply make calculations on our sample, then scale them for the full dataset with a scaling factor 33,33 (100/3)

**a) Total revenue**

In [None]:
# As we don't have quantity of products sold, we'll have product's price as revenue of transaction

from pyspark.sql.functions import col, countDistinct, sum as _sum

total_revenue = df_transactions.select(F.sum(F.col("price"))).alias('TotalRevenue')
total_revenue.show()

+------------------+
|        sum(price)|
+------------------+
|13468.221016949597|
+------------------+



In [None]:
total_revenue_inference = df_transactions.select(F.sum(F.col("price")*33.33))
total_revenue_inference.show()

+--------------------+
|sum((price * 33.33))|
+--------------------+
|  448895.80649493926|
+--------------------+



**b)Number of customers**

In [None]:

number_of_customers = df_customers.distinct().count()
print(number_of_customers)

409862


In [None]:
full_number_of_customers = number_of_customers * 33.33
print(full_number_of_customers)

13660700.459999999


**c) Number of transactions**

In [None]:
number_of_transactions = df_transactions.count()
print(number_of_transactions)

482189


In [None]:
full_n_of_transactions = number_of_transactions * 33.33
print(full_n_of_transactions)

16071359.37


**d) Average expenses per year per customer**

In [None]:
customer_number = df_transactions.distinct().count()

In [None]:
avg_expenses = df_transactions.groupBy("year", "customer_id").agg(F.sum(F.col("price"))
                                                                  .alias("total_expenses")).agg(F.sum(F.col("total_expenses") / customer_number))

In [None]:
avg_expenses.show()

+------------------------------+
|sum((total_expenses / 480319))|
+------------------------------+
|          0.028040158763133184|
+------------------------------+



Another method could be usage of data distributions for more accurate scaling

### Data Quality Report

In [None]:
for df, name in zip([df_transactions, df_customers, df_articles], ["Transactions", "Customers", "Articles"]):
    print(f"Null value analysis for {name}:")
    df.select([((df[col].isNull()).cast("int")).alias(col) for col in df.columns]).agg(*[
        _sum(col).alias(col) for col in df.columns
    ]).show(vertical = True)

Null value analysis for Transactions:
-RECORD 0---------------
 t_dat            | 0   
 customer_id      | 0   
 article_id       | 0   
 price            | 0   
 sales_channel_id | 0   
 date             | 0   
 year             | 0   
 Season           | 0   

Null value analysis for Customers:
-RECORD 0------------------------
 customer_id            | 0      
 FN                     | 245161 
 Active                 | 248064 
 club_member_status     | 932    
 fashion_news_frequency | 3053   
 age                    | 2501   
 postal_code            | 0      

Null value analysis for Articles:
-RECORD 0---------------------------
 article_id                   | 0   
 product_code                 | 0   
 prod_name                    | 0   
 product_type_no              | 0   
 product_type_name            | 0   
 product_group_name           | 0   
 graphical_appearance_no      | 0   
 graphical_appearance_name    | 0   
 colour_group_code            | 0   
 colour_group_name      

Important features don't have null values. What about customers age, or detailed description of a product, these columns' null values don't make huge loss in analysis. We can say that we don't have problem with Null values

In [None]:
mismatched_articles = df_transactions.join(df_articles, "article_id", "left_anti").count()

mismatched_customers = df_transactions.join(df_customers, "customer_id", "left_anti").count()

print(f"Mismatched articles: {mismatched_articles}")
print(f"Mismatched customers: {mismatched_customers}")

Mismatched articles: 0
Mismatched customers: 0


In [None]:
df_transactions.describe('price').show()

+-------+--------------------+
|summary|               price|
+-------+--------------------+
|  count|              482189|
|   mean|0.027931414895299555|
| stddev| 0.01947139179042385|
|    min|3.220338983050848E-4|
|    max|  0.5067796610169492|
+-------+--------------------+



In [None]:
from pyspark.sql.functions import to_date, current_date

df_transactions = df_transactions.withColumn("purchase_date", to_date("date", "yyyy-MM-dd"))
invalid_dates = df_transactions.filter((col("purchase_date") > current_date()) | col("purchase_date").isNull()).count()
print(f"Invalid dates in transactions: {invalid_dates}")

Invalid dates in transactions: 0


We can conclude that we have good quality data

### Some small analysis

In [None]:
df_transactions.groupBy('season').count().show()

+------+------+
|season| count|
+------+------+
|Spring|130643|
|Winter| 94806|
|Summer|148098|
|Autumn|108642|
+------+------+



Most purchases are made in summer, and least in winter. The company can make good profit in summer, and make "Christmas discounts" or other marketing campaign in winter 