In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
# Start Session
spark = SparkSession.builder.appName('DiwaliSales').getOrCreate()

In [3]:
# Session summary
spark

In [4]:
# Reading CSV
df_pyspark = spark.read.csv('data.csv')

In [5]:
df_pyspark.show()

+-------+---------+----------+------+---------+---+--------------+----------------+--------+---------------+----------------+------+--------+------+--------+
|    _c0|      _c1|       _c2|   _c3|      _c4|_c5|           _c6|             _c7|     _c8|            _c9|            _c10|  _c11|    _c12|  _c13|    _c14|
+-------+---------+----------+------+---------+---+--------------+----------------+--------+---------------+----------------+------+--------+------+--------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|           State|    Zone|     Occupation|Product_Category|Orders|  Amount|Status|unnamed1|
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|     Maharashtra| Western|     Healthcare|            Auto|     1|   23952|  NULL|    NULL|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|  Andhra�Pradesh|Southern|           Govt|            Auto|     3|   23934|  NULL|    NULL|
|1001990|    Bindu| P00118542|     F|    26-35| 35| 

In [6]:
# As the above table shows it is not in proper format, i.e. it create _c0, _c1.. columns instead of using actual columns. Thus,

df_pyspark = spark.read.option('header', 'true').csv('data.csv')

# OR
# df_pyspark = spark.read.csv('data.csv', header=True, inferSchema=True)

In [7]:
df_pyspark.show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|         State|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 23924|  NULL|    NULL|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+-----

In [8]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [9]:
# Head in spark is used to get first 3 rows in list format.
df_pyspark.head(3)

[Row(User_ID='1002903', Cust_name='Sanskriti', Product_ID='P00125942', Gender='F', Age Group='26-35', Age='28', Marital_Status='0', State='Maharashtra', Zone='Western', Occupation='Healthcare', Product_Category='Auto', Orders='1', Amount='23952', Status=None, unnamed1=None),
 Row(User_ID='1000732', Cust_name='Kartik', Product_ID='P00110942', Gender='F', Age Group='26-35', Age='35', Marital_Status='1', State='Andhra�Pradesh', Zone='Southern', Occupation='Govt', Product_Category='Auto', Orders='3', Amount='23934', Status=None, unnamed1=None),
 Row(User_ID='1001990', Cust_name='Bindu', Product_ID='P00118542', Gender='F', Age Group='26-35', Age='35', Marital_Status='1', State='Uttar Pradesh', Zone='Central', Occupation='Automobile', Product_Category='Auto', Orders='3', Amount='23924', Status=None, unnamed1=None)]

In [10]:
# Like pd.DataFrame.info() we use below method to see what col has what data type.
df_pyspark.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Cust_name: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age Group: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Orders: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- unnamed1: string (nullable = true)



__Reading Columns and indexing:__

In [11]:
# To see all the columns
df_pyspark.columns

['User_ID',
 'Cust_name',
 'Product_ID',
 'Gender',
 'Age Group',
 'Age',
 'Marital_Status',
 'State',
 'Zone',
 'Occupation',
 'Product_Category',
 'Orders',
 'Amount',
 'Status',
 'unnamed1']

In [12]:
# Viewing column -> Cust_name
df_pyspark.select('Cust_name').show(5)

+---------+
|Cust_name|
+---------+
|Sanskriti|
|   Kartik|
|    Bindu|
|   Sudevi|
|     Joni|
+---------+
only showing top 5 rows



In [13]:
# Viewing Multiple Cols: Cust_name, Age
df_pyspark.select(['Cust_name', 'Age']).show(5)

+---------+---+
|Cust_name|Age|
+---------+---+
|Sanskriti| 28|
|   Kartik| 35|
|    Bindu| 35|
|   Sudevi| 16|
|     Joni| 28|
+---------+---+
only showing top 5 rows



In [14]:
df_pyspark.dtypes

