### What is Pyspark?
Spark is the name of the engine to implement cluster computing in your multi-core multi-chip computing system, while PySpark is the Python's library to use Spark.

Spark is written in Scala and it provides APIs to work with Scala, JAVA, Python, and R.

PySpark is the Python API written in Python to support Spark.

I will show case this work book with  a 30GB unzipped data with a i7 computer with 12 cores and 16GB as the compute platform.

In [1]:
!python --version

## Some inconsistencies in running spark in 3.7? 

Python 3.7.4


Now let us test the if our installation was successful using the code as below

#### Find spark
PySpark isn't on sys.path by default, but that doesn't mean it can't be used as a regular library. 
You can address this by either symlinking pyspark into your site-packages,
or adding pyspark to sys.path at runtime. findspark does the latter.

Run the following only after you successfully run the above code without error

In [2]:

#   !pip install findspark   # already installed in usss
import findspark

findspark.init()    # adds the path (location) to the execution step automatically

PySpark Public classes:

#### SparkContext:
Entry point for Spark functionality.

#### RDD:
Resilient Distributed Dataset (RDD), the basic abstraction of creating and managing data sets in Spark.

Broadcast:
Broadcast variable that gets reused across tasks; a task

A task is a command sent from the driver to an executor by serializing your Function object. The executor deserializes the command (this is possible because it has loaded your jar), and executes it on a partition. (stackoverflow.com)

Accumulator:
An “add-only” shared variable that tasks can only add values to. It is used to implement a counter or cumulative sum and works for parallel implementation

#### SparkConf:
Spark context instantiates Spark engine which application will run in which mode(local/cluster mode?

SparkFiles:
Access files shipped with jobs.

StorageLevel:
Finer-grained cache persistence levels.

TaskContext:
Information about the current running task, available on the workers and experimental.

RDDBarrier:
Wraps an RDD under a barrier stage for barrier execution.

BarrierTaskContext:
A TaskContext that provides extra info and tooling for barrier execution.

BarrierTaskInfo:
Information about a barrier task.

### Important classes of Spark SQL and DataFrames:

pyspark.sql.SQLContext Main entry point for DataFrame and SQL functionality.

pyspark.sql.DataFrame A distributed collection of data grouped into named columns.

pyspark.sql.Column A column expression in a DataFrame. 
    Ex: df.select('zip_code').show() is head() output as in python

pyspark.sql.Row A row of data in a DataFrame.
    Ex: df.sql.Row

pyspark.sql.HiveContext Main entry point for accessing data stored in Apache Hive.

pyspark.sql.GroupedData Aggregation methods, returned by DataFrame.groupBy().

pyspark.sql.DataFrameNaFunctions Methods for handling missing data (null values).

pyspark.sql.DataFrameStatFunctions Methods for statistics functionality.

pyspark.sql.functions List of built-in functions available for DataFrame.

pyspark.sql.types List of data types available.

pyspark.sql.Window For working with window functions.

Read: https://www.dataneb.com/spark-scala-tutorial (why spark and how scala is foundational though java, scala, python, and R are interchangeably used in Jeppelin notebook.

read: https://www.dataneb.com/post/sparkcontext-scala (what is meant by spark-shell?)

In [3]:

from pyspark.sql import SparkSession 

# access the builder to get into existing SparkSession creating a simpler name
spark = SparkSession.builder.getOrCreate()

# we will get more relevant methods as we see useful while building the data science steps below

spark.stop()

In [5]:
# pyspark is a superset of python with essential classes from python foundation and hence 

# pythonic programmatic statements are usable directly; technically a senior python solution 
# developer can start the day with pyspark if his/her work always involves pyspark as central to work; 

# This is true especially for a sr. data scientist

import random, pyspark
num_samples = 10000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

#sc = pyspark.SparkContext() # for first time 
sc = pyspark.SparkContext.getOrCreate()



count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi = 4 * count / num_samples
print(pi)

sc.stop()

3.114


You can apply a transformation to the data with a lambda function. In the example below, you return the square of nums. It is a map transformation

Python map object is an iterator, so we can iterate over its elements. We can also convert map object to sequence objects such as list, tuple etc.

Because spark lives 'lazy' for all operations, we need to 'collect' the outputs from any operation

In [6]:
sc=pyspark.SparkContext.getOrCreate()

nums=sc.parallelize([1,2,-3,4,10])
nums.take(1)

type(nums)

# Resilient Distributed Data structure is created unless you ask for dataset or dataframe

pyspark.rdd.RDD

In [7]:
squared = nums.map(lambda x: x*x).collect()  
# Because spark lives 'lazy' for all operations, we need to 'collect' the outputs from any operation
for num in squared:
    print('%i ' % (num))

1 
4 
9 
16 
100 


# SQLContext
A more convenient way is to use the DataFrame. SparkContext is already set, you can use it to create the dataFrame. You also need to declare the SQLContext

SQLContext allows connecting the engine with different data sources. It is used to initiate the functionalities of Spark SQL.

In [8]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

# Let's create a list of tuple. Each tuple will contain the name of the people and their age. Four steps are required:

Step 1) Create the list of tuple with the information

In [9]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

Step 2) Build a RDD

In [10]:
rdd = sc.parallelize(list_p)

Step 3) Convert the tuples

