#### In this project, we'll use synthetic data about sales with the purpose of practicing some PySpark.

In [47]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import sum, col, avg, first, round, concat, format_string, lit, when, cast, month
import json

In [24]:
findspark.init()

In [25]:
spark = SparkSession.builder.getOrCreate()

In [26]:
with open ("config.json", "r") as config_file:
    config_data = json.load(config_file)

path_products = config_data["csv_paths"]["path_products"]
path_sales = config_data["csv_paths"]["path_sales"]
path_sellers = config_data["csv_paths"]["path_sellers"]
path_result = config_data["csv_paths"]["path_result"] # At the end of the notebook, we'll perform a small ETL that we'll save in this path

In [27]:
products_schema = StructType ([
    StructField ("product_id", IntegerType()),
    StructField ("product_name", StringType()),
    StructField ("price", IntegerType())
])
products = spark.read.options(header=True,delimiter=",").schema(products_schema).csv(path_products)

products.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: integer (nullable = true)



In [28]:
sales_schema = StructType ([
    StructField ("order_id", IntegerType()),
    StructField ("product_id", IntegerType()),
    StructField ("seller_id", IntegerType()),
    StructField ("date", DateType()),
    StructField ("num_pieces_sold", IntegerType()),
    StructField ("bill_raw_text", StringType()),
    StructField ("product_id_num", IntegerType())
])

sales = spark.read.options(header=True,delimiter=",").schema(sales_schema).csv(path_sales)

