In [47]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName('ml-diabetes').getOrCreate() 

In [48]:
sellers_df = spark.read.parquet("./sellers_parquet")
sellers_df.show()

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      257237|
|        2|   seller_2|      754188|
|        3|   seller_3|      310462|
|        4|   seller_4|     1532808|
|        5|   seller_5|     1199693|
|        6|   seller_6|     1055915|
|        7|   seller_7|     1946998|
|        8|   seller_8|      547320|
|        9|   seller_9|     1318051|
+---------+-----------+------------+



In [49]:
products_df = spark.read.parquet("./products_parquet")
products_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
|         5|   product_5|  128|
|         6|   product_6|   66|
|         7|   product_7|  145|
|         8|   product_8|   51|
|         9|   product_9|   44|
|        10|  product_10|   53|
|        11|  product_11|   13|
|        12|  product_12|  104|
|        13|  product_13|  102|
|        14|  product_14|   24|
|        15|  product_15|   14|
|        16|  product_16|   38|
|        17|  product_17|   72|
|        18|  product_18|   16|
|        19|  product_19|   46|
+----------+------------+-----+
only showing top 20 rows



In [61]:
## In order to have faster processing, we take 20% of the data
sales_df = spark.read.parquet("/home/jovyan/AKKA/LinkedIn_Projet/sales_parquet").sample(False, 0.2, seed=1)
sales_df.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|      12|         0|        0|2020-07-06|             45|efmymeftivwsfljzt...|
|      19|         0|        0|2020-07-07|             33|kbrvXuzgiuinodtkg...|
|      21|         0|        0|2020-07-06|             47|fpgeiigoieowkjcna...|
|      25|         0|        0|2020-07-06|             57|tlasdbxogrxamdmik...|
|      29|         0|        0|2020-07-07|             57|utuejyXqfdezosmsv...|
|      46|         0|        0|2020-07-08|             93|qsljhbdrxnlgkjtrA...|
|      51|         0|        0|2020-07-1

## Warm-up 1
#### Find out how many orders

In [62]:
nb_orders=sales_df[["order_id"]].count()
nb_orders

3997259

#### Find out how many products

In [63]:
nb_products=products_df[["product_id"]].count()
nb_products

75000000

#### Find out how many sellers

In [64]:
nb_sellers=sellers_df[["seller_id"]].count()
nb_sellers

10

#### How many products have been sold at least once? 

In [65]:
## Let's do this with SQL tables
sales_df.createOrReplaceTempView("sales_sql")
spark.sql("SELECT count(distinct product_id)AS_least_once from sales_sql where num_pieces_sold>=1").show()

+-------------+
|AS_least_once|
+-------------+
|       199563|
+-------------+



#### Which is the product contained in more orders?

In [66]:
spark.sql("SELECT product_id, count(product_id)Number_orders from sales_sql group by product_id order by count(product_id) DESC ").show()

+----------+-------------+
|product_id|Number_orders|
+----------+-------------+
|         0|      3797441|
|   7022971|            2|
|  18739507|            2|
|  73001064|            2|
|  30820865|            2|
|   8810147|            2|
|  35274900|            2|
|  65297189|            2|
|  24401456|            2|
|  52382988|            2|
|  12385995|            2|
|  62452078|            2|
|   2880005|            2|
|  45146366|            2|
|  61550333|            2|
|  58670435|            2|
|  62624008|            2|
|  42317113|            2|
|   1303726|            2|
|  12867755|            2|
+----------+-------------+
only showing top 20 rows



## Warm-up 2
#### How many distinct products have been sold in each day?

In [67]:
spark.sql("SELECT date, count(distinct product_id)Number_distinct_products from sales_sql group by date order by date ").show()