In [11]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))).collect()

print(ppl)

[Row(age=19, name='John'), Row(age=29, name='Smith'), Row(age=35, name='Adam'), Row(age=50, name='Henry')]


Step 4) Create a DataFrame context

In [12]:
sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

If you want to access the type of each feature, you can use printSchema()

In [13]:
DF_ppl.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



you need to initialize the SQLContext is not already in initiated yet.

In [14]:
#from pyspark.sql import SQLContext

path = "C:\\users\\usss\\Downloads\\adult-data\\adult-data.csv"

from pyspark import SparkFiles
sc.addFile(path)
sqlContext = SQLContext(sc)

you can read the cvs file with sqlContext.read.csv. You use inferSchema set to True to tell Spark to guess automatically the type of data. By default, it is turn to False.

In [16]:
df = sqlContext.read.csv(SparkFiles.get("adult-data.csv"), header=True, inferSchema= True)

 have a look at the data type

In [17]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- wrk_hrs_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [18]:
#You can see the data with show.

df.show(5, truncate = False)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------------+--------------+------+
|age|workclass        |fnlwgt|education |education_num|marital_status     |occupation        |relationship  |race  |sex    |capital-gain|capital-loss|wrk_hrs_per_week|native_country|income|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40              | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13              | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9        

If you didn't set inferSchema to True, here is what is happening to the type. There are all in string.

In [24]:
df_string = sqlContext.read.csv(SparkFiles.get("adult-data.csv"), header=True, inferSchema=  False)
df_string.printSchema()

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- wrk_hrs_per_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



To convert the continuous variable in the right format, you can use recast the columns. You can use withColumn to tell Spark which column to operate the transformation.

In [25]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 


In [26]:
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'education_num', 'capital-loss', 'wrk_hrs_per_week']


In [27]:
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())


In [28]:
from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- wrk_hrs_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



# Select columns
You can select and show the rows with select and the names of the features. Below, age and fnlwgt are selected.

In [31]:
df.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows



# Count by group
If you want to count the number of occurence by group, you can chain:

groupBy()
count()
together. In the example below, you count the number of rows by the education level.



In [32]:
df.groupBy("education").count().sort("count",ascending=True).show()	

+-------------+-----+
|    education|count|
+-------------+-----+
|    Preschool|   51|
|      1st-4th|  168|
|      5th-6th|  333|
|    Doctorate|  413|
|         12th|  433|
|          9th|  514|
|  Prof-school|  576|
|      7th-8th|  646|
|         10th|  933|
|   Assoc-acdm| 1067|
|         11th| 1175|
|    Assoc-voc| 1382|
|      Masters| 1723|
|    Bachelors| 5355|
| Some-college| 7291|
|      HS-grad|10501|
+-------------+-----+



# Describe the data
To get a summary statistics, of the data, you can use describe(). It will compute the :

count
mean
standarddeviation
min
max

In [33]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|    education_num|marital_status|       occupation|relationship|               race|    sex|      capital-gain|    capital-loss|  wrk_hrs_per_week|native_country|income|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|         32561| 32561|
|   mean| 38.58164675532

In [None]:
# !pip install pandas  --- No need to install second time; it is installed already

In [34]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

#sc = SparkContext()
sc = pyspark.SparkContext.getOrCreate()
sqlContext = SQLContext(sc)


In [35]:
sconf = SparkConf()
conf = (SparkConf().set("spark.executor.memory", "12g")
       .set("spark.num.executors","5"))

In [36]:
from datetime import datetime
import time

start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


path="F:\\nycflights_sample\\airOT198710.csv"


df=sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load(path)

ti=(time.time() - start_time)
print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("End Time =", current_time)


Start Time = 09:32:57
--- 0.0 Minutes,   13.679883241653442 Seconds
End Time = 09:33:11


In [None]:
# NO NEED TO RUN THIS WHEN WE WANT TO RUN JUST ONE SAMPLE FILE

# Reading from external USB harddrive or internal hard drive; almost same time..! But subseqent works are faster
# when read from D:\

from datetime import datetime
import time

start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


#path="F:\\nycflights\\AirOnTimeCSV\\airOT201212.csv"

path="D:\\nycflights\\AirOnTimeCSV"

dfSample=sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load(path)

ti=(time.time() - start_time)
print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("End Time =", current_time)


In [37]:
df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- FL_DATE: timestamp (nullable = true)
 |-- UNIQUE_CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- DEP_DELAY_NEW: double (nullable = true)
 |-- DEP_DEL15: double (nullable = true)
 |-- DEP_DELAY_GROUP: integer (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: 

In [38]:
df.show()

+----+-----+------------+-----------+-------------------+--------------+--------+------+-----------------+------+----------------+---------------+----+--------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+---------+-----------------+--------+----------------+-------------------+--------+-------+--------+--------------+-------------+-------------+---------+--------------+-------------------+----+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|            FL_DATE|UNIQUE_CARRIER|TAIL_NUM|FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_STATE_ABR|DEST_AIRPORT_ID|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|FLIGHTS|DISTANCE|DISTANCE_G

In [39]:

#### --------------Start program

start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)

#### -------------Main program

d1Count=df.count()
print('Number of records', d1Count)

#### -------------End program
ti=(time.time() - start_time)
now=datetime.now()
current_time = now.strftime("%H:%M:%S")

