In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\SPARK\\spark-2.4.5-bin-hadoop2.7'

# Chapter1: DATAFRAME DETAILS 

## 1: DEFINING A SCHEMA 

1: Import * from the **pyspark.sql.types** library.

2: Define a new schema using the **StructType** method.

3: Define a **StructField** for **name**, **age**, and **city**. Each field should correspond to the correct datatype and not be **nullable**.

In [2]:
# Import the pyspark.sql.types library
from pyspark.sql.types import * 

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),  #name is a column and False means not nullable
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

## 2: USING LAZY PROCESSING

Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to Spark not performing any transformations until an action is requested.

For this exercise, we'll be defining a Data Frame (**aa_dfw_df**) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

### INSTRUCTIONS

1: Load the Data Frame.

2: Add the transformation for **F.lower()** to the **Destination Airport** column.

3: Drop the **Destination Airport** column from the Data Frame **aa_dfw_df**. Note the time for these operations to complete.

4: Show the Data Frame, noting the time difference for this action to complete.


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(spark)

import pyspark.sql.functions as F
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/AA_DFW_2017_Departures_Short.csv.gz')
aa_dfw_df.show()

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

<pyspark.sql.session.SparkSession object at 0x0000012C1E1E2588>
+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|         0005|                HNL|                          537|
|       01/01/2017|         0007|                OGG|                          498|
|       01/01/2017|         0037|                SFO|                          241|
|       01/01/2017|         0043|                DTW|                          134|
|       01/01/2017|         0051|                STL|                           88|
|       01/01/2017|         0060|                MIA|                          149|
|       01/01/2017|         0071|                LAX|                          203|
|       01/01/2017|         0074|                MEM|                           76|
|       01/0

In [28]:
df1 = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/AA_DFW_2017_Departures_Short.csv.gz')
#df1.show()
df2 = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/AA_DFW_2016_Departures_Short.csv.gz')
#df2.show()
print(df1)
df3 = df1.union(df2)
#df3 = df3.withColumn("Date(MM-DD-YYYY)", df3.Date (MM/DD/YYYY))

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

DataFrame[Date (MM/DD/YYYY): string, Flight Number: string, Destination Airport: string, Actual elapsed time (Minutes): string]


AnalysisException: 'Attribute name "Date (MM/DD/YYYY)" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it.;'

## 3: SAVING A DATAFRAME IN PARQUET FORMAT

When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. The **Parquet** format is a columnar data store, allowing Spark to use **predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset**. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

### INSTRUCTIONS
1: View the row count of **df** and **df2**.

2: Combine **df1** and **df2** in a new DataFrame named **df3** with the union method.

3: Save **df3** to a **parquet** file named **AA_DFW_ALL.parquet**.

4: Read the **AA_DFW_ALL.parquet** file and show the count.

In [3]:
df1 = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/AA_DFW_2017_new_Departures_Short.csv')

df2 = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/AA_DFW_2016_new_Departures_Short.csv')
#print(df1)

print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

df3 = df3.withColumnRenamed("Flight Number", "FlightNumber")

df3 = df3.withColumnRenamed("Destination Airport", "Destination-Airport")

df3 = df3.withColumnRenamed("Actual elapsed time (Minutes)", "ActualelapsedtimeMinutes")

# Save the df3 DataFrame in Parquet format
df3.write.parquet('E:/STUDY/DATASETS/AA_DFW_ALL.parquet', mode='overwrite')
print(df3)


# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('E:/STUDY/DATASETS/AA_DFW_ALL.parquet').count())

df1 Count: 139358
df2 Count: 140604
DataFrame[Date-MM/DD/YYYY: string, FlightNumber: string, Destination-Airport: string, ActualelapsedtimeMinutes: string]
279962


## 4: SQL AND PARQUET

1: Import the **AA_DFW_ALL.parquet** file into **flights_df**.

2: Use the **createOrReplaceTempView** method to alias the **flights** table.

3: Run a Spark SQL query against the **flights** table.

In [20]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('E:/STUDY/DATASETS/AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(ActualelapsedtimeMinutes) from flights').collect()[0] #.collect()[0] need tounderstand this
print('The average flight time is: %d' % avg_duration)

