<h1><center> Google Colab and PySpark</center></h1>

<a id='big-data-pyspark-and-colaboratory'></a>
## Big data, PySpark and Colaboratory

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


<a id='access-to-the-shell'></a>
### Access to the shell

In [None]:
ls

[0m[01;34msample_data[0m/


In [None]:
pwd

'/content'

<a id='installing-spark'></a>
### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Set Environment Variables:

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [4]:
!ls

drive		   sample_data		      spark-3.1.1-bin-hadoop3.2.tgz
flights_small.csv  spark-3.1.1-bin-hadoop3.2


In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

<a id='exploring-the-dataset'></a>
## Exploring the Dataset

<a id='loading-the-dataset'></a>
### Loading the Dataset

In [None]:
# Downloading and preprocessing Cars Data downloaded origianlly from https://perso.telecom-paristech.fr/eagan/class/igr204/datasets
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2023-05-03 15:26:50--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2023-05-03 15:26:50--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv.1’


2023-05-03 15:26:50 (118 MB/s) - ‘cars.csv.1’ saved [22608/22608]



In [None]:
!ls

cars.csv    flights_small.csv  spark-3.1.1-bin-hadoop3.2
cars.csv.1  sample_data        spark-3.1.1-bin-hadoop3.2.tgz


In [8]:
# Load data from csv to a dataframe. 
# header=True means the first row is a header 
# sep=';' means the column are seperated using ''
df = spark.read.csv('/content/flights_small.csv', header=True, sep=",")
df.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

The above command loads our data from into a dataframe (DF). A dataframe is a 2-dimensional labeled data structure with columns of potentially different types.

<a id='viewing-the-dataframe'></a>
### Viewing the Dataframe

There are a couple of ways to view your dataframe(DF) in PySpark:

1.   `df.take(5)` will return a list of five Row objects. 
2.   `df.collect()` will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node. 
3.   `df.show()` is the most commonly used method to view a dataframe. There are a few parameters we can pass to this method, like the number of rows and truncaiton. For example, `df.show(5, False)` or ` df.show(5, truncate=False)` will show the entire data wihtout any truncation.
4.   `df.limit(5)` will **return a new DataFrame** by taking the first n rows. As spark is distributed in nature, there is no guarantee that `df.limit()` will give you the same results each time.

Let us see some of them in action below:

In [7]:
df.show(5, truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |
|2014|4    |9  |1705    |45       |1839    |34       |WN     |N360SW |344   |PDX   |SJC |83      |569     |17  |5     |
|2014|3    |9  |754     |-1       |1015    |1        |AS     |N612AS |522   |SEA   |BUR |127     |937     |7   |54    |
+----+-----+---+--------+---------+-----

In [9]:
df.limit(5)

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,132,954,6,58
2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,360,2677,10,40
2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,111,679,14,43
2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,83,569,17,5
2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,127,937,7,54


<a id='viewing-dataframe-columns'></a>
### Viewing Dataframe Columns

In [10]:
df.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

<a id='dataframe-schema'></a>
### Dataframe Schema

There are two methods commonly used to view the data types of a dataframe:

In [11]:
df.dtypes

[('year', 'string'),
 ('month', 'string'),
 ('day', 'string'),
 ('dep_time', 'string'),
 ('dep_delay', 'string'),
 ('arr_time', 'string'),
 ('arr_delay', 'string'),
 ('carrier', 'string'),
 ('tailnum', 'string'),
 ('flight', 'string'),
 ('origin', 'string'),
 ('dest', 'string'),
 ('air_time', 'string'),
 ('distance', 'string'),
 ('hour', 'string'),
 ('minute', 'string')]

In [12]:
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



<a id='implicit-schema-inference'></a>
#### Inferring Schema Implicitly

We can use the parameter `inferschema=true` to infer the input schema automatically while loading the data. An example is shown below:

In [13]:
df = spark.read.csv('flights_small.csv', header=True, sep=",", inferSchema=True)
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



As you can see, the datatype has been infered automatically spark with even the correct precison for decimal type. A problem that might arise here is that sometimes, when you have to read multiple files with different schemas in different files, there might be an issue with implicit inferring leading to null values in some columns. Therefore, let us also see how to define schemas explicitly.

<a id='explicit-schema-inference'></a>
#### Defining Schema Explicitly

In [14]:
from pyspark.sql.types import *
df.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

In [15]:
# Creating a list of the schema in the format column_name, data_type
# labels = [
#      ('Car',StringType()),
#      ('MPG',DoubleType()),
#      ('Cylinders',IntegerType()),
#      ('Displacement',DoubleType()),
#      ('Horsepower',DoubleType()),
#      ('Weight',DoubleType()),
#      ('Acceleration',DoubleType()),
#      ('Model',IntegerType()),
#      ('Origin',StringType())
# ]

labels = [
    ('year', IntegerType()),
 ('month', IntegerType()),
 ('day', IntegerType()),
 ('dep_time',StringType()),
 ('dep_delay',StringType()),
 ('arr_time',StringType()),
 ('arr_delay',StringType()),
 ('carrier',StringType()),
 ('tailnum',StringType()),
 ('flight',IntegerType()),
 ('origin',StringType()),
 ('dest',StringType()),
 ('air_time',StringType()),
 ('distance',IntegerType()),
 ('hour', StringType()),
 ('minute',StringType())
]

In [16]:
# Creating the schema that will be passed when reading the csv
schema = StructType([StructField (x[0], x[1], True) for x in labels])
schema

StructType(List(StructField(year,IntegerType,true),StructField(month,IntegerType,true),StructField(day,IntegerType,true),StructField(dep_time,StringType,true),StructField(dep_delay,StringType,true),StructField(arr_time,StringType,true),StructField(arr_delay,StringType,true),StructField(carrier,StringType,true),StructField(tailnum,StringType,true),StructField(flight,IntegerType,true),StructField(origin,StringType,true),StructField(dest,StringType,true),StructField(air_time,StringType,true),StructField(distance,IntegerType,true),StructField(hour,StringType,true),StructField(minute,StringType,true)))

In [17]:
df = spark.read.csv('flights_small.csv', header=True, sep=",", schema=schema)
df.printSchema()
# The schema comes as we gave!

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [18]:
df.show(truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |
|2014|4    |9  |1705    |45       |1839    |34       |WN     |N360SW |344   |PDX   |SJC |83      |569     |17  |5     |
|2014|3    |9  |754     |-1       |1015    |1        |AS     |N612AS |522   |SEA   |BUR |127     |937     |7   |54    |
|2014|1    |15 |1037    |7        |1352 

As we can see here, the data has been successully loaded with the specified datatypes.

<a id='dataframe-operations-on-columns'></a>
## DataFrame Operations on Columns

We will go over the following in this section:

1.   Selecting Columns
2.   Selecting Multiple Columns
3.   Adding New Columns
4.   Renaming Columns
5.   Grouping By Columns
6.   Removing Columns



<a id='selecting-columns'></a>
### Selecting Columns

There are multiple ways to do a select in PySpark. You can find how they differ and how each below:

In [19]:
df.columns

['year',
 'month',
 'day',
 'dep_time',
 'dep_delay',
 'arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'hour',
 'minute']

In [20]:
# 1st method
# Column name is case sensitive in this usage
print(df.carrier)
print("*"*20)
df.select(df.carrier).show(truncate=False)

Column<'carrier'>
********************
+-------+
|carrier|
+-------+
|VX     |
|AS     |
|VX     |
|WN     |
|AS     |
|WN     |
|WN     |
|VX     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|UA     |
|AS     |
|WN     |
|AS     |
|OO     |
+-------+
only showing top 20 rows



**NOTE:**

> **We can't always use the dot notation because this will break when the column names have reserved names or attributes to the data frame class. Additionally, the column names are case sensitive in nature so we need to always make sure the column names have been changed to a paticular case before using it.**



In [21]:
# 2nd method
# Column name is case insensitive here
print(df['carrier'])
print("*"*20)
df.select(df['carrier']).show(truncate=False)

Column<'carrier'>
********************
+-------+
|carrier|
+-------+
|VX     |
|AS     |
|VX     |
|WN     |
|AS     |
|WN     |
|WN     |
|VX     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|UA     |
|AS     |
|WN     |
|AS     |
|OO     |
+-------+
only showing top 20 rows



In [22]:
# 3rd method
# Column name is case insensitive here
from pyspark.sql.functions import col
df.select(col('carrier')).show(truncate=False)

+-------+
|carrier|
+-------+
|VX     |
|AS     |
|VX     |
|WN     |
|AS     |
|WN     |
|WN     |
|VX     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|AS     |
|UA     |
|AS     |
|WN     |
|AS     |
|OO     |
+-------+
only showing top 20 rows



<a id='selecting-multiple-columns'></a>
### Selecting Multiple Columns

In [23]:
# 1st method
# Column name is case sensitive in this usage
print(df.carrier, df.flight)
print("*"*40)
df.select(df.carrier, df.flight).show(truncate=False)

Column<'carrier'> Column<'flight'>
****************************************
+-------+------+
|carrier|flight|
+-------+------+
|VX     |1780  |
|AS     |851   |
|VX     |755   |
|WN     |344   |
|AS     |522   |
|WN     |48    |
|WN     |1520  |
|VX     |755   |
|AS     |490   |
|AS     |26    |
|AS     |448   |
|AS     |656   |
|AS     |608   |
|AS     |121   |
|AS     |306   |
|UA     |1458  |
|AS     |368   |
|WN     |827   |
|AS     |24    |
|OO     |3488  |
+-------+------+
only showing top 20 rows



In [24]:
# 2nd method
# Column name is case insensitive in this usage
print(df['carrier'],df['flight'])
print("*"*40)
df.select(df['carrier'],df['flight']).show(truncate=False)

Column<'carrier'> Column<'flight'>
****************************************
+-------+------+
|carrier|flight|
+-------+------+
|VX     |1780  |
|AS     |851   |
|VX     |755   |
|WN     |344   |
|AS     |522   |
|WN     |48    |
|WN     |1520  |
|VX     |755   |
|AS     |490   |
|AS     |26    |
|AS     |448   |
|AS     |656   |
|AS     |608   |
|AS     |121   |
|AS     |306   |
|UA     |1458  |
|AS     |368   |
|WN     |827   |
|AS     |24    |
|OO     |3488  |
+-------+------+
only showing top 20 rows



In [25]:
# 3rd method
# Column name is case insensitive in this usage
from pyspark.sql.functions import col
df.select(col('carrier'),col('flight')).show(truncate=False)

+-------+------+
|carrier|flight|
+-------+------+
|VX     |1780  |
|AS     |851   |
|VX     |755   |
|WN     |344   |
|AS     |522   |
|WN     |48    |
|WN     |1520  |
|VX     |755   |
|AS     |490   |
|AS     |26    |
|AS     |448   |
|AS     |656   |
|AS     |608   |
|AS     |121   |
|AS     |306   |
|UA     |1458  |
|AS     |368   |
|WN     |827   |
|AS     |24    |
|OO     |3488  |
+-------+------+
only showing top 20 rows



<a id='adding-new-columns'></a>
### Adding New Columns

We will take a look at three cases here:

1.   Adding a new column
2.   Adding multiple columns
3.   Deriving a new column from an exisitng one

In [26]:
# CASE 1: Adding a new column
# We will add a new column called 'first_column' at the end
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1)) 
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|first_column|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |1           |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |1           |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |1           |
|2014|4    |9  |1705    |45       |1839    |34       |WN     |N360SW |344   |PDX   |SJC |83      |569     |17  |5     |1           |
|2014|3    |9  |754     |-1       |1015    |1        |AS     |N612AS 

In [27]:
# CASE 2: Adding multiple columns
# We will add two new columns called 'second_column' and 'third_column' at the end
df = df.withColumn('second_column', lit(2)) \
       .withColumn('third_column', lit('Third Column')) 
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+-------------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|first_column|second_column|third_column|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+-------------+------------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |1           |2            |Third Column|
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |1           |2            |Third Column|
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |1           |2            |Third Column|
|2014|4    |9  |1705    |45       |1839 

In [28]:
# CASE 3: Deriving a new column from an exisitng one
# We will add a new column called 'car_model' which has the value of car and model appended together with a space in between 
from pyspark.sql.functions import concat
df = df.withColumn('car_model', concat(col("carrier"), lit(" "), col("flight")))
# lit means literal. It populates the row with the literal value given.
# When adding static data / constant values, it is a good practice to use it.
df.show(5,truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+-------------+------------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|first_column|second_column|third_column|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+-------------+------------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |1           |2            |Third Column|VX 1780  |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |1           |2            |Third Column|AS 851   |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |1           |2            |Thir

As we can see, the new column car model has been created from existing columns. Since our aim was to create a column which has the value of car and model appended together with a space in between we have used the `concat` operator.

<a id='renaming-columns'></a>
### Renaming Columns

We use the `withColumnRenamed` function to rename a columm in PySpark. Let us see it in action below:

In [29]:
#Renaming a column in PySpark
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two') \
       .withColumnRenamed('third_column', 'new_column_three')
df.show(truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+--------------+--------------+----------------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|new_column_one|new_column_two|new_column_three|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+--------------+--------------+----------------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |1             |2             |Third Column    |VX 1780  |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |1             |2             |Third Column    |AS 851   |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43 

<a id='grouping-by-columns'></a>
### Grouping By Columns

Here, we see the Dataframe API way of grouping values. We will discuss how to:


1.   Group By a single column
2.   Group By multiple columns

In [30]:
df.head()

Row(year=2014, month=12, day=8, dep_time='658', dep_delay='-7', arr_time='935', arr_delay='-5', carrier='VX', tailnum='N846VA', flight=1780, origin='SEA', dest='LAX', air_time='132', distance=954, hour='6', minute='58', new_column_one=1, new_column_two=2, new_column_three='Third Column', car_model='VX 1780')

In [31]:
# Group By a column in PySpark
df.groupBy('dest').count().show(5)

+----+-----+
|dest|count|
+----+-----+
| MSY|    9|
| GEG|  105|
| BUR|  137|
| SNA|  198|
| EUG|   41|
+----+-----+
only showing top 5 rows



In [32]:
# Group By multiple columns in PySpark
df.groupBy('dest', 'carrier').count().show(5)

+----+-------+-----+
|dest|carrier|count|
+----+-------+-----+
| KTN|     AS|   77|
| LAX|     AS|  346|
| ANC|     DL|   48|
| SAN|     DL|    9|
| PDX|     AS|    8|
+----+-------+-----+
only showing top 5 rows



<a id='removing-columns'></a>
### Removing Columns

In [33]:
#Remove columns in PySpark
df = df.drop('new_column_one')
df.show(5,truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+--------------+----------------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|new_column_two|new_column_three|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+--------------+----------------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |2             |Third Column    |VX 1780  |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |2             |Third Column    |AS 851   |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |2             |Third Column    |VX 755   |
|2014|4    |9  |1705    |45 

In [34]:
#Remove multiple columnss in one go
df = df.drop('new_column_two') \
       .drop('new_column_three')
df.show(5,truncate=False)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |VX 1780  |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |AS 851   |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |VX 755   |
|2014|4    |9  |1705    |45       |1839    |34       |WN     |N360SW |344   |PDX   |SJC |83      |569     |17  |5     |WN 344   |
|2014|3    |9  |754     |-1       |1015    |1        |AS     |N612AS |522   |SEA   |BUR |1

<a id='dataframe-operations-on-rows'></a>
## DataFrame Operations on Rows

We will discuss the follwoing in this section:

1.   Filtering Rows
2. 	 Get Distinct Rows
3.   Sorting Rows
4.   Union Dataframes



<a id='filtering-rows'></a>
### Filtering Rows

In [35]:
# Filtering rows in PySpark
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count)) 
europe_filtered_count = df.filter(col('dest')=='LAX').count()
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('dest')=='LAX').show(truncate=False)

TOTAL RECORD COUNT: 10000
EUROPE FILTERED RECORD COUNT: 615
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |VX 1780  |
|2014|11   |8  |1653    |-2       |1924    |-1       |AS     |N323AS |448   |SEA   |LAX |130     |954     |16  |53    |AS 448   |
|2014|8    |19 |1845    |-5       |2112    |-22      |DL     |N354NW |2642  |SEA   |LAX |119     |954     |18  |45    |DL 2642  |
|2014|6    |11 |750     |-10      |1009    |-11      |AS     |N799AS |568   |PDX   |LAX |118     |834     |7   |50    |AS 568   |
|2014|4    |25 |1049    |-6   

In [36]:
# Filtering rows in PySpark based on Multiple conditions
total_count = df.count()
print("TOTAL RECORD COUNT: " + str(total_count)) 
europe_filtered_count = df.filter((col('dest')=='LAX') & 
                                  (col('year')==2014)).count() # Two conditions added here
print("EUROPE FILTERED RECORD COUNT: " + str(europe_filtered_count))
df.filter(col('dest')=='LAX').show(truncate=False)

TOTAL RECORD COUNT: 10000
EUROPE FILTERED RECORD COUNT: 615
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |VX 1780  |
|2014|11   |8  |1653    |-2       |1924    |-1       |AS     |N323AS |448   |SEA   |LAX |130     |954     |16  |53    |AS 448   |
|2014|8    |19 |1845    |-5       |2112    |-22      |DL     |N354NW |2642  |SEA   |LAX |119     |954     |18  |45    |DL 2642  |
|2014|6    |11 |750     |-10      |1009    |-11      |AS     |N799AS |568   |PDX   |LAX |118     |834     |7   |50    |AS 568   |
|2014|4    |25 |1049    |-6   

<a id='get-distinct-rows'></a>
### Get Distinct Rows

In [37]:
#Get Unique Rows in PySpark
df.select('dest').distinct().show()

+----+
|dest|
+----+
| MSY|
| GEG|
| BUR|
| SNA|
| EUG|
| OAK|
| DCA|
| RDM|
| KTN|
| LIH|
| IAH|
| HNL|
| SJC|
| CVG|
| AUS|
| LGB|
| RNO|
| BOS|
| EWR|
| LAS|
+----+
only showing top 20 rows



In [38]:
#Get Unique Rows in PySpark based on mutliple columns
df.select('dest','month').distinct().show()

+----+-----+
|dest|month|
+----+-----+
| IAD|   12|
| RDM|    2|
| CLT|    4|
| ONT|   11|
| SBA|   11|
| MCI|   10|
| FLL|    5|
| SLC|   10|
| GEG|   11|
| COS|    3|
| GEG|    7|
| IAD|   10|
| ONT|    2|
| LAS|   12|
| FLL|    3|
| COS|    5|
| SIT|    6|
| BOS|   10|
| SBA|    9|
| LIH|   12|
+----+-----+
only showing top 20 rows



<a id='sorting-rows'></a>
### Sorting Rows

In [39]:
# Sort Rows in PySpark
# By default the data will be sorted in ascending order
df.orderBy('year').show(truncate=False) 

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|2014|12   |8  |658     |-7       |935     |-5       |VX     |N846VA |1780  |SEA   |LAX |132     |954     |6   |58    |VX 1780  |
|2014|1    |22 |1040    |5        |1505    |5        |AS     |N559AS |851   |SEA   |HNL |360     |2677    |10  |40    |AS 851   |
|2014|3    |9  |1443    |-2       |1652    |2        |VX     |N847VA |755   |SEA   |SFO |111     |679     |14  |43    |VX 755   |
|2014|4    |9  |1705    |45       |1839    |34       |WN     |N360SW |344   |PDX   |SJC |83      |569     |17  |5     |WN 344   |
|2014|3    |9  |754     |-1       |1015    |1        |AS     |N612AS |522   |SEA   |BUR |1

In [40]:
# To change the sorting order, you can use the ascending parameter
df.orderBy('dest', ascending=False).show(truncate=False) 

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|2014|8    |19 |1057    |-3       |1356    |1        |AS     |N402AS |648   |SEA   |TUS |156     |1216    |10  |57    |AS 648   |
|2014|3    |18 |1215    |-2       |1501    |-4       |AS     |N317AS |658   |SEA   |TUS |146     |1216    |12  |15    |AS 658   |
|2014|9    |9  |1041    |-4       |1324    |-14      |AS     |N568AS |648   |SEA   |TUS |146     |1216    |10  |41    |AS 648   |
|2014|9    |1  |1041    |-4       |1328    |-10      |AS     |N512AS |648   |SEA   |TUS |147     |1216    |10  |41    |AS 648   |
|2014|8    |18 |1057    |-3       |1351    |-4       |AS     |N516AS |648   |SEA   |TUS |1

In [41]:
# Using groupBy aand orderBy together
df.groupBy("carrier").count().orderBy('carrier', ascending=False).show(10)

+-------+-----+
|carrier|count|
+-------+-----+
|     WN| 1394|
|     VX|  186|
|     US|  367|
|     UA| 1051|
|     OO| 1186|
|     HA|   73|
|     F9|  181|
|     DL| 1082|
|     B6|  214|
|     AS| 3784|
+-------+-----+
only showing top 10 rows



In [42]:
df.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|car_model|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+---------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|  VX 1780|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|   AS 851|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|   VX 755|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|   WN 344|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR| 

<a id='union-dataframes'></a>
### Union Dataframes

You will see three main methods for performing union of dataframes. It is important to know the difference between them and which one is preferred:

*   `union()` – It is used to merge two DataFrames of the same structure/schema. If schemas are not the same, it returns an error
*   `unionAll()` – This function is deprecated since Spark 2.0.0, and replaced with union()
*   `unionByName()` - This function is used to merge two dataframes based on column name.

> Since `unionAll()` is deprecated, **`union()` is the preferred method for merging dataframes.**
<br>
> The difference between `unionByName()` and `union()` is that `unionByName()` resolves columns by name, not by position.

In other SQLs, Union eliminates the duplicates but UnionAll merges two datasets, thereby including duplicate records. But, in PySpark, both behave the same and includes duplicate records. The recommendation is to use `distinct()` or `dropDuplicates()` to remove duplicate records.

In [43]:
# CASE 1: Union When columns are in order
df = spark.read.csv('flights_small.csv', header=True, sep=",", inferSchema=True)
europe_cars = df.filter((col('dest')=='LAX') & (col('year')==2015))
japan_cars = df.filter((col('origin')=='SEA') & (col('year')==2015))
print("Arrived at LAX in 2015: "+str(europe_cars.count()))
print("Departed from SEA in 2015: "+str(japan_cars.count()))
print("Departed from SEA + arrived at LAX in 2015: "+str(europe_cars.union(japan_cars).count()))

Arrived at LAX in 2015: 0
Departed from SEA in 2015: 0
Departed from SEA + arrived at LAX in 2015: 0


**Result:**

> As you can see here, there were 3 cars from Europe with 5 Cylinders, and 4 cars from Japan with 3 Cylinders. After union, there are 7 cars in total.



In [44]:
# CASE 1: Union When columns are not in order
# Creating two dataframes with jumbled columns
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()

+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+



**Result:**

> As you can see here, the two dataframes have been successfully merged based on their column names.



<a id='common-data-manipulation-functions'></a>
## Common Data Manipulation Functions

In [45]:
# Functions available in PySpark
from pyspark.sql import functions
# Similar to python, we can use the dir function to view the avaiable functions
print(dir(functions)) 



<a id='string-functions'></a>
### String Functions

In [46]:
import pandas as pd
pd.read_csv('flights_small.csv')

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658.0,-7.0,935.0,-5.0,VX,N846VA,1780,SEA,LAX,132.0,954,6.0,58.0
1,2014,1,22,1040.0,5.0,1505.0,5.0,AS,N559AS,851,SEA,HNL,360.0,2677,10.0,40.0
2,2014,3,9,1443.0,-2.0,1652.0,2.0,VX,N847VA,755,SEA,SFO,111.0,679,14.0,43.0
3,2014,4,9,1705.0,45.0,1839.0,34.0,WN,N360SW,344,PDX,SJC,83.0,569,17.0,5.0
4,2014,3,9,754.0,-1.0,1015.0,1.0,AS,N612AS,522,SEA,BUR,127.0,937,7.0,54.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,2014,6,23,1806.0,-4.0,2104.0,-6.0,OO,N225AG,3458,SEA,SLC,89.0,689,18.0,6.0
9996,2014,8,31,2336.0,11.0,452.0,-13.0,AA,N3LEAA,1230,SEA,DFW,178.0,1660,23.0,36.0
9997,2014,8,8,904.0,-1.0,1042.0,-5.0,AS,N523AS,360,SEA,SMF,81.0,605,9.0,4.0
9998,2014,8,29,1441.0,26.0,1820.0,10.0,WN,N8647A,2857,SEA,ABQ,133.0,1180,14.0,41.0


In [47]:
# Loading the data
from pyspark.sql.functions import col
df = spark.read.csv('flights_small.csv', header=True, sep=",", inferSchema=True)

**Display the Car column in exisitng, lower and upper characters, and the first 4 characters of the column**

In [48]:
from pyspark.sql.functions import col,lower, upper, substring
# Prints out the details of a function
help(substring)
# alias is used to rename the column name in the output
df.select(col('dest'),lower(col('dest')),upper(col('dest')),substring(col('dest'),1,4).alias("concatenated value")).show(5, False)

Help on function substring in module pyspark.sql.functions:

substring(str, pos, len)
    Substring starts at `pos` and is of length `len` when str is String type or
    returns the slice of byte array that starts at `pos` in byte and is of length `len`
    when str is Binary type.
    
    .. versionadded:: 1.5.0
    
    Notes
    -----
    The position is not zero based, but 1 based index.
    
    Examples
    --------
    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>> df.select(substring(df.s, 1, 2).alias('s')).collect()
    [Row(s='ab')]

+----+-----------+-----------+------------------+
|dest|lower(dest)|upper(dest)|concatenated value|
+----+-----------+-----------+------------------+
|LAX |lax        |LAX        |LAX               |
|HNL |hnl        |HNL        |HNL               |
|SFO |sfo        |SFO        |SFO               |
|SJC |sjc        |SJC        |SJC               |
|BUR |bur        |BUR        |BUR               |
+----+-----------+-----------+------

**Concatenate the Car column and Model column and add a space between them.**

In [49]:
from pyspark.sql.functions import concat
df.select(col("dest"),col("year"),concat(col("dest"), lit(" "), col("year"))).show(5, False)

+----+----+---------------------+
|dest|year|concat(dest,  , year)|
+----+----+---------------------+
|LAX |2014|LAX 2014             |
|HNL |2014|HNL 2014             |
|SFO |2014|SFO 2014             |
|SJC |2014|SJC 2014             |
|BUR |2014|BUR 2014             |
+----+----+---------------------+
only showing top 5 rows



<a id='numeric-functions'></a>
### Numeric functions

**Show the oldest date and the most recent date**

In [50]:
from pyspark.sql.functions import min, max
df.select(min(col('dep_time')), max(col('dep_time'))).show()

+-------------+-------------+
|min(dep_time)|max(dep_time)|
+-------------+-------------+
|            1|           NA|
+-------------+-------------+



**Add 10 to the minimum and maximum weight**

In [52]:
from pyspark.sql.functions import min, max, lit
df.select(min(col('dep_time'))+lit(10), max(col('dep_time')+lit(10))).show()

+--------------------+--------------------+
|(min(dep_time) + 10)|max((dep_time + 10))|
+--------------------+--------------------+
|                11.0|              2410.0|
+--------------------+--------------------+



<a id='operations-on-date'></a>
### Operations on Date

> [PySpark follows SimpleDateFormat table of Java. Click here to view the docs.](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html)

In [63]:
from pyspark.sql.functions import to_date, to_timestamp, lit
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df.show()
df.printSchema()

+-------------------+
|                DOB|
+-------------------+
|2019-12-25 13:30:00|
+-------------------+

root
 |-- DOB: string (nullable = true)



In [64]:
df = spark.createDataFrame([('2019-12-25 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('DOB'),'yyyy-MM-dd HH:mm:ss'))
df.show()
df.printSchema()

+---------------------------------+--------------------------------------+
|to_date(DOB, yyyy-MM-dd HH:mm:ss)|to_timestamp(DOB, yyyy-MM-dd HH:mm:ss)|
+---------------------------------+--------------------------------------+
|                       2019-12-25|                   2019-12-25 13:30:00|
+---------------------------------+--------------------------------------+

root
 |-- to_date(DOB, yyyy-MM-dd HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, yyyy-MM-dd HH:mm:ss): timestamp (nullable = true)



In [65]:
df = spark.createDataFrame([('25/Dec/2019 13:30:00',)], ['DOB'])
df = df.select(to_date(col('DOB'),'dd/MMM/yyyy HH:mm:ss'), to_timestamp(col('DOB'),'dd/MMM/yyyy HH:mm:ss'))
df.show()
df.printSchema()

+----------------------------------+---------------------------------------+
|to_date(DOB, dd/MMM/yyyy HH:mm:ss)|to_timestamp(DOB, dd/MMM/yyyy HH:mm:ss)|
+----------------------------------+---------------------------------------+
|                        2019-12-25|                    2019-12-25 13:30:00|
+----------------------------------+---------------------------------------+

root
 |-- to_date(DOB, dd/MMM/yyyy HH:mm:ss): date (nullable = true)
 |-- to_timestamp(DOB, dd/MMM/yyyy HH:mm:ss): timestamp (nullable = true)



**What is 3 days earlier that the oldest date and 3 days later than the most recent date?**

In [66]:
from pyspark.sql.functions import date_add, date_sub
# create a dummy dataframe
df = spark.createDataFrame([('1990-01-01',),('1995-01-03',),('2021-03-30',)], ['Date'])
# find out the required dates
df.select(date_add(max(col('Date')),3), date_sub(min(col('Date')),3)).show()

+----------------------+----------------------+
|date_add(max(Date), 3)|date_sub(min(Date), 3)|
+----------------------+----------------------+
|            2021-04-02|            1989-12-29|
+----------------------+----------------------+



<a id='joins-in-pyspark'></a>
## Joins in PySpark

In [67]:
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Economy Class'],[2, 'Business Class'],[3, 'First Class']], ["id", "Class"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "Ticket Price"])
cars_df.show()
car_price_df.show()

+---+--------------+
| id|         Class|
+---+--------------+
|  1| Economy Class|
|  2|Business Class|
|  3|   First Class|
+---+--------------+

+---+------------+
| id|Ticket Price|
+---+------------+
|  1|        1000|
|  2|        2000|
|  3|        3000|
+---+------------+



In [69]:
# Executing an inner join so we can see the id, name and price of each car in one row
cars_df.join(car_price_df, cars_df.id == car_price_df.id, 'inner').select(cars_df['id'],cars_df['Class'],car_price_df['Ticket Price']).show(truncate=False)

+---+--------------+------------+
|id |Class         |Ticket Price|
+---+--------------+------------+
|1  |Economy Class |1000        |
|3  |First Class   |3000        |
|2  |Business Class|2000        |
+---+--------------+------------+



As you can see, we have done an inner join between two dataframes. The following joins are supported by PySpark:
1. inner (default)
2. cross
3. outer
4. full
5. full_outer
6. left
7. left_outer
8. right
9. right_outer
10. left_semi
11. left_anti

<a id='spark-sql'></a>
## Spark SQL

SQL has been around since the 1970s, and so one can imagine the number of people who made it their bread and butter. As big data came into popularity, the number of professionals with the technical knowledge to deal with it was in shortage. This led to the creation of Spark SQL. To quote the docs:<br>
>Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

Basically, what you need to know is that Spark SQL is used to execute SQL queries on big data. Spark SQL can also be used to read data from Hive tables and views. 


In [70]:
# Load data
df = spark.read.csv('/content/flights_small.csv', header=True, sep=",")
# Register Temporary Table
df.createOrReplaceTempView("dest")
# Select all data from temp table
spark.sql("select * from dest limit 5").show()
# Select count of data in table
spark.sql("select count(*) as total_count from dest").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

<a id='rdd'></a>
## RDD

> With map, you define a function and then apply it record by record. Flatmap returns a new RDD by first applying a function to all of the elements in RDDs and then flattening the result. Filter, returns a new RDD. Meaning only the elements that satisfy a condition. With reduce, we are taking neighboring elements and producing a single combined result.
For example, let's say you have a set of numbers. You can reduce this to its sum by providing a function that takes as input two values and reduces them to one. 

Some of the reasons you would use a dataframe over RDD are:
1.   It's ability to represnt data as rows and columns. But this also means it can only hold structred and semi-structured data.
2.   It allows processing data in different formats (AVRO, CSV, JSON, and storage system HDFS, HIVE tables, MySQL).
3. It's superior job Optimization capability.
4. DataFrame API is very easy to use.





In [71]:
cars = spark.sparkContext.textFile('/content/flights_small.csv')
print(cars.first())
cars_header = cars.first()
cars_rest = cars.filter(lambda line: line!=cars_header)
print(cars_rest.first())

year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,132,954,6,58


**How many cars are there in our csv data?**

In [73]:
cars_rest.map(lambda line: line.split(",")).count()

10000

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in Europe**

In [80]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(",")[11]=='LAX').
 map(lambda line: (line.split(",")[0],
    line.split(",")[1],
    line.split(",")[2],
    line.split(",")[10],
    line.split(",")[11])).collect())

[('2014', '12', '8', 'SEA', 'LAX'),
 ('2014', '11', '8', 'SEA', 'LAX'),
 ('2014', '8', '19', 'SEA', 'LAX'),
 ('2014', '6', '11', 'PDX', 'LAX'),
 ('2014', '4', '25', 'SEA', 'LAX'),
 ('2014', '9', '12', 'SEA', 'LAX'),
 ('2014', '7', '17', 'SEA', 'LAX'),
 ('2014', '4', '25', 'PDX', 'LAX'),
 ('2014', '8', '14', 'PDX', 'LAX'),
 ('2014', '11', '29', 'SEA', 'LAX'),
 ('2014', '1', '20', 'SEA', 'LAX'),
 ('2014', '9', '23', 'SEA', 'LAX'),
 ('2014', '3', '21', 'PDX', 'LAX'),
 ('2014', '10', '3', 'SEA', 'LAX'),
 ('2014', '12', '21', 'SEA', 'LAX'),
 ('2014', '2', '5', 'SEA', 'LAX'),
 ('2014', '9', '2', 'SEA', 'LAX'),
 ('2014', '3', '31', 'PDX', 'LAX'),
 ('2014', '5', '16', 'SEA', 'LAX'),
 ('2014', '8', '18', 'SEA', 'LAX'),
 ('2014', '4', '30', 'PDX', 'LAX'),
 ('2014', '3', '29', 'PDX', 'LAX'),
 ('2014', '7', '7', 'PDX', 'LAX'),
 ('2014', '11', '17', 'SEA', 'LAX'),
 ('2014', '4', '4', 'SEA', 'LAX'),
 ('2014', '8', '30', 'PDX', 'LAX'),
 ('2014', '8', '6', 'SEA', 'LAX'),
 ('2014', '4', '5', 'SEA', 'LA

**Display the Car name, MPG, Cylinders, Weight and Origin for the cars Originating in either Europe or Japan**

In [81]:
# Car name is column  0
(cars_rest.filter(lambda line: line.split(",")[11] in ['LAX','HNL']).
 map(lambda line: (line.split(",")[0],
    line.split(",")[1],
    line.split(",")[2],
    line.split(",")[10],
    line.split(",")[11])).collect())

[('2014', '12', '8', 'SEA', 'LAX'),
 ('2014', '1', '22', 'SEA', 'HNL'),
 ('2014', '11', '8', 'SEA', 'LAX'),
 ('2014', '8', '19', 'SEA', 'LAX'),
 ('2014', '6', '11', 'PDX', 'LAX'),
 ('2014', '4', '1', 'PDX', 'HNL'),
 ('2014', '4', '25', 'SEA', 'LAX'),
 ('2014', '9', '12', 'SEA', 'LAX'),
 ('2014', '7', '17', 'SEA', 'LAX'),
 ('2014', '4', '25', 'PDX', 'LAX'),
 ('2014', '9', '2', 'PDX', 'HNL'),
 ('2014', '8', '14', 'PDX', 'LAX'),
 ('2014', '11', '29', 'SEA', 'LAX'),
 ('2014', '1', '20', 'SEA', 'LAX'),
 ('2014', '9', '23', 'SEA', 'LAX'),
 ('2014', '3', '21', 'PDX', 'LAX'),
 ('2014', '10', '3', 'SEA', 'LAX'),
 ('2014', '12', '21', 'SEA', 'LAX'),
 ('2014', '2', '5', 'SEA', 'LAX'),
 ('2014', '9', '2', 'SEA', 'LAX'),
 ('2014', '3', '22', 'PDX', 'HNL'),
 ('2014', '8', '16', 'PDX', 'HNL'),
 ('2014', '4', '4', 'PDX', 'HNL'),
 ('2014', '3', '31', 'PDX', 'LAX'),
 ('2014', '5', '16', 'SEA', 'LAX'),
 ('2014', '8', '18', 'SEA', 'LAX'),
 ('2014', '4', '30', 'PDX', 'LAX'),
 ('2014', '3', '29', 'PDX', 'LA

<a id='user-defined-functions-udf'></a>
## User-Defined Functions (UDF)

<a id='creating-dataframes'></a>
## Creating Dataframes

When getting started with dataframes, the most common question is: *'How do I create a dataframe?'* <br> 
Below, you can see how to create three kinds of dataframes:

### Create a totally empty dataframe

In [57]:
from pyspark.sql.types import StructType
sc = spark.sparkContext
#Create empty df
schema = StructType([])
empty = spark.createDataFrame(sc.emptyRDD(), schema)
empty.show()

++
||
++
++



### Create an empty dataframe with header

In [58]:
from pyspark.sql.types import StructType, StructField
#Create empty df with header
schema_header = StructType([StructField("name", StringType(), True)])
empty_with_header = spark.createDataFrame(sc.emptyRDD(), schema_header)
empty_with_header.show()

+----+
|name|
+----+
+----+



### Create a dataframe with header and data

In [59]:
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":13},
  {"name":'Jacob',"age":24},
  {"name":'Betty',"age":135},
]
spark.createDataFrame(Row(**x) for x in mylist).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



In [60]:
# You can achieve the same using this - note that we are using spark context here, not a spark session
from pyspark.sql import Row
df = sc.parallelize([
        Row(name='Alice', age=13),
        Row(name='Jacob', age=24),
        Row(name='Betty', age=135)]).toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice| 13|
|Jacob| 24|
|Betty|135|
+-----+---+



<a id='drop-duplicates'></a>
## Drop Duplicates

As mentioned earlier, there are two easy to remove duplicates from a dataframe. We have already seen the usage of distinct under <a href="#get-distinct-rows">Get Distinct Rows</a>  section. 
I will expalin how to use the `dropDuplicates()` function to achieve the same. 

> `drop_duplicates()` is an alias for `dropDuplicates()`

In [61]:
from pyspark.sql import Row
from pyspark.sql import Row
mylist = [
  {"name":'Alice',"age":5,"height":80},
  {"name":'Jacob',"age":24,"height":80},
  {"name":'Alice',"age":5,"height":80}
]
df = spark.createDataFrame(Row(**x) for x in mylist)
df.dropDuplicates().show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Jacob| 24|    80|
+-----+---+------+



`dropDuplicates()` can also take in an optional parameter called *subset* which helps specify the columns on which the duplicate check needs to be done on.

In [62]:
df.dropDuplicates(subset=['height']).show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
+-----+---+------+

