<a href="https://colab.research.google.com/github/jagatabhay/pysparktest/blob/main/FHDAL1Coding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=fc60992ba6f126580ae9c746e2e8f2618546843b7e15de76604a18d000092fe4
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, isnan, count, col, \
                                  when, year, month, sum as _sum, create_map, lit, udf, md5
import calendar
from itertools import chain

In [3]:
spark = SparkSession.builder.appName('FHDAL1Code').getOrCreate()

In [4]:
spark

In [5]:
df = spark.read.csv('drive/MyDrive/FoodHubDA/orders_test.csv',header=True,inferSchema=True)
df_customer = spark.read.csv('drive/MyDrive/FoodHubDA/customer_test.csv',header=True,inferSchema=True)
df_store = spark.read.csv('drive/MyDrive/FoodHubDA/store_test.csv',header=True,inferSchema=True, ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True)

In [6]:
df_customer = df_customer.withColumn('EncryEmail',md5('email'))
df_customer = df_customer.withColumnRenamed('id','customer_id')
df_store = df_store.withColumnRenamed('id','store_id')

Customer Dataframe Schema

In [7]:
df_customer.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- EncryEmail: string (nullable = true)



Order Dataframe Schema

In [8]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- order_date: string (nullable = true)



Store dataframe schema

In [9]:
df_store.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)



In [10]:
df_customer.columns

['customer_id', 'first_name', 'last_name', 'email', 'EncryEmail']

In [11]:
df.columns

['id', 'total', 'customer_id', 'store_id', 'order_date']

In [12]:
df_store.columns

['store_id', 'name', 'address']

First 5 rows of order dataframe

In [13]:
df.show(5)

+---+-----+-----------+--------+----------+
| id|total|customer_id|store_id|order_date|
+---+-----+-----------+--------+----------+
|  1|19.36|         21|       1|2020-03-03|
|  2| 8.85|         88|       8|2020-04-02|
|  3| 5.53|         41|       3|2020-03-03|
|  4| 12.9|         96|       8|2020-03-15|
|  5| 8.19|         25|       7|2020-01-21|
+---+-----+-----------+--------+----------+
only showing top 5 rows



First 5 rows of Customer Dataframe

In [14]:
df_customer.show(5)

+-----------+----------+---------+--------------------+--------------------+
|customer_id|first_name|last_name|               email|          EncryEmail|
+-----------+----------+---------+--------------------+--------------------+
|          1|      Sara|  Ramirez|samantha67@yahoo.com|84d79088d5b6c3377...|
|          2|    Joshua|  Jimenez|richardtimothy@ho...|91a1cef7deeb17686...|
|          3|    Nicole|  Navarro|nicholsonwilliam@...|1a9da49fcebe50789...|
|          4|      John| Anderson|jenniferhowell@ya...|a794eac7ef25e094a...|
|          5|Alexandria| Alvarado|sjohnston@young-b...|6c5e7059be20c0fc9...|
+-----------+----------+---------+--------------------+--------------------+
only showing top 5 rows



first 5 rows of store dataframe

In [15]:
df_store.show(5)

+-------------+------------+------------------+
|     store_id|        name|           address|
+-------------+------------+------------------+
|            1|  Valdez Inc|18321 Joseph Lodge|
|Christineland|   NH 69026"|              null|
|            2|Stevens-Barr|Unit 0902 Box 4445|
|DPO AE 19637"|        null|              null|
|            3|  Taylor Ltd|  3433 Hill Forest|
+-------------+------------+------------------+
only showing top 5 rows



Count Of Both Null and Missing values of Order Dataframe

In [16]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+-----+-----------+--------+----------+
| id|total|customer_id|store_id|order_date|
+---+-----+-----------+--------+----------+
|  0|    0|          0|       0|         0|
+---+-----+-----------+--------+----------+



Count Of Both Null and Missing values of Customer Dataframe

In [17]:
df_customer.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_customer.columns]).show()

+-----------+----------+---------+-----+----------+
|customer_id|first_name|last_name|email|EncryEmail|
+-----------+----------+---------+-----+----------+
|          0|         0|        0|    0|         0|
+-----------+----------+---------+-----+----------+