The average flight time is: 151


# Chapter2: MANIPULATING DATAFRAMES IN THE REAL WORLD

## 1: FILTERING COLUMN CONTENT WITH PYTHON
You've looked at using various operations on DataFrame columns - now you can modify a real dataset. The DataFrame **voter_df** contains information regarding the voters on the Dallas City Council from the past few years. This truncated DataFrame contains the date of the vote being cast and the name and position of the voter. Your manager has asked you to clean this data so it can later be integrated into some desired reports. The primary task is to remove any null entries or odd characters and return a specific set of voters where you can validate their information.

This is often one of the first steps in data cleaning - removing anything that is obviously outside the format. For this dataset, make sure to look at the original data and see what looks out of place for the **VOTER_NAME** column.

### INSTRUCTIONS
1: Show the distinct **VOTER_NAME** entries.

2: Filter **voter_df** where the **VOTER_NAME** is 1-20 characters in length.

3: Filter out **voter_df** where the **VOTER_NAME** contains an **_**.

4: Show the distinct **VOTER_NAME** entries again.

In [4]:
voter_df = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/DallasCouncilVoters.csv')
import pyspark.sql.functions as F

# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|Mark  Clayton      |
|Casey Thomas       |
|Sandy  Greyson     |
|Mark Clayton       |
|Jennifer S.  Gates |
|Tiffinni A. Young  |
|B. Adam  McGough   |
|Omar Narvaez       |
|Philip T. Kingston |
|Rickey D. Callahan |
|Dwaine R. Caraway  |
|Philip T.  Kingston|
|Jennifer S. Gates  |
|Lee M. Kleinman    |
|Monica R. Alonzo   |
|Rickey D.  Callahan|
|Carolyn King Arnold|
|Erik Wilson        |
|Lee Kleinman       |
+-------------------+



In [24]:
voter_df.select('VOTER_NAME').show(100)

+-------------------+
|         VOTER_NAME|
+-------------------+
|  Jennifer S. Gates|
| Philip T. Kingston|
|Michael S. Rawlings|
|       Adam Medrano|
|       Casey Thomas|
|Carolyn King Arnold|
|       Scott Griggs|
|   B. Adam  McGough|
|       Lee Kleinman|
|      Sandy Greyson|
|  Jennifer S. Gates|
| Philip T. Kingston|
|Michael S. Rawlings|
|       Adam Medrano|
|       Casey Thomas|
|Carolyn King Arnold|
| Rickey D. Callahan|
|  Jennifer S. Gates|
|     Sandy  Greyson|
| Jennifer S.  Gates|
|Philip T.  Kingston|
|Michael S. Rawlings|
|       Adam Medrano|
|       Casey Thomas|
|  Dwaine R. Caraway|
|Rickey D.  Callahan|
|       Omar Narvaez|
|       Kevin Felder|
|     Tennell Atkins|
|      Mark  Clayton|
|       Scott Griggs|
|   B. Adam  McGough|
|      Scott  Griggs|
|   B. Adam  McGough|
|    Lee M. Kleinman|
|     Sandy  Greyson|
| Jennifer S.  Gates|
|Philip T.  Kingston|
|Michael S. Rawlings|
|       Adam Medrano|
|       Casey Thomas|
|  Dwaine R. Caraway|
|Rickey D.

## 2: MODIFYING DATAFRAME COLUMNS
Previously, you filtered out any rows that didn't conform to something generally resembling a name. Now based on your earlier work, your manager has asked you to create two new columns - first_name and last_name. She asks you to split the VOTER_NAME column into words on any space character. You'll treat the last word as the last_name, and all other words as the first_name. You'll be using some new functions in this exercise including **.split(), .size(), and .getItem()**.

### INSTRUCTIONS
1: Add a new column called **splits** holding the list of possible names.

2: Use the **getItem()** method and create a new column called **first_name**.

3: Get the last entry of the **splits** list and create a column called **last_name**.

4: Drop the **splits** column and show the new **voter_df**.

In [12]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df.show()
# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()