sales.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- seller_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- num_pieces_sold: integer (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_id_num: integer (nullable = true)



In [29]:
sellers_schema = StructType ([
    StructField ("seller_id", IntegerType()),
    StructField ("seller_name", StringType()),
    StructField ("daily_target", IntegerType())
])
sellers = spark.read.options(header=True,delimiter=",").schema(sellers_schema).csv(path_sellers)

sellers.printSchema()

root
 |-- seller_id: integer (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: integer (nullable = true)



#### We'll load the three datasets and show the first 5 columns of each one

In [30]:
products.show(5)
sales.show(5)
sellers.show(5)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
+----------+------------+-----+
only showing top 5 rows

+--------+----------+---------+----------+---------------+--------------------+--------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|product_id_num|
+--------+----------+---------+----------+---------------+--------------------+--------------+
| 2998575|     39495|        8|2020-07-09|             69|cbgztjphaqxaolwgd...|         39495|
| 3981313|     35340|        9|2020-07-04|             47|nedrhdkrndwhonulx...|         35340|
| 3992645|     11870|        6|2020-07-08|             66|pjjfnblolzqdzzxbj...|         11870|
| 6475605|      7915|        8|2020-07-03|             12|wmsofzpxwlyhorqwf...|          7915|
|12492936|     85210| 

#### How many products, sales and sellers are there in total?

Total of products and sellers is trivial, but in one sale there can be more than one product. With the purpose of counting the total amount of sales, we'll sum the number of pieces sold

In [31]:
total_sales = sales.select(sum(sales.num_pieces_sold)).first()[0]

In [32]:
print ("Products:", products.count())
print ("Sales:", total_sales)
print ("Sellers:", sellers.count())

Products: 100000
Sales: 71194
Sellers: 10


#### How many products have been sold at least once?

In [33]:
sales.select("product_id").distinct().count()

1376

#### Which product has been sold in more orders?

In [34]:
max_sales = (
    sales.groupBy("product_id")
    .count()
    .sort("count", ascending = False)
    .first()[1]
)
print ("The product sold in the most orders has been sold", max_sales, "times.")

The product sold in the most orders has been sold 2 times.


In the following query we can see the most frequent products among all orders

In [35]:
(
    sales.groupBy("product_id")
    .count()
    .where(col("count") == max_sales)
    .show()
)

+----------+-----+
|product_id|count|
+----------+-----+
|      2999|    2|
|     20913|    2|
|     72638|    2|
|     30418|    2|
|     91069|    2|
|     62258|    2|
|     42883|    2|
|     22297|    2|
|     71771|    2|
|     81906|    2|
|     75721|    2|
|     96545|    2|
+----------+-----+



#### How many different products have been sold each day?

In [36]:
(
    sales.select("product_id", "date")
    .distinct()
    .groupBy("date")
    .count()
    .sort("date", ascending = True)
    .show()
)

+----------+-----+
|      date|count|
+----------+-----+
|2020-07-01|  142|
|2020-07-02|  132|
|2020-07-03|  159|
|2020-07-04|  147|
|2020-07-05|  142|
|2020-07-06|  145|
|2020-07-07|  123|
|2020-07-08|  114|
|2020-07-09|  146|
|2020-07-10|  138|
+----------+-----+



#### How much is the average expense per order?

In [37]:
avg_order_cost = (
    sales.join(products, sales.product_id == products.product_id, "inner")
    .withColumn("cost", col("num_pieces_sold") * col("price"))
    .select("order_id","cost")
    .agg({"cost": 'avg'}).first()[0]
)
print ('Average cost of a order is', avg_order_cost)

Average cost of a order is 3916.516570605187


#### We'll calculate the average percentage that a sales order contributes to the quota of each seller

In [38]:
(
    sales.join(sellers, sales.seller_id == sellers.seller_id, "inner")
    .withColumn("quota_per_order", col("num_pieces_sold") / col("daily_target") * 100)
    .groupBy(sales.seller_id)
    .agg(round(avg('quota_per_order'),4).alias('quota'))
    .sort(sales.seller_id, ascending=True)
    .show()
)

+---------+------+
|seller_id| quota|
+---------+------+
|        1|0.0189|
|        2|0.0069|
|        3|0.0168|
|        4|0.0035|
|        5|0.0041|
|        6|0.0048|
|        7|0.0029|
|        8|0.0094|
|        9|0.0036|
+---------+------+



#### We'll calculate the total sold

In [39]:
total_spent = (
    sales.join(products, sales.product_id == products.product_id)
    .withColumn("spend_per_order", col('price') * col('num_pieces_sold'))
    .select(sum("spend_per_order"))
    .first()[0]
)
print ("The total sold is", total_spent)

The total sold is 5436125


Now we'll check the total sold by seller 

In [40]:
(
    sales.join(products, sales.product_id == products.product_id)
    .withColumn("spend_per_order", col('price') * col('num_pieces_sold'))
    .groupBy('seller_id')
    .agg(first('spend_per_order')
        .alias('spend_per_order'),sum('spend_per_order').alias('spend_per_seller'))
    .select('seller_id', 'spend_per_seller')
    .sort('spend_per_seller', ascending=False)
    .show()
)

+---------+----------------+
|seller_id|spend_per_seller|
+---------+----------------+
|        7|          775363|
|        5|          646894|
|        8|          627624|
|        2|          624802|
|        3|          620734|
|        4|          567151|
|        1|          561092|
|        9|          534637|
|        6|          477828|
+---------+----------------+



In the following cell, we can see that every sale was made in July.

In [51]:
print (sales.filter((month(col("date")) == 7)).count(), 
       sales.count())

1388 1388


Finally, we'll perform a small ETL process to join the three datasets, adding some additional information that might be useful in a real case. After that, we'll save the new dataset as CSV in the 'result' folder.

In [42]:
df_joined = (products.join(sales, products.product_id == sales.product_id)
             .join(sellers, sellers.seller_id == sales.seller_id)
             .withColumn('income', col('num_pieces_sold') * col('price'))
             )

avg_income = df_joined.select(avg('income')).collect()[0][0]

df_joined = df_joined.select(
    sales.product_id,
    products.product_name,
    sales.order_id,
    sales.date,
    concat(sales.seller_id, lit('_'), sellers.seller_name).alias('seller'),
    sellers.daily_target,
    (col('num_pieces_sold') / col('daily_target') * lit(100)).alias('sales target'),
    col('income'),
    (when(col('income') > avg_income, 'Y').otherwise('N').alias('highlited sale'))
)
df_joined.show()

+----------+------------+--------+----------+----------+------------+--------------------+------+--------------+
|product_id|product_name|order_id|      date|    seller|daily_target|        sales target|income|highlited sale|
+----------+------------+--------+----------+----------+------------+--------------------+------+--------------+
|       141| product_141|  483979|2020-07-07|7_seller_7|     1946998|0.001027222421389...|  2060|             N|
|       170| product_170| 6978441|2020-07-06|8_seller_8|      547320|0.012972301395892713|  7668|             Y|
|       188| product_188|  481134|2020-07-07|5_seller_5|     1199693|0.003750959620502...|  4680|             Y|
|       189| product_189| 2994023|2020-07-03|2_seller_2|      754188|0.011402992357343262|  9288|             Y|
|       369| product_369|10480295|2020-07-03|6_seller_6|     1055915|0.002083501039382905|   528|             N|
|       442| product_442|12479439|2020-07-05|3_seller_3|      310462|0.015138728733307134|  1222

In [43]:
df_joined_csv = (df_joined.write.options (header=True, delimiter=',')
                 .mode('overwrite')
                 .csv(path_result)
)