# Spark Operations using Spark DataFrames and Spark SQL

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz

!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:

from pyspark.sql import SparkSession

spark = SparkSession \
      .builder \
      .appName('PySpark on Google Colab') \
      .master('local[*]') \
      .getOrCreate()

### 0.Set PySpark environment.

In [None]:
# import os
# import sys
# os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
# os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
# sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

### 1.Create  SparkSession

In [None]:
# from pyspark.sql import SparkSession
# from pyspark import SparkConf
# spark = SparkSession.builder\
#         .appName("SparkSQL and SparkData Frames")\
#         .master('local[*]')\
#         .getOrCreate()

### 2. Check the Spark Session Configuration

In [None]:
spark

In [None]:
sc = spark.sparkContext

In [None]:
sc

## ** Spark DataFrame **

#### A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. 
<br> The list that defines the columns and the types within those columns is called the schema. 
<br> One can think of a DataFrame as a spreadsheet with named columns.
<br> A spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers.
<br> The reason for putting the data on more than one computer should be intuitive: 
<br>     either the data is too large to fit on one machine or 
<br>     it would simply take too long to perform that computation on one machine.

#### NOTE
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). 
<br> These different abstractions all represent distributed collections of data. 
<br> The easiest and most efficient are DataFrames, which are available in all languages.



### 3. Create Dataframe

In [None]:
myDF = spark.createDataFrame([[1, 'Alice', 30],
                              [2, 'Bob', 28],
                              [3, 'Cathy', 31], 
                              [4, 'Dave', 56]], ['Id', 'Name', 'Age'])

myDF.show()

+---+-----+---+
| Id| Name|Age|
+---+-----+---+
|  1|Alice| 30|
|  2|  Bob| 28|
|  3|Cathy| 31|
|  4| Dave| 56|
+---+-----+---+



#### Create Dataframe from an RDD

In [None]:
# Reading from local file system.
#trainRDD = sc.textFile("file:///home/thomasj/Batch78/SparkSQL/SalesData/train.csv")

# Read from hdfs file system.
trainRDD = sc.textFile("drive/My Drive/SparkSQL/data/SalesData/train.csv")
print("Total Records with header: ", trainRDD.count())

Total Records with header:  550069


In [None]:
print("\nFirst Two Records Before Removing Header\n")
print(trainRDD.take(2))


First Two Records Before Removing Header

['User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase', '1000001,P00069042,F,0-17,10,A,2,0,3,,,8370']


In [None]:
header = trainRDD.first()
header

'User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase'

In [None]:
trainRDD = trainRDD.filter(lambda line: line != header)
print("Total Records without header: ", trainRDD.count())
print("\nFirst Two Records After Removing Header\n")
print(trainRDD.take(2))

Total Records without header:  550068

First Two Records After Removing Header

['1000001,P00069042,F,0-17,10,A,2,0,3,,,8370', '1000001,P00248942,F,0-17,10,A,2,0,1,6,14,15200']


In [None]:
# Split the data into individual columns
splitRDD = trainRDD.map(lambda row:row.split(","))
print("\nFirst Two Records After Split/Parsing\n")
print(splitRDD.take(2))


First Two Records After Split/Parsing

[['1000001', 'P00069042', 'F', '0-17', '10', 'A', '2', '0', '3', '', '', '8370'], ['1000001', 'P00248942', 'F', '0-17', '10', 'A', '2', '0', '1', '6', '14', '15200']]


#### Create a dataframe for the above Data
1. Define Schema
2. Create dataframe using the above schema

#### Create Schema

In [None]:
from pyspark.sql.types import *

trainSchema = StructType([
    StructField("User_ID", StringType(), True),
    StructField("Product_ID", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Occupation", StringType(), True),
    StructField("City_Category", StringType(), True),
    StructField("Stay_In_Current_City_Years",StringType(),True),
    StructField("Marital_Status", StringType(), True),
    StructField("Product_Category_1", StringType(), True),
    StructField("Product_Category_2", StringType(), True),
    StructField("Product_Category_3", StringType(), True),
    StructField("Purchase",StringType(),True)
])

#### Create DataFrame using toDF()

In [None]:
trainDF = splitRDD.toDF(schema = trainSchema)
trainDF.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                  |                  |    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                  |                  |    1422|
|100

#### Create DataFrame using createDataFrame()

In [None]:
trainDF = spark.createDataFrame(data = splitRDD, schema=trainSchema)
trainDF.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                  |                  |    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                  |                  |    1422|
|100

### 4. DataFrame Transformations & Actions

### Transformations
In Spark, the core data structures are immutable, meaning they cannot be changed after they’re created.
<br> To “change” a DataFrame, you need to instruct Spark how you would like to modify it to do what you want.
<br> These instructions are called transformations.
<br> Transformations are the core of how you express your business logic using Spark.
<br> Transformations are simply ways of specifying different series of data manipulation.



#### Create a dataframe with one column containing 100 rows with values from 0 to 99.

In [None]:
myRange = spark.range(100).toDF('number')

In [None]:
myRange.show(10)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+
only showing top 10 rows



In [None]:
divisBy2 = myRange.where("number % 2 = 0")
divisBy2

DataFrame[number: bigint]

Notice that these return no output. <br>This is because we specified only an abstract transformation, and Spark will not act on transformations until we call an action.

### Actions
Transformations allow us to build up our logical transformation plan. 
<br> To trigger the computation, we run an action.
<br> An action instructs Spark to compute a result from a series of transformations. 
<br> The simplest action is show, which displays the records in the DataFrame

#### There are 3 types of actions
Actions to view data in the console
<br>Actions to collect data 
<br>Actions to write to output data sources

In [None]:
divisBy2.show()

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
|    10|
|    12|
|    14|
|    16|
|    18|
|    20|
|    22|
|    24|
|    26|
|    28|
|    30|
|    32|
|    34|
|    36|
|    38|
+------+
only showing top 20 rows



In [None]:
divisBy2.count()

50

In [None]:
trainDF.take(2)

[Row(User_ID='1000001', Product_ID='P00069042', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='3', Product_Category_2='', Product_Category_3='', Purchase='8370'),
 Row(User_ID='1000001', Product_ID='P00248942', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='1', Product_Category_2='6', Product_Category_3='14', Purchase='15200')]

In [None]:
trainDF.show(4,truncate=True)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                  |                  |    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                  |                  |    1422|
|100

In [None]:
trainDF.count()

550068

### 5. Reading a CSV file into a DataFrame 

In [None]:
path = "drive/My Drive/SparkSQL/data/SalesData/train.csv"

In [None]:
trainDF = spark.read.csv(path=path,header=True,sep=",", inferSchema=True)

In [None]:
trainDF.take(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

In [None]:
trainDF.show(5,truncate=False)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age |Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001|P00069042 |F     |0-17|10        |A            |2                         |0             |3                 |null              |null              |8370    |
|1000001|P00248942 |F     |0-17|10        |A            |2                         |0             |1                 |6                 |14                |15200   |
|1000001|P00087842 |F     |0-17|10        |A            |2                         |0             |12                |null              |null              |1422    |
|100

In [None]:
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



#### Getting the  shape of the spark data frame
* As such there is no shape command directly in spark we need to get it from the length of columns and 
  count of records

In [None]:
## To Count the number of rows in DataFrame
print('Total records count in train dataset is {}'.format(trainDF.count()))

Total records count in train dataset is 550068


In [None]:
## Columns count and column names
print("Total Columns count in train dataset is {}".format(len(trainDF.columns)))
print("\n\nColumns in train dataset are: {} \n".format(trainDF.columns))

Total Columns count in train dataset is 12


Columns in train dataset are: ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Purchase'] 



### 6. Verify Schema

In [None]:
## Print Schema
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [None]:
trainDF.dtypes

[('User_ID', 'int'),
 ('Product_ID', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Occupation', 'int'),
 ('City_Category', 'string'),
 ('Stay_In_Current_City_Years', 'string'),
 ('Marital_Status', 'int'),
 ('Product_Category_1', 'int'),
 ('Product_Category_2', 'int'),
 ('Product_Category_3', 'int'),
 ('Purchase', 'int')]

#### Getting the Columns from the SparkDataframe

In [None]:
trainDF.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase']

In [None]:
type(trainDF.columns)
trainDF.take(2)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200)]

### 7.To Show first n observations

In [None]:
## Use head operation to see first n observations (say, 2 observations). 
## Head operation in PySpark is similar to head operation in Pandas.
trainDF.head(2)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200)]