+----------+-------------+-------------------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|
+----------+-------------+-------------------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|
|02/08/2017|Councilmember|       Scott Griggs|     [Scott, Griggs]|
|02/08/2017|Councilmember|   B. Adam  McGough| [B., Adam, McGough]|
|02/08/2017|Councilmember|       Lee Kleinman|     [Lee, Kleinman]|
|02/08/2017|Councilmember|      Sandy Greyson|    [Sandy, Greyson]|
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|[P

In [51]:
l = ['malik', 'sharjeel', 'umair']
print(len(l))

import numpy as np
print(np.size(l) - 1)

3
2


In [50]:
import numpy as np
print(np.size(l) - 1)

2


## 3: when() EXAMPLE
1: Add a column to **voter_df** named **random_val** with the results of the 

2: **F.rand()** method for any voter with the title **Councilmember**.

3: Show some of the DataFrame rows, noting whether the **.when()** clause worked.

In [11]:
import pyspark.sql.functions as F

# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val', F.when(voter_df.TITLE == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates| 0.24187069712384035|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.09506244915212914|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                null|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.4122105474958815|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|  0.7423475735081858|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|  0.8590369223603771|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|  0.2962483998172717|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|   0.465153012488559|
|02/08/2017|Councilme

## 4: WHEHN / OTHERWISE
1: Add a column to **voter_df** named **random_val** with the results of the **F.rand()** method for any voter with the title **Councilmember**. Set **random_val** to 2 for the **Mayor**. Set any other title to the value 0.

2: Show some of the Data Frame rows, noting whether the clauses worked.

3: Use the **.filter** clause to find 0 in **random_val**.


In [13]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|0.006641414161357773|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|  0.4967686417936066|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.5358170203472812|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|  0.9733400220184432|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|  0.2293476812801355|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs| 0.09690615941022906|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough| 0.18239797441544126|
|02/08/2017|Councilme

## 5: USER DEFINED FUNCTIONS IN SPARK

1: Edit the **getFirstAndMiddle()** function to return a space separated string of names, except the last entry in the names list.

2: Define the function as a user-defined function. It should return a string type.

3: Create a new column on **voter_df** called **first_and_middle_name** using your UDF.

4: Drop the "first_name" and "splits" columns (on separate lines), then show the Data Frame.

In [14]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df.show()
# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|              splits|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|[Philip, T., King...|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|[Michael, S., Raw...|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|     [Adam, Medrano]|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|     [Casey, Thomas]|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|[Carolyn, King, A...|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|     [Scott, Griggs]|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough| [B., Adam, McGough]|
|02/08/2017|Councilme

In [19]:
import pyspark.sql.functions as F

def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, F.StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Drop the unnecessary columns then show the DataFrame
voter_df = voter_df.drop('first_name')
voter_df = voter_df.drop('splits')
voter_df.show()

AttributeError: 'DataFrame' object has no attribute 'splits'

## ADDING AN ID FIELD
When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the **unique** voter names from the DataFrame and add a unique ID number. Remember that Spark IDs are assigned based on the DataFrame **partition** - as such the ID values may be much greater than the actual number of rows in the DataFrame.

### INSTRUCTIONS
1: Select the unique entries from the column **VOTER NAME** and create a new DataFrame called **voter_df**.

2: Count the rows in the **voter_df** DataFrame.

3: Add a ROW_ID column using the appropriate Spark function.

4: Show the rows with the 10 highest ROW_IDs.

In [5]:
df = spark.read.format('csv').options(Header=True).load('E:/STUDY/DATASETS/DallasCouncilVotes.csv.gz')

# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct() #use distinct for unique entries

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)


There are 36 rows in the voter_df DataFrame.

+--------------------+-------------+
|          VOTER NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the   final  2...|1443109011456|
|    Monica R. Alonzo|1382979469312|
|     Lee M. Kleinman|1228360646656|
|   Jennifer S. Gates|1194000908288|
+--------------------+-------------+
only showing top 10 rows