print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))
print("End Time =", current_time)

Start Time = 09:34:20
Number of records 448620
--- 0.0 Minutes,   1.1349124908447266 Seconds
End Time = 09:34:21


In [40]:

#### --------------Start program

start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)

#### -------------Main program

d1Count=df.describe(['ARR_DELAY']).show()

#### -------------End program
ti=(time.time() - start_time)
now=datetime.now()
current_time = now.strftime("%H:%M:%S")

print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))
print("End Time =", current_time)

Start Time = 09:35:50
+-------+------------------+
|summary|         ARR_DELAY|
+-------+------------------+
|  count|            444780|
|   mean| 6.004721435316336|
| stddev|18.558215850110173|
|    min|           -1295.0|
|    max|             860.0|
+-------+------------------+

--- 0.0 Minutes,   2.7094717025756836 Seconds
End Time = 09:35:53


In [41]:
df.toPandas() 

## We are able to get 4 times sophisticated information values with less time than just using 'count'
## function; USE ALWAYS NATIVE FUNCTIONS THAT ARE OPTIMIZED FOR CERTAIN COMPUTATIONS. 

## (On a side note, one might think that 'count' function is a foundational native function, 
## meaning it should be superfast)

Unnamed: 0,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,...,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c44
0,1987,10,1,4,1987-10-01,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
1,1987,10,2,5,1987-10-02,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
2,1987,10,3,6,1987-10-03,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
3,1987,10,4,7,1987-10-04,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
4,1987,10,5,1,1987-10-05,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
5,1987,10,6,2,1987-10-06,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
6,1987,10,7,3,1987-10-07,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
7,1987,10,8,4,1987-10-08,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
8,1987,10,9,5,1987-10-09,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
9,1987,10,10,6,1987-10-10,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,


In [42]:
type(df)

pyspark.sql.dataframe.DataFrame

In [None]:
## Let us see how partitions are used - as of now, my research points there is no way you can get that information 
## from data frame.  So convert it to RDD and get that information 

#### A quick note on how to repartition in increasing or decreasing partitions 

In [43]:
dfrdd=df.rdd     
# quick and easy way to convert a df into rdd; takes a second

In [44]:
dfrdd.getNumPartitions()

4

In [45]:
df.rdd.getNumPartitions()

4

In [46]:
dfrdd=dfrdd.repartition(310)


In [47]:
dfrdd.getNumPartitions()

310

In [None]:
## If filtering reduces large data set to a small dataset, the small data set can work on smaller number of 
## partitions and to repartition to lower levels, use function 'coalesc(lower_num)' to get efficient computations
## with out the full reshuffle(If you use repartition function with lower number of partitions, it will do a full
## reshuffle)

Spark Dataframes
The key data type used in PySpark is the Spark dataframe. This object can be thought of as a table distributed across a cluster and has functionality that is similar to dataframes in R and Pandas. If you want to do distributed computation using PySpark, then you’ll need to perform operations on Spark dataframes, and not other python data types.
It is also possible to use Pandas dataframes when using Spark, by calling toPandas() on a Spark dataframe, which returns a pandas object. However, this function should generally be avoided except when working with small dataframes, because it pulls the entire object into memory on a single node.

### data frame is better when working with structured 
### super large data

In [48]:
# to  release resources from rdd, to detach
dfrdd.unpersist()

MapPartitionsRDD[93] at coalesce at NativeMethodAccessorImpl.java:0

In [50]:
#How many cores are used as executor


executor_count = len(sc._jsc.sc().statusTracker().getExecutorInfos()) - 1
cores_per_executor = int(sc.getConf().get('spark.executor.cores','1'))

print(executor_count)
print(cores_per_executor)

0
1


## Most of the times the common sql outputs involve the verbs 

#### select 
#### groupby
#### having
#### order 
#### applying aggregation functions

#### The most common aggregation functions are count, mean, min, max, percentiles of some cut (like p1, p5, p10, p25, p50, p75, p90, p95, p99, outliers, standard deviation, median)

#### These and more are available in agg functions. 

## In describe function used below as example, the percentiles, outliers (list), and median are not standard outputs.    


In [51]:
### ---------------data frame is better when working with structured 
### super large data


start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)

#### -------------Main program

dfDescribe=df.describe()


#### -------------End program
ti=(time.time() - start_time)
now=datetime.now()
current_time = now.strftime("%H:%M:%S")

print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))
print("End Time =", current_time)

Start Time = 09:44:23
--- 1.0 Minutes,   43.33158850669861 Seconds
End Time = 09:45:07


#### Differences in various ways of visually inspecting data summaries:  (1) describe function output, (2) Converting describe function output in pandas format, and directly calling a Pandas data frame, and finally printing a pandas data frame of the describe output

In [52]:
print(dfDescribe.show(10))

+-------+--------------------+------+-----------------+------------------+--------------+--------+-----------------+------------------+------+----------------+------------------+------+--------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+--------+----------+---------+-------+-----------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+-----------------+--------------------+-----------------+-------------------+--------+-------+-----------------+------------------+-------------+-------------+---------+--------------+-------------------+----+
|summary|                YEAR| MONTH|     DAY_OF_MONTH|       DAY_OF_WEEK|UNIQUE_CARRIER|TAIL_NUM|           FL_NUM| ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_STATE_ABR|   DEST_AIRPORT_ID|  DEST|DEST_STATE_ABR|      CRS_DEP_TIME|          DEP_TIME|        DEP_DELAY|     DEP_DELAY_NEW|          DEP_DEL15|    D