In [None]:
## Above results are comprised of row like format. 
## To see the result in more interactive manner (rows under the columns), Use the show operation. 
## Show operation on train and take first 5 rows of it. 
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

### 8.Summary statistics

In [None]:
## To get the summary statistics (mean, standard deviance, min ,max , count) of numerical columns in a DataFrame
trainDF.describe().show(truncate=False)

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|User_ID           |Product_ID|Gender|Age   |Occupation       |City_Category|Stay_In_Current_City_Years|Marital_Status     |Product_Category_1|Product_Category_2|Product_Category_3|Purchase         |
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|count  |550068            |550068    |550068|550068|550068           |550068       |550068                    |550068             |550068            |376430            |166821            |550068           |
|mean   |1003028.8424013031|null      |null  |null  |8.076706879876669|null         |1.468494139793958         |0.40965298835780306|5.404270017525106 |9.842329251122386

In [None]:
## Check what happens when we specify the name of a categorical / String columns in describe operation.
## describe operation is working for String type column but the output for mean, stddev are null and 
## min & max values are calculated based on ASCII value of categories.
trainDF.describe(['Purchase']).show()

+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+



### 9. a. Adding Columns

In [None]:
## More Formal way
from pyspark.sql.functions import lit
trainDF.withColumn("Year", lit("2019")).show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Year|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|2019|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|2019|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+--------

In [None]:
tempDF = trainDF.withColumn("SameCategoryCode", 
trainDF["Product_Category_1"] == trainDF["Product_Category_2"])
tempDF.show(4)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|SameCategoryCode|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|            null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|           false|
|1000001| P00087842|     F|0-17|        10|            A|                         2| 

### 9.b.Renaming Columns

In [None]:
tempDF = tempDF.withColumnRenamed("SameCategoryCode", "SimilarCategory")

In [None]:
tempDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|SimilarCategory|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|           null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          false|
+-------+----------+------+----+----------+-------------+--------------------------+------

### 9.c.Removing Columns

In [None]:
tempDF.drop("SimilarCategory").show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

### 10. Changing a Column’s Type (cast)

In [None]:
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [None]:
trainDF = trainDF.withColumn("Purchase",trainDF.Purchase.cast(IntegerType()))

### 11. Splitting the data into Train and Test

In [None]:
trainDF,testDF, testDF2  = trainDF.randomSplit([0.4, 0.3, 0.3], seed=1234)
print(trainDF.count())
print(testDF.count())

220514
164951


### 12. Working with Nulls in Data

In [None]:
from pyspark.sql.functions import isnan, when, count, col
trainDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in trainDF.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|             69456|            153673|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



#### To drop the all rows with null value?
##### Use **dropna()** operation. 
  To drop row from the DataFrame it consider three options.
* **how** – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.

* **thresh** – int, default None If specified, drop rows that have less than thresh non-null values.This overwrites the how parameter.

* **subset** – optional list of column names to consider.

#### Drop null rows in train with default parameters and count the rows in output DataFrame. 
#### Default options are any, None, None for how, thresh, subset respectively.

In [None]:
print(trainDF.dropna().count())
print(trainDF.na.drop().count())
print(trainDF.na.drop("any").count())

66841
66841
66841


#### To replace the null values in DataFrame with constant number
#### Use **fillna()** operation. 

 The fillna will take two parameters to fill the null values.
* **value**:
    - It will take a dictionary to specify which column will replace with which value.A value (int , float, string) for all columns.
* **subset**: Specify some selected columns.



In [None]:
##Fill ‘-1’ inplace of null values in train DataFrame.
trainDF.fillna(-1).show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00059442|     F|0-17|        10|            A|                         2|             0|                 6|                 8|                16|   16622|
|1000001| P00085442|     F|0-17|        10|            A|                         2|             0|                12|                14|                -1|    1057|
|1000001| P00111842|     F|0-17|        10|            A|                         2|             0|                 8|                -1|                -1|    8094|
|100

In [None]:
## Filling with different values for different columns
fill_cols_vals = {
"Gender": 'M',
"Purchase" : 999999
}
trainDF.na.fill(fill_cols_vals).count()

220514

### 13. Distinct Values