## IDs WITH DIFFERENT PARTITIONS
You've just completed adding an ID field to a DataFrame. Now, take a look at what happens when you do the same thing on DataFrames containing a different number of partitions.

To check the number of partitions, use the method **.rdd.getNumPartitions()** on a DataFrame.

### INSTRUCTIONS
1: Print the number of partitions on each DataFrame.

2: Add a **ROW_ID** field to each DataFrame.

3: Show the top 10 IDs in each DataFrame.

In [24]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
#print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)


There are 200 partitions in the voter_df DataFrame.


There are 200 partitions in the voter_df_single DataFrame.

+--------------------+-------------+
|          VOTER NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the   final  2...|1443109011456|
|    Monica R. Alonzo|1382979469312|
|     Lee M. Kleinman|1228360646656|
|   Jennifer S. Gates|1194000908288|
+--------------------+-------------+
only showing top 10 rows

+--------------------+-------------+
|          VOTER NAME|       ROW_ID|
+--------------------+-------------+
|        Lee Kleinman|1709396983808|
|  the  final  201...|1700807049217|
|         Erik Wilson|1700807049216|
|  the  final   20...|1683627180032|
| Carolyn King Arnold|1632087572480|
| Rickey D.  Callahan|1597727834112|
|   the 

## MORE ID TRICKS 
1: Determine the highest **ROW_ID** in **voter_df_march** and save it in the variable **previous_max_ID**. The statement **.rdd.max()[0]** will get the maximum ID.

2: Add a **ROW_ID** column to **voter_df_april** starting at the value of **previous_max_ID**.

3: Show the **ROW_ID**'s from both Data Frames and compare.

In [26]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

+-------------+
|       ROW_ID|
+-------------+
|   8589934592|
|  25769803776|
|  34359738368|
|  42949672960|
|  51539607552|
| 103079215104|
| 111669149696|
| 231928233984|
| 240518168576|
| 360777252864|
| 395136991232|
| 601295421440|
| 635655159808|
| 670014898176|
| 807453851648|
| 850403524608|
| 944892805120|
| 962072674304|
|1005022347264|
|1047972020224|
+-------------+
only showing top 20 rows

+-------------+
|       ROW_ID|
+-------------+
|1717986918400|
|1735166787584|
|1743756722176|
|1752346656768|
|1760936591360|
|1812476198912|
|1821066133504|
|1941325217792|
|1949915152384|
|2070174236672|
|2104533975040|
|2310692405248|
|2345052143616|
|2379411881984|
|2516850835456|
|2559800508416|
|2654289788928|
|2671469658112|
|2714419331072|
|2757369004032|
+-------------+
only showing top 20 rows



# Chapter3: IMPROVING PERFORMANCE

## 1: CACHING A DATAFRAME
You've been assigned a task that requires running several analysis operations on a DataFrame. You've learned that caching can improve performance when reusing DataFrames and would like to implement it.

You'll be working with a new dataset consisting of airline departure information. It may have repetitive data and will need to be de-duplicated.

### INSTRUCTIONS
1: Cache the unique rows in the **departures_df** DataFrame.

2: Perform a count query on **departures_df**, noting how long the operation takes.

3: Count the rows again, noting the variance in time of a cached DataFrame.

In [17]:
departures_df = spark.read.csv('E:/STUDY/DATASETS/AA_DFW_2017_new_Departures_Short.csv', header=True)

import time
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))


Counting 139358 rows took 6.884262 seconds
Counting 139358 rows again took 2.179404 seconds


Consider why the first run takes longer even though you've told it to **cache()** the DataFrame. Remember that even though you've applied the caching transformation, it doesn't take effect until an action is run. The action instantiates the caching after the **distinct()** function completes. The second time, there is no need to recalculate anything so it returns almost immediately.

## 2: REMOVING A DATAFRAME FROM CACHE
You've finished the analysis tasks with the departures_df DataFrame, but have some other processing to do. You'd like to remove the DataFrame from the cache to prevent any excess memory usage on your cluster.

### INSTRUCTIONS
1: Check the caching status on the **departures_df** DataFrame.

2: Remove the **departures_df** DataFrame from the cache.

3: Validate the caching status again

