## Spark Operations using Spark DataFrames and Spark SQL

#### Agenda
-  What are DataFrames in Spark ?
-  Different ways to create a DataFrames
-  What are Spark Transformations & Actions
-  Verify Summary Statistics
-  Spark SQL
-  Performance Comparison of Spark DataFrame and Spark SQL
-  Column References
-  Converting to Spark Types - Literals
-  Add/Rename/Remove Columns
-  TypeCasting
-  Column differences
-  Pair-wise frequencies
-  Remove duplicates
-  Working with Nulls
-  Filtering the rows
-  Aggregations
-  Joins
-  Random Samples
-  Random Splits
-  Map Transformations
-  Sorting
-  Union
-  String Manipulations
-  Regular Expressions
-  Working with Dates and Time Stamp
-  User Defined Functions

In [1]:
## Set Python - Spark environment.
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")


* The entry point to programming Spark with the Dataset and DataFrame API is SparkSession.
* A SparkSession can be used to  create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files etc.,
* Spark Session is available in pyspark package as a sub package called pyspark.sql module
  *  Reference :https://spark.apache.org/docs/2.3.0/api/python/pyspark.html

In [2]:
## Create  SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkConf
spark = None

* _builder_ : A class attribute having a Builder method to construct SparkSession instances

* _AppName_: Sets a name for the application, which will be shown in the Spark web UI.
   
* _Config_: Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession‘s own configuration.

* _master_:It  is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local modemaster 

* _getOrCreate()_: Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

* _enableHiveSupport()_: Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined functions.



## ** Spark DataFrame **

#### A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. 
<br> The list that defines the columns and the types within those columns is called the schema. 
<br> One can think of a DataFrame as a spreadsheet with named columns.
<br> A spreadsheet sits on one computer in one specific location, whereas a Spark DataFrame can span thousands of computers.
<br> The reason for putting the data on more than one computer should be intuitive: 
<br>     either the data is too large to fit on one machine or 
<br>     it would simply take too long to perform that computation on one machine.

#### NOTE
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). 
<br> These different abstractions all represent distributed collections of data. 
<br> The easiest and most efficient are DataFrames, which are available in all languages.



#### Create a dataframe with one column containing 100 rows with values from 0 to 99.
This range of numbers represents a distributed collection. 
<br> When run on a cluster, each part of this range of numbers exists on a different executor. 
<br> This is a Spark DataFrame.

In [4]:
myRange = spark.range(100).toDF('number')


In [5]:
#check myRange

DataFrame[number: bigint]

In [6]:
myRange.show(10)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
+------+
only showing top 10 rows



In [7]:
myDF = spark.__________([[1, 'Alice', 30], [2, 'Bob', 28], [3, 'Cathy', 31], [4, 'Denice', 25]], ['Id', 'Name', 'Age'])

In [8]:
myDF.show()

+---+------+---+
| Id|  Name|Age|
+---+------+---+
|  1| Alice| 30|
|  2|   Bob| 28|
|  3| Cathy| 31|
|  4|Denice| 25|
+---+------+---+



### Important classes of pyspark.sql
* pyspark.sql.SparkSession :            Main entry point for DataFrame and SQL functionality.
* pyspark.sql.DataFrame    :    A distributed collection of data grouped into named columns.
* pyspark.sql.Column       :               A column expression in a DataFrame.
* pyspark.sql.Row          :                A row of data in a DataFrame.
* pyspark.sql.GroupedData  :     Aggregation methods, returned by DataFrame.groupBy().
* pyspark.sql.DataFrameNaFunctions :         Methods for handling missing data (null values).
* pyspark.sql.DataFrameStatFunctions:       Methods for statistics functionality.
* pyspark.sql.functions             :   List of built-in functions available for DataFrame.
* pyspark.sql.types                 :       List of data types available.
* pyspark.sql.Window                :      For working with window functions

#### Reference :https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html 

### DataFrame Transformations & Actions

### Transformations
* In Spark, the core data structures are immutable, meaning they cannot be changed after they’re created.
* To “change” a DataFrame, you need to instruct Spark how you would like to modify it to do what you want.These instructions are called transformations.This will create another new dataframe of type **_pyspark.sql.dataframe.DataFrame_**
* Transformations are the core of how you express your business logic using Spark.
* Transformations are simply ways of specifying different series of data manipulation.



In [9]:
divisBy2 = myRange.where("number % 2 = 0")

In [10]:
divisBy2

DataFrame[number: bigint]

In [11]:
type( )

pyspark.sql.dataframe.DataFrame

Notice that these return no output. <br>This is because we specified only an abstract transformation, and Spark will not act on transformations until we call an action.

### Actions
* Transformations allow us to build up our logical transformation plan. 
* To trigger the computation, we run an action.
    * An action instructs Spark to compute a result from a series of transformations. 
    * The simplest action is count, which gives us the total number of records in the DataFrame

#### There are 3 types of actions
Actions to view data in the console.
* Actions to collect data to native objects in the respective language
* Actions to write to output data sources

In [12]:
divisBy2.count()

50

### Creating a  Dataframe from an RDD

* Creating the RDD from the local or hdfs file system 

In [13]:
!cat /home/mahidharv/SparkSql_Activity/SalesData/test.csv | head -10

User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3
1000004,P00128942,M,46-50,7,B,2,1,1,11,
1000009,P00113442,M,26-35,17,C,0,0,3,5,
1000010,P00288442,F,36-45,1,B,4+,1,5,14,
1000010,P00145342,F,36-45,1,B,4+,1,4,9,
1000011,P00053842,F,26-35,1,C,1,0,4,5,12
1000013,P00350442,M,46-50,1,C,3,1,2,3,15
1000013,P00155442,M,46-50,1,C,3,1,1,11,15
1000013,P0094542,M,46-50,1,C,3,1,2,4,9
1000015,P00161842,M,26-35,7,A,1,0,10,13,16
cat: write error: Broken pipe


In [14]:
testRDD = spark.sparkContext.textFile("file:///home/mahidharv/SparkSql_Activity/SalesData/test.csv")

* Observe the first few records

In [15]:
testRDD.take(3)

[u'User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3',
 u'1000004,P00128942,M,46-50,7,B,2,1,1,11,',
 u'1000009,P00113442,M,26-35,17,C,0,0,3,5,']

* The rows are read as list of unicoded strings 

In [16]:
type(testRDD.take(3))

list

* Count of the Records including the header

In [17]:
print("Total Records with header: ", testRDD.count())
print("\nFirst Two Records Before Removing Header\n")
print(testRDD.take(2))

('Total Records with header: ', 233600)

First Two Records Before Removing Header

[u'User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3', u'1000004,P00128942,M,46-50,7,B,2,1,1,11,']


* Count of the Records excluding the header

In [18]:
header = testRDD._______()
testRDD = testRDD.______(lambda line: line _______)
print("Total Records without header: ", testRDD.count())
print("\nFirst Two Records After Removing Header\n")
print(testRDD.take(2))

('Total Records without header: ', 233599)

First Two Records After Removing Header

[u'1000004,P00128942,M,46-50,7,B,2,1,1,11,', u'1000009,P00113442,M,26-35,17,C,0,0,3,5,']


* Split the data into individual columns

In [19]:
splitRDD = testRDD.map(lambda line: line.split( ))
print("\nFirst Two Records After Split/Parsing\n")
print("\n{}".format(splitRDD.take(1)))


First Two Records After Split/Parsing


[[u'1000004', u'P00128942', u'M', u'46-50', u'7', u'B', u'2', u'1', u'1', u'11', u'']]


### Create a dataframe for the above Data
1. Define Schema
2. Create dataframe using the above schema

##### 1. Defining Schema

* We will define the schema using __`StructType`__  in the **`pyspark.sql.types`** class
* Struct type, consisting of a list of `StructFields`.
  * This is the data type representing a `Row`.
  * Iterating `StructType` will iterate `StructField`\s.
  * A `StructField` can be accessed by name or position.
* Note, the StructField class is broken down in terms of:
	 * name : The name of this field
	 * dataType : The data type of this field
	 * nullable : Indicates whether values of this field can be null

In [20]:
from pyspark.sql.types import *