In [53]:
import pandas as pd

dfDescribe.toPandas()

Unnamed: 0,summary,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,...,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c44
0,count,448620.0,448620.0,448620.0,448620.0,448620,0.0,448620.0,448620.0,448620,...,0.0,448620.0,448620.0,448620.0,0.0,0.0,0.0,0.0,0.0,0.0
1,mean,1987.0,10.0,15.97350541661094,4.047692924969907,,,671.1280616111632,12767.61632784985,,...,,1.0,587.8776336320271,2.8250813606170038,,,,,,
2,stddev,8.233647849459686e-14,0.0,8.932715623219531,1.932847547925682,,,516.8870011546963,1564.1552172922652,,...,,0.0,496.5675071952869,1.9538747410848083,,,,,,
3,min,1987.0,10.0,1.0,1.0,AA,,1.0,10135.0,ABE,...,,1.0,0.0,1.0,,,,,,
4,max,1987.0,10.0,31.0,7.0,WN,,4938.0,16440.0,YUM,...,,1.0,4983.0,11.0,,,,,,


In [54]:
df.limit(5).toPandas()

Unnamed: 0,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,...,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c44
0,1987,10,1,4,1987-10-01,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
1,1987,10,2,5,1987-10-02,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
2,1987,10,3,6,1987-10-03,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
3,1987,10,4,7,1987-10-04,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,
4,1987,10,5,1,1987-10-05,AA,,1,12478,JFK,...,,1.0,2475.0,10,,,,,,


In [55]:
df.dtypes

[('YEAR', 'int'),
 ('MONTH', 'int'),
 ('DAY_OF_MONTH', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('FL_DATE', 'timestamp'),
 ('UNIQUE_CARRIER', 'string'),
 ('TAIL_NUM', 'string'),
 ('FL_NUM', 'int'),
 ('ORIGIN_AIRPORT_ID', 'int'),
 ('ORIGIN', 'string'),
 ('ORIGIN_STATE_ABR', 'string'),
 ('DEST_AIRPORT_ID', 'int'),
 ('DEST', 'string'),
 ('DEST_STATE_ABR', 'string'),
 ('CRS_DEP_TIME', 'int'),
 ('DEP_TIME', 'int'),
 ('DEP_DELAY', 'double'),
 ('DEP_DELAY_NEW', 'double'),
 ('DEP_DEL15', 'double'),
 ('DEP_DELAY_GROUP', 'int'),
 ('TAXI_OUT', 'string'),
 ('WHEELS_OFF', 'string'),
 ('WHEELS_ON', 'string'),
 ('TAXI_IN', 'string'),
 ('CRS_ARR_TIME', 'int'),
 ('ARR_TIME', 'int'),
 ('ARR_DELAY', 'double'),
 ('ARR_DELAY_NEW', 'double'),
 ('ARR_DEL15', 'double'),
 ('ARR_DELAY_GROUP', 'int'),
 ('CANCELLED', 'double'),
 ('CANCELLATION_CODE', 'string'),
 ('DIVERTED', 'double'),
 ('CRS_ELAPSED_TIME', 'double'),
 ('ACTUAL_ELAPSED_TIME', 'double'),
 ('AIR_TIME', 'string'),
 ('FLIGHTS', 'double'),
 ('DISTANCE', 'dou

In [56]:
dfDesPD=dfDescribe.toPandas()
dfDesPD


Unnamed: 0,summary,YEAR,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,UNIQUE_CARRIER,TAIL_NUM,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN,...,AIR_TIME,FLIGHTS,DISTANCE,DISTANCE_GROUP,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,_c44
0,count,448620.0,448620.0,448620.0,448620.0,448620,0.0,448620.0,448620.0,448620,...,0.0,448620.0,448620.0,448620.0,0.0,0.0,0.0,0.0,0.0,0.0
1,mean,1987.0,10.0,15.97350541661094,4.047692924969907,,,671.1280616111632,12767.61632784985,,...,,1.0,587.8776336320271,2.8250813606170038,,,,,,
2,stddev,8.233647849459686e-14,0.0,8.932715623219531,1.932847547925682,,,516.8870011546963,1564.1552172922652,,...,,0.0,496.5675071952869,1.9538747410848083,,,,,,
3,min,1987.0,10.0,1.0,1.0,AA,,1.0,10135.0,ABE,...,,1.0,0.0,1.0,,,,,,
4,max,1987.0,10.0,31.0,7.0,WN,,4938.0,16440.0,YUM,...,,1.0,4983.0,11.0,,,,,,


In [57]:
dfDesPD['summary']

0     count
1      mean
2    stddev
3       min
4       max
Name: summary, dtype: object

In [58]:
print('Maximum of DAY_OF_WEEK =',dfDesPD.loc[4,"DAY_OF_WEEK"])
      
print('Maximum of DAY_OF_MONTH =',dfDesPD.loc[4,"DAY_OF_MONTH"])

print('Maximum Carrier Delay in Minutes =',dfDesPD.loc[4,"CARRIER_DELAY"])

Maximum of DAY_OF_WEEK = 7
Maximum of DAY_OF_MONTH = 31
Maximum Carrier Delay in Minutes = None


In [None]:
## A real problem will involve interesting cases like:
## Which carriers - Airlines that are in the top 10 worst delays by month and year
## Let us create output for this

In [None]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row 
from pyspark.ml.linalg import Vectors
start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)

#### -------------Main program



#### -------------End program
ti=(time.time() - start_time)
now=datetime.now()
current_time = now.strftime("%H:%M:%S")

print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))
print("End Time =", current_time)



