## Spark SQL

### Block diagram - Double Click to expand
<!--    
+----------------------------------------------+
|                  User Program                |
+----------------------------------------------+
                        |
                        v
+----------------+    +-----------------------+
|    Data Source  |    |      SparkSession      |
+----------------+    +-----------------------+
                        |
                        v
+----------------+    +-----------------------+
|  JDBC/ODBC API |    | Spark SQL Engine / API |
+----------------+    +-----------------------+
                        |
                        v
+----------------+    +-----------------------+
|     Dataset    |    |    DataFrame / SQL     |
+----------------+    +-----------------------+
                        |
                        v
+----------------+    +-----------------------+
|    Catalyst     |    |  Spark Core / Cluster  |
|    Optimizer    |    +-----------------------+
+----------------+               |
                                 v
+----------------------------------------------+
|                    RDDs                      |
+----------------------------------------------+

 -->

In [1]:
!pip install findspark
import findspark
findspark.init()



In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName('SparkSQL_UseCase').master('local[2]').getOrCreate()

In [4]:
spark

In [5]:
rangeDF=spark.range(100).toDF('number')

In [6]:
rangeDF

DataFrame[number: bigint]

In [7]:
rangeDF.show(4)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
+------+
only showing top 4 rows



In [8]:
rangeDF.count()

100

In [9]:
evenDF=rangeDF.where('number%2==0')

In [10]:
evenDF

DataFrame[number: bigint]

In [11]:
evenDF.show(5)

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
+------+
only showing top 5 rows



In [12]:
# spark.createDataFrame function

nameDF=spark.createDataFrame([[1,'Alice',30,'Female'],
                              [2,'Beneth',30,'Male'],
                              [3,'Charlie',30,'Male'],
                              [4,'Dharan',30,'Male']],['Id','Name','Age','Gender'])

In [13]:
nameDF.show(3)

+---+-------+---+------+
| Id|   Name|Age|Gender|
+---+-------+---+------+
|  1|  Alice| 30|Female|
|  2| Beneth| 30|  Male|
|  3|Charlie| 30|  Male|
+---+-------+---+------+
only showing top 3 rows



In [14]:
# constructing DataFrame from RDD

sc=spark.sparkContext

In [15]:
sc

In [16]:
# infer schema by using RDD

import os
os.getcwd()

'C:\\Users\\pdharantej\\OneDrive - ALLEGIS GROUP\\Desktop\\TEK_Training\\5. Data Analysis'

In [17]:
tempRDD=sc.textFile('./temp_data.txt')

In [18]:
tempRDD.count()

13131

In [19]:
type(tempRDD)

pyspark.rdd.RDD

In [20]:
tempRDD.take(3)

['1901\t-78\t1', '1901\t-72\t1', '1901\t-94\t1']

In [21]:
splitRDD=tempRDD.map(lambda record: record.split('\t'))
splitRDD.take(3)

[['1901', '-78', '1'], ['1901', '-72', '1'], ['1901', '-94', '1']]

In [22]:
# constructing the RDD using the Row object

from pyspark.sql import Row

In [23]:
schemaRDD=splitRDD.map(lambda line: Row(year=line[0],temp=int(line[1]),status=int(line[2])))

In [24]:
schemaRDD.take(3)

[Row(year='1901', temp=-78, status=1),
 Row(year='1901', temp=-72, status=1),
 Row(year='1901', temp=-94, status=1)]

In [25]:
tempDF=spark.createDataFrame(schemaRDD)
tempDF.show(3)

+----+----+------+
|year|temp|status|
+----+----+------+
|1901| -78|     1|
|1901| -72|     1|
|1901| -94|     1|
+----+----+------+
only showing top 3 rows



In [29]:
tempDF.head(3)

[Row(year='1901', temp=-78, status=1),
 Row(year='1901', temp=-72, status=1),
 Row(year='1901', temp=-94, status=1)]

In [28]:
tempDF.printSchema()

root
 |-- year: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- status: long (nullable = true)



In [38]:
# reading a csv file as an RDD and then building the RDD as a dataframe

# # read test.csv as RDD and convert it to dataframe
testRDD=sc.textFile('./test.csv')
testRDD.count()

233600

In [41]:
testRDD.take(3)

['User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3',
 '1000004,P00128942,M,46-50,7,B,2,1,1,11,',
 '1000009,P00113442,M,26-35,17,C,0,0,3,5,']

In [42]:
header=testRDD.first()

In [44]:
testRDD=testRDD.filter(lambda line: line!=header)

In [45]:
print('After the header record is removed ')
testRDD.first()

After the header record is removed 


'1000004,P00128942,M,46-50,7,B,2,1,1,11,'

In [46]:
# split data based on the separator
splitRDD=testRDD.map(lambda line: line.split(','))
print('After splitting the records are : \n')
splitRDD.take(2)

After splitting the records are : 



[['1000004', 'P00128942', 'M', '46-50', '7', 'B', '2', '1', '1', '11', ''],
 ['1000009', 'P00113442', 'M', '26-35', '17', 'C', '0', '0', '3', '5', '']]