testSchema = StructType([
    StructField("User_ID", StringType(), True),
    StructField("Product_ID", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Occupation", StringType(), True),
    StructField("City_Category", StringType(), True),
    StructField("Stay_In_Current_City_Years", StringType(), True),
    StructField("Marital_Status", StringType(), True),
    StructField("Product_Category_1", StringType(), True),
    StructField("Product_Category_2", StringType(), True),
    StructField("Product_Category_3", StringType(), True)
])

In [21]:
type(__________)

pyspark.sql.types.StructType

In [22]:
testSchema["Gender"]

StructField(Gender,StringType,true)

#####  2. Create DataFrame using the above schema
* To create a data frame we need to use the __*createDataFrame*__ method from _**pyspark.sql.SparkSession**_ class.

* _**createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)**_
* Creates a DataFrame from an RDD, a list or a pandas.DataFrame.
   * When schema is a list of column names, the type of each column will be inferred from data.
   * When schema is None, it will try to infer the schema (column names and types) from data, which should be an RDD of Row, or namedtuple, or dict.
   * If schema inference is needed, samplingRatio is used to determined the ratio of rows used for schema inference. The first row will be used if samplingRatio is None.

* _Parameters_ :	
  * data – an RDD of any kind of SQL data representation(e.g. row, tuple, int, boolean, etc.), or list, or pandas.DataFrame.
  * schema – a pyspark.sql.types.DataType or a datatype string or a list of column names, default is None. The data type string format equals to pyspark.sql.types.DataType.simpleString, except that top level struct type can omit the struct<> and atomic types use typeName() as their format, e.g. use byte instead of tinyint for pyspark.sql.types.ByteType. We can also use int as a short name for IntegerType.
  * samplingRatio – the sample ratio of rows used for inferring
  * verifySchema – verify data types of every row against schema.
  * Returns:	DataFrame

In [23]:
testDF = spark.createDataFrame(data = splitRDD, schema=testSchema)

In [24]:
type(testDF)

pyspark.sql.dataframe.DataFrame

In [25]:
testDF.cache()

DataFrame[User_ID: string, Product_ID: string, Gender: string, Age: string, Occupation: string, City_Category: string, Stay_In_Current_City_Years: string, Marital_Status: string, Product_Category_1: string, Product_Category_2: string, Product_Category_3: string]

### Reading a csv file :
* Reading a CSV file into a DataFrame and converting it to a local array or list of rows.
* We will use _**spark.read.csv**_ from _**pyspark.sql.DataFrameReader**_ class
* Parameters :
    Parameters:	
  * path – string, or list of strings, for input path(s).
  * schema – an optional pyspark.sql.types.StructType for the input schema or a DDL-formatted string (For example col0 INT, col1 DOUBLE).
  * sep – sets a single character as a separator for each field and value. If None is set, it uses the default value, ,.
  * encoding – decodes the CSV files by the given encoding type. If None is set, it uses the default value, UTF-8. 
  * quote – sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ". If you would like to turn off quotations, you need to set an empty string.
  * escape – sets a single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, \.
  * comment – sets a single character used for skipping lines beginning with this character. By default (None), it is disabled.
  * header – uses the first line as names of columns. If None is set, it uses the default value, false.
  * inferSchema – infers the input schema automatically from data. It requires one extra pass over the data. If None is set, it uses the default value, false.
  * ignoreLeadingWhiteSpace – a flag indicating whether or not leading whitespaces from values being read should be skipped. If None is set, it uses the default value, false.
  * ignoreTrailingWhiteSpace – a flag indicating whether or not trailing whitespaces from values being read should be skipped. If None is set, it uses the default value, false.
  * nullValue – sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this nullValue param applies to all supported types including the string type.
  * nanValue – sets the string representation of a non-number value. If None is set, it uses the default value, NaN.
  * positiveInf – sets the string representation of a positive infinity value. If None is set, it uses the default value, Inf.
  * negativeInf – sets the string representation of a negative infinity value. If None is set, it uses the default value, Inf.
  * dateFormat – sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type. If None is set, it uses the default value, yyyy-MM-dd.
  * timestampFormat – sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type. If None is set, it uses the default value, yyyy-MM-dd'T'HH:mm:ss.SSSXXX.
  * maxColumns – defines a hard limit of how many columns a record can have. If None is set, it uses the default value, 20480.
  * maxCharsPerColumn – defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, -1 meaning unlimited length.
  * maxMalformedLogPerPartition – this parameter is no longer used since Spark 2.2.0. If specified, it is ignored.
  * mode –
    * allows a mode for dealing with corrupt records during parsing. If None isset, it uses the default value, PERMISSIVE.
    * PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a field configured by columnNameOfCorruptRecord. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When a length of parsed CSV tokens is shorter than an expected length of a schema, it sets null for extra fields.
    * DROPMALFORMED : ignores the whole corrupted records.
    * FAILFAST : throws an exception when it meets corrupted records.
    * columnNameOfCorruptRecord – allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord. If None is set, it uses the value specified in spark.sql.columnNameOfCorruptRecord.
  * multiLine – parse one record, which may span multiple lines. If None is set, it uses the default value, false.
  * charToEscapeQuoteEscaping – sets a single character used for escaping the escape for the quote character. If None is set, the default value is escape character when escape and quote characters are different, \ otherwise..


In [26]:
trainDF = spark.read.csv(header=True,
                         inferSchema=True,
                         path="file:///home/mahidharv/SparkSql_Activity/SalesData/train.csv")        


*  We can also use the generalized approach of using _**spark.read.format**_ and use the options to load the file

In [27]:
testDF = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("file:///home/mahidharv/SparkSql_Activity/SalesData/test.csv")

#### Verify Schema

In [28]:
## Print Schema : prints out the schema in tree format
trainDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



* To Show first n observations
*  Use head operation to see first n observations (say, 2 observations). 
*  Head operation in PySpark is similar to head operation in Pandas.

In [29]:
trainDF.head(2)

[Row(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200)]

*  Above results are comprised of row like format. 
*  To see the result in more interactive manner (rows under the columns), Use the show operation. 
* Show operation on train and take first 5 rows of it.

In [30]:
trainDF.show(2,truncate=False)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age |Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001|P00069042 |F     |0-17|10        |A            |2                         |0             |3                 |null              |null              |8370    |
|1000001|P00248942 |F     |0-17|10        |A            |2                         |0             |1                 |6                 |14                |15200   |
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

* To Count the number of rows in DataFrame

In [31]:

print('Total records count in train dataset is {}'.format(trainDF.count()))
print('Total records count in test dataset is {}'.format(testDF.count()))

Total records count in train dataset is 550068
Total records count in test dataset is 233599


* Columns count and column names

In [32]:

print("Total Columns count in train dataset is {}".format(len(trainDF.columns)))
print("\n\nColumns in train dataset are: {} \n".format(trainDF.columns))

print("Total Columns count in test dataset is {}".format(len(testDF.columns)))
print("\n\nColumns in test dataset are: {} \n".format(testDF.columns))

Total Columns count in train dataset is 12


Columns in train dataset are: ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Purchase'] 

Total Columns count in test dataset is 11


Columns in test dataset are: ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1', 'Product_Category_2', 'Product_Category_3'] 



#### Summary statistics

In [33]:
## To get the summary statistics (mean, standard deviance, min ,max , count) of numerical columns in a DataFrame
trainDF.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [34]:
## Check what happens when we specify the name of a categorical / String columns in describe operation.
## describe operation is working for String type column but the output for mean, stddev are null and 
## min & max values are calculated based on ASCII value of categories.
trainDF.describe('Product_ID').show()

+-------+----------+
|summary|Product_ID|
+-------+----------+
|  count|    550068|
|   mean|      null|
| stddev|      null|
|    min| P00000142|
|    max|  P0099942|
+-------+----------+



### Spark SQL
* With Spark SQL, you can register any DataFrame as a table or view (a temporary table) and query it using pure SQL. 
* There is no performance difference between writing SQL queries or writing DataFrame code, they both “compile” to the same underlying plan that we specify in DataFrame code.

* Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

#### Spark SQL Create table

In [35]:
#spark.sql("""create database mahidhar""")

In [36]:
spark.sql("""use mahidhar""")

DataFrame[]

In [3]:
#spark.sql("""CREATE TABLE user_test (i INT) """)

In [39]:
spark.sql("""use mahidhar""").show()


++
||
++
++



In [40]:
spark.sql("""show tables""").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|mahidhar|    sales|      false|
|mahidhar|       t3|      false|
|mahidhar|       t4|      false|
|mahidhar|user_test|      false|
+--------+---------+-----------+



In [41]:
spark.sql("""select * from _____""").show(5)

+---+
|  i|
+---+
+---+



In [42]:
## Create view/table
trainDF.createOrReplaceTempView("trainDFTable")

In [43]:
## Verify Dataframe
trainDF.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [44]:
## Verify Dataframe
trainDF.take(2)

[Row(User_ID=1000001, Product_ID=u'P00069042', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID=u'P00248942', Gender=u'F', Age=u'0-17', Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200)]