In [None]:
### Using builtin groupby and agg(regate) functions to speed up summarizations

#A set of methods for aggregations on a DataFrame:
#
#    agg
#    avg  (mean is also available)
#    count
#    max
#    mean
#    min
#    pivot
#    sum


In [59]:
#### Using pyspark dataframe  

start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


from pyspark.sql.functions import mean, min, max
#df.select([mean('CRS_ELAPSED_TIME'), min('CRS_ELAPSED_TIME'), max('CRS_ELAPSED_TIME')])


#df.groupBy("YEAR","Month","UNIQUE_CARRIER")
dfSample1=df.groupBy("YEAR","Month","UNIQUE_CARRIER").agg({'ARR_DELAY':'max'}).sort(['max(ARR_DELAY)'], ascending=False).show(20, False)


#dfgroupedAgg=grouped.agg({'ARR_DELAY':'max'})
#dfgroupedAggPD=dfgroupedAgg.toPandas
ti=(time.time() - start_time)

print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

print("End Time =", current_time)

Start Time = 09:49:08
+----+-----+--------------+--------------+
|YEAR|Month|UNIQUE_CARRIER|max(ARR_DELAY)|
+----+-----+--------------+--------------+
|1987|10   |HP            |860.0         |
|1987|10   |CO            |800.0         |
|1987|10   |PS            |769.0         |
|1987|10   |AA            |726.0         |
|1987|10   |PA (1)        |687.0         |
|1987|10   |EA            |542.0         |
|1987|10   |DL            |412.0         |
|1987|10   |NW            |359.0         |
|1987|10   |UA            |327.0         |
|1987|10   |PI            |277.0         |
|1987|10   |AS            |270.0         |
|1987|10   |TW            |220.0         |
|1987|10   |US            |208.0         |
|1987|10   |WN            |152.0         |
+----+-----+--------------+--------------+

--- 0.0 Minutes,   4.770625114440918 Seconds
End Time = 09:49:08


In [None]:
## select, filter, mutate, summarize, and arrange(sort) 
## dropna and fillna are two special functions for data science subsequently adopted by SQL

In [60]:
df_NumColumns=[item[0] for item in df.dtypes if item[1].startswith('string')]
df_NumColumns

['UNIQUE_CARRIER',
 'TAIL_NUM',
 'ORIGIN',
 'ORIGIN_STATE_ABR',
 'DEST',
 'DEST_STATE_ABR',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CANCELLATION_CODE',
 'AIR_TIME',
 'CARRIER_DELAY',
 'WEATHER_DELAY',
 'NAS_DELAY',
 'SECURITY_DELAY',
 'LATE_AIRCRAFT_DELAY',
 '_c44']

In [61]:
df.dtypes

