In [None]:
!wget -O AA_DFW_2014.csv.gz https://assets.datacamp.com/production/repositories/4336/datasets/f412f5acbef38630147c2956d8703bafbcd0f74c/AA_DFW_2014_Departures_Short.csv.gz
!wget -O AA_DFW_2015.csv.gz https://assets.datacamp.com/production/repositories/4336/datasets/475d2803541ba8facb2c39024dd0d9497859dc6c/AA_DFW_2015_Departures_Short.csv.gz
!wget -O AA_DFW_2016.csv.gz https://assets.datacamp.com/production/repositories/4336/datasets/c1abacafea802998597d6c68b27c7b8650a18ab8/AA_DFW_2016_Departures_Short.csv.gz
!wget -O AA_DFW_2017.csv.gz https://assets.datacamp.com/production/repositories/4336/datasets/04db01ffbd39f7bf2f88ffd5b7924b2de0419168/AA_DFW_2017_Departures_Short.csv.gz
!wget -O votes.csv.gz https://assets.datacamp.com/production/repositories/4336/datasets/ea700976560a6f1760782c6e7310a662120c63b5/DallasCouncilVotes.csv.gz
!wget -O voters.csv.gz https://assets.datacamp.com/production/repositories/4336/datasets/c0aa672020bce21eef0d875a484a4fd44da042cf/DallasCouncilVoters.csv.gz

In [None]:
%pip install pyspark[sql]
%pip install pyspark[pandas_on_spark]

In [62]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
import time

# Configure Spark Session

In [45]:
spark = SparkSession.builder \
    .appName("DataCamp Locally") \
    .master("local") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.ui.port", "4040") \
    .config("spark.ui.bindAddress", "127.0.0.1") \
    .getOrCreate()

# Define Schema

In [46]:
# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

In [47]:
aa_dfw_df = spark.read.format('csv').options(Header=True).load('/content/AA_DFW_2017.csv.gz')
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])
aa_dfw_df.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|         0005|                          537|    hnl|
|       01/01/2017|         0007|                          498|    ogg|
|       01/01/2017|         0037|                          241|    sfo|
|       01/01/2017|         0043|                          134|    dtw|
|       01/01/2017|         0051|                           88|    stl|
|       01/01/2017|         0060|                          149|    mia|
|       01/01/2017|         0071|                          203|    lax|
|       01/01/2017|         0074|                           76|    mem|
|       01/01/2017|         0081|                          123|    den|
|       01/01/2017|         0089|                          161|    slc|
|       01/01/2017|         0096|                           84| 

# To Parquet / Read Parquet

In [48]:
all_flights = spark.read.csv("/content/AA_DFW_*.csv.gz", header=True, inferSchema=True)
all_flights.write.parquet('/content/AA_DFW_ALL.parquet', mode='overwrite')
all_flights.show()

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|            5|                HNL|                          519|
|       01/01/2014|            7|                OGG|                          505|
|       01/01/2014|           35|                SLC|                          174|
|       01/01/2014|           43|                DTW|                          153|
|       01/01/2014|           52|                PIT|                          137|
|       01/01/2014|           58|                SAN|                          174|
|       01/01/2014|           60|                MIA|                          155|
|       01/01/2014|           64|                JFK|                          185|
|       01/01/2014|           90|                ORD|                       

In [49]:
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')
flights_df = flights_df.withColumn('Actual elapsed time (Minutes)', flights_df['Actual elapsed time (Minutes)'].cast("integer"))
flights_df.createOrReplaceTempView('flights')

avg_duration = spark.sql('SELECT avg(`Actual elapsed time (Minutes)`) as avg_duration from flights').collect()[0]
print(f'The average flight time is: {avg_duration["avg_duration"]}')

The average flight time is: 147.59399915712726


In [50]:
# Show the distinct VOTER_NAME entries
voter_df = spark.read.csv("/content/voters.csv.gz", header=True, inferSchema=True)
voter_df.select(voter_df['VOTER_NAME']).distinct().show(40)
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))
voter_df.select('VOTER_NAME').distinct().show(40)

+--------------------+
|          VOTER_NAME|
+--------------------+
|      Tennell Atkins|
|  the  final   20...|
|        Scott Griggs|
|       Scott  Griggs|
|       Sandy Greyson|
| Michael S. Rawlings|
| the final 2018 A...|
|        Kevin Felder|
|        Adam Medrano|
|       Casey  Thomas|
|   the   final  2...|
|          011018__42|
|       Mark  Clayton|
|        Casey Thomas|
|      Sandy  Greyson|
|        Mark Clayton|
|  Jennifer S.  Gates|
|   Tiffinni A. Young|
|  the  final  201...|
|    B. Adam  McGough|
|        Omar Narvaez|
|  Philip T. Kingston|
|  Rickey D. Callahan|
|   Dwaine R. Caraway|
| Philip T.  Kingston|
|   Jennifer S. Gates|
|     Lee M. Kleinman|
|    Monica R. Alonzo|
|   the   final  2...|
| Rickey D.  Callahan|
| Carolyn King Arnold|
|  the  final   20...|
|         Erik Wilson|
|  the  final  201...|
|        Lee Kleinman|
|                NULL|
+--------------------+