+----------+------------------------+
|      date|Number_distinct_products|
+----------+------------------------+
|2020-07-01|                   19708|
|2020-07-02|                   19983|
|2020-07-03|                   20182|
|2020-07-04|                   19964|
|2020-07-05|                   19671|
|2020-07-06|                   20153|
|2020-07-07|                   20065|
|2020-07-08|                   19965|
|2020-07-09|                   20272|
|2020-07-10|                   19848|
+----------+------------------------+



## Exercise #1
#### What is the average revenue of the orders?

In [68]:
## With a join between sales and product
products_df.createOrReplaceTempView("products_sql")
spark.sql("SELECT sum(num_pieces_sold*price)/count(order_id)Average_revenue from sales_sql INNER JOIN products_sql ON sales_sql.product_id = products_sql.product_id").show()

+------------------+
|   Average_revenue|
+------------------+
|1245.9022757844814|
+------------------+



## Exercise #2
For each seller, what is the average % contribution of an order to the seller's daily quota?

In [70]:
## In this case we need an join between sellers and and sales tables, final results are in %
sellers_df.createOrReplaceTempView("sellers_sql")
spark.sql("SELECT sales_sql.seller_id, ((sum(num_pieces_sold/daily_target))/count(order_id)*100)AverageContributionperOrder from sales_sql INNER JOIN sellers_sql ON sales_sql.seller_id = sellers_sql.seller_id group by sales_sql.seller_id").show()

+---------+---------------------------+
|seller_id|AverageContributionperOrder|
+---------+---------------------------+
|        7|       0.002599294631765...|
|        3|        0.01628511163743572|
|        8|        0.00922774474793346|
|        0|       0.002019729085981...|
|        5|       0.004227410572948022|
|        6|       0.004787991738113276|
|        9|       0.003856736694966...|
|        1|       0.019625671373294216|
|        4|       0.003290175148319278|
|        2|       0.006680273436516...|
+---------+---------------------------+



## Exercise #3
Who are the second most selling and the least selling persons (sellers) for each product? 

In [71]:
#Lets use pyspark.sql.functions
from pyspark.sql.functions import *

In [72]:
## First, lets filter only the product that has been ordered at least two times

countt = sales_df.groupby("product_id").count()

##We add this to sales_df and then we filter
sales_df_with_count=sales_df.join(countt, ["product_id"])
sales_df_with_count_filtered=sales_df_with_count.filter("count>'1'")
sales_df_with_count_filtered.show()

+----------+--------+---------+----------+---------------+--------------------+-----+
|product_id|order_id|seller_id|      date|num_pieces_sold|       bill_raw_text|count|
+----------+--------+---------+----------+---------------+--------------------+-----+
|  24401456|18494099|        7|2020-07-08|             57|byqoifpuplzuuxiln...|    2|
|  24401456|18986268|        6|2020-07-10|              4|ghiapazrejwnpbgmd...|    2|
|  42317113| 9993032|        8|2020-07-01|              6|wbhfyzmqtncbevazf...|    2|
|  42317113|14993107|        5|2020-07-03|             22|xrlrbykxqdmbeqjyj...|    2|
|  62624008| 4989149|        7|2020-07-04|             38|pmzvmvtstfjxgxgsc...|    2|
|  62624008|16990276|        5|2020-07-08|             40|ktucgikgplinneehe...|    2|
|  12867755| 7992807|        4|2020-07-05|             80|nBjjovuavxoxpoqwy...|    2|
|  12867755|14497679|        1|2020-07-02|            100|gccxhgicuiajkolyx...|    2|
|   1303726| 5994012|        7|2020-07-04|            

In [73]:
## Then lets groupby product_id and seller_id  and lets sum  the num_pieces_sold
sales_produit_seller = sales_df_with_count_filtered.groupBy("product_id", "seller_id").agg(sum("num_pieces_sold").alias("num_pieces_sold"))
sales_produit_seller.show()

