# Data Cleaning in PySpark

+ Schemas
+ May contain various data types
+ Can filter garbage data during import
+ Improves read performance 

In [None]:
import pyspark.sql.types
peopleSchema = StructType([
    StructField('name', StringType(), True), # Boolean is if data can be null or not
    StructField('age', IntegerType(), True),
    StructField('city',StringType(), True)    
])

In [None]:
people_df = spark.read.format('csv').load(name = 'rawdata.csv', 
                                          schema = peopleSchema)

In [None]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name',StringType(), False),
  StructField('age', IntegerType(),False),
  StructField('city',StringType(), False)
])

In [None]:
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

Difficulties with CSV files
+ No Defined schema
+ Nested data requires special handling
+ Slow to parse
+ If schema is not provided, all data must be read before inferring schema
+ Predicate pushdown - Ordering tasks to do least amount of work. Filtering the data before processing is one of the optimizations for large datasets. In CSV this cannot be done.
+ Any intermediate use requires redefining schemas

Parquet Format
+ A Columnar data format
+ Supported in Spark 
+ Supports predicate pushdown
+ Automatically stores schema information
+ Binary file format

In [None]:
# Reading parquet files
df = spark.read.format('parquet').load('filename.parquet')
df = spark.read.parquet('filename.parquet')
df.createOrReplaceTempView('flights')

# Writing parquet files
df.write.format('parquet').save('filename.parquet')
df.write.parquet('filename.parquet')

In [None]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

In [None]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)

### Manipulating DataFrames

In [None]:
# Filter / Where
voter_df.filter(voter_df.date > '1/1/2019')
voter_df.where(voter_df.date > '1/1/2019')
voter_df.filter(voter_df['name'].isNotNUll())  # Remove Nulls
voter_df.where(~ voter_df['name'].isNull())
voter_df.filter(voter_df.date.year > 1800) 
voter_df.where(voter_df['_C0'].contains('VOTE')) 
voter_df.where(~voter_df._c1.isNull()) # using Negation

# Select
voter_df.select(voter_df.name)

# withColumn to create new column - (name_of_column, command to create)
voter_df.withColumn('year', voter_df.date.year)

# drop
voter_df.drop('unused_column')

In [None]:
# String transformations contained in pyspark.sql.functions
import pyspark.sql.functions as F

# Applied per column
voter_df.withColumn('lower', F.lower('name'))
voter_df.withColumn('splits', F.split('name',' ')) # intermediary columns
voter_df.withColumn('year', voter_df['_c4'].cast(IntegerType()))

In [None]:
# ArrayType
.size(column) # length of array type column
.getItem(index) # Retrieves the items with the index from the list

In [None]:
# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

In [None]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - (F.size('splits')-1)))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

In [None]:
# conditionals
# .when(if_condition, then)
df.select(df.Name, df.Age, F.when(df.Age >= 18, "Adult"))

In [None]:
df.select(df.Name, df.Age,
         F.when(df.Age >= 18, "Adult")
          .when(df.Age < 18, "Minor"))

# otherwise() is like else
df.select(df.Name, df.Age,
         F.when(df.Age >= 18, "Adult")
          .otherwise("Minor"))

In [None]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

In [None]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val == 0).show()

#### UDF

In [None]:
+ Python Method
+ wrapped via pyspark.sql.functions.udf method
+ stored as a variable and called like a normal spark function

In [None]:
# Define a python method
def reverseString(mystr):
    return mystr[::-1]

#wrap the function and store as a variable
udfReverseString = F.udf(reverseString, StringType())

#Use with Spark
user_df = user_df_withColumn('ReverseName', udfReverseString(user_df.Name))

In [None]:

def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names[:-1])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show()

#### Assigning IDs

+ Sequential ID's create bottleneck in spark
+ Monotonically increasing IDs can be used in spark
+ pyspark.sql.functions.monotonically_increasing_id()
+ This is not sequential and completely parallel
+ IDs are provided based on the partitions

In [None]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

In [None]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID',F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID',F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)

In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id()+previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

#### Improving Performance

In [None]:
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

In [None]:
# cache dataframe
df.cache()

#check if a dataframe is cached
print(voter_df.is_cached)

# Remove from cache
voter_df.unpersist()

In [None]:
# Improve Import performance
+ Converting a large file into a smaller number of files of equal size improves performance
+ Splitting the files in case of very large files (split, cut, awk)
+ split -l 10000 -d largefile chunk- 
+ l - Number of lines to have per file
+ d - tells split to use numeric suffixes
+ name of the file to be split (largefile)
+ prefix to be used for the splits


# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_*')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))

In [None]:
# cluster configurations
spark.conf.get(configuration name)
spark.conf.set(configuration name)

+ Driver node should have double the memory of the worker (Cross-check)
+ More worker node is often better than larger workers. This is especially obvious during import and export operations as there are more machines avaialble to do the work

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

In [None]:
# Explaining spark execution plan
voter_df.explain()

In [None]:
# shuffling
+ repartitioning requires shuffles and is a costly operation 
+ df.repartition(num_partitions)
+ If you like to reduce the number of partitions use coalesce instead
+ df.coalesce(num_partitions)
+ .join() is costly and may involve shuffle
+ use .broadcast() when one df is small

from pyspark.sql.functions import broadcast
combined_df = df_1.join(broadcast(df_2))

In [None]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

In [None]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

#### Data Pipelines

In [None]:
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df['Actual elapsed time (Minutes)'] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id',F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count,comment_count, no_comments_count))

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df.colcount < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

In [None]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width',split_cols.getItem(2))
split_df = split_df.withColumn('height',split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols',split_cols)

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

In [None]:
# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0','folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df),'folder')

# Compare the number of rows remaining
joined_count = joined_df.count()

In [None]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left_anti')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

In [None]:
+ UDFs comes with performance penality when compared with built-in spark functions
+ Do calcualtion inline where ever possible

In [None]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(10, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y",IntegerType(), False),
    StructField("end_x",IntegerType(),False),
    StructField("end_y",IntegerType(),False)
])

In [None]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(10)

In [None]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)