In [18]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


## 3: FILE IMPORT PERFORMANCE (### Discuss)
You've been given a large set of data to import into a Spark DataFrame. You'd like to test the difference in import speed by splitting up the file.

You have two types of files available: **departures_full.txt.gz** and **departures_xxx.txt.gz** where xxx is 000 - 013. The same number of rows is split between each file.

In [20]:
full_df = spark.read.csv('E:/STUDY/DATASETS/AA_DFW_2017_new_Departures_Short.csv')
full_df.show()

#split_df = spark.read.csv('departures_013.txt.gz')

+---------------+-------------+-------------------+--------------------+
|            _c0|          _c1|                _c2|                 _c3|
+---------------+-------------+-------------------+--------------------+
|Date-MM/DD/YYYY|Flight Number|Destination Airport|Actual elapsed ti...|
|       01-01-17|            5|                HNL|                 537|
|       01-01-17|            7|                OGG|                 498|
|       01-01-17|           37|                SFO|                 241|
|       01-01-17|           43|                DTW|                 134|
|       01-01-17|           51|                STL|                  88|
|       01-01-17|           60|                MIA|                 149|
|       01-01-17|           71|                LAX|                 203|
|       01-01-17|           74|                MEM|                  76|
|       01-01-17|           81|                DEN|                 123|
|       01-01-17|           89|                SLC|

In [29]:
df_csv = spark.read.csv('E:/STUDY/DATASETS/AA_DFW_2017_new_Departures_Short.csv')
df_csv.write.parquet('E:/STUDY/DATASETS/data.parquet')
dfp = spark.read.parquet('E:/STUDY/DATASETS/data.parquet')


In [32]:
#full_df = spark.read.csv('departures_full.txt.gz')
#split_df = spark.read.csv('departures_013.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % df_csv.count() )
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % dfp.count())
print("Time to run: %f" % (time.time() - start_time_b))

Total rows in full DataFrame:	139359
Time to run: 0.534921
Total rows in split DataFrame:	139359
Time to run: 0.624882


## 4: READING SPARK CONFIGURATIONS
1: Check the name of the Spark application instance **('spark.app.name')**.

2: Determine the TCP port the driver runs on **('spark.driver.port')**.

3: Determine how many partitions are configured for joins.

4: Show the results.

In [33]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: pyspark-shell
Driver TCP port: 50496
Number of partitions: 200


### NOTE: 
Using the **spark.conf** object allows you to validate the settings of a cluster without having configured it initially. This can help you know what changes should be optimized for your needs.

In [38]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('E:/STUDY/DATASETS/AA_DFW_2017_new_Departures_Short.csv').distinct()

#after = departures_df.rdd.getNumPartitions()
# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % before)

Partition count before change: 500
Partition count after change: 500


### NOTE:
It's important to remember that modifying the settings in Spark may change objects that already exist. Sometimes the changes only take effect after configuring a new DataFrame. Remember to test changes you make to Spark configurations to verify it does exactly what you think.

## 5: NORMAL JOINS
1: Create a new DataFrame normal_df by joining flights_df with **airports_df**.

2: Determine which type of join is used in the query plan.

In [51]:
flights_df = spark.read.csv('E:/STUDY/flights_small.csv', header=True) 
print(flights_df.count())

airports_df = spark.read.csv('E:/STUDY/airports.csv', header=True) 
print(airports_df.count())

10000
1397


In [50]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["dest"] == airports_df["alt"] )
print(normal_df.count())
# Show the query plan
normal_df.explain()