In [49]:
from pyspark.sql.types import *
testRDDSchema=StructType([
    StructField('User_Id', StringType(),True),
    StructField('ProductId', StringType(),True),
    StructField('Gender', StringType(),True),
    StructField('Age', StringType(),True),
    StructField('Occupation', StringType(),True),
    StructField('City_Category', StringType(),True),
    StructField('Stay_In_Current_City_Years', StringType(),True),
    StructField('Marital_Status', StringType(),True),
    StructField('Product_Category_1', StringType(),True),
    StructField('Product_Category_2', StringType(),True),
    StructField('Product_Category_3', StringType(),True),
])

In [50]:
testDF=spark.createDataFrame(data=splitRDD,schema=testRDDSchema)

In [55]:
testDF.show(2)

+-------+---------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_Id|ProductId|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+---------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000004|P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|                  |
|1000009|P00113442|     M|26-35|        17|            C|                         0|             0|                 3|                 5|                  |
+-------+---------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
only showing top 2 rows



In [56]:
testDF.count()

233599

In [57]:
testSample=testDF.sample(False,0.1,98) # <without_duplication, sample_percentage, seed_value>

In [58]:
testSample.count()

23403

In [61]:
trainDF=spark.read.format('csv').option('header','true').option('inferSchema','true').load('./train.csv')

In [62]:
trainDF

DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int]

In [63]:
trainDF.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [64]:
trainSample=trainDF.sample(False, 0.1, 192)
trainSample.count()

54729

In [69]:
trainSamplePD=trainSample.toPandas()
print(type(trainSample))
print(type(trainSamplePD))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [67]:
# trainSamplePD = trainSamplePD.set_index('User_ID')

In [70]:
trainSamplePD.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,1000005,P00031342,M,26-35,20,A,1,1,8,,,6073
1,1000006,P00231342,F,51-55,9,A,1,0,5,8.0,14.0,5378
2,1000006,P00190242,F,51-55,9,A,1,0,4,5.0,,2079
3,1000008,P00220442,M,26-35,12,C,4+,1,5,14.0,,8584
4,1000010,P00297942,F,36-45,1,B,4+,1,8,,,5875


In [72]:
trainSamplePD.to_csv('./2023_train_sample.csv')

In [74]:
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [75]:
trainDF.head(2)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200)]

In [76]:
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [77]:
print('Number of records in Training Dataset {}'.format(trainDF.count()))
print('Number of records in Testing Dataset {}'.format(testDF.count()))

Number of records in Training Dataset 550068
Number of records in Testing Dataset 233599


In [80]:
trainDF.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [81]:
testDF.describe().show()

+-------+------------------+---------+------+------+-----------------+-------------+--------------------------+------------------+------------------+------------------+------------------+
|summary|           User_Id|ProductId|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|    Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+------------------+---------+------+------+-----------------+-------------+--------------------------+------------------+------------------+------------------+------------------+
|  count|            233599|   233599|233599|233599|           233599|       233599|                    233599|            233599|            233599|            233599|            233599|
|   mean|1003029.3568594044|     null|  null|  null|8.085407043694536|         null|        1.4682778997642345|0.4100702485883929| 5.276542279718663| 9.849586059346997|12.669453946534905|
| stddev|  1726.50496799554|     null|  null|  null|6.521146

In [82]:
trainDF.describe('Purchase').show()

+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+



In [91]:
trainDF.createOrReplaceTempView('trainDFTable')

In [92]:
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [93]:
spark.sql("select * from trainDFTable limit 2")

DataFrame[User_ID: int, Product_ID: string, Gender: string, Age: string, Occupation: int, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: int, Product_Category_1: int, Product_Category_2: int, Product_Category_3: int, Purchase: int]

In [94]:
spark.sql("select * from trainDFTable limit 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [97]:
dataFrameWay=trainDF.groupBy('Age').count()
dataFrameWay

DataFrame[Age: string, count: bigint]

In [96]:
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Age#333], functions=[count(1)])
   +- Exchange hashpartitioning(Age#333, 200), ENSURE_REQUIREMENTS, [plan_id=668]
      +- HashAggregate(keys=[Age#333], functions=[partial_count(1)])
         +- FileScan csv [Age#333] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/pdharantej/OneDrive - ALLEGIS GROUP/Desktop/TEK_Trainin..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Age:string>




In [99]:
sqlWay=spark.sql('select Age, count(1) from trainDFTable group by Age')

In [100]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Age#333], functions=[count(1)])
   +- Exchange hashpartitioning(Age#333, 200), ENSURE_REQUIREMENTS, [plan_id=681]
      +- HashAggregate(keys=[Age#333], functions=[partial_count(1)])
         +- FileScan csv [Age#333] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/pdharantej/OneDrive - ALLEGIS GROUP/Desktop/TEK_Trainin..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Age:string>




In [101]:
# The above two statements give the same output 

In [102]:
from pyspark.sql.functions import expr, col, column
dfWay=trainDF.filter(col('Age')!='0-17').groupBy('Age').count()

In [103]:
dfWay.show()

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+



In [None]:
sqlWay=spark.sql('select Age, ')