+----------+---------+---------------+
|product_id|seller_id|num_pieces_sold|
+----------+---------+---------------+
|  24401456|        7|           57.0|
|  24401456|        6|            4.0|
|  42317113|        8|            6.0|
|  42317113|        5|           22.0|
|  62624008|        7|           38.0|
|  62624008|        5|           40.0|
|  12867755|        4|           80.0|
|  12867755|        1|          100.0|
|   1303726|        7|           83.0|
|   1303726|        3|           55.0|
|  52382988|        7|           80.0|
|  52382988|        1|           30.0|
|  30820865|        5|          101.0|
|  12385995|        2|           60.0|
|  12385995|        6|            6.0|
|    461769|        2|           14.0|
|    461769|        5|           47.0|
|  62452078|        5|           41.0|
|  62452078|        9|           78.0|
|   8810147|        9|           98.0|
+----------+---------+---------------+
only showing top 20 rows



In [74]:
#Then lets use the function rank and add it as a column, we will have ascending and descending ranks
from pyspark.sql.window import Window

sales_with_rank = sales_produit_seller.withColumn("rankDesc", dense_rank().over( Window.partitionBy("product_id").orderBy(desc("num_pieces_sold"))))
sales_with_rank2 = sales_with_rank.withColumn("rankAsc", dense_rank().over( Window.partitionBy("product_id").orderBy(asc("num_pieces_sold"))))
sales_with_rank2.show()

+----------+---------+---------------+--------+-------+
|product_id|seller_id|num_pieces_sold|rankDesc|rankAsc|
+----------+---------+---------------+--------+-------+
|  24401456|        6|            4.0|       2|      1|
|  24401456|        7|           57.0|       1|      2|
|  42317113|        8|            6.0|       2|      1|
|  42317113|        5|           22.0|       1|      2|
|  62624008|        7|           38.0|       2|      1|
|  62624008|        5|           40.0|       1|      2|
|  12867755|        4|           80.0|       2|      1|
|  12867755|        1|          100.0|       1|      2|
|   1303726|        3|           55.0|       2|      1|
|   1303726|        7|           83.0|       1|      2|
|  52382988|        1|           30.0|       2|      1|
|  52382988|        7|           80.0|       1|      2|
|  30820865|        5|          101.0|       1|      1|
|  12385995|        6|            6.0|       2|      1|
|  12385995|        2|           60.0|       1| 

### The second most selling is: 

In [75]:
sales_with_rank2.where("rankDesc=='2'").select("product_id", "seller_id").show() 

+----------+---------+
|product_id|seller_id|
+----------+---------+
|  24401456|        6|
|  42317113|        8|
|  62624008|        7|
|  12867755|        4|
|   1303726|        3|
|  52382988|        1|
|  12385995|        6|
|    461769|        2|
|  62452078|        5|
|   8810147|        9|
|  58670435|        4|
|  18739507|        9|
|   2880005|        2|
|  45146366|        5|
|  35274900|        6|
|  61550333|        5|
|  65297189|        6|
|  73001064|        5|
|  25132590|        8|
|  33675155|        4|
+----------+---------+
only showing top 20 rows



### The second least selling is: 

In [76]:
sales_with_rank2.where("rankAsc=='2'").select("product_id", "seller_id").show() 

+----------+---------+
|product_id|seller_id|
+----------+---------+
|  24401456|        7|
|  42317113|        5|
|  62624008|        5|
|  12867755|        1|
|   1303726|        7|
|  52382988|        7|
|  12385995|        2|
|    461769|        5|
|  62452078|        9|
|   8810147|        8|
|  58670435|        9|
|  18739507|        3|
|   2880005|        9|
|  45146366|        2|
|  35274900|        1|
|  61550333|        2|
|  65297189|        3|
|  73001064|        1|
|  25132590|        9|
|  33675155|        2|
+----------+---------+
only showing top 20 rows



### Who are those for product id 0?

In [77]:
sales_with_rank2.where("rankAsc=='2'").where("product_id=='0'").select("product_id", "seller_id").show() 

+----------+---------+
|product_id|seller_id|
+----------+---------+
+----------+---------+



