# Cleaning Data with PySpark

## DataFrame details

### Defining a schema

In [1]:
from pyspark.sql.types import *

people_schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('city', StringType(), True),
])

### Using lazy processing

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('cleaning-data').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/04 20:40:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
aa_dfw_df = spark.read.format('csv').options(Header=True).load("datasets/AA_DFW_2017_Departures_Short.csv.gz")

In [4]:
aa_dfw_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|         0005|                HNL|                          537|
|       01/01/2017|         0007|                OGG|                          498|
|       01/01/2017|         0037|                SFO|                          241|
|       01/01/2017|         0043|                DTW|                          134|
|       01/01/2017|         0051|                STL|                           88|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



In [5]:
import pyspark.sql.functions as F
# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))
aa_dfw_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-------------------+-----------------------------+-------+
|       01/01/2017|         0005|                HNL|                          537|    hnl|
|       01/01/2017|         0007|                OGG|                          498|    ogg|
|       01/01/2017|         0037|                SFO|                          241|    sfo|
|       01/01/2017|         0043|                DTW|                          134|    dtw|
|       01/01/2017|         0051|                STL|                           88|    stl|
+-----------------+-------------+-------------------+-----------------------------+-------+
only showing top 5 rows



In [6]:
# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])
aa_dfw_df.show(5)

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|         0005|                          537|    hnl|
|       01/01/2017|         0007|                          498|    ogg|
|       01/01/2017|         0037|                          241|    sfo|
|       01/01/2017|         0043|                          134|    dtw|
|       01/01/2017|         0051|                           88|    stl|
+-----------------+-------------+-----------------------------+-------+
only showing top 5 rows



### Saving a DataFrame in Parquet format

In [None]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

### SQL and Parquet

In [None]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

## Manipulating DataFrames in the real world

### Filtering column content with Python

In [9]:
import pyspark.sql.functions as F

voter_df = spark.read.csv('datasets/DallasCouncilVoters.csv.gz', header=True, inferSchema=True)
voter_df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

`distinct()`: Hàm `distinct()` là một hoạt động trong `PySpark`, nó loại bỏ các hàng trùng lặp từ `DataFrame` và trả về một `DataFrame` mới với các hàng duy nhất dựa trên các cột được chỉ định.

In [10]:
voter_df.select(voter_df.VOTER_NAME).distinct().show(40, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|VOTER_NAME                                                                                                                                                                                                                                                                                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

In [12]:
# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

In [13]:
# Show the distinct VOTER_NAME entries again
voter_df.select(voter_df.VOTER_NAME).distinct().show(40, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|Mark  Clayton      |
|Casey Thomas       |
|Sandy  Greyson     |
|Mark Clayton       |
|Jennifer S.  Gates |
|Tiffinni A. Young  |
|B. Adam  McGough   |
|Omar Narvaez       |
|Philip T. Kingston |
|Rickey D. Callahan |
|Dwaine R. Caraway  |
|Philip T.  Kingston|
|Jennifer S. Gates  |
|Lee M. Kleinman    |
|Monica R. Alonzo   |
|Rickey D.  Callahan|
|Carolyn King Arnold|
|Erik Wilson        |
|Lee Kleinman       |
+-------------------+



### Modifying DataFrame columns

In [14]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df.show(5)

+----------+-------------+-------------------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|
+----------+-------------+-------------------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|
+----------+-------------+-------------------+--------------------+
only showing top 5 rows



In [15]:
# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
voter_df.show(5)

+----------+-------------+-------------------+--------------------+----------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|
+----------+-------------+-------------------+--------------------+----------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|
+----------+-------------+-------------------+--------------------+----------+
only showing top 5 rows



In [16]:
# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn("last_name", voter_df.splits.getItem(F.size('splits') - 1))
voter_df.show()

+----------+-------------+-------------------+--------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|
+----------+-------------+-------------------+--------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     [Scott, Griggs]|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough| [B., Adam, McGough]|        B.|  McGough|
|02/08/2017|Councilme



In [17]:
# Drop the splits column
voter_df = voter_df.drop('splits')
# Show the voter_df DataFrame
voter_df.show()

+----------+-------------+-------------------+----------+---------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|
+----------+-------------+-------------------+----------+---------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|       Lee| Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|     Sandy|  Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|
|02/08/2017|Councilmember| Philip T. Kingston|  

### when() example

In [18]:
# Add a column to voter_df named random_val with the results of the F.rand() 
# method for any voter with the title Councilmember.

voter_df = voter_df.withColumn('random_val', F.when(voter_df.TITLE == 'Councilmember', F.rand()))
voter_df.show(5)

+----------+-------------+-------------------+----------+---------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|         random_val|
+----------+-------------+-------------------+----------+---------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates| 0.2207577456456088|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.3945328780489654|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|               null|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano| 0.3279043771538702|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.42274487832381524|
+----------+-------------+-------------------+----------+---------+-------------------+
only showing top 5 rows



### When / Otherwise

In [19]:
voter_df = voter_df.withColumn("random_val", 
                               F.when(voter_df.TITLE == "Councilmember", F.rand()).when(voter_df.TITLE == "Mayor", 2).otherwise(0))
voter_df.show(5)

+----------+-------------+-------------------+----------+---------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|         random_val|
+----------+-------------+-------------------+----------+---------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|0.26032428301487776|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.3289315117728715|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                2.0|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano| 0.2930330374667178|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas| 0.6262461239391839|
+----------+-------------+-------------------+----------+---------+-------------------+
only showing top 5 rows



In [20]:
# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

+----------+--------------------+-----------------+----------+---------+----------+
|      DATE|               TITLE|       VOTER_NAME|first_name|last_name|random_val|
+----------+--------------------+-----------------+----------+---------+----------+
|04/25/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|04/25/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|09/18/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|    

### Using user defined functions in Spark

In [21]:
from pyspark.sql.types import StringType
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names)

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(F.array('first_name', 'last_name')))