0
== Physical Plan ==
*(2) BroadcastHashJoin [dest#1249], [alt#1305], Inner, BuildRight
:- *(2) Project [year#1238, month#1239, day#1240, dep_time#1241, dep_delay#1242, arr_time#1243, arr_delay#1244, carrier#1245, tailnum#1246, flight#1247, origin#1248, dest#1249, air_time#1250, distance#1251, hour#1252, minute#1253]
:  +- *(2) Filter isnotnull(dest#1249)
:     +- *(2) FileScan csv [year#1238,month#1239,day#1240,dep_time#1241,dep_delay#1242,arr_time#1243,arr_delay#1244,carrier#1245,tailnum#1246,flight#1247,origin#1248,dest#1249,air_time#1250,distance#1251,hour#1252,minute#1253] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/STUDY/flights_small.csv], PartitionFilters: [], PushedFilters: [IsNotNull(dest)], ReadSchema: struct<year:string,month:string,day:string,dep_time:string,dep_delay:string,arr_time:string,arr_d...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[4, string, true]))
   +- *(1) Project [faa#1301, name#1302, lat#1303, lon#1304, alt#1305, tz#1

## 6: USING BRAODCASTING ON SPARK JOINS
Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's **broadcast** operations to give each node a copy of the specified data.

A couple tips:

1: Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.

2: On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.

3: If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.

### INSTRUCTIONS
1: Import the **broadcast()** method from pyspark.sql.functions.

2: Create a new DataFrame **broadcast_df** by joining **flights_df** with **airports_df**, using the broadcasting.

3: Show the query plan and consider differences from the original.

In [47]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["dest"] == airports_df["alt"] )

# Show the query plan and compare against the original
broadcast_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [dest#985], [alt#1020], Inner, BuildRight
:- *(2) Project [year#974, month#975, day#976, dep_time#977, dep_delay#978, arr_time#979, arr_delay#980, carrier#981, tailnum#982, flight#983, origin#984, dest#985, air_time#986, distance#987, hour#988, minute#989]
:  +- *(2) Filter isnotnull(dest#985)
:     +- *(2) FileScan csv [year#974,month#975,day#976,dep_time#977,dep_delay#978,arr_time#979,arr_delay#980,carrier#981,tailnum#982,flight#983,origin#984,dest#985,air_time#986,distance#987,hour#988,minute#989] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/STUDY/flights_small.csv], PartitionFilters: [], PushedFilters: [IsNotNull(dest)], ReadSchema: struct<year:string,month:string,day:string,dep_time:string,dep_delay:string,arr_time:string,arr_d...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[4, string, true]))
   +- *(1) Project [faa#1016, name#1017, lat#1018, lon#1019, alt#1020, tz#1021, dst#1022]
      +- *(1) Filter 

## 7: COMPARING BROADCAST vs NORMAL JOINS
1: Execute **.count()** on the normal DataFrame.

2: Execute **.count()** on the broadcasted DataFrame.

3: Print the count and duration of the DataFrames noting and differences.

In [48]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count() 
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

Normal count:		0	duration: 1.764707
Broadcast count:	0	duration: 0.641665


# Chapter4: COMPLEX PROCESSING AND DATA PIPELINES 

## 1: QUICK PIPELINE
Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

### INSTRUCTIONS
1: Import the file **2015-departures.csv.gz** to a DataFrame. Note the header is already defined.

2: Filter the DataFrame to contain only flights with a duration over 0 minutes. Use the index of the column, not the column name (remember to use **.printSchema()** to see the column names / order).

3: Add an ID column.

4: Write the file out as a JSON document named **output.json**.

In [5]:
import pyspark.sql.functions as F

# Import the data to a DataFrame
departures_df = spark.read.csv('E:/STUDY/DATASETS/AA_DFW_2015_Departures_Short.csv', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('E:/STUDY/DATASETS/output.json')


In [6]:
departures_df.printSchema()

root
 |-- Date (MM/DD/YYYY): string (nullable = true)
 |-- Flight Number: string (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): string (nullable = true)
 |-- id: long (nullable = false)



## 2: REMOVING COMMENTED LINES
Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

### INSTRUCTIONS
1: Import the **annotations.csv.gz** file to a DataFrame and perform a row count. Specify a separator character of |.

2: Query the data for the number of rows beginning with **#**.

3: Import the file again to a new DataFrame, but specify the comment character in the options to remove any commented rows.

4: Count the new DataFrame and verify the difference is as expected.

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#') #Use # on the comment argument to remove comment rows.

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

## 3: REMOVING INVALID ROWS
Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (**\t**) characters.

### INSTRUCTIONS
1: Create a new variable **tmp_fields** using the **annotations_df** DataFrame column **'_c0'** splitting it on the tab character.

2: Create a new column in **annotations_df** named **'colcount'** representing the number of fields defined in the previous step.

3: Filter out any rows from **annotations_df** containing fewer than 5 fields.

4: Count the number of rows in the DataFrame and compare to the **initial_count**.

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df["colcount"] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

## 4: SPLITTING INTO COLUMNS
1: Split the content of the **'_c0'** column on the tab character and store in a variable called split_cols.

2: Add the following columns based on the first four entries in the variable above: folder, filename, width, height on a DataFrame named **split_df**.

3: Add the **split_cols** variable as a column.

In [None]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

You may be wondering why we're not using a schema instead to define the content layout. Spark's CSV parser can't handle advanced types (Arrays or Maps) so it wouldn't process correctly. In our example, we bypass using the types.

## 5: FURTHER PARSING
1: Create a new function called **retriever** that takes two arguments, the split columns (cols) and the total number of columns (colcount). This function should return a list of the entries that have not been defined as columns yet (i.e., everything after item 4 in the list).

2: Define the function as a Spark UDF, returning an Array of strings.

3: Create the new column **dog_list** using the UDF and the available columns in the DataFrame.

4: Remove the columns **_c0**, colcount, and split_cols.

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

## 7: VALIDATE ROWS VIA JOIN
Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named **valid_folders_df**.

### INSTRUCTIONS
1: Rename the **_c0** column to **folder** on the **valid_folders_df** DataFrame.

2: Count the number of rows in **split_df**.

3: Join the two DataFrames on the folder name, and call the resulting DataFrame **joined_df**. Make sure to broadcast the smaller DataFrame.

4: Check the number of rows remaining in the DataFrame and compare.

In [None]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

## 7: EXAMINING INVALID ROWS
1: Determine the row counts for each DataFrame.

2: Create a DataFrame containing only the invalid rows.

3: Validate the count of the new DataFrame is as expected.

4: Determine the number of distinct folder rows removed.

In [None]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

## DOG PARSING
1: Select the column representing the dog details from the DataFrame and show the first 10 un-truncated rows.

2: Create a new schema as you've done before, using breed, start_x, start_y, end_x, and end_y as the names. Make sure to specify the proper data types for each field in the schema (any number value is an integer).

In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

## PER IMAGE COUNT
Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your **dog_list** column created earlier. You have also created the **DogType** which will allow better parsing of the data within some of the data columns.

The **joined_df** is available as you last defined it, and the **DogType** structtype is defined. pyspark.sql.functions is available under the F alias.

### INSTRUCTIONS
1: Create a Python function to split each entry in dog_list to its appropriate parts. Make sure to convert any strings into the appropriate types or the DogType will not parse correctly.

2: Create a UDF using the above function.

3: Use the UDF to create a new column called dogs. Drop the previous column in the same command.

4: Show the number of dogs in the new column for the first 10 rows.


In [None]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

## PERCENTAGE DOG PIXELS
The final task for parsing the dog annotation data is to determine the percentage of pixels in each image that represents a dog (or dogs). You'll need to use the various techniques you've learned in this course to help calculate this information and add it as columns for later analysis.

To calculate the percentage of pixels, first calculate the total number of pixels representing each dog then sum them for the image. You can calculate the bounding box with the formula:

(Xend - Xstart) * (Yend - Ystart)

NOTE: You can ignore the possibility of overlapping bounding boxes in this instance.

For the percentage, calculate the total number of "dog" pixels divided by the total size of the image, multiplied by 100.
The joined_df DataFrame is as you last used it. pyspark.sql.functions is aliased to F.

### INSTRUCTIONS
1: Define a Python function to take a list of tuples (the dog objects) and calculate the total number of "dog" pixels per image.

2: Create a UDF of the function and use it to create a new column called **'dog_pixels'** on the DataFrame.

3: Create another column, **'dog_percent'**, representing the percentage of **'dog_pixels'** in the image. Make sure this is between 0-100%.

4: Show the first 10 rows with more than 60% **'dog_pixels'** in the image.

In [None]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)