# Chapter 2: Spark Structured API

[**2.1 RDD**](#2.1-RDD)   
[**2.2 DataFrames API**](#2.2-DataFrames-API)   
[**2.3 Spark Data Types**](#2.3-Spark-Data-Types)  
[**2.3.1 Data Types**](#2.3.1-Data-Types)   
[**2.4 DataFrames**](#2.4-DataFrames)   
[**2.4.1 Schemas**](#2.4.1-Schemas)     
[**2.4.2 Columns and Expressions**](#2.4.2-Columns-and-Expressions)  
[**2.4.3 Rows**](#2.4.3-Rows)  
[**2.5 Spark Data Sources**](#2.5-Spark-Data-Sources)  
[**2.5.1 DataFrameReader**](#2.5.1-DataFrameReader)   
[**2.5.2 DataFrameWriter**](#2.5.2-DataFrameWriter)   
[**2.6 DataFrame Operations**](#2.6-DataFrame-Operations)   
[**2.6.1 Creating DataFrames**](#2.6.1-Creating-DataFrames)  
[**2.6.2 Projections**](#2.6.2-Projections)   
[**2.6.3 Renaming Columns**](#2.6.3-Renaming-Columns)   
[**2.6.4 Dropping Columns**](#2.6.4-Dropping-Columns)   
[**2.6.5 Adding Columns**](#2.6.5-Adding-Columns)   
[**2.6.6 Changing Column Types**](#2.6.6-Changing-Column-Types)   
[**2.6.7 Filtering Rows**](#2.6.7-Filtering-Rows)   
[**2.6.8 Limit Rows**](#2.6.8-Limit-Rows)   
[**2.6.9 Distinct Rows**](#2.6.9-Distinct-Rows)   
[**2.6.10 Sorting Rows**](#2.6.10-Sorting-Rows)  

#### 2.1 RDD
**RDD**: Resilient Distributed Dataset (RDD) is a Spark's lower-level APIs. RDD are only used when the high level API for Dataset, DataFrame and SQL doesn't satisfy the business or logical requirement for solving the problems. Basically low-level API are used to manipulate RDD and manipulate shared variables i.e. broadcast variable and accumulators.  
The main use case for using low-level API are:-  
* When high-level APIs doesn't meet the business requirement for manipulation business logics.  
* Utilizing and manipulating custom shared variables.  
* Maintain existing codes written using RDDs.  
* To obtain coarse granularity for controlling partitioning of data over cluster for performance and optimization.

**Brief on RDD**  
All the high level data structure (DataFrames, Datasets) in Spark are logically converted in RDD internally during job execution. RDDs is an immutable, partitioned collection of records. RDDs is an object of records. We can perform any operation on these objects. 

**Properties of RDD**. 
* A list of partitions.
* A function for compution each split.
* A list of dependencies on other RDDs.
* A Partitioner for key-value RDDs.
* A list of preferred locations to compute each split

**Compare RDDs with DataFrame and Datasets**  
RDDs has same operating Spark paradigms for executing jobs similar to DataFrame and Datasets. RDDs has both tranformations and actions. But there is no theorical concept of rows/records in RDDs. Each records are treated as raw Java/Scala/Python objects. RDDs also provides APIs for Scala, Java and Python. Some performance are lost while using Python APIs compared to Scala and Java for manipulation RDDs.

In [27]:
# Open pySpark console and run the following codes:
"""
Example: 1
dataset = range(1,1000)
spark
sc
dataset
dataset[5]
datasetRDD = sc.parallelize(dataset,2)  # create RDD
datasetRDD
datasetRDD.take(5)  # display first 5 elements from RDD
datasetRDD.count() # Count total elements in 
datasetRDD.first()  # Display first element in RDD
datasetRDD.top(10) # Display top elements in RDD
datasetRDD.filter(lambda x: x%2 == 0).collect() # Filter only even number

Example: 2
userComment = "I like to visit New York. The public transportation are very reliable.".split(" ")
allwords = sc.parallelize(userComment, 2)
allwords.setName("Comments")
allwords.name()
allwords.count()
allwords.distinct().count()
allwords.foreach(print)
def likewords(words):
    return words.startswith("like")
allwords.filter(lambda word: likewords(word)).collect()
"""

# Similar we can perform various transformation such as: distinct, filter, map, flatmap, sort
# Various Actions such as: reduce, count, countapprox, countapproxdistinct, countbyvalue, first, max, min, take

"""
Saving output to file
allwords.saveAsTextFile("file:/tmp/sparkwordSplit")
## Check the file on /tmp/sparkwordSplit
"""


"""
Example: 3
# create an RDDs of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)])
# use map and reduceByKey transformations with their 
# lambda expressions to aggregate and then compute average
agesRDD = dataRDD.map(lambda x,y: (x, (y,1))) \
          	.reduceByKey(lambda x,y: (x[0] + y[0], x[1] + y[1])) \
           .map(lambda x, y, z: (x, y / z))
"""

"""
Example: 3 simplified with avg function

from pyspark.sql.functions import avg

# create a DataFrame using SparkSession
dataDF = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])
# Group the same names together, aggregate their age, and compute an average
avgDF = dataDF.groupBy("name").agg(avg("age"))
# show the results of the final execution
avgDF.show()

"""

'\nExample: 3 simplified with avg function\n\nfrom pyspark.sql.functions import avg\n\n# create a DataFrame using SparkSession\ndataDF = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])\n# Group the same names together, aggregate their age, and compute an average\navgDF = dataDF.groupBy("name").agg(avg("age"))\n# show the results of the final execution\navgDF.show()\n\n'

In [1]:
# Executing Python Word Count Script in Spark
!echo $SPARK_HOME   # print spark home
!ls -1 $SPARK_HOME/examples/src/main/python # list all file on given location
## Step 1: Let's try to understand the python code from wordcount.py script
## Step 2: Copy the wordcount.py on your script location. i.e. under Chapter_2/script
## Step 3: Create any file on same location based on your choice. Choose some interesting topic.
## For example: Let's choose current business, entertainment, political, scientific, sport news. And copy and paste in file.
## This example uses State of union of 2019 from the link below
## https://www.cnn.com/2019/02/05/politics/donald-trump-state-of-the-union-2019-transcript/index.html

# Step 4: copy the wordcount script
## !mv $SPARK_HOME/examples/src/main/python/wordcount.py destination_directory
## Step 5: Run the script using command below
## spark-submit wordcount.py state_of_union_2019.txt 
## Try to understand the output and look the SparkUI on http://localhost:4040
## Assignment: Order the top word on descending order, Remove any stopwords, Save the output to file and create wordcloud.

/usr/local/apache-spark/spark-3.0.0
1.txt
[1m[32mals.py[m[m
avro_inputformat.py
[1m[32mkmeans.py[m[m
[1m[32mlogistic_regression.py[m[m
[1m[34mml[m[m
[1m[34mmllib[m[m
[1m[32mpagerank.py[m[m
parquet_inputformat.py
[1m[32mpi.py[m[m
[1m[32msort.py[m[m
[1m[34msql[m[m
status_api_demo.py
[1m[34mstreaming[m[m
[1m[32mtransitive_closure.py[m[m
[1m[32mwordcount.py[m[m


Copy and paste the code below in new python file. Execute the code using command below:   
`spark-submit script_name.py`   
Assignment: Try to read the comment from file and store the output to file instead of console.

In [64]:
"""
# Code Snippet 1: Creating RDD in PySpark
# import SparkContext and config
from pyspark import SparkContext
from pyspark import SparkConf

SPARK_MASTER='local'
SPARK_APP_NAME='Word StartingwithS'

conf = SparkConf().setMaster(SPARK_MASTER) \
        .setAppName(SPARK_APP_NAME)
    
sc = SparkContext(conf=conf)

userComment = "I like to visit New York. The public transportation are very reliable. There are many shopping complex and sports actitivies. Shopping is always fun in Manhattan"

listWords = userComment.split(" ")
allwords = sc.parallelize(listWords, 2)

def likewords(words):
    return words.lower().startswith("s")
    
filter_words = allwords.filter(lambda word: likewords(word)).collect()

for w in filter_words:
    print(w)

"""

'\n# Code Snippet 1: Creating RDD in PySpark\n# import SparkContext and config\nfrom pyspark import SparkContext\nfrom pyspark import SparkConf\n\nSPARK_MASTER=\'local\'\nSPARK_APP_NAME=\'Word StartingwithS\'\n\nconf = SparkConf().setMaster(SPARK_MASTER)         .setAppName(SPARK_APP_NAME)\n    \nsc = SparkContext(conf=conf)\n\nuserComment = "I like to visit New York. The public transportation are very reliable. There are many shopping complex and sports actitivies. Shopping is always fun in Manhattan"\n\nlistWords = userComment.split(" ")\nallwords = sc.parallelize(listWords, 2)\n\ndef likewords(words):\n    return words.lower().startswith("s")\n    \nfilter_words = allwords.filter(lambda word: likewords(word)).collect()\n\nfor w in filter_words:\n    print(w)\n\n'

**SparkContext and SparkSession**

Prior to Spark 2.0, `SparkContext` was the entry point of all Spark application which is used to access all Spark features and Spark Configuration. After Spark 2.0, `SparkSession` is an unified entry point of all Spark Application. Spark Session interact with various Spark's functionality with less amount of constructs. In Spark 2.0, various context such as Spark context, hive context, SQL context are all encapsulated in a Spark Session. So Spark session combines all the different contexts.

Figure 2.1: SparkSession
![Figure 1](spark_session.png)

**Spark Session**. 
Spark Session is created using a builder pattern. The spark session builder tries to get spark session if it already exists or create new session if it doesn't exits.

Example below shows creating Spark session in Python.

In [1]:
# Creating Spark session in Python
from pyspark.sql import SparkSession
SparkSession.builder \
     .master("local") \
     .appName("AppsName") \
     .config("config.option", "value") \
     .getOrCreate()

**Note**: If there is an Py4JError like below.        
`Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM`  
**Fixes**: Set environment variable for PYTHONPATH in bash_profile.     
export SPARK_HOME="/usr/local/apache-spark/spark-3.0.0"  
export PYTHONPATH="$SPARK_HOME/python"

#### 2.2 DataFrames API
Spark DataFrame are distributed table-like collection of rows and named columns with a schema. Each columns has  a specific data types and must have same number of rows which also accepts `null` values. Like RDDs, DataFrames are immutable and keeps lineage of all transformation including the columns. It uses lazy evaluation to perform operation. An action in DataFrame performs transformation and return results.

Figure 2.1(a): Spark Catalyst Optimizer  
![Figure 2.1(a)](spark_catalyst_optimizer.png)
 

Figure 2.1(b): Structured API Logical Plan$^1$  
![Figure 2.1(b)](https://databricks.com/wp-content/uploads/2018/05/Catalyst-Optimizer-diagram.png)

#### 2.3 Spark Data Types
Spark has its own internal data type representation. Data types are define while creating schema for writing Spark application. The code snippet below show the data type definition for Scala, Java and Python.  

**`Note: In this course we will concentrate on Python using PySpark APIs`**. 


**Scala types:**   
`
import org.apache.spark.sql.types._  
val firstname = ByteType`  

**Java types:**  
`
import org.apache.spark.sql.type.DataTypes;  
ByteType firstname = DataTypes.ByteType  
`

**Python type:**  
`
from pyspark.sql.types import *  
firstname = ByteType()
`

#### 2.3.1 Data Types

Table 2.1: Python Basic Data Types in Spark$^2$    

| Types | Data Type | Value Assigned in Python | APIs to instantiate |
| --------- | --------- | ------------------------ | ------------------- |  
| Numeric Types | ByteType | int or long. Number will be converted to 1-byte signed integer number during runtime. Range of number -128 to 127. | ByteType() |
| Numeric Types | ShortType | int or long. Number will be converted to 2-byte signed integer number during runtime. Range of number -32768 to 32767. | ShortType() |
| Numeric Types | IntegerType | int or long	| IntegerType() |
| Numeric Types | LongType | Long. Number will be converted to 8-byte signed integer number during runtime. Range of number -9223372036854775808 to 9223372036854775807 | LongType() |
| Numeric Types | FloatType | float. Number will be conveted to 4-byte single-precision floating-point during runtime. | FloatType() |
| Numeric Types | DoubleType | float | DoubleType() |
| Numeric Types | DecimalType | decimal.Decimal | DecimalType() |
| String Types | StringType | string | StringType() |
| Binary Type | BinaryType | bytearray | BinaryType() |
| Boolean Type | BooleanType | bool | BooleanType() |
| Datetime Type | TimestampType | datetime.datetime | TimestampType() | 
| Datetime Type | DateType | datetime.date | DateType() |
| Complex Type | ArrayType | list, tuple, or array | ArrayType(dataType, [containsNull]). The default value for containsNull is True.  |
| Complex Type | MapType | dict | MapType(keyType, valueType, [valueContainsNull]). The default value for valueContainsNull is True. |
| Complex Type | StructType | list or tuple. | StructType(fields). Fields is a list of StructFields. Fields with same name are not accepted. |
| Complex Type | StructField | The value type in Python of the data type of this field. Like Int for StructField with data type IntegerType | StructField(name, dataType, [nullable]). The default value of nullabe is True. |

#### 2.4 DataFrames
DataFrame is composed of :-  
* Columns
* Rows

DataFrame is similar to spreadSheet, table, pandas DataFrame or any other object that holds column definition and contains rows/record.

#### 2.4.1 Schemas
Schema defines the column names and its associated data types for a DataFrame. Schema is very important while reading data from structured DataSources. The benefit of defining schema in Spark are:-  
* Reduce Spark Engine to infer data types, where Spark need to read all data to identify types.
* Early error detection when data doesn't match with the schema.

Schema can be defined in two ways:-  
1. automatically while reading the data which is also known as schema-on-read.
2. manually by defining ourself explicitly before reading the data. 

Defining schema varies upon the use-case. For e.g. In ETL, defining schema explicitly is best practics. In adhoc reporting, schema-on-read is easier.


* Schema is a **`StructType`** made from number of fields.
* **`StructField`** has column name, data type, boolean flag (for defining missing or null values) and optional metadata for the column.
* Schema can also contain complex types i.e. **`StructType`** of **`StructType`**

We'll create simple schema in Python 

In [2]:
# Python code to create schema and associate data with given schema

# Load types
from pyspark.sql.types import *
from pyspark.sql import SparkSession

# Create Schema
empSchema = StructType([
    StructField("emp_no", IntegerType(), False),
    StructField("birth_date", StringType(), False),
    StructField("FIRST_NAME", StringType(), False),
    StructField("LAST_NAME", StringType(), False),
    StructField("GENDER", StringType(), False),
    StructField("hire_date", StringType(), False)
])

# Create Data
empData = [
    [1, "01/19/1950", 'Jimmy', 'Doe', 'M', "1975-01-15"],
    [2, "08/15/1952", 'Smith', 'Butler', 'F', "1972-01-25"],
    [3, "12/10/1953", 'David', 'Jackson', 'M', "1980-02-22"],
    [4, "05/25/1960", 'Jina', 'Unknown', 'F', "1990-10-15"],
    [5, "12/16/1945", 'Smith', 'Unknown', 'F', "1965-07-05"],
    [6, "03/12/1980", 'Jim', 'Unknown', 'NA', "2005-12-20"],
    [7, "08/23/1981", 'Jimmy', 'Unknown', 'F', "2015-11-23"],
    [8, "07/1975/24", 'Jimmy', 'Unknown', 'NA', "2019-07-11"],
    [9, "02/21/1990", 'Jimmy', 'Unknown', 'F', "2018-05-07"],
    [10, "01/18/1991", 'Jimmy', 'Unknown', 'NA', "2014-12-01"],  
]

# Create Sparksession
spark = SparkSession.builder.appName("Employee").getOrCreate()

# Create DataFrame
employee_df = spark.createDataFrame(empData,empSchema)
print()
# print schema
print(employee_df.printSchema())
# show dataframe
employee_df.show()


root
 |-- emp_no: integer (nullable = false)
 |-- birth_date: string (nullable = false)
 |-- FIRST_NAME: string (nullable = false)
 |-- LAST_NAME: string (nullable = false)
 |-- GENDER: string (nullable = false)
 |-- hire_date: string (nullable = false)

None
+------+----------+----------+---------+------+----------+
|emp_no|birth_date|FIRST_NAME|LAST_NAME|GENDER| hire_date|
+------+----------+----------+---------+------+----------+
|     1|01/19/1950|     Jimmy|      Doe|     M|1975-01-15|
|     2|08/15/1952|     Smith|   Butler|     F|1972-01-25|
|     3|12/10/1953|     David|  Jackson|     M|1980-02-22|
|     4|05/25/1960|      Jina|  Unknown|     F|1990-10-15|
|     5|12/16/1945|     Smith|  Unknown|     F|1965-07-05|
|     6|03/12/1980|       Jim|  Unknown|    NA|2005-12-20|
|     7|08/23/1981|     Jimmy|  Unknown|     F|2015-11-23|
|     8|07/1975/24|     Jimmy|  Unknown|    NA|2019-07-11|
|     9|02/21/1990|     Jimmy|  Unknown|     F|2018-05-07|
|    10|01/18/1991|     Jimmy| 

In [3]:
employee_df.show(5)

+------+----------+----------+---------+------+----------+
|emp_no|birth_date|FIRST_NAME|LAST_NAME|GENDER| hire_date|
+------+----------+----------+---------+------+----------+
|     1|01/19/1950|     Jimmy|      Doe|     M|1975-01-15|
|     2|08/15/1952|     Smith|   Butler|     F|1972-01-25|
|     3|12/10/1953|     David|  Jackson|     M|1980-02-22|
|     4|05/25/1960|      Jina|  Unknown|     F|1990-10-15|
|     5|12/16/1945|     Smith|  Unknown|     F|1965-07-05|
+------+----------+----------+---------+------+----------+
only showing top 5 rows



#### 2.4.2 Columns and Expressions

**Columns**: Columns defines the data types and holds values. Columns in Spark DataFrame are similar to columns in any database table, Pandas DataFrames, spreadsheet etc. Columns can be referred and constructed in multiple ways. The simplest method is using function below:     
`col()`  
`column()`  
Both functions takes `column name` as argument. In Python, we can use either functions shown below to refer column.

In [4]:
from pyspark.sql.functions import col, column
# We'll use the existing employee dataframe 
col("emp_no")
column("FIRST_NAME")

Column<b'FIRST_NAME'>

In [5]:
employee_df.select("emp_no").show()

+------+
|emp_no|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
+------+



In [6]:
employee_df.select("emp_no" * 10).show()

AnalysisException: cannot resolve '`emp_noemp_noemp_noemp_noemp_noemp_noemp_noemp_noemp_noemp_no`' given input columns: [FIRST_NAME, GENDER, LAST_NAME, birth_date, emp_no, hire_date];;
'Project ['emp_noemp_noemp_noemp_noemp_noemp_noemp_noemp_noemp_noemp_no]
+- LogicalRDD [emp_no#0, birth_date#1, FIRST_NAME#2, LAST_NAME#3, GENDER#4, hire_date#5], false


In [7]:
employee_df.select(col("emp_no") * 10).show()

+-------------+
|(emp_no * 10)|
+-------------+
|           10|
|           20|
|           30|
|           40|
|           50|
|           60|
|           70|
|           80|
|           90|
|          100|
+-------------+



**Expressions**: Expression is an operation and transformation performed in a columns. With expressions, the columns values can manipulated and modified. `expr` is a *pyspark.sql.functions* and *org.apache.spark.sql.functions* package. The expression created with `expr` functions always reference to DataFrame column. i.e  
`expr("ColumnName")` is similar to `col("ColumnName")`  
For example, `expr("columnName + 10")`

**Representing Columns as Expressions**   
When we want to apply or perform transformation on column then we use column reference using `col()` and apply transformation as shown below:  
`col("price") - 5`   

The same feature can be implemented using an expression with `expr()` function. This function will parse the transformations and column reference from string. The comparsion are shown below:

`expr("price - 5")` or `expr("price") - 5` is similar to `col("price") - 5`   

So, Columns is a subset of expression functionality. In Python, we use expression as shown below.

In [14]:
from pyspark.sql.functions import expr
employee_df.select(expr("emp_no"), col("emp_no"),expr("emp_no * 10")).show()

+------+------+-------------+
|emp_no|emp_no|(emp_no * 10)|
+------+------+-------------+
|     1|     1|           10|
|     2|     2|           20|
|     3|     3|           30|
|     4|     4|           40|
|     5|     5|           50|
|     6|     6|           60|
|     7|     7|           70|
|     8|     8|           80|
|     9|     9|           90|
|    10|    10|          100|
+------+------+-------------+



In [16]:
expr("(((emp_no * 100) - 30) + 200) < 50")

Column<b'((((emp_no * 100) - 30) + 200) < 50)'>

#### 2.4.3 Rows
**Rows**: Rows is a single record that contain one or more columns. Spark represent record as a *Row* object which is stored as arrays of bytes. *Row* object are used to create rows manually. The example below shows creating row in Python using Row object.

In [17]:
from pyspark.sql import Row
record1 = Row(100, "Analytics Tensor", 1)  # Create row

print(record1[0])  # Accessing field 1
print(record1[1])  # Accessing field 2

100
Analytics Tensor


#### 2.5 Spark Data Sources
Spark supports variety of data sources both for reading and writing. As of Spark 2.4.4, the core data sources are:  
* CSV
* JSON
* Parquet
* ORC
* JDBC/ODBC connections
* Plain-text files

Other supported data sources contributed by the community are Cassandra, HBase, MongoDB, AWSRedshift, XML etc.

The high-level data source APIs for reading and writing are DataFrameReader and DataFrameWriter respectively.

#### 2.5.1 DataFrameReader
**DataFrameReader**: DataFrameReader is the core construct for reading data into a DataFrame. `DataFrameReader` is accessed through `SparkSession` with **`read`** attribute. i.e. `spark.read` or `spark.readStream` for reading static and streaming data source respectively.    

The basic structure for reading data is shown below:   
`DataFrameRead.format(....).option("key", "value").schema(....).load()`


[To learn more about DataFrameReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader)$^3$


After applying read method we'll specify other methods such as: `format`,`schema`,`mode`,`option`. Among these `option` method should be defined since other method can use default value. Each data source has it's own set of option for reading data. 

Code snippet below shows reading different files into DataFrame.

In [62]:
# Read csv file
df1 = spark.read.format("csv") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("path", filelocation) \
    .schema(defineSchema) \
    .load()  
    
# Read parquet file
df2 = spark.read.format("parquet") \
    .option("path", filelocation) \
    .load()
    
# Read file using default format    
df3 = spark.read.option("path", filelocation).load()
    
# Read JSON file
df3 = spark.read.format("json") \
    .option("path", "/tmp/datasets/employees/json/*") \
    .load()   

NameError: name 'filelocation' is not defined

Table: 2.4.1 DataFrameReader methods, arguments and options

| Method | Arguments | Description |
|:---------------:|:---------------:|:---------------:|
| format() | parquet, csv, txt, json, jdbc, orc, avro | Parquet is default or whatever specified in spark.sql.sources.default |
| option() | 1. ( mode, {permissive, dropmalformed, failfast} ) 2. ( inferSchema, {true, false} ) 3. ( path,  path_to_data_source) | A series of key/value pairs and options. |
| schema() | DDL String or StructType | We can provide schema or specify to infer the schema in the option() | 
| load() | “path to a file” | Specifies the path to the data source. |

Table 2.4.1(a) Spark read modes

| Read Mode | Description |
| --------- | ----------- |
| permissive | Sets all fields to **null** and store the corrupted records into a field `_corrupt_record`, configured by `columnNameofCorruptRecord`. This is default mode | 
| dropmalformed | Ignore the whole records when corrupted records is found. | 
| failfast | Throws an exception when the corrupted record is found. | 

#### 2.5.2 DataFrameWriter
**DataFrameWriter**: DataFrameWriter is the core construct for writing data from a DataFrame into external data sources. `DataFrameWriter` is not accessed through `SparkSession` but from DataFrame we are saving into.

The basic structure for writing data is shown below:   
`DataFrameWriter.format(....).option(....).partitionBy(....).bucketBy(....).sortBy(....).save()`   


 [To learn more about DataFrameWriter](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter)$^4$ 
 
 
After applying write method we'll specify other methods such as: `format`,`option`,`save` etc. `format` is optional where Spark uses parquet as default file format. `option` specifies how to save our data and must be supplied.


Code snippet below shows writing DataFrame into files.

In [63]:
# Write dataframe into csv file format
df.write.format("csv") \
.option("mode", "OVERWRITE") \
.option("path", "path_to_file") \
.save()

NameError: name 'df' is not defined

Table 2.4.2(a) Spark save modes

| Save Mode | Description |
| --------- | ----------- |
| append | Append the output file if file already exists in output location. | 
| overwrite | Overwrite the file if file already exists in output location.  Basically it is drop and re-create. | 
| errorIfExists | Throw an error and fail if files already exists in output location. This is default mode. |
| ignore | Ignore and do nothing if file already exist in output location. | 

#### 2.6 DataFrame Operations
DataFrame operations or transformations is the process of manipulating DataFrames which includes various tasks based on the business and logical functionality applied into DataFrames. Some of the common operation are adding new columns based on existing values, dropping columns while creating final output/report, aggregation, sorting, merge, convert data types etc. All of the requirement differs based on the needs but the basic concept is manipulating data to make it readable by applying transformations.

#### 2.6.1 Creating DataFrames
Although we have seen syntax for reading data for multiple data sources but haven't loaded yet. In this tutorial, we'll use `Mysql Employee` database and some JSON and CSV files both from local and Cloud location.

**Prerequisties**   
1. [Spark running on local mode](https://spark.apache.org/downloads.html)$^5$
2. [Jupyter Notebook](https://www.anaconda.com/distribution/#download-section)$^6$
3. [Local Instances of MYSQL Database with Employee database](https://dev.mysql.com/downloads/mysql/)$^7$
4. [AWS account with access to S3 bucket](https://portal.aws.amazon.com/billing/signup#/start)$^8$
5. [Databricks account for community version](https://community.cloud.databricks.com/login.html)$^9$

**Important Links**:   
* [PySpark SQL Module](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#)$^{10}$
* [Spark Transformation and Actions](https://training.databricks.com/visualapi.pdf)$^{11}$

###### 1.) Loading comma seperated value (csv) file from Local Drive

In [8]:
# Creating Spark session in Python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
     .master("local") \
     .appName("Online Retail Apps") \
     .getOrCreate()

file_location = "data/Online_Retail.csv"

online_retail = spark.read.format("csv") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("path", file_location) \
    .option("header", "true") \
    .load() 

In [9]:
online_retail.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



###### 2.) Loading employee table from MySQL database

**Note**: If there is JDBC driver error then download driver and store under spark/lib location.

In [22]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local") \
    .appName("Employee Apps") \
    .getOrCreate()


employeesDF = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:mysql://localhost:3306/employees") \
  .option("driver", "com.mysql.jdbc.Driver") \
  .option("dbtable", "employees") \
  .option("user", "root") \
  .option("password", "Mysql123#") \
  .option("serverTimezone", "EST") \
  .load()

In [21]:
employeesDF.printSchema()

root
 |-- emp_no: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- hire_date: date (nullable = true)



Display top 10 records from Employee DataFrame.

In [23]:
employeesDF.limit(10).show()

# Equivalent SQL query
# SELECT * from employees LIMIT 10

+------+----------+----------+---------+------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|
+------+----------+----------+---------+------+----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12|
| 10006|1953-04-20|    Anneke|  Preusig|     F|1989-06-02|
| 10007|1957-05-23|   Tzvetan|Zielinski|     F|1989-02-10|
| 10008|1958-02-19|    Saniya| Kalloufi|     M|1994-09-15|
| 10009|1952-04-19|    Sumant|     Peac|     F|1985-02-18|
| 10010|1963-06-01| Duangkaew| Piveteau|     F|1989-08-24|
+------+----------+----------+---------+------+----------+



###### 3.) Loading Parquet file from S3

In [13]:
# @todo load data from S3

###### 4.) Loading ORC file from Hadoop

In [14]:
# @todo load data from HDFS

###### 5.) Loading JSON file from Databricks File System

In [None]:
# @todo load data from DBFS

###### 6.) Loading data built from scratch

In [None]:
# Script is already there, copy and paste

#### 2.6.2 Projections
Projection is the ability to select all or individual columns. Spark uses following two methods to select the columns.   
* `select()`   
* `selectExpr()`   
We need to pass the column name as string for parameter. `selectExpr()` is the best method.

In [12]:
# Select single column
online_retail.select("Description").show(20,False)

# Equivalent SQL query
# SELECT description from online_retail LIMIT 20

+-----------------------------------+
|Description                        |
+-----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |
|WHITE METAL LANTERN                |
|CREAM CUPID HEARTS COAT HANGER     |
|KNITTED UNION FLAG HOT WATER BOTTLE|
|RED WOOLLY HOTTIE WHITE HEART.     |
|SET 7 BABUSHKA NESTING BOXES       |
|GLASS STAR FROSTED T-LIGHT HOLDER  |
|HAND WARMER UNION JACK             |
|HAND WARMER RED POLKA DOT          |
|ASSORTED COLOUR BIRD ORNAMENT      |
|POPPY'S PLAYHOUSE BEDROOM          |
|POPPY'S PLAYHOUSE KITCHEN          |
|FELTCRAFT PRINCESS CHARLOTTE DOLL  |
|IVORY KNITTED MUG COSY             |
|BOX OF 6 ASSORTED COLOUR TEASPOONS |
|BOX OF VINTAGE JIGSAW BLOCKS       |
|BOX OF VINTAGE ALPHABET BLOCKS     |
|HOME BUILDING BLOCK WORD           |
|LOVE BUILDING BLOCK WORD           |
|RECIPE BOX WITH METAL HEART        |
+-----------------------------------+
only showing top 20 rows



In [13]:
# Select multiple columns in Python
online_retail.select("InvoiceNo", "StockCode", "Quantity").show(10)

# Equivalent SQL query
# SELECT InvoiceNo, StockCode, Quantity from online_retail LIMIT 10

+---------+---------+--------+
|InvoiceNo|StockCode|Quantity|
+---------+---------+--------+
|   536365|   85123A|       6|
|   536365|    71053|       6|
|   536365|   84406B|       8|
|   536365|   84029G|       6|
|   536365|   84029E|       6|
|   536365|    22752|       2|
|   536365|    21730|       6|
|   536366|    22633|       6|
|   536366|    22632|       6|
|   536367|    84879|      32|
+---------+---------+--------+
only showing top 10 rows



In [14]:
online_retail.select("*").show(10,False)

+---------+---------+-----------------------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/10 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/10 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/10 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/10 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/10 8:26|3.39     |17850     |United Kingdom|
|536365   |22752    |SET 7 BABUSHKA NESTING BOXES       |2       |12/1/1

In [15]:
# Select all columns
online_retail.selectExpr("*").show(10)

# Equivalent SQL query
# SELECT * from online_retail LIMIT 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

Using expression to select columns.

In [17]:
# Select multiple columns in Python
from pyspark.sql.functions import expr, col, column

online_retail.select( 
    expr("InvoiceNo"), 
    col("CustomerID"),
    column("Country"))\
    .show(10)

# Equivalent SQL query
# SELECT InvoiceNo, CustomerID, Country from online_retail LIMIT 10

+---------+----------+--------------+
|InvoiceNo|CustomerID|       Country|
+---------+----------+--------------+
|   536365|     17850|United Kingdom|
|   536365|     17850|United Kingdom|
|   536365|     17850|United Kingdom|
|   536365|     17850|United Kingdom|
|   536365|     17850|United Kingdom|
|   536365|     17850|United Kingdom|
|   536365|     17850|United Kingdom|
|   536366|     17850|United Kingdom|
|   536366|     17850|United Kingdom|
|   536367|     13047|United Kingdom|
+---------+----------+--------------+
only showing top 10 rows



In [18]:
online_retail.select(col("InvoiceNo"), "InvoiceNo").show(10)

+---------+---------+
|InvoiceNo|InvoiceNo|
+---------+---------+
|   536365|   536365|
|   536365|   536365|
|   536365|   536365|
|   536365|   536365|
|   536365|   536365|
|   536365|   536365|
|   536365|   536365|
|   536366|   536366|
|   536366|   536366|
|   536367|   536367|
+---------+---------+
only showing top 10 rows



In [19]:
online_retail.selectExpr("CustomerID as ID").show(10)

+-----+
|   ID|
+-----+
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|13047|
+-----+
only showing top 10 rows



Alias Column Name using multiple methods

In [21]:
online_retail.select(expr("CustomerID as ID")).show(10)

+-----+
|   ID|
+-----+
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|17850|
|13047|
+-----+
only showing top 10 rows



In [22]:
online_retail.select(expr("CustomerID as ID").alias("cust_id")).show(10)

+-------+
|cust_id|
+-------+
|  17850|
|  17850|
|  17850|
|  17850|
|  17850|
|  17850|
|  17850|
|  17850|
|  17850|
|  13047|
+-------+
only showing top 10 rows



In [23]:
online_retail.selectExpr("CustomerID as ID", "UnitPrice").show(10) 

+-----+---------+
|   ID|UnitPrice|
+-----+---------+
|17850|     2.55|
|17850|     3.39|
|17850|     2.75|
|17850|     3.39|
|17850|     3.39|
|17850|     7.65|
|17850|     4.25|
|17850|     1.85|
|17850|     1.85|
|13047|     1.69|
+-----+---------+
only showing top 10 rows



`SelectExpr` is used to build complex expression for selecting columns as well as creating new columns. In the example below, we'll choose all the column and add new column based on validation rule.

In [27]:
online_retail.selectExpr("*", # Choose all columns
                     "(InvoiceNo = CustomerID) as SameID")\
                     .show(10)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|SameID|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom| false|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom| false|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom| false|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom| false|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom| false|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom| false|
|   536365|    21730|GLASS S

`SelectExpr()` is also very important while aggregating the data by applying multiple functions.

In [28]:
from pyspark.sql.functions import avg, count
online_retail.selectExpr("count(distinct(StockCode)) as `Distinct Stock`").show(10)

# Equivalent SQL query
# SELECT count(distinct(StockCode)) as 'Distinct Stock' from online_retail limit 10 

+--------------+
|Distinct Stock|
+--------------+
|          1049|
+--------------+



Literal values are passing explicit values. If we need literal values for some validation then we use literal functions in expressions.

In [29]:
from pyspark.sql.functions import lit
online_retail.select(expr("*"), lit("Valid").alias("valid"), lit(10).alias("Ten")).show(10)
# Equivalent SQL query
# SELECT *, "valid" as valid, 10 as Ten from online_retail limit 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+-----+---+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|valid|Ten|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+-----+---+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|Valid| 10|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|Valid| 10|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|Valid| 10|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|Valid| 10|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|Valid| 10|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|Valid| 10|
|

#### 2.6.3 Renaming Columns
`withColumnRenamed()` method is used to rename a column. This method takes two argument. The first argument is the orginal column name to be renamed and second is the new column name.

In [30]:
online_retail.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [31]:
online_retail.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [32]:
online_retail.withColumnRenamed("InvoiceNo", "Invoice_No").columns

['Invoice_No',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [34]:
final_output = online_retail.withColumnRenamed("InvoiceNo", "Invoice_No") \
                    .withColumnRenamed("StockCode", "Stock_Code") \
                    .withColumnRenamed("CustomerID", "employee_id")
final_output.printSchema()
final_output.write.format("csv").option("path", "/tmp/online_retail_data12")\
        .option("header", "true")\
        .save()

root
 |-- Invoice_No: string (nullable = true)
 |-- Stock_Code: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- Country: string (nullable = true)



#### 2.6.4 Dropping Columns
Column can be dropped or removed using `drop()` method. If we want to drop multiple columns then specify all the column name in delimiter.

In [35]:
online_retail.drop("Country").columns # drop single column

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID']

In [36]:
final_output.printSchema()

root
 |-- Invoice_No: string (nullable = true)
 |-- Stock_Code: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- Country: string (nullable = true)



In [38]:
final_output = final_output.drop("Country")
final_output.printSchema()
final_output.drop("emp_id", "CustomerID", "abc").columns # drop multiple columns
final_output.printSchema()

root
 |-- Invoice_No: string (nullable = true)
 |-- Stock_Code: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- employee_id: integer (nullable = true)

root
 |-- Invoice_No: string (nullable = true)
 |-- Stock_Code: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- employee_id: integer (nullable = true)



In [39]:
drop_col = "Quantity, InvoiceDate, UnitPrice ,employee_id"
final_output = final_output.drop("Quantity", "InvoiceDate", "UnitPrice" ,"employee_id")

In [40]:
final_output.drop(drop_col)

DataFrame[Invoice_No: string, Stock_Code: string, Description: string]

In [41]:
final_output.printSchema()

root
 |-- Invoice_No: string (nullable = true)
 |-- Stock_Code: string (nullable = true)
 |-- Description: string (nullable = true)



In [42]:
online_retail.drop("Country", "CustomerID").columns # drop multiple columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice']

In [43]:
online_retail.show(5)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
only showing top 5 rows



#### 2.6.5 Adding Columns
`withColumn()` method is used to add new column to a DataFrame. `withColumn` take two arguments where first values is column name to be added and second is the expression to create value for the given row.

In [44]:
from pyspark.sql.functions import lit
online_retail.withColumn("SeqNumber", lit(1)).show(10)

# Equivalent SQL query
# SELECT *, 1 as SeqNumber from online_retail limit 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|SeqNumber|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|        1|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|        1|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|        1|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|        1|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|        1|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|        1|
|

In [46]:
from pyspark.sql.functions import substring
final_output = online_retail.withColumn("yearofPurchase", substring("StockCode",0,6).cast("integer"))
final_output.show(10)
final_output.printSchema()

+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|yearofPurchase|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|          null|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|         71053|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|          null|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|          null|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|          null|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.6

In [68]:
online_retail.withColumn("isSameID", expr("InvoiceNo == StockCode")).show(10)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|isSameID|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+--------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|   false|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|   false|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|   false|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|   false|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|   false|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|   false|
|   536365

In [47]:
online_retail.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



#### 2.6.6 Changing Column Types
We can change column type into different type using `cast()` method. This is also known as type casting. The input for this method will be the string of data type name.

In [56]:
from pyspark.sql.functions import col
online_retail.withColumn("InoviceNo", col("InvoiceNo").cast("string")).show(2)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|InoviceNo|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|   536365|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|   536365|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+---------+
only showing top 2 rows



#### 2.6.7 Filtering Rows
Filtering row is the process of the selecting the row/record that matches with the condition defined in an expression. The expression evaluates boolean value (i.e. true or false). We can filter either of the condition that matches the needs. In DataFrame we can filter row by:-   
* Creating an expression of String
* Building an expression using column manipulation

`where()` and `filter()` methods are used to perform filter operation. The input parameter for both method are same. Since we are familiar with SQL syntax we'll use `where()` method.

In [48]:
online_retail.where("InvoiceNo == 536365").show(10)

# Equivalent SQL query
# SELECT * from employees where invoiceno = "536365" limit 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

In [49]:
online_retail.filter(col("InvoiceNo") == 536365).show(10)

# Equivalent SQL query
# SELECT * from online_retail where invoiceno = "536365" limit 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

Multiple filter can be applied by chaining each operation together. Spark internally handle all the ordering for optimization. We'll discuss more on next chapter.

**Multiple AND Operation**

In [50]:
online_retail.where("UnitPrice > 3").where("InvoiceNo == 536365").show(10)
                    
# Equivalent SQL query
# SELECT * from online_retail where unitprice > 3 and invoiceno = "536365" limit 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+



#### 2.6.8 Limit Rows
When we want to display limited number of records for quick testing and validation. `limit()` method is used to select and display only certain amount of records. The parameter for this method is the number to display. It is similar to MySQL LIMIT, top in MSSQL, head etc. The default value is 10.

In [53]:
online_retail.limit(8).show()

# Equivalent SQL query
# SELECT * from online_retail limit 10

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

In [54]:
onlineretail_quick_validaition = online_retail.limit(20)#.show()
onlineretail_quick_validaition.show(50)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United

#### 2.6.9 Distinct Rows
Distinct is used to find the unique or deduplicate records in a DataFrame. `distinct()` method is used to get the unique records.

In [58]:
online_retail.select("InvoiceNo").distinct().show(10)

+---------+
|InvoiceNo|
+---------+
|   536365|
|   536366|
|   536367|
|   536368|
|   536369|
|   536370|
|   536371|
|   536372|
|   536373|
|   536374|
+---------+
only showing top 10 rows



In [57]:
online_retail.select("InvoiceNo").distinct().count()

# Equivalent SQL query
# SELECT count(distinct(invoiceno)) from online_retail

97

In [59]:
online_retail.count()
# Equivalent SQL query
# SELECT count(invoiceno) from online_retail

1999

In [60]:
online_retail.select("InvoiceNo", "CustomerID").distinct().count()

# Equivalent SQL query
# SELECT count(distinct(invoiceno, customerid)) from online_retail

97

#### 2.6.10 Sorting Rows
Sorting is used to organize/arrange records based on ascending or desceding order. Sorting DataFrame can be done by using either methods:  
* `sort()`
* `orderBy()`

Both the method accepts column expressions and strings. The default sorting is an ascending order. If we want to specify the sorting order explicitly for specified columns then we need to use `asc()` and `desc()` methods. Also, `asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last` can be applied to specify the null values location while sorting in DataFrame.

In [61]:
online_retail.sort("UnitPrice").show(10)

+---------+---------+--------------------+--------+-------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|  InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------+---------+----------+--------------+
|   536414|    22139|                null|      56|12/1/10 11:52|      0.0|      null|United Kingdom|
|   536550|    85044|                null|       1|12/1/10 14:34|      0.0|      null|United Kingdom|
|   536546|    22145|                null|       1|12/1/10 14:33|      0.0|      null|United Kingdom|
|   536547|    37509|                null|       1|12/1/10 14:33|      0.0|      null|United Kingdom|
|   536545|    21134|                null|       1|12/1/10 14:32|      0.0|      null|United Kingdom|
|   536549|   85226A|                null|       1|12/1/10 14:34|      0.0|      null|United Kingdom|
|   536390|    20668|DISCO BALL CHRIST...|     288|12/1/10 10:19|      0.1|     17

In [62]:
online_retail.orderBy("InvoiceNo", "UnitPrice").show(10)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United

In [63]:
online_retail.orderBy(col("InvoiceNo"), "UnitPrice").show(10)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United

In [43]:
from pyspark.sql.functions import asc, desc, expr
online_retail.orderBy(expr("InvoiceNo desc")).show(10)

+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/10 8:26|     4.25|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/10 8:26|     3.39|     17850|United

In [64]:
online_retail.orderBy(col("InvoiceNo").desc(), col("UnitPrice").asc()).show(10)

# Equivalent SQL query
# SELECT * from online_retail ORDER BY invoiceno desc, unitprice asc limit 10

+---------+---------+--------------------+--------+-------------+---------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|  InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+--------------------+--------+-------------+---------+----------+-------+
|  C536548|    22245|HOOK, 1 HANGER ,M...|      -2|12/1/10 14:33|     0.85|     12472|Germany|
|  C536548|    22892|SET OF SALT AND P...|      -7|12/1/10 14:33|     1.25|     12472|Germany|
|  C536548|    20957|PORCELAIN HANGING...|      -1|12/1/10 14:33|     1.45|     12472|Germany|
|  C536548|    22242|5 HOOK HANGER MAG...|      -5|12/1/10 14:33|     1.65|     12472|Germany|
|  C536548|    22077|6 RIBBONS RUSTIC ...|      -6|12/1/10 14:33|     1.65|     12472|Germany|
|  C536548|    22333|RETROSPOT PARTY B...|      -1|12/1/10 14:33|     1.65|     12472|Germany|
|  C536548|    22631|CIRCUS PARADE LUN...|      -1|12/1/10 14:33|     1.95|     12472|Germany|
|  C536548|    22244|3 HOOK HANGER MAG...|      -4

In [65]:
# define schema for our data using DDL 
schema = "`Id` INT,`First` STRING,`Last` STRING,`Url` STRING,`Published` STRING,`Hits` INT,`Campaigns` ARRAY<STRING>"

# create our static data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

In [66]:
# create a DataFrame using the schema defined above
blogs_df = spark.createDataFrame(data, schema)
   # show the DataFrame; it should reflect our table above
blogs_df.show()
print()
   # print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+


root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer 

**References**  
$^1$ https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html     
$^2$ https://spark.apache.org/docs/latest/sql-reference.html#data-types   
$^3$ https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader   
$^4$ https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter  
$^5$ https://spark.apache.org/downloads.html        
$^6$ https://www.anaconda.com/distribution/#download-section)      
$^7$ https://dev.mysql.com/downloads/mysql/            
$^8$ https://portal.aws.amazon.com/billing/signup#/start    
$^9$ https://community.cloud.databricks.com/login.html    
$^{10}$ https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#    
$^{11}$ https://training.databricks.com/visualapi.pdf      