In [78]:
sales_with_rank2.where("rankDesc=='2'").where("product_id=='0'").select("product_id", "seller_id").show() 

+----------+---------+
|product_id|seller_id|
+----------+---------+
+----------+---------+



## Exercise #4

Create a new column called "hashed_bill" defined as follows:
- if the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, 
once for each 'A' (capital 'A') present in the text. 
- E.g. if the bill text is 'nbAAnllA' you would apply hashing three times iteratively (only if the order number is even).
- if the order_id is odd: apply SHA256 to the bill textfinally, check if there is any duplicate in the column 

In [80]:
# Here import hashlib for having MD5 and SHA256 hashing for caracter 'A'. Those are resultMD5 and resultSHA256
import hashlib 
str2hash = "A"
  
result = hashlib.md5(str2hash.encode()) 
resultMD5=result.hexdigest() ## This translate the result into Hexadecimal 
    
result = hashlib.sha256(str2hash.encode())
resultSHA256 = result.hexdigest() ## This is the binary output

#Lets create two new colums where 'A' from rows in column bill_raw_test are translated to resultMD5 and resultSHA256 in 
# columns bill_raw_text_MD5 and bill_raw_text_SHA256 respectively. Those columns are added to sales_df

s = sales_df.withColumn('bill_raw_text_MD5', translate('bill_raw_text', 'A', resultMD5))
s = s.withColumn('bill_raw_text_SHA256', translate('bill_raw_text', 'A', resultSHA256))

s.show()

+--------+----------+---------+----------+---------------+--------------------+--------------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|   bill_raw_text_MD5|bill_raw_text_SHA256|
+--------+----------+---------+----------+---------------+--------------------+--------------------+--------------------+
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|uyjihlzhzcswxcccx...|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|umnxvoqbdzpbwjqmz...|umnxvoqbdzpbwjqmz...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|sgldfgtcxufasnvsc...|sgldfgtcxufasnvsc...|
|      12|         0|        0|2020-07-06|             45|efmymeftivwsfljzt...|efmymeftivwsfljzt...|efmymeftivwsfljzt...|
|      19|         0|        0|2020-07-07|             33|kbrvXuzgiuinodtkg...|kbrvXuzgiuinodtkg...|kbrvXuzgiuinodtkg...|
|      21|         0|   

In [81]:
## Here lets create a new column hashed_bill with and if else, if order_id is even we take results from bill_raw_text_MD5
## otherwise we take them from bill_raw_text_SHA256
ss=s.withColumn("hashed_bill", when(s["order_id"]%2==0 , s["bill_raw_text_MD5"] ).otherwise(s["bill_raw_text_SHA256"]))

## Then we drop columns bill_raw_text_MD5 and bill_raw_text_SHA256
columns_to_drop = ['bill_raw_text_MD5', 'bill_raw_text_SHA256']
ss = ss.drop(*columns_to_drop)

## This is the final result
ss.show()

+--------+----------+---------+----------+---------------+--------------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|         hashed_bill|
+--------+----------+---------+----------+---------------+--------------------+--------------------+
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|umnxvoqbdzpbwjqmz...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|sgldfgtcxufasnvsc...|
|      12|         0|        0|2020-07-06|             45|efmymeftivwsfljzt...|efmymeftivwsfljzt...|
|      19|         0|        0|2020-07-07|             33|kbrvXuzgiuinodtkg...|kbrvXuzgiuinodtkg...|
|      21|         0|        0|2020-07-06|             47|fpgeiigoieowkjcna...|fpgeiigoieowkjcna...|
|      25|         0|        0|2020-07-06|             57|tlasdbxogrxamdmik...|tlasdbxogrxa

In [82]:
## Finally, lets check if there is any duplicate in the column
ss.groupby("hashed_bill").count().filter("count>'1'").show()

+-----------+-----+
|hashed_bill|count|
+-----------+-----+
+-----------+-----+