[('User_ID', 'string'),
 ('Cust_name', 'string'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age Group', 'string'),
 ('Age', 'string'),
 ('Marital_Status', 'string'),
 ('State', 'string'),
 ('Zone', 'string'),
 ('Occupation', 'string'),
 ('Product_Category', 'string'),
 ('Orders', 'string'),
 ('Amount', 'string'),
 ('Status', 'string'),
 ('unnamed1', 'string')]

In [15]:
# Same as df.describe() in pandas, spark also hass .describe(). Values are NULL because it takes string into account as well.
df_pyspark.describe().show()

+-------+------------------+---------+----------+------+---------+------------------+------------------+--------------+-------+-----------+----------------+------------------+-----------------+------+--------+
|summary|           User_ID|Cust_name|Product_ID|Gender|Age Group|               Age|    Marital_Status|         State|   Zone| Occupation|Product_Category|            Orders|           Amount|Status|unnamed1|
+-------+------------------+---------+----------+------+---------+------------------+------------------+--------------+-------+-----------+----------------+------------------+-----------------+------+--------+
|  count|             11251|    11251|     11251| 11251|    11251|             11251|             11251|         11251|  11251|      11251|           11251|             11251|            11239|     0|       0|
|   mean| 1003004.488134388|     NULL|      NULL|  NULL|     NULL|35.421207003821884|0.4203181939383166|          NULL|   NULL|       NULL|            NULL|2.48

In [16]:
# Adding a new column
df_pyspark = df_pyspark.withColumn('Is 28 Year Old?', df_pyspark['Age']=='28')

# As Amount is string column we cannot execute below code but if it was int:
# df_pyspark.withColumn('Amount (in k)', df_pyspark['Amount']/1000)

# To create Column with default value:
df_pyspark = df_pyspark.withColumn('AmountinK', df_pyspark['Amount'].cast('int')/1000)

In [17]:
df_pyspark.show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|         State|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|Is 28 Year Old?|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------------+---------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|           true|   23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|          false|   23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3|

In [18]:
# Dropping Columns:
df_pyspark = df_pyspark.drop('Is 28 Year Old?')

In [19]:
# Column Rename
df_pyspark = df_pyspark.withColumnRenamed('State', 'Pradesh')
df_pyspark.show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|   23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|   23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 23924|  NULL|    NULL|   23.924|
+-------+---------+----------+------+---------

__Handling Null Values__

In [20]:
# na.drop takes how as argument which has two option:
# 1. how = any -> drop a row if it contains any null value.
# 2. how = all -> Drops rows if all the values are null.

df_pyspark.na.drop(how='all').show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|   23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|   23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 23924|  NULL|    NULL|   23.924|
+-------+---------+----------+------+---------

In [21]:
# It also has thresh as argument which will drop rows having more than k null values, given thres=k.
df_pyspark.na.drop(thresh=2 ).show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|   23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|   23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 23924|  NULL|    NULL|   23.924|
+-------+---------+----------+------+---------

In [22]:
df_pyspark.na.drop(subset=["Zone","User_ID"]).show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|   23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|   23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 23924|  NULL|    NULL|   23.924|
+-------+---------+----------+------+---------

In [23]:
# Fills every None value with "Missing Value"
# df_pyspark.na.fill("Missing Values").show(3)

# Fills every None value with 0
df_pyspark.na.fill("Married", "Status").show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+-------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount| Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+-------+--------+---------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|Married|    NULL|   23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|Married|    NULL|   23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 23924|Married|    NULL|   23.924|
+-------+---------+----------+------+---

In [26]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['AmountinK'], 
                  outputCols=['Amount_imputed']
                  ).setStrategy("mean")

In [27]:
# Use:
imputer.fit(df_pyspark).transform(df_pyspark).show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+--------------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|Amount_imputed|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+--------------+
|1002903|Sanskriti| P00125942|     F|    26-35| 28|             0|   Maharashtra| Western|Healthcare|            Auto|     1| 23952|  NULL|    NULL|   23.952|        23.952|
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|   23.934|        23.934|
|1001990|    Bindu| P00118542|     F|    26-35| 35|             1| Uttar Pradesh| Central|Automobile|            Auto|     3| 2392

__Filter Operations__

In [39]:
df_pyspark.filter("AmountinK<=23").show(3)

+-------+---------+----------+------+---------+---+--------------+-------+--------+----------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+-------+--------+----------+----------------+------+------+------+--------+---------+
|1003745|  Shivani| P00220042|     M|    26-35| 29|             0|Haryana|Northern| IT Sector|      Stationery|     2| 21563|  NULL|    NULL|   21.563|
|1001884|   Eugene| P00339042|     F|    46-50| 47|             0|  Delhi| Central|   Textile|      Stationery|     1| 21547|  NULL|    NULL|   21.547|
|1001804|    Ishit| P00000642|     F|    26-35| 35|             0|  Delhi| Central|    Lawyer|      Stationery|     1| 21533|  NULL|    NULL|   21.533|
+-------+---------+----------+------+---------+---+--------------+-------+--------+-----

In [42]:
# Works for String Values too.
df_pyspark.filter("Occupation=='Govt'").show(3)