# Show the DataFrame
voter_df.show()

+----------+-------------+-------------------+----------+---------+-------------------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|         random_val|first_and_middle_name|
+----------+-------------+-------------------+----------+---------+-------------------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|0.26032428301487776|       Jennifer Gates|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.3289315117728715|      Philip Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                2.0|     Michael Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano| 0.2930330374667178|         Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas| 0.6262461239391839|         Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold| 0.6970129546902093|       Carolyn Arnold|
|02/08/201

### Adding an ID Field

In [22]:
df = spark.read.csv('datasets/DallasCouncilVotes.csv.gz', header=True, inferSchema=True)
df.show()

+----------+------------------+-----------+--------+-------------+-------------------+---------+------------------+-----------------------+------------------+--------------------+
|      DATE|AGENDA_ITEM_NUMBER|  ITEM_TYPE|DISTRICT|        TITLE|         VOTER NAME|VOTE CAST|FINAL ACTION TAKEN|AGENDA ITEM DESCRIPTION|         AGENDA_ID|             VOTE_ID|
+----------+------------------+-----------+--------+-------------+-------------------+---------+------------------+-----------------------+------------------+--------------------+
|02/08/2017|                 1|     AGENDA|      13|Councilmember|  Jennifer S. Gates|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|     AGENDA|      14|Councilmember| Philip T. Kingston|      N/A|  NO ACTION NEEDED|          Call to Order|020817__Special__1|020817__Special__...|
|02/08/2017|                 1|     AGENDA|      15|        Mayor|Michael S. Rawlings|      N/A|  NO

In [23]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()
voter_df.show()

+--------------------+
|          VOTER NAME|
+--------------------+
|      Tennell Atkins|
|  the  final   20...|
|        Scott Griggs|
|       Scott  Griggs|
|       Sandy Greyson|
| Michael S. Rawlings|
| the final 2018 A...|
|        Kevin Felder|
|        Adam Medrano|
|                null|
|   the   final  2...|
|          011018__42|
|    Casey Thomas, II|
|       Mark  Clayton|
|   Casey  Thomas, II|
|      Sandy  Greyson|
|        Mark Clayton|
|  Jennifer S.  Gates|
|   Tiffinni A. Young|
|  the  final  201...|
+--------------------+
only showing top 20 rows



In [24]:
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())


There are 36 rows in the voter_df DataFrame.



In [25]:
# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df.show()

+--------------------+------+
|          VOTER NAME|ROW_ID|
+--------------------+------+
|      Tennell Atkins|     0|
|  the  final   20...|     1|
|        Scott Griggs|     2|
|       Scott  Griggs|     3|
|       Sandy Greyson|     4|
| Michael S. Rawlings|     5|
| the final 2018 A...|     6|
|        Kevin Felder|     7|
|        Adam Medrano|     8|
|                null|     9|
|   the   final  2...|    10|
|          011018__42|    11|
|    Casey Thomas, II|    12|
|       Mark  Clayton|    13|
|   Casey  Thomas, II|    14|
|      Sandy  Greyson|    15|
|        Mark Clayton|    16|
|  Jennifer S.  Gates|    17|
|   Tiffinni A. Young|    18|
|  the  final  201...|    19|
+--------------------+------+
only showing top 20 rows



In [26]:
# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

+--------------------+------+
|          VOTER NAME|ROW_ID|
+--------------------+------+
|        Lee Kleinman|    35|
|  the  final  201...|    34|
|         Erik Wilson|    33|
|  the  final   20...|    32|
| Carolyn King Arnold|    31|
| Rickey D.  Callahan|    30|
|   the   final  2...|    29|
|    Monica R. Alonzo|    28|
|     Lee M. Kleinman|    27|
|   Jennifer S. Gates|    26|
+--------------------+------+
only showing top 10 rows