In [45]:
## Verify Table
spark.sql("SELECT * FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



#### Performance Comparison Spark DataFrame vs Spark SQL

In [46]:
#dataframeWay = trainDF.where(trainDF.Purchase>15000).count()
dataframeWay = trainDF.groupBy('Age').count()
dataframeWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[Age#123], functions=[count(1)])
+- Exchange hashpartitioning(Age#123, 200)
   +- *(1) HashAggregate(keys=[Age#123], functions=[partial_count(1)])
      +- *(1) FileScan csv [Age#123] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/mahidharv/SparkSql_Activity/SalesData/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Age:string>


In [47]:
sqlWay = spark.sql("SELECT Age, count(1) FROM trainDFTable GROUP BY Age")
sqlWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[Age#123], functions=[count(1)])
+- Exchange hashpartitioning(Age#123, 200)
   +- *(1) HashAggregate(keys=[Age#123], functions=[partial_count(1)])
      +- *(1) FileScan csv [Age#123] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/mahidharv/SparkSql_Activity/SalesData/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Age:string>


#### Column References

#### Select & SelectExpr

In [48]:
## Multiple ways of referring a column in a dataframe
from pyspark.sql.functions import expr, col, column

trainDF.select(expr("User_ID AS userID") , col("User_ID"), column("User_ID"), "User_ID").show(2)

+-------+-------+-------+-------+
| userID|User_ID|User_ID|User_ID|
+-------+-------+-------+-------+
|1000001|1000001|1000001|1000001|
|1000001|1000001|1000001|1000001|
+-------+-------+-------+-------+
only showing top 2 rows



In [49]:
trainDF.select(expr("User_ID AS userID")).show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [50]:
spark.sql("SELECT User_ID AS userID FROM trainDFTable").show(2)

+-------+
| userID|
+-------+
|1000001|
|1000001|
+-------+
only showing top 2 rows



In [51]:
trainDF.selectExpr("User_ID AS userID", "Product_ID AS productID").show(2)

+-------+---------+
| userID|productID|
+-------+---------+
|1000001|P00069042|
|1000001|P00248942|
+-------+---------+
only showing top 2 rows



In [52]:
trainDF.select("User_ID", "Product_ID", "Age").show(2)

+-------+----------+----+
|User_ID|Product_ID| Age|
+-------+----------+----+
|1000001| P00069042|0-17|
|1000001| P00248942|0-17|
+-------+----------+----+
only showing top 2 rows



#### Converting to Spark Types (Literals)
Sometimes we need to pass explicit values into Spark that aren’t a new column but are just a value in all the rows. This might be a constant value or something we’ll need to compare to later on. The way we do this is through literals. 
This is basically a translation from a given programming language’s literal value to one that Spark understands. 
Literals are expressions and can be used in the same way.

In [53]:
from pyspark.sql.functions import lit
trainDF.select("*", lit(1).alias('One')).show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

In [54]:
## In SQL, literals are just the specific value.
spark.sql("SELECT *, 1 as One FROM trainDFTable LIMIT 2").show()

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

#### Adding Columns

In [55]:
## More Formal way
trainDF.withColumn("One", lit(1)).show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|One|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|  1|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|  1|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+-------------

In [56]:
tempDF = trainDF.withColumn("SameCategoryCode", trainDF["Product_Category_1"] == trainDF["Product_Category_2"])
tempDF.show(4)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|SameCategoryCode|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|            null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|           false|
|1000001| P00087842|     F|0-17|        10|            A|                         2| 

#### Renaming Columns

In [57]:
tempDF.withColumnRenamed("SameCategoryCode", "SimilarCategory").show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|SimilarCategory|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+---------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|           null|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|          false|
+-------+----------+------+----+----------+-------------+--------------------------+------

#### Removing Columns

In [58]:
tempDF.drop("SameCategoryCode").show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

#### Changing a Column’s Type (cast)

In [59]:
tempDF.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- SameCategoryCode: boolean (nullable = true)



In [60]:
tempDF.withColumn("Purchase", col("Purchase").cast("string")).printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: string (nullable = true)
 |-- SameCategoryCode: boolean (nullable = true)



#### Distinct Values

In [61]:
## To find the number of distinct product in train and test datasets
## To calculate the number of distinct products in train and test datasets apply distinct operation.
print("Distinct values in Product_ID's in train dataset are {}".format(trainDF.select('Product_ID').distinct().count()))
print("Distinct values in Product_ID's in test dataset are {}".format(testDF.select('Product_ID').distinct().count()))

Distinct values in Product_ID's in train dataset are 3631
Distinct values in Product_ID's in test dataset are 3491


#### Differences in two columns

In [62]:
## From the above we can see the train file has more categories than test file. 
## Let us check what are the categories for Product_ID, which are in test file but not in train file by 
## applying subtract operation.
## We can do the same for all categorical features.
diff_cat_in_test_train=testDF.select('Product_ID').subtract(trainDF.select('Product_ID'))
print("Count of Product_ID's there in test dataset but not train dataset are {}".format(diff_cat_in_test_train.count()))

diff_cat_in_train_test=trainDF.select('Product_ID').subtract(testDF.select('Product_ID'))
print("Count of Product_ID's there in train dataset but not test dataset are {}".format(diff_cat_in_train_test.count()))

Count of Product_ID's there in test dataset but not train dataset are 46
Count of Product_ID's there in train dataset but not test dataset are 186


#### Pair wise Frequencies - Crosstab

In [63]:
## To calculate pair wise frequency of categorical columns
## Use crosstab operation on DataFrame to calculate the pair wise frequency of columns. 
## Apply crosstab operation on ‘Age’ and ‘Gender’ columns of train DataFrame.
trainDF.crosstab('Age', 'Gender').show()

+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+



In [64]:
trainDF.crosstab('Age','Marital_Status').show()

+------------------+------+-----+
|Age_Marital_Status|     0|    1|
+------------------+------+-----+
|              0-17| 15102|    0|
|             46-50| 12690|33011|
|             18-25| 78544|21116|
|             36-45| 66377|43636|
|               55+|  7883|13621|
|             51-55| 10839|27662|
|             26-35|133296|86291|
+------------------+------+-----+



In [65]:
trainDF.groupBy('Age', 'Gender').count().show()

+-----+------+------+
|  Age|Gender| count|
+-----+------+------+
|51-55|     F|  9894|
|18-25|     M| 75032|
| 0-17|     F|  5083|
|46-50|     M| 32502|
|18-25|     F| 24628|
|  55+|     M| 16421|
|  55+|     F|  5083|
|36-45|     M| 82843|
|26-35|     F| 50752|
| 0-17|     M| 10019|
|36-45|     F| 27170|
|51-55|     M| 28607|
|26-35|     M|168835|
|46-50|     F| 13199|
+-----+------+------+



In [66]:
spark.sql("""select Age,
    sum(case when Gender = 'F' then 1 else 0 end) F,
    sum(case when Gender = 'M' then 1 else 0 end) M
from trainDFTable
group by Age""").show()

# spark.sql("""select Age,
#     count(*) total,
#     sum(case when Gender = 'F' then 1 else 0 end) F,
#     sum(case when Gender = 'M' then 1 else 0 end) M
# from trainDFTable
# group by Age""").show()

+-----+-----+------+
|  Age|    F|     M|
+-----+-----+------+
|18-25|24628| 75032|
|26-35|50752|168835|
| 0-17| 5083| 10019|
|46-50|13199| 32502|
|51-55| 9894| 28607|
|36-45|27170| 82843|
|  55+| 5083| 16421|
+-----+-----+------+



#### Removing Duplicates

In [67]:
##To get the DataFrame without any duplicate rows of given a DataFrame
##Use dropDuplicates operation to drop the duplicate rows of a DataFrame. 
## In this command, performing this on two columns Age and Gender of train dataset and 
## Get the all unique rows for these two columns.
trainDF.select('Age','Gender').dropDuplicates().show()

+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|18-25|     M|
| 0-17|     F|
|46-50|     M|
|18-25|     F|
|  55+|     M|
|  55+|     F|
|36-45|     M|
|26-35|     F|
| 0-17|     M|
|36-45|     F|
|51-55|     M|
|26-35|     M|
|46-50|     F|
+-----+------+



#### Working with Nulls in Data

* **To drop the all rows with null value?**
* Use dropna operation. 
* To drop row from the DataFrame it consider three options.
    * how – ‘any’ or ‘all’. 
        * If ‘any’, drop a row if it contains any nulls. 
        * If ‘all’, drop a row only if all its values are null.
    * thresh – int, default None If specified, drop rows that have less than thresh non-null values.This overwrites the how parameter.
    * subset – optional list of column names to consider.

* Drop null rows in train with default parameters and count the rows in output DataFrame. 
* Default options are any, None, None for how, thresh, subset respectively.


In [68]:
print(trainDF.dropna().count())
print(trainDF.na.drop().count())
print(trainDF.na.drop("any").count())

166821
166821
166821


In [69]:
## To replace the null values in DataFrame with constant number
## Use fillna operation. 

##The fillna will take two parameters to fill the null values.
## value:
##     It will take a dictionary to specify which column will replace with which value.
##     A value (int , float, string) for all columns.
##subset: Specify some selected columns.

##Fill ‘-1’ inplace of null values in train DataFrame.
trainDF.fillna(-1).show(5,truncate=False)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age |Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001|P00069042 |F     |0-17|10        |A            |2                         |0             |3                 |-1                |-1                |8370    |
|1000001|P00248942 |F     |0-17|10        |A            |2                         |0             |1                 |6                 |14                |15200   |
|1000001|P00087842 |F     |0-17|10        |A            |2                         |0             |12                |-1                |-1                |1422    |
|100

In [70]:
## Filling with different values for different columns
fill_cols_vals = {
"Gender": 'M',
"Purchase" : 999999
}
trainDF.na.fill(fill_cols_vals).count()

550068

In [71]:
trainDF.na.replace([""], ["UNKNOWN"], "Gender").count()

550068

#### Filtering the rows

In [72]:
## To filter the rows in train dataset which has Purchases more than 15000
## apply the filter operation on Purchase column in train DataFrame 
## to filter out the rows with values more than 15000. 
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(trainDF.Purchase > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(col("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(column("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(expr("Purchase") > 15000).count()))
print("Count of rows where Purchase Amount more than 15000 are {}".format(trainDF.filter(trainDF["Purchase"] > 15000).count()))

Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523
Count of rows where Purchase Amount more than 15000 are 110523


In [73]:
spark.sql("""
SELECT 
COUNT(*) AS Count
FROM trainDFTable
WHERE Purchase > 15000""").show()

+------+
| Count|
+------+
|110523|
+------+



In [74]:
trainDF.where("Purchase > 15000").where("Gender = 'F'").count()

21429

In [75]:
trainDF.filter("Purchase > 15000").where("Gender = 'F'").count()

21429

In [76]:
trainDF.where((col("Purchase") > 15000) & (col("Gender") == 'F')).count()

21429

In [77]:
trainDF.filter((col("Purchase") > 15000) & (col("Gender") == 'F')).count()

21429

In [78]:
spark.sql("SELECT * FROM trainDFTable WHERE Purchase > 15000 AND Gender = 'F'").count()

21429

#### Aggregations

#### Count Distinct

In [79]:
from pyspark.sql.functions import countDistinct
trainDF.select(countDistinct("Age")).show()

+-------------------+
|count(DISTINCT Age)|
+-------------------+
|                  7|
+-------------------+



#### Approximate Count Distinct

In [80]:
from pyspark.sql.functions import approx_count_distinct
trainDF.select(approx_count_distinct("Age", 0.1)).show()

+--------------------------+
|approx_count_distinct(Age)|
+--------------------------+
|                         7|
+--------------------------+



#### First and Last

In [81]:
from pyspark.sql.functions import first, last
trainDF.select(first("Product_ID",), last("Product_ID")).show()

+------------------------+-----------------------+
|first(Product_ID, false)|last(Product_ID, false)|
+------------------------+-----------------------+
|               P00069042|              P00371644|
+------------------------+-----------------------+



#### Min and Max

In [82]:
from pyspark.sql.functions import min, max
trainDF.select(min("Purchase"), max("Purchase")).show()

+-------------+-------------+
|min(Purchase)|max(Purchase)|
+-------------+-------------+
|           12|        23961|
+-------------+-------------+



#### Sum

In [83]:
from pyspark.sql.functions import sum
trainDF.select(sum("Purchase")).show()

+-------------+
|sum(Purchase)|
+-------------+
|   5095812742|
+-------------+



#### sumDistinct

In [84]:
from pyspark.sql.functions import sumDistinct
trainDF.select(sumDistinct("Purchase")).show()

+----------------------+
|sum(DISTINCT Purchase)|
+----------------------+
|             208520914|
+----------------------+



#### Avg

In [85]:
from pyspark.sql.functions import sum, count, avg, expr

trainDF.select(
    count("Purchase").alias("total_transactions"),
    sum("Purchase").alias("total_purchases"),
    avg("Purchase").alias("avg_purchases"),
    expr("mean(Purchase)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+-----------------+-----------------+
|(total_purchases / total_transactions)|    avg_purchases|   mean_purchases|
+--------------------------------------+-----------------+-----------------+
|                     9263.968712959126|9263.968712959126|9263.968712959126|
+--------------------------------------+-----------------+-----------------+



#### Variance and Standard Deviation

In [86]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

trainDF.select(var_pop("Purchase"), var_samp("Purchase"),
  stddev_pop("Purchase"), stddev_samp("Purchase")).show()

+-------------------+--------------------+--------------------+---------------------+
|  var_pop(Purchase)|  var_samp(Purchase)|stddev_pop(Purchase)|stddev_samp(Purchase)|
+-------------------+--------------------+--------------------+---------------------+
|2.523114008138541E7|2.5231185950597852E7|   5023.060827959921|    5023.065393820575|
+-------------------+--------------------+--------------------+---------------------+



In [87]:
spark.sql("""SELECT var_pop(Purchase), var_samp(Purchase),
             stddev_pop(Purchase), stddev_samp(Purchase)
             FROM trainDFTable""").show()

+---------------------------------+----------------------------------+------------------------------------+-------------------------------------+
|var_pop(CAST(Purchase AS DOUBLE))|var_samp(CAST(Purchase AS DOUBLE))|stddev_pop(CAST(Purchase AS DOUBLE))|stddev_samp(CAST(Purchase AS DOUBLE))|
+---------------------------------+----------------------------------+------------------------------------+-------------------------------------+
|              2.523114008138541E7|              2.5231185950597852E7|                   5023.060827959921|                    5023.065393820575|
+---------------------------------+----------------------------------+------------------------------------+-------------------------------------+



#### skewness and kurtosis

In [88]:
from pyspark.sql.functions import skewness, kurtosis
trainDF.select(skewness("Purchase"), kurtosis("Purchase")).show()

+------------------+--------------------+
|skewness(Purchase)|  kurtosis(Purchase)|
+------------------+--------------------+
|0.6001383671643461|-0.33838539753607577|
+------------------+--------------------+



In [89]:
spark.sql("""SELECT skewness(Purchase), kurtosis(Purchase)
             FROM trainDFTable""").show()

+----------------------------------+----------------------------------+
|skewness(CAST(Purchase AS DOUBLE))|kurtosis(CAST(Purchase AS DOUBLE))|
+----------------------------------+----------------------------------+
|                0.6001383671643461|              -0.33838539753607577|
+----------------------------------+----------------------------------+



#### Covariance and Correlation

In [90]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
trainDF.select(corr("Product_Category_1", "Purchase"), covar_samp("Product_Category_1", "Purchase"),
    covar_pop("Product_Category_1", "Purchase")).show()

+----------------------------------+----------------------------------------+---------------------------------------+
|corr(Product_Category_1, Purchase)|covar_samp(Product_Category_1, Purchase)|covar_pop(Product_Category_1, Purchase)|
+----------------------------------+----------------------------------------+---------------------------------------+
|              -0.34370334591990875|                     -6795.6500072045765|                     -6795.637653004719|
+----------------------------------+----------------------------------------+---------------------------------------+



In [91]:
spark.sql("""SELECT corr(Product_Category_1, Purchase), covar_samp(Product_Category_1, Purchase),
             covar_pop(Product_Category_1, Purchase)
             FROM trainDFTable""").show()

+------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------+
|corr(CAST(Product_Category_1 AS DOUBLE), CAST(Purchase AS DOUBLE))|covar_samp(CAST(Product_Category_1 AS DOUBLE), CAST(Purchase AS DOUBLE))|covar_pop(CAST(Product_Category_1 AS DOUBLE), CAST(Purchase AS DOUBLE))|
+------------------------------------------------------------------+------------------------------------------------------------------------+-----------------------------------------------------------------------+
|                                              -0.34370334591990875|                                                     -6795.6500072045765|                                                     -6795.637653004719|
+------------------------------------------------------------------+------------------------------------------------------------------------+---

#### Complex Aggregations

In [92]:
from pyspark.sql.functions import collect_set, collect_list
trainDF.agg(collect_set("Age"), collect_list("Age")).show()

+--------------------+--------------------+
|    collect_set(Age)|   collect_list(Age)|
+--------------------+--------------------+
|[55+, 51-55, 0-17...|[0-17, 0-17, 0-17...|
+--------------------+--------------------+



In [93]:
spark.sql("""SELECT collect_set(Age), collect_list(Age) FROM trainDFTable""").show()

+--------------------+--------------------+
|    collect_set(Age)|   collect_list(Age)|
+--------------------+--------------------+
|[55+, 51-55, 0-17...|[0-17, 0-17, 0-17...|
+--------------------+--------------------+



#### Grouping

In [94]:
trainDF.groupBy("Age", "Gender").count().show()

+-----+------+------+
|  Age|Gender| count|
+-----+------+------+
|51-55|     F|  9894|
|18-25|     M| 75032|
| 0-17|     F|  5083|
|46-50|     M| 32502|
|18-25|     F| 24628|
|  55+|     M| 16421|
|  55+|     F|  5083|
|36-45|     M| 82843|
|26-35|     F| 50752|
| 0-17|     M| 10019|
|36-45|     F| 27170|
|51-55|     M| 28607|
|26-35|     M|168835|
|46-50|     F| 13199|
+-----+------+------+



#### Grouping with Expressions

In [95]:
trainDF.groupBy("Age").agg(
  count("Purchase").alias("quan"),
  expr("count(Purchase)")).show()

+-----+------+---------------+
|  Age|  quan|count(Purchase)|
+-----+------+---------------+
|18-25| 99660|          99660|
|26-35|219587|         219587|
| 0-17| 15102|          15102|
|46-50| 45701|          45701|
|51-55| 38501|          38501|
|36-45|110013|         110013|
|  55+| 21504|          21504|
+-----+------+---------------+



#### Grouping with Maps

In [96]:
trainDF.groupBy("Age").agg(expr("avg(Purchase)"),expr("stddev_pop(Purchase)")).show()

+-----+-----------------+--------------------+
|  Age|    avg(Purchase)|stddev_pop(Purchase)|
+-----+-----------------+--------------------+
|18-25|9169.663606261289|   5034.296739627788|
|26-35|9252.690632869888|   5010.515894010142|
| 0-17|8933.464640444974|   5110.944823427657|
|46-50|9208.625697468327|   4967.162022122699|
|51-55|9534.808030960236|   5087.302011173861|
|36-45|9331.350694917874|   5022.901050378551|
|  55+|9336.280459449405|   5011.377469555766|
+-----+-----------------+--------------------+



In [97]:
## To find the mean of each age group in train dataset - Average purchases in each age group
trainDF.groupby('Age').agg({'Purchase': 'mean'}).show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
| 0-17|8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|  55+|9336.280459449405|
+-----+-----------------+



In [98]:
trainDF.groupby('Age').agg({'Purchase': 'sum'}).show()

+-----+-------------+
|  Age|sum(Purchase)|
+-----+-------------+
|18-25|    913848675|
|26-35|   2031770578|
| 0-17|    134913183|
|46-50|    420843403|
|51-55|    367099644|
|36-45|   1026569884|
|  55+|    200767375|
+-----+-------------+



In [99]:
## Apply sum, min, max, count with groupby to get different summary insight for each group. 
exprs = {x: "sum" for x in trainDF.columns}
trainDF.groupBy("Age").agg(exprs).show()

+-----+------------------+-----------------------+-------------------+-------------+------------+---------------+-------------------------------+-----------------------+--------+-----------+-----------------------+---------------+
|  Age|sum(City_Category)|sum(Product_Category_3)|sum(Marital_Status)|sum(Purchase)|sum(User_ID)|sum(Occupation)|sum(Stay_In_Current_City_Years)|sum(Product_Category_1)|sum(Age)|sum(Gender)|sum(Product_Category_2)|sum(Product_ID)|
+-----+------------------+-----------------------+-------------------+-------------+------------+---------------+-------------------------------+-----------------------+--------+-----------+-----------------------+---------------+
|18-25|              null|                 388041|              21116|    913848675| 99939196632|         671348|                       116997.0|                 509371|    null|       null|                 654936|           null|
|26-35|              null|                 846624|              86291|   203

## Joins

### Data dictionary :
* __RestGenInfo.csv__ contains :
    * placeID - Uniqued Id of restaurants
    * latitude - Location detail 
    * longitude - Location detail
    * name - Name of the restaurant
    * state - Name of the state 
    * alcohol - Constraints on having alcoholic beverages
    * smoking_area - Information for smokers
    * price - Pricing type of restaurant
    * franchise - Does the restaurant have frachise
    * area - open or close type of restaurant

* __Cuisine.csv__ contains :
    * placeID - Uniqued Id of restaurants
    * Rcuisine - Different styles of food

    
* __PaymentMode.csv__ contains :
    * placeID - Uniqued Id of restaurants
    * Rpayment - Different modes of payment

    
* __parking.csv__ contains :
     * placeID - Uniqued Id of restaurants
     * parking_lot - Different types of parking available

dsets_path = "academics/Batch44/CSE7322c/20181007_Batch 44_CSE 7322c_SparkSQL and DataFrames_Lab/"
/home/mahidharv/academics/Batch44/CSE7322c/20181007_Batch 44_CSE 7322c_SparkSQL and DataFrames_Lab

In [107]:
restoGen = spark.read.csv(dsets_path+'Datasets/RestGenInfo.csv', header=True, inferSchema=True,nullValue='?')
cuisine = spark.read.csv(dsets_path+'Datasets/Cuisine.csv', header=True, inferSchema=True)
paymentMode = spark.read.csv(dsets_path+'Datasets/PaymentMode.csv', header=True, inferSchema=True)
parking = spark.read.csv(dsets_path+'Datasets/parking.csv', header=True, inferSchema=True)

AnalysisException: u'Path does not exist: hdfs://bigdata/user/mahidharv/academics/Batch44/CSE7322c/20181007_Batch 44_CSE 7322c_SparkSQL and DataFrames_Lab/Datasets/RestGenInfo.csv;'

In [None]:
restoGen.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in restoGen.columns]).show()
cuisine.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in cuisine.columns]).show()
paymentMode.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in paymentMode.columns]).show()
parking.select([count(when(isnan(c)| col(c).isNull(), 1)).alias(c) for c in parking.columns]).show()

In [None]:
restoGen = restoGen.dropna()

In [None]:
restoGen.select('placeID').distinct().count()

cuisine.select('placeID').distinct().count()

cuisine.select('Rcuisine').distinct().count()

In [None]:
restoGen.createOrReplaceTempView('restoGenTable')
cuisine.createOrReplaceTempView('cuisineTable')
paymentMode.createOrReplaceTempView('paymentModeTable')
parking.createOrReplaceTempView('parkingTable')

In [None]:
 ##The  count of restaurants(as numberOfHotels) for each payment modes and area and order based on numberOfHotels in descending order.
    
spark.sql('''select  count(*) as numberOfHotels, Rpayment, area from
restoGenTable a join paymentModeTable b 
where a.placeID = b.placeID group by Rpayment, area 
order by numberOfHotels desc''').show()

In [None]:
inner_join = restoGen.join(paymentMode, restoGen.placeID == paymentMode.placeID,how='inner') 
inner_join.show(4)

count_of_hotels = inner_join.select('Rpayment','area').groupby('area','Rpayment').count()

count_of_hotels = count_of_hotels.withColumnRenamed('count','NumberofHotels')
count_of_hotels.show()

count_of_hotels.orderBy(count_of_hotels.NumberofHotels.desc()).show()

In [None]:
##  Count the number of Cuisines that are used by the Restaurants
print("The number of Disintct Cuisines =  ",cuisine.select('Rcuisine').distinct().count())
left_join = restoGen.join(cuisine,how = 'left',on=restoGen.placeID==cuisine.placeID)
left_join.select(countDistinct('Rcuisine')).show()

In [None]:
## Count the distinct restaurant names which has valet parking
spark.sql('''select distinct name ,parking_lot from restoGenTable a join parkingTable b where a.placeID = b.placeID and 
          b.parking_lot = 'valet parking' ''').show()

right_join = restoGen.join(other=parking,on=parking.placeID==restoGen.placeID,how='right')
names_of_restaurants = right_join.select('name','parking_lot').filter(parking.parking_lot=='valet parking')
names_of_restaurants.distinct().filter(names_of_restaurants.name!='null').show()

#### Natural Joins
Natural joins make implicit guesses at the columns on which you would like to join. 
It finds matching columns and returns the results. 
Left, right, and outer natural joins are all supported.

WARNING:
Implicit is always dangerous! 
The following query will give us incorrect results because 
the two DataFrames/tables share a column name (id), but it means different things in the datasets. 
You should always use this join with caution.

In [None]:
spark.sql('''select  * from restoGenTable a NATURAL JOIN paymentModeTable b ''').show(10)

#### Cross (Cartesian) Joins
Cross-joins in simplest terms are inner joins that do not specify a predicate. 
Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. 
This will cause an absolute explosion in the number of rows contained in the resulting DataFrame. 
If you have 1,000 rows in each DataFrame, the cross-join of these will result in 1,000,000 (1,000 x 1,000) rows. 
For this reason, you must very explicitly state that you want a cross-join by using the cross join keyword:

#### Random Samples

In [None]:
## To create a sample DataFrame from the base DataFrame
## Use sample operation to take sample of a DataFrame. 
## The sample method on DataFrame will return a DataFrame containing the sample of base DataFrame. 
## The sample method takes 3 parameters.
## withReplacement = True or False to select a observation with or without replacement.
## fraction = x, where x = .5 shows that we want to have 50% data in sample DataFrame.
## seed to reproduce the result
sampleDF1 = trainDF.sample(False, 0.2, 1234)
sampleDF2 = trainDF.sample(False, 0.2, 4321)
print(sampleDF1.count(), sampleDF2.count())

#### Random Splits

In [None]:
splitDF = trainDF.randomSplit([0.7, 0.3], seed=1234)
print(splitDF[0].count())
print(splitDF[1].count())

#### Map Transformation

In [None]:
## To apply map operation on DataFrame columns
## Apply a function on each row of DataFrame using map operation. 
## After applying this function, we get the result in the form of RDD. 
## Apply a map operation on User_ID column of train and print the first 5 elements of mapped RDD(x,1) 
## ----- Applying lambda function.

trainDF.select('User_ID').rdd.map(lambda x:(x,1)).take(5)

*__Prior to Spark 2.0, spark_df.map would alias to spark_df.rdd.map(). 
With Spark 2.0, you must explicitly call .rdd first.__*

#### Sorting Rows

In [None]:
## To sort the DataFrame based on column(s)
## Use orderBy operation on DataFrame to get sorted output based on some column. 
## The orderBy operation take two arguments.
## List of columns.
## ascending = True or False for getting the results in ascending or descending order(list in case of more than two columns )
## Sort the train DataFrame based on ‘Purchase’.
trainDF.orderBy(trainDF.Purchase.desc()).show(5)

#### Repartition and Coalesce
Another important optimization opportunity is to partition the data according to some frequently filtered columns
which controls the physical layout of data across the cluster including the partitioning scheme and the number of
partitions.

Repartition will incur a full shuffle of the data, regardless of whether or not one is necessary. This means that you should typically only repartition when the future number of partitions is greater than your current number of
partitions or when you are looking to partition by a set of columns.

In [101]:
## Find existing partitions count
trainDF.rdd.getNumPartitions()
## Do the repartition
## trainDF.repartition(5)

## Repartition based on a column
## If we know we are going to be filtering by a certain column often, 
## it can be worth repartitioning based on that column.
## trainDF.repartition(col(“Purchase”))

## We can optionally specify the number of partitions we would like too.
## trainDF.repartition(5, col(“Purchase”))

## Coalesce on the other hand will not incur a full shuffle and will try to combine partitions. 
## This operation will shuffle our data into 5 partitions based on the Purchase, 
## then coalesce them (without a full shuffle).
## trainDF.repartition(5, col("Purchase")).coalesce(2)

7

### Miscellaneous

#### Unions

In [102]:
df1 = spark.createDataFrame([[1, 'Alex', 25],[3, 'Carol', 53],[5, 'Emily', 25],[7, 'Gabriel', 32],[9, 'Ilma', 35],[11, 'Kim', 45]], ['id', 'name', 'age'])
df2 = spark.createDataFrame([[2, 'Ben', 66],[4, 'Daniel', 28],[6, 'Frank', 64],[8, 'Harley', 29],[10, 'Jack', 35],[12, 'Litmya', 45]], ['id', 'name', 'age'])
print("Before")
print("DataFrame-1")
print(df1.show())
print("DataFrame-2")
print(df2.show())
print("After")
df1 = df1.union(df2)
print("DataFrame-1")
print(df1.show())

Before
DataFrame-1
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
+---+-------+---+

None
DataFrame-2
+---+------+---+
| id|  name|age|
+---+------+---+
|  2|   Ben| 66|
|  4|Daniel| 28|
|  6| Frank| 64|
|  8|Harley| 29|
| 10|  Jack| 35|
| 12|Litmya| 45|
+---+------+---+

None
After
DataFrame-1
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|   Alex| 25|
|  3|  Carol| 53|
|  5|  Emily| 25|
|  7|Gabriel| 32|
|  9|   Ilma| 35|
| 11|    Kim| 45|
|  2|    Ben| 66|
|  4| Daniel| 28|
|  6|  Frank| 64|
|  8| Harley| 29|
| 10|   Jack| 35|
| 12| Litmya| 45|
+---+-------+---+

None


In [103]:
## To add the new column in DataFrame
## Use withColumn operation to add new column (we can also replace) in base DataFrame and return a new DataFrame. 
## The withColumn operation will take 2 parameters.
## Column name to be added /replaced.
## Expression on column.

## Derive new column, ‘Purchase_new’ in train which is calculated by dviding Purchase column by 2.

trainDF.withColumn('Purchase_new', trainDF.Purchase /2.0).select('Purchase','Purchase_new').show(5)

+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
|    7969|      3984.5|
+--------+------------+
only showing top 5 rows



In [104]:
## To drop a column in DataFrame
## To drop a column from the DataFrame use drop operation. 
## Drop the column called ‘Comb’ from the test and get the remaining columns in test dataframe
testDF.drop('Comb').columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

In [105]:
## To remove some categories of Product_ID column in test that are not present in Product_ID column in train
## Use an user defined function ( udf ) to remove the categories of a column which are in test but not in train.
## Calculate the categories in Product_ID column which are in test but not in train.
diff_cat_in_train_test=testDF.select('Product_ID').subtract(trainDF.select('Product_ID'))
diff_cat_in_train_test.count() # For distict count

46

In [106]:
diff_cat_in_train_test.show(2)

+----------+
|Product_ID|
+----------+
| P00322642|
| P00300142|
+----------+
only showing top 2 rows



In [107]:
## There are 46 different categories in test. 
## To remove these categories from the test ‘Product_ID’ column.

## Create the distinct list of categories called ‘not_found_cat’ from the diff_cat_in_train_test using map operation.
## Register a udf(user define function).
## User defined function will take each element of test column and search this in not_found_cat list and 
## it will put -1 ifit finds in this list otherwise it will do nothing.
not_found_cat = diff_cat_in_train_test.rdd.map(lambda x: x[0]).collect()
print(len(not_found_cat))
print(type(not_found_cat))
print(not_found_cat)

46
<type 'list'>
[u'P00322642', u'P00300142', u'P00077642', u'P00249942', u'P00294942', u'P00106242', u'P00239542', u'P00074942', u'P00092742', u'P00082142', u'P00030342', u'P00062542', u'P00063942', u'P00013042', u'P00279042', u'P00227242', u'P00359842', u'P00061642', u'P00042642', u'P0099542', u'P00306842', u'P00140842', u'P00165542', u'P00322842', u'P00268942', u'P00236842', u'P00038942', u'P00172942', u'P00012642', u'P00270342', u'P00312642', u'P00336842', u'P00105742', u'P00309842', u'P00166542', u'P00082642', u'P00253842', u'P00062242', u'P00100242', u'P00315342', u'P00058842', u'P00168242', u'P00156942', u'P00039042', u'P00056942', u'P00204642']


#### User Defined Functions - UDF

In [108]:
## Resister the udf, we need to import StringType from the pyspark.sql and udf from the pyspark.sql.functions. 
## The udf function takes 2 parameters as arguments:
## Return type (in my case StringType())
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

Function1 = udf(lambda x: '-1' if x in not_found_cat else x, StringType())

In [109]:
## In the above code function name is ‘Function1’ and we are putting ‘-1’  for not found catagories in test ‘Product_ID’. 
## Finally apply above ‘Function1’ function on test ‘Product_ID’ and take result in k for new column calles “NEW_Product_ID”.

k = testDF.withColumn("NEW_Product_ID",Function1(testDF["Product_ID"])).select('NEW_Product_ID')
k.where(k['NEW_Product_ID'] == -1).show(2)

+--------------+
|NEW_Product_ID|
+--------------+
|            -1|
|            -1|
+--------------+
only showing top 2 rows



In [110]:
## See the results by again calculating the different categories in k and train subtract operation.
diff_cat_in_train_test=k.select('NEW_Product_ID').subtract(trainDF.select('Product_ID'))
print(diff_cat_in_train_test.count())# For distinct count
print(diff_cat_in_train_test.distinct().count())# For distinct count

1
1


In [111]:
## The output 1 means we have now only 1 different category k and train.
diff_cat_in_train_test.distinct().collect()

[Row(NEW_Product_ID=u'-1')]

In [112]:
from pyspark.sql.functions import lit, round, bround
trainDF.select(round(lit("2.5")), bround(lit(2.5))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [113]:
spark.sql("SELECT round(2.5), bround(2.5)").show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|            3|             2|
+-------------+--------------+



In [114]:
from pyspark.sql.functions import corr
print(trainDF.stat.corr("Purchase", "Product_Category_1"))
trainDF.select(corr("Purchase", "Product_Category_1")).show()

-0.34370334592
+----------------------------------+
|corr(Purchase, Product_Category_1)|
+----------------------------------+
|              -0.34370334591990875|
+----------------------------------+



In [115]:
spark.sql("SELECT corr(Purchase, Product_Category_1) FROM trainDFTable").show()

+------------------------------------------------------------------+
|corr(CAST(Purchase AS DOUBLE), CAST(Product_Category_1 AS DOUBLE))|
+------------------------------------------------------------------+
|                                              -0.34370334591990875|
+------------------------------------------------------------------+



In [116]:
trainDF.stat.freqItems(["Age"]).show(truncate = False)

+----------------------------------------------+
|Age_freqItems                                 |
+----------------------------------------------+
|[26-35, 18-25, 55+, 46-50, 51-55, 36-45, 0-17]|
+----------------------------------------------+



#### String Manipulations

In [117]:
!ls

CatalystOptimizer.ipynb  Spark_DataFrames_SQL-Copy1.ipynb
SalesData		 Spark_DataFrames_SQL.ipynb


In [118]:
!pwd

/nfsroot/data/home/mahidharv/academics/Batch44/CSE7322c/20181007_Batch 44_CSE 7322c_SparkSQL and DataFrames_Lab


In [119]:
!ls SalesData

cat.jpg  test.csv  train.csv


In [120]:
!hdfs dfs -rm -r /user/mahidharv/SalesData

19/04/13 21:57:18 INFO fs.TrashPolicyDefault: Moved: 'hdfs://bigdata/user/mahidharv/SalesData' to trash at: hdfs://bigdata/user/mahidharv/.Trash/Current/user/mahidharv/SalesData


In [121]:
!hdfs dfs -mkdir /user/mahidharv/SalesData

In [122]:
!hdfs dfs -chmod 777 /user/mahidharv/SalesData

In [123]:
!hdfs dfs -copyFromLocal /home/mahidharv/academics/Batch44/CSE7322c/20181007_Batch 44_CSE 7322c_SparkSQL and DataFrames_Lab/SalesData/* /user/mahidharv/SalesData


copyFromLocal: `/home/mahidharv/academics/Batch44/CSE7322c/20181007_Batch': No such file or directory
copyFromLocal: `44_CSE': No such file or directory
copyFromLocal: `7322c_SparkSQL': No such file or directory
copyFromLocal: `and': No such file or directory
copyFromLocal: `DataFrames_Lab/SalesData/*': No such file or directory


In [124]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

trainDF.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp"))\
.show(2)

+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 2 rows



In [125]:
spark.sql("""SELECT
ltrim(' HELLLOOOO '),
rtrim(' HELLLOOOO '),
trim(' HELLLOOOO '),
lpad('HELLOOOO ', 3, ' '),
rpad('HELLOOOO ', 10, ' ')
FROM
trainDFTable""").show(2)

+------------------+------------------+-----------------+---------------------+----------------------+
|ltrim( HELLLOOOO )|rtrim( HELLLOOOO )|trim( HELLLOOOO )|lpad(HELLOOOO , 3,  )|rpad(HELLOOOO , 10,  )|
+------------------+------------------+-----------------+---------------------+----------------------+
|        HELLLOOOO |         HELLLOOOO|        HELLLOOOO|                  HEL|            HELLOOOO  |
|        HELLLOOOO |         HELLLOOOO|        HELLLOOOO|                  HEL|            HELLOOOO  |
+------------------+------------------+-----------------+---------------------+----------------------+
only showing top 2 rows



#### Regular Expressions

In [126]:
from pyspark.sql.functions import regexp_replace
regex_string = "F|M"

trainDF.select(
regexp_replace(col("Gender"), regex_string, "MALE_OR_FEMALE")
.alias("Gender_DECODE"),
col("Gender"))\
.show(2)

+--------------+------+
| Gender_DECODE|Gender|
+--------------+------+
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
+--------------+------+
only showing top 2 rows



In [127]:
spark.sql("""
SELECT
regexp_replace(Gender, 'F|M', 'MALE_OR_FEMALE') as
Gender_DECODE,
Gender
FROM
trainDFTable
""").show(2)

+--------------+------+
| Gender_DECODE|Gender|
+--------------+------+
|MALE_OR_FEMALE|     F|
|MALE_OR_FEMALE|     F|
+--------------+------+
only showing top 2 rows



In [128]:
from pyspark.sql.functions import translate
trainDF.select(
translate(col("Gender"), "FM", "01"),
col("Gender"))\
.show(10)

+-------------------------+------+
|translate(Gender, FM, 01)|Gender|
+-------------------------+------+
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
+-------------------------+------+
only showing top 10 rows



In [129]:
spark.sql("""
SELECT
translate(Gender, 'FM', '01'),
Gender
FROM
trainDFTable
""").show(10)

+-------------------------+------+
|translate(Gender, FM, 01)|Gender|
+-------------------------+------+
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        0|     F|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
|                        1|     M|
+-------------------------+------+
only showing top 10 rows



#### Working with Date and Time

In [130]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.show()

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2019-04-13|2019-04-13 21:57:...|
|  1|2019-04-13|2019-04-13 21:57:...|
|  2|2019-04-13|2019-04-13 21:57:...|
|  3|2019-04-13|2019-04-13 21:57:...|
|  4|2019-04-13|2019-04-13 21:57:...|
|  5|2019-04-13|2019-04-13 21:57:...|
|  6|2019-04-13|2019-04-13 21:57:...|
|  7|2019-04-13|2019-04-13 21:57:...|
|  8|2019-04-13|2019-04-13 21:57:...|
|  9|2019-04-13|2019-04-13 21:57:...|
+---+----------+--------------------+



In [131]:
dateDF.createOrReplaceTempView("dateDFTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [132]:
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 5),date_add(col("today"), 5)).show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2019-04-08|        2019-04-18|
+------------------+------------------+
only showing top 1 row



In [133]:
spark.sql("""
SELECT
date_sub(today, 5),
date_add(today, 5)
FROM
dateDFTable
""").show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2019-04-08|        2019-04-18|
+------------------+------------------+
only showing top 1 row



In [134]:
from pyspark.sql.functions import datediff, months_between, to_date
dateDF\
.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today")))\
.show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [135]:
dateDF\
.select(
to_date(lit("2017-01-01")).alias("start"),
to_date(lit("2018-02-18")).alias("end"))\
.select(months_between(col("start"), col("end")))\
.show(1)

+--------------------------+
|months_between(start, end)|
+--------------------------+
|               -13.5483871|
+--------------------------+
only showing top 1 row



In [136]:
spark.sql("""
SELECT
to_date('2016-01-01'),
months_between('2016-01-01', '2017-01-01'),
datediff('2016-01-01', '2017-01-01')
FROM
dateDFTable
""").show(2)

+---------------------+----------------------------------------------------------------------------+------------------------------------------------------------+
|to_date('2016-01-01')|months_between(CAST(2016-01-01 AS TIMESTAMP), CAST(2017-01-01 AS TIMESTAMP))|datediff(CAST(2016-01-01 AS DATE), CAST(2017-01-01 AS DATE))|
+---------------------+----------------------------------------------------------------------------+------------------------------------------------------------+
|           2016-01-01|                                                                       -12.0|                                                        -366|
|           2016-01-01|                                                                       -12.0|                                                        -366|
+---------------------+----------------------------------------------------------------------------+------------------------------------------------------------+
only showing top 2 rows



In [137]:
from pyspark.sql.functions import to_date, lit
spark.range(5).withColumn("date", lit("2017-01-01"))\
.select(to_date(col("date")))\
.show()

+---------------+
|to_date(`date`)|
+---------------+
|     2017-01-01|
|     2017-01-01|
|     2017-01-01|
|     2017-01-01|
|     2017-01-01|
+---------------+



__WARNING__
<br>Spark will not throw an error if it cannot parse the date, it’ll just return null. This can be a bit tricky in larger pipelines because you may be expecting your data in one format and getting it in another. To illustrate, let’s take a look at the date format that has switched from year-month-day to year-day-month. Spark will fail to parse this date and silently return null instead.

In [138]:
### 2016-20-12 - year-day-month
### 2017-12-11 - year-month-day
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

+---------------------+---------------------+
|to_date('2016-20-12')|to_date('2017-12-11')|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row



In [139]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

dateFormat = "yyyy-dd-MM"

cleanDateDF = spark.range(1)\
.select(to_date(unix_timestamp(lit("2017-12-11"), dateFormat)
.cast("timestamp"))\
.alias("date"),
to_date(unix_timestamp(lit("2017-20-12"), dateFormat)
.cast("timestamp"))\
.alias("date2"))

cleanDateDF.show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [140]:
cleanDateDF.createOrReplaceTempView("dateTable2")

spark.sql("""
SELECT
to_date(cast(unix_timestamp(date, 'yyyy-dd-MM') as timestamp)),
to_date(cast(unix_timestamp(date2, 'yyyy-dd-MM') as timestamp)),
to_date(date)
FROM
dateTable2
""").show()

+---------------------------------------------------------------------------+----------------------------------------------------------------------------+--------------------------+
|to_date(CAST(unix_timestamp(datetable2.`date`, 'yyyy-dd-MM') AS TIMESTAMP))|to_date(CAST(unix_timestamp(datetable2.`date2`, 'yyyy-dd-MM') AS TIMESTAMP))|to_date(datetable2.`date`)|
+---------------------------------------------------------------------------+----------------------------------------------------------------------------+--------------------------+
|                                                                 2017-11-12|                                                                  2017-12-20|                2017-11-12|
+---------------------------------------------------------------------------+----------------------------------------------------------------------------+--------------------------+



In [141]:
cleanDateDF\
.select(unix_timestamp(col("date"), dateFormat).cast("timestamp"))\
.show()

+---------------------------------------------------+
|CAST(unix_timestamp(date, yyyy-dd-MM) AS TIMESTAMP)|
+---------------------------------------------------+
|                                2017-11-12 00:00:00|
+---------------------------------------------------+



In [142]:
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [143]:
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [144]:
textDF = spark.range(10).withColumn("Description", lit("This is long string"))
textDF.show()

+---+-------------------+
| id|        Description|
+---+-------------------+
|  0|This is long string|
|  1|This is long string|
|  2|This is long string|
|  3|This is long string|
|  4|This is long string|
|  5|This is long string|
|  6|This is long string|
|  7|This is long string|
|  8|This is long string|
|  9|This is long string|
+---+-------------------+



In [145]:
from pyspark.sql.functions import split
textDF.select(split(col("Description"), " ")).show(2)

+---------------------+
|split(Description,  )|
+---------------------+
| [This, is, long, ...|
| [This, is, long, ...|
+---------------------+
only showing top 2 rows



In [146]:
textDF.createOrReplaceTempView('textDFTable')

spark.sql("""
SELECT
split(Description, ' ')
FROM
textDFTable
""").show(2)

+---------------------+
|split(Description,  )|
+---------------------+
| [This, is, long, ...|
| [This, is, long, ...|
+---------------------+
only showing top 2 rows



#### User-Defined Functions

In [147]:
udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):
    return double_value ** 3

power3(2.0)

8.0

Once the function is created, we need to register them with Spark so that we can used
them on all of our worker machines. Spark will serialize the function on the driver, and transfer it over the network to all executor processes. This happens regardless of language.

<br>Once we go to use the function, there are essentially two different things that occur. If the function is written in Scala or Java then we can use that function within the JVM. This means there will be little performance penalty aside from the fact that we can’t take advantage of code generation capabilities that Spark has for built-in functions.

<br>If the function is written in Python, something quite different happens. 
Spark will start up a python process on the worker, serialize all of the data to a format that python can understand (remember it was in the JVM before), execute the function row by row on that data in the python process, before finally returning the results of the row operations to the JVM and Spark.

![UDF_Spark_Python](./Images/UDF_Spark_Python.png)

In [148]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [149]:
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show()

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+



#### UDF Written in Scala
#### Try in spark-shell
val udfExampleDF = spark.range(5).toDF("num")

def power3(number:Double):Double = {
<br>          number X number X number
<br>}

power3(2.0)

![UDF_Scala_PySpark](./Images/Scala_UDF.png)

In [151]:
from pyspark.sql import Row
myList = [('Alpha',25),('Beta',22),('Charlie',20),('Delta',22), ('Echo',21),('France',22),('Gamma',23)]
rdd = spark.sparkContext.parallelize(myList)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [152]:
people.collect()

[Row(age=25, name='Alpha'),
 Row(age=22, name='Beta'),
 Row(age=20, name='Charlie'),
 Row(age=22, name='Delta'),
 Row(age=21, name='Echo'),
 Row(age=22, name='France'),
 Row(age=23, name='Gamma')]

In [153]:
schemaPeople = spark.createDataFrame(people)

# check the type of schemaPeople.
type(schemaPeople)


pyspark.sql.dataframe.DataFrame

In [154]:
schemaPeople.show()

+---+-------+
|age|   name|
+---+-------+
| 25|  Alpha|
| 22|   Beta|
| 20|Charlie|
| 22|  Delta|
| 21|   Echo|
| 22| France|
| 23|  Gamma|
+---+-------+



In [155]:
udfExampleDF = spark.range(5).toDF("num")

In [156]:
udfExampleDF.show()

+---+
|num|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



### Distributed Shared Variables

#### Broadcast Variables

In [157]:
my_collection = "Postgraduate Program in Big Data Analytics and Optimization"\
  .split(" ")
    
words = spark.sparkContext.parallelize(my_collection)

In [158]:
words.getNumPartitions()

8

In [159]:
words.take(3)

['Postgraduate', 'Program', 'in']

In [160]:
supplementalData = {"Postgraduate":1000, "Analytics":200, "Optimization": 400,
                    "Big":-300, "Data": 100, "Program":100}

In [161]:
suppBroadcast = spark.sparkContext.broadcast(supplementalData)

In [162]:
suppBroadcast.value

{'Analytics': 200,
 'Big': -300,
 'Data': 100,
 'Optimization': 400,
 'Postgraduate': 1000,
 'Program': 100}

In [163]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
  .sortBy(lambda wordPair: wordPair[1])\
  .collect()

[('Big', -300),
 ('in', 0),
 ('and', 0),
 ('Program', 100),
 ('Data', 100),
 ('Analytics', 200),
 ('Optimization', 400),
 ('Postgraduate', 1000)]

#### Accumulators

In [164]:
cwgDF = spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("file:///home/mahidharv/SparkSql_Activity/XXI_Commonwealth_Games.csv")

In [165]:
cwgDF.show(5)

+---+----------+------------+----+------+------+-----+
|Seq|NationCode|  NationName|Gold|Silver|Bronze|Total|
+---+----------+------------+----+------+------+-----+
|  1|       AUS|   Australia|  60|    45|    46|  151|
|  2|       ENG|     England|  28|    31|    24|   83|
|  3|       IND|       India|  14|     6|     9|   29|
|  4|       CAN|      Canada|  11|    26|    19|   56|
|  5|       RSA|South Africa|  11|     9|    12|   32|
+---+----------+------------+----+------+------+-----+
only showing top 5 rows



In [166]:
cwgDF.schema

StructType(List(StructField(Seq,IntegerType,true),StructField(NationCode,StringType,true),StructField(NationName,StringType,true),StructField(Gold,IntegerType,true),StructField(Silver,IntegerType,true),StructField(Bronze,IntegerType,true),StructField(Total,IntegerType,true)))

#### Define Accumulator

In [167]:
accIND = spark.sparkContext.accumulator(0)

In [168]:
def accINDFunc(each_row):
    countryCD = each_row["NationCode"]
    list_ctrys = ["IND", "SRI", "PAK", "BAN"]
    if countryCD in list_ctrys:
        accIND.add(each_row["Total"])

In [169]:
cwgDF.foreach(lambda each_row: accINDFunc(each_row))

In [170]:
accIND

Accumulator<id=0, value=38>