Count Of Both Null and Missing values of Store Dataframe

In [18]:
df_store.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_store.columns]).show()

+--------+----+-------+
|store_id|name|address|
+--------+----+-------+
|       0|   2|     10|
+--------+----+-------+



Conversion of Order Date Column to get the Month name

In [19]:
df = df.withColumn('order_date_1',to_date("order_date"))
df = df.withColumn('YEAR',year("order_date_1"))
df = df.withColumn('INTMONTH',month("order_date_1"))

In [20]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_date_1: date (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- INTMONTH: integer (nullable = true)



In [21]:
df.show(5)

+---+-----+-----------+--------+----------+------------+----+--------+
| id|total|customer_id|store_id|order_date|order_date_1|YEAR|INTMONTH|
+---+-----+-----------+--------+----------+------------+----+--------+
|  1|19.36|         21|       1|2020-03-03|  2020-03-03|2020|       3|
|  2| 8.85|         88|       8|2020-04-02|  2020-04-02|2020|       4|
|  3| 5.53|         41|       3|2020-03-03|  2020-03-03|2020|       3|
|  4| 12.9|         96|       8|2020-03-15|  2020-03-15|2020|       3|
|  5| 8.19|         25|       7|2020-01-21|  2020-01-21|2020|       1|
+---+-----+-----------+--------+----------+------------+----+--------+
only showing top 5 rows



In [22]:
monthDict = dict((index,month) \
            for index, month in enumerate(calendar.month_name) \
            if month)

for _ in monthDict.items():
    print(_)

(1, 'January')
(2, 'February')
(3, 'March')
(4, 'April')
(5, 'May')
(6, 'June')
(7, 'July')
(8, 'August')
(9, 'September')
(10, 'October')
(11, 'November')
(12, 'December')


In [23]:
mapping = create_map([lit(x) for x in chain(*monthDict.items())])
df = df.withColumn('MONTH',mapping[df['INTMONTH']])

In [24]:
df.show(5)

+---+-----+-----------+--------+----------+------------+----+--------+-------+
| id|total|customer_id|store_id|order_date|order_date_1|YEAR|INTMONTH|  MONTH|
+---+-----+-----------+--------+----------+------------+----+--------+-------+
|  1|19.36|         21|       1|2020-03-03|  2020-03-03|2020|       3|  March|
|  2| 8.85|         88|       8|2020-04-02|  2020-04-02|2020|       4|  April|
|  3| 5.53|         41|       3|2020-03-03|  2020-03-03|2020|       3|  March|
|  4| 12.9|         96|       8|2020-03-15|  2020-03-15|2020|       3|  March|
|  5| 8.19|         25|       7|2020-01-21|  2020-01-21|2020|       1|January|
+---+-----+-----------+--------+----------+------------+----+--------+-------+
only showing top 5 rows



In [25]:
df = df.select(['id','total','customer_id','store_id','YEAR','MONTH'])
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- total: double (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: string (nullable = true)



Final Dataframe, required for solution of Question1

In [26]:
df.show(5)

+---+-----+-----------+--------+----+-------+
| id|total|customer_id|store_id|YEAR|  MONTH|
+---+-----+-----------+--------+----+-------+
|  1|19.36|         21|       1|2020|  March|
|  2| 8.85|         88|       8|2020|  April|
|  3| 5.53|         41|       3|2020|  March|
|  4| 12.9|         96|       8|2020|  March|
|  5| 8.19|         25|       7|2020|January|
+---+-----+-----------+--------+----+-------+
only showing top 5 rows



Aggregate table showingthe total orders and revenueeach store had each month

Answer of Question1

In [27]:
df2 = df.join(df_store, on=['store_id'], how='inner')
df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+--------+---+-----+-----------+----+-----+----+-------+
|store_id| id|total|customer_id|YEAR|MONTH|name|address|
+--------+---+-----+-----------+----+-----+----+-------+
|       0|  0|    0|          0|   0|    0|   0|      0|
+--------+---+-----+-----------+----+-----+----+-------+



In [28]:
df2.groupBy(['YEAR','MONTH','name'])\
       .agg(count('total'),_sum('total'))\
       .withColumnRenamed('count(total)','NumberOfOrders')\
       .withColumnRenamed('sum(total)','TotalRevenue')\
       .withColumnRenamed('name','StoreName')\
       .orderBy(['YEAR','MONTH'])\
       .show()

+----+--------+--------------------+--------------+------------------+
|YEAR|   MONTH|           StoreName|NumberOfOrders|      TotalRevenue|
+----+--------+--------------------+--------------+------------------+
|2020|   April|        Stevens-Barr|            24|            343.31|
|2020|   April|         Young Group|            26|366.11999999999995|
|2020|   April|          Valdez Inc|            26|406.60999999999996|
|2020|   April|    Edwards-Mcdaniel|            30| 416.1200000000001|
|2020|   April|            Hall Inc|            25|            391.25|
|2020|   April|Martinez, Marsh a...|            23|361.03999999999996|
|2020|   April|Henderson, Olson ...|            24|319.28999999999996|
|2020|   April|Adams, Barrett an...|            27|            354.82|
|2020|   April|          Taylor Ltd|            24| 393.1499999999999|
|2020|   April|         Scott-Brown|            20|391.17999999999995|
|2020|February|Henderson, Olson ...|            25|             342.6|
|2020|

a list of users who have placed less than 10 orders

Answer of Question2

In [29]:
df.groupBy('customer_id')\
  .agg(count('id'))\
  .where(col('count(id)')<10)\
  .withColumnRenamed('count(id)','OrderPlacedByUser')\
  .show()

+-----------+-----------------+
|customer_id|OrderPlacedByUser|
+-----------+-----------------+
|         85|                6|
|         65|                7|
|         53|                9|
|         28|                7|
|         27|                9|
|         44|                5|
|         12|                8|
|         93|                7|
|         47|                6|
|          1|                9|
|         13|                5|
|         86|                7|
|         20|                9|
|         40|                7|
|         57|                9|
|         48|                9|
|          5|                4|
|         19|                5|
|         64|                6|
|         15|                9|
+-----------+-----------------+
only showing top 20 rows



In [30]:
df3 = df.join(df_customer, on=['customer_id'], how='inner')
df3 = df3.select(['customer_id','id','first_name','last_name','EncryEmail'])

In [31]:
df3.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df3.columns]).show()