+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|       Pradesh|    Zone|Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+--------------+--------+----------+----------------+------+------+------+--------+---------+
|1000732|   Kartik| P00110942|     F|    26-35| 35|             1|Andhra�Pradesh|Southern|      Govt|            Auto|     3| 23934|  NULL|    NULL|   23.934|
|1003224|   Kushal| P00205642|     M|    26-35| 35|             0| Uttar Pradesh| Central|      Govt|            Auto|     2| 23809|  NULL|    NULL|   23.809|
|1000813|   Lauren| P00289942|     F|    18-25| 24|             0|Andhra�Pradesh|Southern|      Govt|            Auto|     2| 23664|  NULL|    NULL|   23.664|
+-------+---------+----------+------+---------

In [43]:
# View Only Cust_name and Prodcut_ID for married users.
df_pyspark.filter("Marital_Status==1").select(["Cust_name", "Product_ID"]).show(3)

+---------+----------+
|Cust_name|Product_ID|
+---------+----------+
|   Kartik| P00110942|
|    Bindu| P00118542|
|     Joni| P00057942|
+---------+----------+
only showing top 3 rows



In [45]:
# Adding multiple conditions and & | ~ not operation.

df_pyspark.filter((df_pyspark['Age']=='28') 
                & (df_pyspark['Marital_Status']==1) & 
                (~(df_pyspark['Occupation']=='Govt') |
                 ~(df_pyspark['Pradesh']=='Delhi'))
                 ).show(3)

# Above basically looks for 28 year old married person who is either not occupied in the Govt Sector or not from Delhi.

+-------+---------+----------+------+---------+---+--------------+----------------+--------+---------------+----------------+------+------+------+--------+---------+
|User_ID|Cust_name|Product_ID|Gender|Age Group|Age|Marital_Status|         Pradesh|    Zone|     Occupation|Product_Category|Orders|Amount|Status|unnamed1|AmountinK|
+-------+---------+----------+------+---------+---+--------------+----------------+--------+---------------+----------------+------+------+------+--------+---------+
|1000588|     Joni| P00057942|     M|    26-35| 28|             1|         Gujarat| Western|Food Processing|            Auto|     2| 23877|  NULL|    NULL|   23.877|
|1000588|     Joni| P00057942|     M|    26-35| 28|             1|Himachal Pradesh|Northern|Food Processing|            Auto|     1| 23877|  NULL|    NULL|   23.877|
|1006040|     Pond| P00271242|     F|    26-35| 28|             1|           Delhi| Central|      IT Sector|Footwear & Shoes|     1| 20960|  NULL|    NULL|    20.96|
+---

__Groupby & Aggregate__

In [48]:
# Group by gender that in total spend most amount
df_pyspark.groupby("Gender").sum().show()

+------+-----------------+
|Gender|   sum(AmountinK)|
+------+-----------------+
|     F|74335.85300000003|
|     M|31913.27599999998|
+------+-----------------+



In [49]:
# Age wise average spent amount.
df_pyspark.groupby("Age Group").mean().show()

+---------+-----------------+
|Age Group|   avg(AmountinK)|
+---------+-----------------+
|    18-25|9.175482703565718|
|    26-35| 9.38415371063643|
|     0-17|9.120449324324326|
|    46-50|9.367084435401823|
|    51-55|9.953586746987956|
|    36-45|9.699953569864219|
|      55+|9.557346604215455|
+---------+-----------------+



In [51]:
# Count of age group and Gender
df_pyspark.groupby(["Age Group","Gender"]).count().show()

+---------+------+-----+
|Age Group|Gender|count|
+---------+------+-----+
|    51-55|     F|  554|
|    18-25|     M|  574|
|     0-17|     F|  162|
|    46-50|     M|  291|
|    18-25|     F| 1305|
|      55+|     M|  155|
|      55+|     F|  273|
|    36-45|     M|  705|
|    26-35|     F| 3271|
|     0-17|     M|  134|
|    36-45|     F| 1581|
|    51-55|     M|  278|
|    26-35|     M| 1272|
|    46-50|     F|  696|
+---------+------+-----+



In [59]:
# Top 3 Products
df_pyspark.groupby("Product_ID").max().show(3)

+----------+--------------+
|Product_ID|max(AmountinK)|
+----------+--------------+
| P00180642|        23.546|
| P00146342|        20.678|
| P00026042|         19.69|
+----------+--------------+
only showing top 3 rows



In [56]:
# Finding Sum of total revenue:
df_pyspark.agg({'AmountinK':'sum'}).show()

+------------------+
|    sum(AmountinK)|
+------------------+
|106249.12899999987|
+------------------+