[('YEAR', 'int'),
 ('MONTH', 'int'),
 ('DAY_OF_MONTH', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('FL_DATE', 'timestamp'),
 ('UNIQUE_CARRIER', 'string'),
 ('TAIL_NUM', 'string'),
 ('FL_NUM', 'int'),
 ('ORIGIN_AIRPORT_ID', 'int'),
 ('ORIGIN', 'string'),
 ('ORIGIN_STATE_ABR', 'string'),
 ('DEST_AIRPORT_ID', 'int'),
 ('DEST', 'string'),
 ('DEST_STATE_ABR', 'string'),
 ('CRS_DEP_TIME', 'int'),
 ('DEP_TIME', 'int'),
 ('DEP_DELAY', 'double'),
 ('DEP_DELAY_NEW', 'double'),
 ('DEP_DEL15', 'double'),
 ('DEP_DELAY_GROUP', 'int'),
 ('TAXI_OUT', 'string'),
 ('WHEELS_OFF', 'string'),
 ('WHEELS_ON', 'string'),
 ('TAXI_IN', 'string'),
 ('CRS_ARR_TIME', 'int'),
 ('ARR_TIME', 'int'),
 ('ARR_DELAY', 'double'),
 ('ARR_DELAY_NEW', 'double'),
 ('ARR_DEL15', 'double'),
 ('ARR_DELAY_GROUP', 'int'),
 ('CANCELLED', 'double'),
 ('CANCELLATION_CODE', 'string'),
 ('DIVERTED', 'double'),
 ('CRS_ELAPSED_TIME', 'double'),
 ('ACTUAL_ELAPSED_TIME', 'double'),
 ('AIR_TIME', 'string'),
 ('FLIGHTS', 'double'),
 ('DISTANCE', 'dou

In [62]:
df_CatColumns=list(set(df.columns)-set(df_NumColumns))
df_CatColumns

['FLIGHTS',
 'ARR_DEL15',
 'DAY_OF_MONTH',
 'ACTUAL_ELAPSED_TIME',
 'DEP_TIME',
 'DISTANCE_GROUP',
 'DEP_DEL15',
 'CRS_DEP_TIME',
 'DEP_DELAY',
 'ARR_DELAY_GROUP',
 'CRS_ELAPSED_TIME',
 'FL_NUM',
 'ARR_DELAY',
 'ORIGIN_AIRPORT_ID',
 'DEP_DELAY_NEW',
 'CANCELLED',
 'DEST_AIRPORT_ID',
 'CRS_ARR_TIME',
 'DEP_DELAY_GROUP',
 'DAY_OF_WEEK',
 'MONTH',
 'DISTANCE',
 'YEAR',
 'ARR_TIME',
 'FL_DATE',
 'ARR_DELAY_NEW',
 'DIVERTED']

In [63]:
cols1=["YEAR","Month","UNIQUE_CARRIER","TAIL_NUM","FL_NUM"]

df.select(*cols1, 'ARR_DELAY','AIR_TIME').sort('ARR_DELAY','AIR_TIME', ascending=False).show()


+----+-----+--------------+--------+------+---------+--------+
|YEAR|Month|UNIQUE_CARRIER|TAIL_NUM|FL_NUM|ARR_DELAY|AIR_TIME|
+----+-----+--------------+--------+------+---------+--------+
|1987|   10|            HP|    null|   398|    860.0|    null|
|1987|   10|            CO|    null|   812|    800.0|    null|
|1987|   10|            PS|    null|  1960|    769.0|    null|
|1987|   10|            AA|    null|  1045|    726.0|    null|
|1987|   10|        PA (1)|    null|   429|    687.0|    null|
|1987|   10|            HP|    null|   398|    638.0|    null|
|1987|   10|            CO|    null|    20|    592.0|    null|
|1987|   10|            EA|    null|   614|    542.0|    null|
|1987|   10|            CO|    null|   963|    540.0|    null|
|1987|   10|            CO|    null|   634|    521.0|    null|
|1987|   10|            CO|    null|     6|    505.0|    null|
|1987|   10|            CO|    null|   712|    499.0|    null|
|1987|   10|            CO|    null|     3|    478.0|  

In [64]:
df.filter(df.UNIQUE_CARRIER=='AA').show()

+----+-----+------------+-----------+-------------------+--------------+--------+------+-----------------+------+----------------+---------------+----+--------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+---------+-----------------+--------+----------------+-------------------+--------+-------+--------+--------------+-------------+-------------+---------+--------------+-------------------+----+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|            FL_DATE|UNIQUE_CARRIER|TAIL_NUM|FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_STATE_ABR|DEST_AIRPORT_ID|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|FLIGHTS|DISTANCE|DISTANCE_G

### Which one to use in pyspark? Apache Spark APIs – RDD, DataFrame, and DataSet comparison

Before starting the comparison between Spark RDD vs DataFrame vs Dataset, let us see RDDs, DataFrame and Datasets in Spark: (https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/)

    Spark RDD APIs – An RDD stands for Resilient Distributed Datasets. It is Read-only partition collection of records. 
    RDD is the fundamental data structure of Spark. It allows a programmer to perform in-memory computations on large 
    clusters in a fault-tolerant manner. Thus, speed up the task. object oriented programming and type safety at compile time
    
#### Works on structured and unstructured data

    Spark Dataframe APIs – Unlike an RDD, data organized into named columns. For example a table in a relational 
    database. It is an immutable distributed collection of data. DataFrame in Spark allows developers to impose 
    a structure onto a distributed collection of data, allowing higher-level abstraction. Follow this link to learn 
    Spark DataFrame in detail. Works on structured and semi-structured data; No type safety at compile time

#### Works on structured and semi-structured data

    Spark Dataset APIs – Datasets in Apache Spark are an extension of DataFrame API which provides type-safe, 
    object-oriented programming interface. Dataset takes advantage of Spark’s Catalyst optimizer by 
    exposing expressions and data fields to a query planner. Follow this link to learn Spark DataSet in detail. Type safety at compile time

#### Works on structured and unstructured data

All of the above data structures can support all essential data sources; however, RDD can work with compute engines native data, such as JDBC 

In [65]:
df.withColumn('ARR_DELAY_In_Hours', df.ARR_DELAY/60).show()  ## Mutating the data frame

+----+-----+------------+-----------+-------------------+--------------+--------+------+-----------------+------+----------------+---------------+----+--------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+---------+-----------------+--------+----------------+-------------------+--------+-------+--------+--------------+-------------+-------------+---------+--------------+-------------------+----+--------------------+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|            FL_DATE|UNIQUE_CARRIER|TAIL_NUM|FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_STATE_ABR|DEST_AIRPORT_ID|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|FLIGHT

In [66]:
import pandas as pd
pd.set_option('display.max_rows', None)
from pyspark.sql.functions import countDistinct, avg, stddev

avg_cols=['ARR_TIME','ARR_DELAY']
dfg1=df.groupby('YEAR','Month','UNIQUE_CARRIER')
dfg1.min(*avg_cols).show()

dfg1.avg(*avg_cols).show()

dfg1.max(*avg_cols).show()

## Now you can use select/filter/mutate in between the data frame and the summary
## functions such as min, avg, max, 



+----+-----+--------------+-------------+--------------+
|YEAR|Month|UNIQUE_CARRIER|min(ARR_TIME)|min(ARR_DELAY)|
+----+-----+--------------+-------------+--------------+
|1987|   10|            CO|            1|        -965.0|
|1987|   10|        PA (1)|            2|         -42.0|
|1987|   10|            NW|            1|        -595.0|
|1987|   10|            TW|            1|       -1295.0|
|1987|   10|            UA|            1|         -45.0|
|1987|   10|            US|            1|         -58.0|
|1987|   10|            PS|            1|         -56.0|
|1987|   10|            AA|            1|        -258.0|
|1987|   10|            EA|            1|         -49.0|
|1987|   10|            PI|            1|         -32.0|
|1987|   10|            DL|            1|         -71.0|
|1987|   10|            WN|            1|         -39.0|
|1987|   10|            HP|            1|         -48.0|
|1987|   10|            AS|            3|        -720.0|
+----+-----+--------------+----

In [67]:
df.columns

['YEAR',
 'MONTH',
 'DAY_OF_MONTH',
 'DAY_OF_WEEK',
 'FL_DATE',
 'UNIQUE_CARRIER',
 'TAIL_NUM',
 'FL_NUM',
 'ORIGIN_AIRPORT_ID',
 'ORIGIN',
 'ORIGIN_STATE_ABR',
 'DEST_AIRPORT_ID',
 'DEST',
 'DEST_STATE_ABR',
 'CRS_DEP_TIME',
 'DEP_TIME',
 'DEP_DELAY',
 'DEP_DELAY_NEW',
 'DEP_DEL15',
 'DEP_DELAY_GROUP',
 'TAXI_OUT',
 'WHEELS_OFF',
 'WHEELS_ON',
 'TAXI_IN',
 'CRS_ARR_TIME',
 'ARR_TIME',
 'ARR_DELAY',
 'ARR_DELAY_NEW',
 'ARR_DEL15',
 'ARR_DELAY_GROUP',
 'CANCELLED',
 'CANCELLATION_CODE',
 'DIVERTED',
 'CRS_ELAPSED_TIME',
 'ACTUAL_ELAPSED_TIME',
 'AIR_TIME',
 'FLIGHTS',
 'DISTANCE',
 'DISTANCE_GROUP',
 'CARRIER_DELAY',
 'WEATHER_DELAY',
 'NAS_DELAY',
 'SECURITY_DELAY',
 'LATE_AIRCRAFT_DELAY',
 '_c44']

In [68]:
start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


# Two step approach to groupby and then use agg functions

group_data = df.groupBy("YEAR","Month","UNIQUE_CARRIER")
dfgrouped=group_data.agg({'ARR_DELAY': 'avg', }).show()

dfgrouped=group_data.agg({'ARR_DELAY': 'max', }).show()

ti=(time.time() - start_time)
print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

print("End Time =", current_time)

Start Time = 09:59:58
+----+-----+--------------+------------------+
|YEAR|Month|UNIQUE_CARRIER|    avg(ARR_DELAY)|
+----+-----+--------------+------------------+
|1987|   10|            CO|2.0199623352165723|
|1987|   10|        PA (1)| 6.234344230011816|
|1987|   10|            NW| 7.854825235130837|
|1987|   10|            TW|  5.77659170499135|
|1987|   10|            UA|  5.26454604188044|
|1987|   10|            US| 7.916648510691275|
|1987|   10|            PS|17.218156228008446|
|1987|   10|            AA| 2.437455158559334|
|1987|   10|            EA| 5.243765017814235|
|1987|   10|            PI| 5.627097272259393|
|1987|   10|            DL|  8.16234956563322|
|1987|   10|            WN| 3.590649470290189|
|1987|   10|            HP|  9.45006729475101|
|1987|   10|            AS|10.100887372013652|
+----+-----+--------------+------------------+

+----+-----+--------------+--------------+
|YEAR|Month|UNIQUE_CARRIER|max(ARR_DELAY)|
+----+-----+--------------+--------------+
|1

In [69]:
#  Within every year print the top 10 delays of each carrier


## df = spark.createDataFrame([(1, 2, 3)] )
## cols = ["_1", "_2", "_3"]

## df.orderBy(cols, ascending=False)


#### Using pyspark dataframe  

start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


#from pyspark.sql.functions import mean, min, max
#df.select([mean('CRS_ELAPSED_TIME'), min('CRS_ELAPSED_TIME'), max('CRS_ELAPSED_TIME')]).show()

group_data = df.groupBy("YEAR","Month","UNIQUE_CARRIER")

dfgrouped=group_data.agg({'ARR_DELAY':'mean'})




dfgroupedPandas=dfgrouped.toPandas()

ti=(time.time() - start_time)
print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

print("End Time =", current_time)


Start Time = 10:01:05
--- 0.0 Minutes,   4.097306728363037 Seconds
End Time = 10:01:05


In [None]:
start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


from pyspark.sql.functions import mean, min, max
#df.select([mean('CRS_ELAPSED_TIME'), min('CRS_ELAPSED_TIME'), max('CRS_ELAPSED_TIME')]).show()



d1=df.take(10000)
d1p=sc.parallelize(d1)
d1Count=d1p.count()
print(d1Count)

ti=(time.time() - start_time)
print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

now=datetime.now()
print("End Time =", now.strftime("%H:%M:%S"))

In [None]:
start_time = time.time()
now=datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Start Time =", current_time)


from pyspark.sql.functions import mean, min, max
df.select([mean('CRS_ELAPSED_TIME'), min('CRS_ELAPSED_TIME'), max('CRS_ELAPSED_TIME')]).show()

ti=(time.time() - start_time)
print("--- %s Minutes, " % round(ti/60,0), " %s Seconds" % (ti%60))

print("End Time =", now.strftime("%H:%M:%S"))

In [None]:
df.dtypes

In [70]:
pd.set_option('display.max_rows', None)
dfCube1=df.cube(df.YEAR, df.MONTH,df.FL_NUM, df.UNIQUE_CARRIER).count().show(1000,False)

+----+-----+------+--------------+------+
|YEAR|MONTH|FL_NUM|UNIQUE_CARRIER|count |
+----+-----+------+--------------+------+
|1987|10   |195   |AA            |62    |
|1987|10   |226   |AA            |57    |
|1987|10   |265   |AA            |92    |
|1987|10   |303   |AA            |62    |
|1987|10   |530   |AA            |61    |
|1987|10   |595   |AA            |30    |
|1987|10   |973   |AA            |93    |
|1987|10   |1004  |AA            |60    |
|1987|10   |1062  |AA            |59    |
|1987|10   |2183  |AA            |89    |
|1987|10   |2404  |AA            |31    |
|1987|10   |174   |US            |60    |
|1987|10   |279   |US            |86    |
|1987|10   |395   |AS            |59    |
|1987|10   |313   |CO            |31    |
|1987|10   |573   |CO            |54    |
|null|null |61    |null          |407   |
|null|null |67    |AA            |62    |
|null|10   |123   |AA            |58    |
|null|null |134   |AA            |61    |
|1987|null |152   |null          |

In [None]:
# While The cube function “takes a list of columns and applies 
# aggregate expressions to all possible combinations of the grouping columns”. And, order of
# the arguments listed inside the cube does not matter
# rollup, on the other hand, is a specific subset of 'cube' that "computes hierarchical subtotals" 
# from left to right

In [71]:
dfRollup1=df.rollup(df.YEAR, df.MONTH,df.FL_NUM, df.UNIQUE_CARRIER).count().show(1000,False)

+----+-----+------+--------------+------+
|YEAR|MONTH|FL_NUM|UNIQUE_CARRIER|count |
+----+-----+------+--------------+------+
|1987|10   |195   |AA            |62    |
|1987|10   |226   |AA            |57    |
|1987|10   |265   |AA            |92    |
|1987|10   |303   |AA            |62    |
|1987|10   |530   |AA            |61    |
|1987|10   |595   |AA            |30    |
|1987|10   |973   |AA            |93    |
|1987|10   |1004  |AA            |60    |
|1987|10   |1062  |AA            |59    |
|1987|10   |2183  |AA            |89    |
|1987|10   |2404  |AA            |31    |
|1987|10   |174   |US            |60    |
|1987|10   |279   |US            |86    |
|1987|10   |395   |AS            |59    |
|1987|10   |313   |CO            |31    |
|1987|10   |573   |CO            |54    |
|1987|10   |675   |null          |423   |
|1987|10   |803   |null          |466   |
|1987|10   |1093  |null          |138   |
|1987|10   |2144  |null          |62    |
|1987|10   |2273  |null          |

# References

##### https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf 
##### https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf 
##### https://changhsinlee.com/pyspark-dataframe-basics/ 
##### https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/


In [None]:
df.show()

In [72]:
spark = SparkSession.builder.getOrCreate()
df.createOrReplaceTempView("flights")

sqlDF = spark.sql("SELECT * FROM flights limit 5")
sqlDF.show()

+----+-----+------------+-----------+-------------------+--------------+--------+------+-----------------+------+----------------+---------------+----+--------------+------------+--------+---------+-------------+---------+---------------+--------+----------+---------+-------+------------+--------+---------+-------------+---------+---------------+---------+-----------------+--------+----------------+-------------------+--------+-------+--------+--------------+-------------+-------------+---------+--------------+-------------------+----+
|YEAR|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|            FL_DATE|UNIQUE_CARRIER|TAIL_NUM|FL_NUM|ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_STATE_ABR|DEST_AIRPORT_ID|DEST|DEST_STATE_ABR|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|FLIGHTS|DISTANCE|DISTANCE_G

In [73]:
sqlDF2 = spark.sql("SELECT UNIQUE_CARRIER, count(UNIQUE_CARRIER) FROM flights GROUP BY UNIQUE_CARRIER ")
sqlDF2.show()

+--------------+---------------------+
|UNIQUE_CARRIER|count(UNIQUE_CARRIER)|
+--------------+---------------------+
|            EA|                37048|
|            UA|                52952|
|            PI|                39228|
|            PS|                14405|
|            AA|                56091|
|            NW|                37590|
|            HP|                15026|
|            TW|                23823|
|            DL|                63104|
|            US|                32293|
|            AS|                 7432|
|            CO|                42756|
|        PA (1)|                 5134|
|            WN|                21738|
+--------------+---------------------+