+-----------+---+----------+---------+----------+
|customer_id| id|first_name|last_name|EncryEmail|
+-----------+---+----------+---------+----------+
|          0|  0|         0|        0|         0|
+-----------+---+----------+---------+----------+



Answer of Question 3

In [32]:
df3.groupBy('first_name','last_name','EncryEmail')\
  .agg(count('id'))\
  .where(col('count(id)')<10)\
  .withColumnRenamed('count(id)','OrderPlacedByUser')\
  .show()

+----------+----------+--------------------+-----------------+
|first_name| last_name|          EncryEmail|OrderPlacedByUser|
+----------+----------+--------------------+-----------------+
|   Richard|  Peterson|c637e9b68f817c6e2...|                8|
|    Jeremy|      Shaw|2f40dfd43fa285d19...|                7|
|    Alicia|  Martinez|73d4e1335c8b73f44...|                6|
|     Jason|     Moore|b5075cf22f983d637...|                7|
|    Joanna|     Terry|f9477c84df3fa1add...|                7|
|    Travis|    Phelps|a750ecf95c82cc5ff...|                7|
|      Sara|   Ramirez|84d79088d5b6c3377...|                9|
|   Nichole|    Morris|585265c94b51e0119...|                7|
|    Angela|    Taylor|10b39830a7ba95742...|                9|
|    Olivia|    Hudson|335444f3cbb088d2e...|                8|
|      Ryan|   Terrell|f7b1672d53d0ae4b0...|                6|
|    Thomas|Montgomery|2a011e8e05b91ce7c...|                8|
|      Lisa|     Mckay|c146b7fba37684f0b...|           