+-------------------+
|         VOTER_NAME|
+-------------------+
|     Tennell

In [51]:
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
voter_df = voter_df.drop('splits')
voter_df.show()



+----------+-------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|
+----------+-------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|       Lee| Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|     Sandy|  Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|  

# When / Otherwise

In [52]:
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand()))

In [53]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

voter_df.filter(voter_df.random_val == 0).show()

+----------+--------------------+-----------------+----------+---------+----------+
|      DATE|               TITLE|       VOTER_NAME|first_name|last_name|random_val|
+----------+--------------------+-----------------+----------+---------+----------+
|04/25/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|04/25/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|09/18/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|    

# Defined Functions

In [57]:
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df = voter_df.drop('splits')
voter_df.show()

+----------+-------------+-------------------+----------+---------+--------------------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|first_and_middle_name|
+----------+-------------+-------------------+----------+---------+--------------------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.2987287340381092|          Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.39570315564845493|            Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|           Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.5215374774190792|                 Adam|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.017804270476997397|                Casey|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|  0.8564584398287882|         Carolyn King|
|

# Adding an ID Field

In [60]:
# Select all the unique council voters
voter_df = voter_df.select(voter_df["VOTER_NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)


There are 27 rows in the voter_df DataFrame.

+-------------------+------+
|         VOTER_NAME|ROW_ID|
+-------------------+------+
|       Lee Kleinman|    26|
|        Erik Wilson|    25|
|Carolyn King Arnold|    24|
|Rickey D.  Callahan|    23|
|   Monica R. Alonzo|    22|
|    Lee M. Kleinman|    21|
|  Jennifer S. Gates|    20|
|Philip T.  Kingston|    19|
|  Dwaine R. Caraway|    18|
| Rickey D. Callahan|    17|
+-------------------+------+
only showing top 10 rows



# ID with different Partitions

In [61]:
# Print the number of partitions in each DataFrame
print(f"\nThere are {voter_df.rdd.getNumPartitions()} partitions in the voter_df DataFrame.\n")

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)


There are 1 partitions in the voter_df DataFrame.

+-------------------+------+
|         VOTER_NAME|ROW_ID|
+-------------------+------+
|       Lee Kleinman|    26|
|        Erik Wilson|    25|
|Carolyn King Arnold|    24|
|Rickey D.  Callahan|    23|
|   Monica R. Alonzo|    22|
|    Lee M. Kleinman|    21|
|  Jennifer S. Gates|    20|
|Philip T.  Kingston|    19|
|  Dwaine R. Caraway|    18|
| Rickey D. Callahan|    17|
+-------------------+------+
only showing top 10 rows



# Improving Performance

## Caching

In [63]:
start_time = time.time()
departures_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print(f"Counting {departures_df.count()} rows took {time.time() - start_time} seconds")

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print(f"Counting {departures_df.count()} rows took {time.time() - start_time} seconds")

Counting 583718 rows took 18.604215383529663 seconds
Counting 583718 rows took 4.329400539398193 seconds


## Removing a DataFrame from cache

In [64]:
# Determine if departures_df is in the cache
print(f"Is departures_df cached?: {departures_df.is_cached}")
print(f"Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print(f"Is departures_df cached?: {departures_df.is_cached}")

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


## Read Spark Configurations

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print(f"Name: {app_name}")
print(f"Driver TCP port: {driver_tcp_port}")
print(f"Number of partitions: {num_partitions}")

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)
print(f"Number of partitions: {num_partitions}")

## Broadcasting

In [None]:
voters = spark.read.csv("/content/voters.csv.gz", header=True, inferSchema=True)
votes = spark.read.csv("/content/votes.csv.gz", header=True, inferSchema=True)

# Join the flights_df and aiports_df DataFrames
normal_df = voters.join(votes,
  (voters["DATE"] == votes["DATE"]) &
  (voters["VOTER_NAME"] == votes["VOTER NAME"]) &
  (voters["TITLE"] == votes["TITLE"])
  )

# Show the query plan
normal_df.explain()

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = F.broadcast(voters).join(votes,
  (voters["DATE"] == votes["DATE"]) &
  (voters["VOTER_NAME"] == votes["VOTER NAME"]) &
  (voters["TITLE"] == votes["TITLE"])
  )

# broadcast_df = flights_df.join(F.broadcast(airports_df), \
#     flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

### Comparing Broadcast VS Normal

In [74]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print(f"Normal count:\t\t{normal_count}\tduration: {normal_duration}")
print(f"Broadcast count:\t{broadcast_count}\tduration: {broadcast_duration}")

Normal count:		2960192	duration: 3.3175344467163086
Broadcast count:	2960192	duration: 2.2266414165496826


# Intro To Data Pipelines

## Quick Pipeline

In [76]:
# Import the data to a DataFrame
departures_df = spark.read.csv('/content/AA_DFW_2015.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('/content/output.json', mode='overwrite')