In [None]:
## To find the number of distinct product in train and test datasets
## To calculate the number of distinct products in train and test datasets apply distinct operation.
print("Distinct values in Product_ID's in train dataset are {}".format(trainDF.select('Product_ID').distinct().count()))
print("Distinct values in Product_ID's in test dataset are {}".format(testDF.select('Product_ID').distinct().count()))

Distinct values in Product_ID's in train dataset are 3490
Distinct values in Product_ID's in test dataset are 3430


#### Differences in two columns

In [None]:
## From the above we can see the train file has more categories than test file. 
## Let us check what are the categories for Product_ID, which are in test file but not in train file by 
## applying subtract operation.
## We can do the same for all categorical features.
diff_cat_in_test_train=testDF.select('Product_ID').subtract(trainDF.select('Product_ID'))
print("Count of Product_ID's there in test dataset but not train dataset are {}".format(diff_cat_in_test_train.count()))

diff_cat_in_train_test=trainDF.select('Product_ID').subtract(testDF.select('Product_ID'))
print("Count of Product_ID's there in train dataset but not test dataset are {}".format(diff_cat_in_train_test.count()))

Count of Product_ID's there in test dataset but not train dataset are 83
Count of Product_ID's there in train dataset but not test dataset are 143


### 14. Using Spark SQL 
With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. 
<br>There is no performance difference between writing SQL queries or writing DataFrame code, <br>they both “compile” to the same underlying plan that we specify in DataFrame code.

In [None]:
## Create view/table
trainDF.createOrReplaceTempView("trainDFTable")