### IDs with different partitions

In [27]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())


There are 1 partitions in the voter_df DataFrame.



### More ID tricks

In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

## Improving Performance

### Caching a DataFrame

In [29]:
departures_df = spark.read.csv('datasets/AA_DFW_2017_Departures_Short.csv.gz', header=True)
departures_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2017|         0005|                HNL|                          537|
|       01/01/2017|         0007|                OGG|                          498|
|       01/01/2017|         0037|                SFO|                          241|
|       01/01/2017|         0043|                DTW|                          134|
|       01/01/2017|         0051|                STL|                           88|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



In [30]:
# import time
import time

start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))


Counting 139358 rows took 1.007771 seconds
Counting 139358 rows again took 0.254597 seconds


### Removing a DataFrame from cache

In [31]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

Is departures_df cached?: True
Removing departures_df from cache
Is departures_df cached?: False


### File import performance

In [None]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

### Reading Spark configurations

In [33]:
app_name = spark.conf.get('spark.app.name')
driver_tcp_port = spark.conf.get('spark.driver.port')
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Name: cleaning-data
Driver TCP port: 49835
Number of partitions: 200


In [34]:
spark.conf.get('spark.driver.host')

'dino-mbp'

### Writing Spark configurations

In [41]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()
# Configure Spark to use 500 partitions
spark.conf.set("spark.sql.shuffle.partitions", 500)
# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('datasets/AA_DFW_2017_Departures_Short.csv.gz', header=True).distinct()

In [42]:
# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())


Partition count before change: 2
Partition count after change: 2


In [43]:
departures_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[Date (MM/DD/YYYY)#911, Flight Number#912, Destination Airport#913, Actual elapsed time (Minutes)#914], functions=[])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0
         +- Exchange hashpartitioning(Date (MM/DD/YYYY)#911, Flight Number#912, Destination Airport#913, Actual elapsed time (Minutes)#914, 500), ENSURE_REQUIREMENTS, [plan_id=959]
            +- *(1) HashAggregate(keys=[Date (MM/DD/YYYY)#911, Flight Number#912, Destination Airport#913, Actual elapsed time (Minutes)#914], functions=[])
               +- FileScan csv [Date (MM/DD/YYYY)#911,Flight Number#912,Destination Airport#913,Actual elapsed time (Minutes)#914] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/ngohongthai/Documents/projects/IBM-Data-Engineer-Specializ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight N

### Normal joins

In [None]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()


### Using broadcasting on Spark joins

In [None]:
from pyspark.sql.functions import broadcast

broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

### Comparing broadcast vs normal joins

In [None]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

## Complex processing and data pipelines

### Quick pipeline

In [44]:
# Import the data to a DataFrame
departures_df = spark.read.csv('datasets/AA_DFW_2015_Departures_Short.csv.gz', header=True)
departures_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2015|         0005|                HNL|                          526|
|       01/01/2015|         0007|                OGG|                          517|
|       01/01/2015|         0023|                SFO|                          233|
|       01/01/2015|         0027|                LAS|                          165|
|       01/01/2015|         0029|                ONT|                            0|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 5 rows



In [45]:
departures_df.printSchema()

root
 |-- Date (MM/DD/YYYY): string (nullable = true)
 |-- Flight Number: string (nullable = true)
 |-- Destination Airport: string (nullable = true)
 |-- Actual elapsed time (Minutes): string (nullable = true)



In [46]:
departures_df.count()

146558

In [47]:
# Remove any duration of 0
departures_df = departures_df.filter(departures_df[3] > 0)
departures_df.count()

143203

In [49]:
# # Add an ID column
departures_df = departures_df.withColumn("id", F.monotonically_increasing_id())
departures_df.show(5)

+-----------------+-------------+-------------------+-----------------------------+---+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)| id|
+-----------------+-------------+-------------------+-----------------------------+---+
|       01/01/2015|         0005|                HNL|                          526|  0|
|       01/01/2015|         0007|                OGG|                          517|  1|
|       01/01/2015|         0023|                SFO|                          233|  2|
|       01/01/2015|         0027|                LAS|                          165|  3|
|       01/01/2015|         0035|                HDN|                          178|  4|
+-----------------+-------------+-------------------+-----------------------------+---+
only showing top 5 rows



In [50]:
# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

### Removing commented lines

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep="|")
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

### Removing invalid rows

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

### Splitting into columns

In [None]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

### Further parsing

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

### Validate rows via join

In [None]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed("_c0", "folder")

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

### Examining invalid rows

In [None]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

### Dog parsing

In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

### Per image count

In [None]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

### Percentage dog pixels

In [None]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)