In [None]:
## Verify Table
spark.sql("SELECT * FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00059442|     F|0-17|        10|            A|                         2|             0|                 6|                 8|                16|   16622|
|1000001| P00085442|     F|0-17|        10|            A|                         2|             0|                12|                14|              null|    1057|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



#### Column References

#### Select & SelectExpr

In [None]:
## Multiple ways of referring a column in a dataframe
from pyspark.sql.functions import expr, col, column

trainDF.select(expr("User_ID AS userID") , col("User_ID"), 
               column("User_ID"), "User_ID").show(2)

+-------+-------+-------+-------+
| userID|User_ID|User_ID|User_ID|
+-------+-------+-------+-------+
|1000001|1000001|1000001|1000001|
|1000001|1000001|1000001|1000001|
+-------+-------+-------+-------+
only showing top 2 rows



#### Pandas dot notation doesn't work here 

In [None]:
result = trainDF.User_ID

This will save/assign a column name to the newly created variable

In [None]:
# select content from the above column
trainDF.select(result).show(2)

+-------+
|User_ID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [None]:
spark.sql("SELECT User_ID AS userID FROM trainDFTable").show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [None]:
trainDF.selectExpr("User_ID AS userID", "Product_ID AS productID").show(2)

+-------+---------+
| userID|productID|
+-------+---------+
|1000001|P00059442|
|1000001|P00085442|
+-------+---------+
only showing top 2 rows



#### Converting to Spark Types (Literals)
Sometimes we need to pass explicit values into Spark that aren’t a new column but are just a value in all the rows. This might be a constant value or something we’ll need to compare to later on. The way we do this is through literals. 
This is basically a translation from a given programming language’s literal value to one that Spark understands. 
Literals are expressions and can be used in the same way.

In [None]:
from pyspark.sql.functions import lit
trainDF.select("*", lit(1).alias('One')).show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00059442|     F|0-17|        10|            A|                         2|             0|                 6|                 8|                16|   16622|  1|
|1000001| P00085442|     F|0-17|        10|            A|                         2|             0|                12|                14|              null|    1057|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

In [None]:
## In SQL, literals are just the specific value.
trainDF.createOrReplaceTempView('trainDFTable')
spark.sql("SELECT *, 2019 as Year FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Year|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----+
|1000001| P00059442|     F|0-17|        10|            A|                         2|             0|                 6|                 8|                16|   16622|2019|
|1000001| P00085442|     F|0-17|        10|            A|                         2|             0|                12|                14|              null|    1057|2019|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+--------

#### Pair wise Frequencies - Crosstab

In [None]:
## To calculate pair wise frequency of categorical columns
## Use crosstab operation on DataFrame to calculate the pair wise frequency of columns. 
## Apply crosstab operation on ‘Age’ and ‘Gender’ columns of train DataFrame.
trainDF.crosstab('Gender', 'Age').show()

+----------+----+-----+-----+-----+-----+-----+----+
|Gender_Age|0-17|18-25|26-35|36-45|46-50|51-55| 55+|
+----------+----+-----+-----+-----+-----+-----+----+
|         M|4014|30307|67844|33112|13017|11410|6599|
|         F|1976| 9926|20151|10849| 5258| 4021|2030|
+----------+----+-----+-----+-----+-----+-----+----+



In [None]:
trainDF.groupBy('Age', 'Gender').count().show()

+-----+------+-----+
|  Age|Gender|count|
+-----+------+-----+
|51-55|     F| 4021|
|18-25|     M|30307|
| 0-17|     F| 1976|
|46-50|     M|13017|
|18-25|     F| 9926|
|  55+|     M| 6599|
|  55+|     F| 2030|
|36-45|     M|33112|
|26-35|     F|20151|
| 0-17|     M| 4014|
|36-45|     F|10849|
|51-55|     M|11410|
|26-35|     M|67844|
|46-50|     F| 5258|
+-----+------+-----+



In [None]:
spark.sql("""select Age,
    sum(case when Gender = 'F' then 1 else 0 end) F,
    sum(case when Gender = 'M' then 1 else 0 end) M
from trainDFTable
group by Age""").show()

# spark.sql("""select Age,
#     count(*) total,
#     sum(case when Gender = 'F' then 1 else 0 end) F,
#     sum(case when Gender = 'M' then 1 else 0 end) M
# from trainDFTable
# group by Age""").show()

+-----+-----+-----+
|  Age|    F|    M|
+-----+-----+-----+
|18-25| 9926|30307|
|26-35|20151|67844|
| 0-17| 1976| 4014|
|46-50| 5258|13017|
|51-55| 4021|11410|
|36-45|10849|33112|
|  55+| 2030| 6599|
+-----+-----+-----+



#### Removing Duplicates

In [None]:
##To get the DataFrame without any duplicate rows of given a DataFrame
##Use dropDuplicates operation to drop the duplicate rows of a DataFrame. 
## In this command, performing this on two columns Age and Gender of train dataset and 
## Get the all unique rows for these two columns.
trainDF.select('Age','Gender').dropDuplicates().show()

+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|18-25|     M|
| 0-17|     F|
|46-50|     M|
|18-25|     F|
|  55+|     M|
|  55+|     F|
|36-45|     M|
|26-35|     F|
| 0-17|     M|
|36-45|     F|
|51-55|     M|
|26-35|     M|
|46-50|     F|
+-----+------+



#### Filtering the rows

In [None]:
## To filter the rows in train dataset which has Purchases more than 15000
## apply the filter operation on Purchase column in train DataFrame 
## to filter out the rows with values more than 15000. 
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(trainDF.Purchase > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(col("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(column("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(expr("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(trainDF["Purchase"] > 15000).count()))

Count of rows where Purchase Amount more than 15000 are 44075
Count of rows where Purchase Amount more than 15000 are 44075
Count of rows where Purchase Amount more than 15000 are 44075
Count of rows where Purchase Amount more than 15000 are 44075
Count of rows where Purchase Amount more than 15000 are 44075


In [None]:
spark.sql("""
SELECT 
COUNT(*) AS Count
FROM trainDFTable
WHERE Purchase > 15000""").show()

+-----+
|Count|
+-----+
|44075|
+-----+



In [None]:
trainDF.where("Purchase > 15000").where("Gender = 'F'").count()

8535

In [None]:
trainDF.filter("Purchase > 15000").where("Gender = 'F'").count()

8535

In [None]:
trainDF.where((col("Purchase") > 15000) & (col("Gender") == 'F')).count()

8535

In [None]:
trainDF.filter((col("Purchase") > 15000) & (col("Gender") == 'F')).count()

8535

In [None]:
spark.sql("SELECT * FROM trainDFTable WHERE Purchase > 15000 AND Gender = 'F'").count()

8535

### 15. Aggregations

#### Count Distinct

In [None]:
from pyspark.sql.functions import countDistinct
trainDF.select(countDistinct("Age")).show()

+-------------------+
|count(DISTINCT Age)|
+-------------------+
|                  7|
+-------------------+



#### Approximate Count Distinct
* **Parameters:**
    * col - Name of the column
    * rsd – maximum estimation error allowed (default = 0.05).

In [None]:
from pyspark.sql.functions import approx_count_distinct
trainDF.select(approx_count_distinct(col="Age", rsd=0.1)).show()

+--------------------------+
|approx_count_distinct(Age)|
+--------------------------+
|                         7|
+--------------------------+



#### First and Last

In [None]:
from pyspark.sql.functions import first, last
trainDF.select(first("Product_ID", ignorenulls = True), last("Product_ID")).show()

+-----------------------+-----------------------+
|first(Product_ID, true)|last(Product_ID, false)|
+-----------------------+-----------------------+
|              P00059442|              P00349442|
+-----------------------+-----------------------+



#### Min and Max

In [None]:
from pyspark.sql.functions import min, max
trainDF.select(min("Purchase"), max("Purchase")).show()

+-------------+-------------+
|min(Purchase)|max(Purchase)|
+-------------+-------------+
|           12|        23961|
+-------------+-------------+



#### Sum

In [None]:
from pyspark.sql.functions import sum
trainDF.select(sum("Purchase")).show()

+-------------+
|sum(Purchase)|
+-------------+
|   2039763128|
+-------------+



#### sumDistinct

In [None]:
from pyspark.sql.functions import sumDistinct
trainDF.select(sumDistinct("Purchase")).show()

+----------------------+
|sum(DISTINCT Purchase)|
+----------------------+
|             180521433|
+----------------------+



#### Avg

In [None]:
from pyspark.sql.functions import sum, count, avg, expr

trainDF.select(
    count("Purchase").alias("total_transactions"),
    sum("Purchase").alias("total_purchases"),
    avg("Purchase").alias("avg_purchases"),
    expr("mean(Purchase)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+-----------------+-----------------+
|(total_purchases / total_transactions)|    avg_purchases|   mean_purchases|
+--------------------------------------+-----------------+-----------------+
|                     9250.039126767462|9250.039126767462|9250.039126767462|
+--------------------------------------+-----------------+-----------------+



#### Variance and Standard Deviation

In [None]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

trainDF.select(var_pop("Purchase"), var_samp("Purchase"),
  stddev_pop("Purchase"), stddev_samp("Purchase")).show()

+-------------------+-------------------+--------------------+---------------------+
|  var_pop(Purchase)| var_samp(Purchase)|stddev_pop(Purchase)|stddev_samp(Purchase)|
+-------------------+-------------------+--------------------+---------------------+
|2.515591314684939E7|2.515602722589755E7|   5015.567081282972|    5015.578453767576|
+-------------------+-------------------+--------------------+---------------------+



In [None]:
spark.sql("""SELECT var_pop(Purchase), var_samp(Purchase),
             stddev_pop(Purchase), stddev_samp(Purchase)
             FROM trainDFTable""").show()

+---------------------------------+----------------------------------+------------------------------------+-------------------------------------+
|var_pop(CAST(Purchase AS DOUBLE))|var_samp(CAST(Purchase AS DOUBLE))|stddev_pop(CAST(Purchase AS DOUBLE))|stddev_samp(CAST(Purchase AS DOUBLE))|
+---------------------------------+----------------------------------+------------------------------------+-------------------------------------+
|              2.515591314684939E7|               2.515602722589755E7|                   5015.567081282972|                    5015.578453767576|
+---------------------------------+----------------------------------+------------------------------------+-------------------------------------+



#### skewness and kurtosis

In [None]:
from pyspark.sql.functions import skewness, kurtosis
trainDF.select(skewness("Purchase"), kurtosis("Purchase")).show()

+------------------+-------------------+
|skewness(Purchase)| kurtosis(Purchase)|
+------------------+-------------------+
|0.6025760119887242|-0.3280309873336331|
+------------------+-------------------+



In [None]:
spark.sql("""SELECT skewness(Purchase), kurtosis(Purchase)
             FROM trainDFTable""").show()

+----------------------------------+----------------------------------+
|skewness(CAST(Purchase AS DOUBLE))|kurtosis(CAST(Purchase AS DOUBLE))|
+----------------------------------+----------------------------------+
|                0.6025760119887242|               -0.3280309873336331|
+----------------------------------+----------------------------------+



#### Covariance and Correlation

In [None]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
trainDF.select(corr("Product_Category_1", "Purchase"), covar_samp("Product_Category_1", "Purchase"),
    covar_pop("Product_Category_1", "Purchase")).show()

+----------------------------------+----------------------------------------+---------------------------------------+
|corr(Product_Category_1, Purchase)|covar_samp(Product_Category_1, Purchase)|covar_pop(Product_Category_1, Purchase)|
+----------------------------------+----------------------------------------+---------------------------------------+
|              -0.34450360021154175|                       -6796.78011329084|                     -6796.749290848214|
+----------------------------------+----------------------------------------+---------------------------------------+



In [None]:
spark.sql("""SELECT corr(Product_Category_1, Purchase), covar_samp(Product_Category_1, Purchase),
             covar_pop(Product_Category_1, Purchase)
             FROM trainDFTable""").show()

+------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------+
|corr(CAST(Product_Category_1 AS DOUBLE), CAST(Purchase AS DOUBLE))|covar_samp(CAST(Product_Category_1 AS DOUBLE), CAST(Purchase AS DOUBLE))|covar_pop(CAST(Product_Category_1 AS DOUBLE), CAST(Purchase AS DOUBLE))|
+------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------+
|                                              -0.34450360021154175|                                                       -6796.78011329084|                                                     -6796.749290848214|
+------------------------------------------------------------------+------------------------------------------------------------------------+---

#### Complex Aggregations

In [None]:
from pyspark.sql.functions import collect_set, collect_list
trainDF.agg(collect_set("Age"), collect_list("Age")).show()

+--------------------+--------------------+
|    collect_set(Age)|   collect_list(Age)|
+--------------------+--------------------+
|[55+, 51-55, 0-17...|[0-17, 0-17, 0-17...|
+--------------------+--------------------+



In [None]:
spark.sql("""SELECT collect_set(Age), collect_list(Age) FROM trainDFTable""").show()

+--------------------+--------------------+
|    collect_set(Age)|   collect_list(Age)|
+--------------------+--------------------+
|[55+, 51-55, 0-17...|[0-17, 0-17, 0-17...|
+--------------------+--------------------+



#### Grouping

In [None]:
trainDF.groupBy("Age", "Gender").count().show()

+-----+------+-----+
|  Age|Gender|count|
+-----+------+-----+
|51-55|     F| 4021|
|18-25|     M|30307|
| 0-17|     F| 1976|
|46-50|     M|13017|
|18-25|     F| 9926|
|  55+|     M| 6599|
|  55+|     F| 2030|
|36-45|     M|33112|
|26-35|     F|20151|
| 0-17|     M| 4014|
|36-45|     F|10849|
|51-55|     M|11410|
|26-35|     M|67844|
|46-50|     F| 5258|
+-----+------+-----+



In [None]:
trainDF.select("Age","Gender","Purchase").groupBy("Age","Gender").sum("Purchase").alias("Age Group Purchase").show()

+-----+------+-------------+
|  Age|Gender|sum(Purchase)|
+-----+------+-------------+
|51-55|     F|     36202846|
|18-25|     M|    285067216|
| 0-17|     F|     16602902|
|46-50|     M|    121957420|
|18-25|     F|     82547523|
|  55+|     M|     62468497|
|  55+|     F|     18103298|
|36-45|     M|    313934889|
|26-35|     F|    176288527|
| 0-17|     M|     36869675|
|36-45|     F|     96853551|
|51-55|     M|    110785049|
|26-35|     M|    636249540|
|46-50|     F|     45832195|
+-----+------+-------------+



In [None]:
trainDF.select("Age","Gender","Purchase").groupBy("Age","Gender").agg(sum("Purchase").alias("Age Group Purchase"), avg("Purchase").alias("Mean Age Group Purchase")).show()

+-----+------+------------------+-----------------------+
|  Age|Gender|Age Group Purchase|Mean Age Group Purchase|
+-----+------+------------------+-----------------------+
|51-55|     F|          36202846|       9003.44342203432|
|18-25|     M|         285067216|      9405.985943841357|
| 0-17|     F|          16602902|      8402.278340080971|
|46-50|     M|         121957420|      9369.088115541215|
|18-25|     F|          82547523|       8316.29286721741|
|  55+|     M|          62468497|      9466.358084558266|
|  55+|     F|          18103298|       8917.88078817734|
|36-45|     M|         313934889|      9481.000513409035|
|26-35|     F|         176288527|      8748.376110366731|
| 0-17|     M|          36869675|      9185.270303936224|
|36-45|     F|          96853551|      8927.417365655821|
|51-55|     M|         110785049|       9709.46967572305|
|26-35|     M|         636249540|      9378.125405341667|
|46-50|     F|          45832195|      8716.659376188665|
+-----+------+

#### Grouping with Expressions

In [None]:
trainDF.groupBy("Age").agg(
  count("Purchase").alias("quan"),
  expr("count(Purchase)")).show()

+-----+-----+---------------+
|  Age| quan|count(Purchase)|
+-----+-----+---------------+
|18-25|40233|          40233|
|26-35|87995|          87995|
| 0-17| 5990|           5990|
|46-50|18275|          18275|
|51-55|15431|          15431|
|36-45|43961|          43961|
|  55+| 8629|           8629|
+-----+-----+---------------+



In [None]:
trainDF.groupBy("Age").agg(expr("avg(Purchase)"),expr("stddev_pop(Purchase)")).show()

+-----+-----------------+--------------------+
|  Age|    avg(Purchase)|stddev_pop(Purchase)|
+-----+-----------------+--------------------+
|18-25|9137.144607660379|   5030.064681284094|
|26-35|9233.911779078357|   5000.033602828318|
| 0-17|8926.974457429049|   5085.392112027391|
|46-50|9181.374281805745|  4927.6155514087095|
|51-55|9525.493811159355|   5080.055180428372|
|36-45|9344.383430768181|   5028.548097475931|
|  55+|9337.327036736586|   5026.353904353003|
+-----+-----------------+--------------------+



In [None]:
## To find the mean of each age group in train dataset - Average purchases in each age group
trainDF.groupby('Age').agg({'Purchase': 'mean'}).show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|18-25|9137.144607660379|
|26-35|9233.911779078357|
| 0-17|8926.974457429049|
|46-50|9181.374281805745|
|51-55|9525.493811159355|
|36-45|9344.383430768181|
|  55+|9337.327036736586|
+-----+-----------------+



In [None]:
trainDF.groupby('Age').agg({'Purchase': 'sum'}).show()

+-----+-------------+
|  Age|sum(Purchase)|
+-----+-------------+
|18-25|    367614739|
|26-35|    812538067|
| 0-17|     53472577|
|46-50|    167789615|
|51-55|    146987895|
|36-45|    410788440|
|  55+|     80571795|
+-----+-------------+



In [None]:
## Apply sum, min, max, count with groupby to get different summary insight for each group. 
exprs = {x: "sum" for x in trainDF.columns}
trainDF.groupBy("Age").agg(exprs).show(5)

+-----+------------------+-----------------------+-------------------+-------------+------------+---------------+-------------------------------+-----------------------+--------+-----------+-----------------------+---------------+
|  Age|sum(City_Category)|sum(Product_Category_3)|sum(Marital_Status)|sum(Purchase)|sum(User_ID)|sum(Occupation)|sum(Stay_In_Current_City_Years)|sum(Product_Category_1)|sum(Age)|sum(Gender)|sum(Product_Category_2)|sum(Product_ID)|
+-----+------------------+-----------------------+-------------------+-------------+------------+---------------+-------------------------------+-----------------------+--------+-----------+-----------------------+---------------+
|18-25|              null|                 156493|               8469|    367614739| 40345895081|         271247|                        47104.0|                 204957|    null|       null|                 264391|           null|
|26-35|              null|                 338487|              34678|    81

### 16. User-Defined Functions

##### a. simple UDF function for finding the cube of a number

In [None]:
udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):
    return double_value ** 3

power3(2.0)

8.0

Once the function is created, we need to register them with Spark so that we can used
them on all of our worker machines. Spark will serialize the function on the driver, and transfer it over the network to all executor processes. This happens regardless of language.

<br>Once we go to use the function, there are essentially two different things that occur. If the function is written in Scala or Java then we can use that function within the JVM. This means there will be little performance penalty aside from the fact that we can’t take advantage of code generation capabilities that Spark has for built-in functions.

<br>If the function is written in Python, something quite different happens. 
Spark will start up a python process on the worker, serialize all of the data to a format that python can understand (remember it was in the JVM before), execute the function row by row on that data in the python process, before finally returning the results of the row operations to the JVM and Spark.

In [None]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [None]:
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show()

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+



##### b. Binning of Purchase column

In [None]:
def binning_purchase(purchase):
    """
    args:
        Accepts Purchase amount and returns the correspondin bin
    return:
        bin number (Bin01,02,....) type=String
    0       - 500       -> Bin01
    501     - 1000      -> Bin02
    1001    - 2000      -> Bin03
    2001    - 4000      -> Bin04
    4001    - 6000      -> Bin05
    6001    - 8000      -> Bin06
    8001    - 10000     -> Bin07
    10001   - 20000     -> Bin08
    20001   - 30000     -> Bin09
    """
    if float(purchase) > 0:
        purchase = float(purchase)
    else:
        purchase = float(0)
    
    if purchase <= 500: return str("Bin01")
    elif (purchase > 500 and purchase <= 1000): return str("Bin02")
    elif (purchase > 1000 and purchase <= 2000): return str("Bin03")
    elif (purchase > 2000 and purchase <= 4000): return str("Bin04")
    elif (purchase > 4000 and purchase <= 6000): return str("Bin05")
    elif (purchase > 6000 and purchase <= 8000): return str("Bin06")
    elif (purchase > 8000 and purchase <= 10000): return str("Bin07")
    elif (purchase > 10000 and purchase <= 20000): return str("Bin08")
    else:
        return str("Bin09")
    
    

In [None]:
bin_purchase_udf = udf(binning_purchase)

In [None]:
trainDF.withColumn('Binned_Purchase',bin_purchase_udf('Purchase')).select("Purchase","Binned_Purchase").show(4)

+--------+---------------+
|Purchase|Binned_Purchase|
+--------+---------------+
|   16622|          Bin08|
|    1057|          Bin03|
|    8094|          Bin07|
|   10003|          Bin08|
+--------+---------------+
only showing top 4 rows



## 17. Joins

#### Dataset
* The data is obtained from Surfeous,a recommender system prototype that uses social annotations (e.g., tags) and contextual models to find restaurants that best suit the user preferences.It is a publicly available dataset in UCI.It has threee tables restaurants,consumers and user rating.The tables we choose are from them which are fitered for our scenario


#### Data dictionary :
* __RestGenInfo.csv__ contains :
    * placeID - Uniqued Id of restaurants
    * latitude - Location detail 
    * longitude - Location detail
    * name - Name of the restaurant
    * state - Name of the state 
    * alcohol - Constraints on having alcoholic beverages
    * smoking_area - Information for smokers
    * price - Pricing type of restaurant
    * franchise - Does the restaurant have frachise
    * area - open or close type of restaurant

* __Cuisine.csv__ contains :
    * placeID - Uniqued Id of restaurants
    * Rcuisine - Different styles of food

    
* __PaymentMode.csv__ contains :
    * placeID - Uniqued Id of restaurants
    * Rpayment - Different modes of payment

    
* __parking.csv__ contains :
     * placeID - Uniqued Id of restaurants
     * parking_lot - Different types of parking available

#### Read the data as a dataframe

In [None]:
restoGen = spark.read.csv('drive/My Drive/SparkSQL/data/RestaurantsData/RestGenInfo.csv', header=True, inferSchema=True,nullValue='?')
cuisine = spark.read.csv('drive/My Drive/SparkSQL/data/RestaurantsData/Cuisine.csv', header=True, inferSchema=True)
paymentMode = spark.read.csv('drive/My Drive/SparkSQL/data/RestaurantsData/PaymentMode.csv', header=True, inferSchema=True)
parking = spark.read.csv('drive/My Drive/SparkSQL/data/RestaurantsData/parking.csv', header=True, inferSchema=True)

In [None]:
restoGen.show()

+-------+----------+------------+--------------------+----------+-----------------+-------------+------+---------+------+
|placeID|  latitude|   longitude|                name|     state|          alcohol| smoking_area| price|franchise|  area|
+-------+----------+------------+--------------------+----------+-----------------+-------------+------+---------+------+
| 132560|23.7523041| -99.1669133|  puesto de gorditas|Tamaulipas|No_Alcohol_Served|    permitted|   low|        f|  open|
| 132561| 23.726819| -99.1265059|          cafe ambar|      null|No_Alcohol_Served|         none|   low|        f|closed|
| 132564|23.7309245| -99.1451848|             churchs|      null|No_Alcohol_Served|         none|   low|        f|closed|
| 132572|22.1416471|-100.9927118|        Cafe Chaires|       SLP|No_Alcohol_Served|not permitted|   low|        f|closed|
| 132583|18.9222904|  -99.234332|    McDonalds Centro|   Morelos|No_Alcohol_Served|not permitted|   low|        t|closed|
| 132584|23.7523648| -99

In [None]:
restoGen.count()

130

In [None]:
restoGen.select('PlaceID').distinct().count()

130

#### Check for any null values in the data

In [None]:
restoGen.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in restoGen.columns]).show()
cuisine.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in cuisine.columns]).show()
paymentMode.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in paymentMode.columns]).show()
parking.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in parking.columns]).show()

+-------+--------+---------+----+-----+-------+------------+-----+---------+----+
|placeID|latitude|longitude|name|state|alcohol|smoking_area|price|franchise|area|
+-------+--------+---------+----+-----+-------+------------+-----+---------+----+
|      0|       0|        0|   0|   18|      0|           0|    0|        0|   0|
+-------+--------+---------+----+-----+-------+------------+-----+---------+----+

+-------+--------+
|placeID|Rcuisine|
+-------+--------+
|      0|       0|
+-------+--------+

+-------+--------+
|placeID|Rpayment|
+-------+--------+
|      0|       0|
+-------+--------+

+-------+-----------+
|placeID|parking_lot|
+-------+-----------+
|      0|          0|
+-------+-----------+



In [None]:
restoGen = restoGen.dropna()

In [None]:
restoGen.select('placeID').distinct().count()

In [None]:
cuisine.select('placeID').distinct().count()

In [None]:
paymentMode.select('placeID').distinct().count()

In [None]:
parking.select('placeID').distinct().count()

In [None]:
cuisine.select('Rcuisine').distinct().count()

In [None]:
restoGen.createOrReplaceTempView('restoGenTable')
cuisine.createOrReplaceTempView('cuisineTable')
paymentMode.createOrReplaceTempView('paymentModeTable')
parking.createOrReplaceTempView('parkingTable')

 ## The  count of restaurants(as numberOfHotels) for each payment modes and area. Also order them based on numberOfHotels in descending order.

In [None]:
spark.sql('''select  count(*) as numberOfHotels, Rpayment, area from
restoGenTable a join paymentModeTable b 
where a.placeID = b.placeID group by Rpayment, area 
order by numberOfHotels desc''').show()

#### Inner Join

In [None]:
inner_join = restoGen.join(paymentMode, restoGen.placeID == paymentMode.placeID,how='inner') 
inner_join.show(4)

In [None]:
count_of_hotels = inner_join.select('Rpayment','area').groupby('area','Rpayment').count()

count_of_hotels = count_of_hotels.withColumnRenamed('count','NumberofHotels')
count_of_hotels.show()

In [None]:
count_of_hotels.orderBy(count_of_hotels.NumberofHotels.desc()).show()

##  Count the number of Cuisines that are used by the Restaurants

In [None]:
print("The number of Disintct Cuisines Available from all the restaurants = ",cuisine.select('Rcuisine').distinct().count())
left_join = restoGen.join(cuisine, on=restoGen.placeID==cuisine.placeID, how = 'left').drop(cuisine.placeID)
print("The number of cusines used in the selected Restaurants: ")
left_join.select(countDistinct('Rcuisine').alias(" Distinct Cusines  used in Restaurants")).show()
print("The Cusinies available are ")
left_join.select('Rcuisine').distinct().alias("Cusines  used in Restaurants").show(50)

## Count the distinct restaurant names which has valet parking

In [None]:
spark.sql('''select distinct name ,parking_lot from restoGenTable a join parkingTable b where a.placeID = b.placeID and 
          b.parking_lot = 'valet parking' ''').show()

#### Right Join

In [None]:
right_join = restoGen.join(other=parking,on=parking.placeID==restoGen.placeID,how='right')
names_of_restaurants = right_join.select('name','parking_lot').filter(parking.parking_lot=='valet parking')
names_of_restaurants.distinct().filter(names_of_restaurants.name!='null').show()

## Identify the placeID where the paymentMode for parking  is not available

#### Full outer Join

In [None]:
spark.sql("""SELECT parkingTable.placeID,parkingTable.parking_lot,paymentModeTable.Rpayment
FROM parkingTable
FULL OUTER JOIN paymentModeTable ON parkingTable.placeID=paymentModeTable.placeID WHERE paymentModeTable.Rpayment is NULL""").show(1000)


## The restaurant names and their corresponding restaurant cuisine styles, price, location details(latitude, longitude) and smoking_area informations only for those which are located in Morelos state and have closed roofing, also order based on price

In [None]:
spark.sql('''select distinct name, Rcuisine, price, latitude, longitude, smoking_area from 
restoGenTable a join cuisineTable b 
where a.placeID = b.placeID and a.state = 'Morelos' and a.area = 'closed' 
order by price''').show(truncate = False)

#### Natural Joins
Natural joins make implicit guesses at the columns on which you would like to join. 
It finds matching columns and returns the results. 
Left, right, and outer natural joins are all supported.

WARNING:
Implicit is always dangerous! 
The following query will give us incorrect results because 
the two DataFrames/tables share a column name (id), but it means different things in the datasets. 
You should always use this join with caution.

In [None]:
#spark.sql("""SELECT * FROM TableA NATURAL JOIN TableB""").show()

spark.sql('''select  * from restoGenTable a NATURAL JOIN paymentModeTable b ''').show()

#### Cross (Cartesian) Joins
Cross-joins in simplest terms are inner joins that do not specify a predicate. 
Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. 
This will cause an absolute explosion in the number of rows contained in the resulting DataFrame. 
If you have 1,000 rows in each DataFrame, the cross-join of these will result in 1,000,000 (1,000 x 1,000) rows. 
For this reason, you must very explicitly state that you want a cross-join by using the cross join keyword:

#### Random Samples

In [None]:
## To create a sample DataFrame from the base DataFrame
## Use sample operation to take sample of a DataFrame. 
## The sample method on DataFrame will return a DataFrame containing the sample of base DataFrame. 
## The sample method takes 3 parameters.
## withReplacement = True or False to select a observation with or without replacement.
## fraction = x, where x = .5 shows that we want to have 50% data in sample DataFrame.
## seed to reproduce the result
sampleDF1 = trainDF.sample(False, 0.2, 1234)
sampleDF2 = trainDF.sample(False, 0.2, 4321)
print(sampleDF1.count(), sampleDF2.count())

### Miscellaneous

#### Unions

In [None]:
df1 = spark.createDataFrame([[1, 'Alex', 25],[3, 'Carol', 53],[5, 'Emily', 25],[7, 'Gabriel', 32],[9, 'Ilma', 35],[11, 'Kim', 45]], ['id', 'name', 'age'])
df2 = spark.createDataFrame([[2, 'Ben', 66],[4, 'Daniel', 28],[6, 'Frank', 64],[8, 'Harley', 29],[10, 'Jack', 35],[12, 'Litmya', 45]], ['id', 'name', 'age'])
print("Before")
print("DataFrame-1")
print(df1.show())
print("DataFrame-2")
print(df2.show())
print("After")
df1 = df1.union(df2)
print("DataFrame-1")
print(df1.show())

Before
DataFrame-1
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
+---+-------+---+

None
DataFrame-2
+---+------+---+
| id|  name|age|
+---+------+---+
|  2|   Ben| 66|
|  4|Daniel| 28|
|  6| Frank| 64|
|  8|Harley| 29|
| 10|  Jack| 35|
| 12|Litmya| 45|
+---+------+---+

None
After
DataFrame-1
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
|  2|    Ben| 66|
|  4| Daniel| 28|
|  6|  Frank| 64|
|  8| Harley| 29|
| 10|   Jack| 35|
| 12| Litmya| 45|
+---+-------+---+

None


#### Unions and condtional append

In [None]:
df1.union(df2).where("age < 60").show()

#### String Manipulations

In [None]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

trainDF.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 7, " ").alias("lp"),
rpad(lit("HELLO"), 7, " ").alias("rp"))\
.show(2,truncate=False)

+------+------+-----+-------+-------+
|ltrim |rtrim |trim |lp     |rp     |
+------+------+-----+-------+-------+
|HELLO | HELLO|HELLO|  HELLO|HELLO  |
|HELLO | HELLO|HELLO|  HELLO|HELLO  |
+------+------+-----+-------+-------+
only showing top 2 rows



In [None]:
spark.sql("""SELECT
ltrim(' HELLLOOOO ') AS ltrim,
rtrim(' HELLLOOOO ') AS rtrim,
trim(' HELLLOOOO ') AS trim,
lpad('HELLOOOO ', 3, ' ') AS lp,
rpad('HELLOOOO ', 10, ' ') AS rp
FROM
trainDFTable""").show(2)

#### Working with Date and Time

In [None]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.show(truncate = False)

+---+----------+-----------------------+
|id |today     |now                    |
+---+----------+-----------------------+
|0  |2020-02-16|2020-02-16 01:41:51.534|
|1  |2020-02-16|2020-02-16 01:41:51.534|
|2  |2020-02-16|2020-02-16 01:41:51.534|
|3  |2020-02-16|2020-02-16 01:41:51.534|
|4  |2020-02-16|2020-02-16 01:41:51.534|
|5  |2020-02-16|2020-02-16 01:41:51.534|
|6  |2020-02-16|2020-02-16 01:41:51.534|
|7  |2020-02-16|2020-02-16 01:41:51.534|
|8  |2020-02-16|2020-02-16 01:41:51.534|
|9  |2020-02-16|2020-02-16 01:41:51.534|
+---+----------+-----------------------+



In [None]:
dateDF.createOrReplaceTempView("dateDFTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [None]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 10),date_add(col("today"), 10)).show(1)

+-------------------+-------------------+
|date_sub(today, 10)|date_add(today, 10)|
+-------------------+-------------------+
|         2020-02-06|         2020-02-26|
+-------------------+-------------------+
only showing top 1 row



In [None]:
spark.sql("""
SELECT
date_sub(today, 10),
date_add(today, 10)
FROM
dateDFTable
""").show(1)

+-------------------+-------------------+
|date_sub(today, 10)|date_add(today, 10)|
+-------------------+-------------------+
|         2020-02-06|         2020-02-26|
+-------------------+-------------------+
only showing top 1 row



In [None]:
from pyspark.sql.functions import datediff, months_between, to_date
dateDF\
.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today")).alias('datediff_today_weekago'))\
.show(1)

+----------------------+
|datediff_today_weekago|
+----------------------+
|                    -7|
+----------------------+
only showing top 1 row



In [None]:
dateDF\
.select(
to_date(lit("2017-01-01")).alias("start"),
to_date(lit("2018-02-18")).alias("end"))\
.select(months_between(col("end"), col("start")))\
.show(1)

+--------------------------+
|months_between(end, start)|
+--------------------------+
|                13.5483871|
+--------------------------+
only showing top 1 row



In [None]:
spark.sql("""
SELECT
to_date('2016-01-01') AS date,
months_between('2017-01-01', '2016-01-01') AS months_between,
datediff('2017-01-01', '2016-01-01') AS datediff_days
FROM
dateDFTable
""").show(2)

+----------+--------------+-------------+
|      date|months_between|datediff_days|
+----------+--------------+-------------+
|2016-01-01|          12.0|          366|
|2016-01-01|          12.0|          366|
+----------+--------------+-------------+
only showing top 2 rows



__WARNING__
<br>Spark will not throw an error if it cannot parse the date, it’ll just return null. This can be a bit tricky in larger pipelines because you may be expecting your data in one format and getting it in another. To illustrate, let’s take a look at the date format that has switched from year-month-day to year-day-month. Spark will fail to parse this date and silently return null instead.

In [None]:
### 2016-20-12 - year-day-month
### 2017-12-11 - year-month-day
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

+---------------------+---------------------+
|to_date('2016-20-12')|to_date('2017-12-11')|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row

