### What is data cleaning?

 - Preparing raw data for use in data processing pipelines
 - Tasks include reformatting or replacing text, performing calculations, removing garbage or incomplete data etc
 - For billions of pieces of data, performance will be an issue, hence the use of Spark which can handle big data / The primary limit to Spark's abilities is the level of RAM in the Spark cluster
 - Spark schemas: define the various data types used, can filter garbage data during import, imporves read performance

In [None]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

# Reading files and enforcing the schema
 people_df = spark.read.format('csv').load(name='rawdata.csv', schema=people_schema)

In [None]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

#### Immutability and lazy processing

 - Normally in Python variables are mutable
 - This while adding to flexibility presents a problem when there are multiple concurrent components trying to modify the same data
 - Spark is designed to use immutable variables (variables that are defined once and are not modifieable after initialization)
 - Variables are re-created if reassigned
 - This allows Spark to share data efficiently without worrying about concurrent data objects
 - When you make changes to a Spark DataFrame the original object is destroyed and a new one takes its name/place
 - That doesn't mean that the original data (e.g. the file that was read to create the first DataFrame) is changed


Lazy processing: the idea that very little actually happens until an action is performed
 - Funcionality is broken down to transformations and actions
 - Transformations are like instructions of what we want to accomplish
 - Actions are like "triggers" that begin the process based on the instructions provided
 - Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to Spark not performing any transformations until an action is requested.
 - Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

In [None]:
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2018.csv.gz')

# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

# Show the DataFrame
aa_dfw_df.show()

### CSVs vs Parquet file formats

CSVs
 - CSVs are slow to import and parse / the files cannot be shared among Spark workers and if no schema is defined all data must be read before a schema can be inferred
 - Files cannot be filtered via "predicate pushdown" (the idea of ordering tasks to do the least amount of work / filtering data prior to processing is one of the primary optimizations of predicate pushdown drastically reducing the amount of information that must be processed in large data sets / you cannot filter the CSV data via predicate pushdown)
 - Spark processes are often multi-step and may utilize an intermediate file representation / these representations allow data to be used later without regenerating data from source / using CSV would require a significant amount of extra work defining schemas, encoding formats etc

Parquet files
 - Parquet is a compressed columnar data format developed for use in any Hadoop based system (Apache Spark, Hadoop, Impala)
 - The format is structured with data accessible in chunks allowing efficient read-write operations without processing the entire file
 - It supports the predicate pushdown functionality, providing significant performance improvement
 - The Parquet format is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.
 - Automatically includes schema information and handle data encoding
 - Parquet files are a binary file format and can only be used with the proper tools / in contrast to CSV files which can be edited with any text editor
 - Parquet files are perfect as a backing data store for SQL queries in Spark. While it is possible to run the same queries directly via Spark's Python functions, sometimes it's easier to run SQL queries alongside the Python options.

In [None]:
# Two methods to read/write (used interchangeably)

# Reading parquet files
df = spark.read.format('parquet').load('filename.parquet')
df = spark.read.parquet('filename.parquet')

# Writing parquet files
df.write.format('parquet').save('filename.parquet')
df.write.parquet('filename.parquet')

In [None]:
# To run SQL queries use createOrReplaceTempView
# after reading the parquet file

flight_df = spark.read.parquet('flights.parquet')

flight_df.createOrReplaceTempView('flights')

short_flights_df = spark.sql('SELECT * FROM flights WHERE flightduration < 100')
  


### Manipulating DataFrames

DataFrames
 - Made up of rows and columns and generally analogous to a database table
 - Are immutable as any change to the structure or content creates a new DataFrame
 - Are modified through the use of transformations

#### Examples

In [None]:
# Return rows where name starts with "M"
voter_df.filter(voter_df.name.like('M%'))
# Return name and position only
voters = voter_df.select('name', 'position')

In [None]:
# Filter
voter_df.filter(voter_df.date > '1/1/2019') # or voter_df.where(...)
voter_df.filter(voter_df['name'].isNotNull()) # remove nulls
voter_df.filter(voter_df.date.year > 1800) # remove old entries

# Where
voter_df.where(voter_df['_c0'].contains('VOTE')) # split data from combined sources
voter_df.where(~ voter_df._c1.isNull()) # negate with ~
 
# Select
voter_df.select(voter_df.name)

# withColumn
voter_df.withColumn('year', voter_df.date.year)

# drop
voter_df.drop('unused_column')

In [None]:
# Column string transformations
# Contsined in pyspark.sql.functions
import pyspark.sql.functions as F

# Applied per column transformation
voter_df.withColumn('upper', F.upper('name'))

# Create intermediary columns
voter_df.withColumn('splits', F.split('name', ' '))

# Cast to other types
voter_df.withColumn('year', voter_df['_c4'].cast(IntegerType()))
    

#### ArrayType column functions
 - .size(column) returns length of arrayType() column
 - .getItem(index) retrieves a specific item at index of list column

#### More examples

In [None]:
# Show the distinct VOTER_NAME entries
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

# Show the voter_df DataFrame
voter_df.show()

### Conditional clauses
 
 - The when() clause lets you conditionally modify a Data Frame based on its content
 - The otherwise() clause is like else

In [None]:
# Add a column to voter_df for any voter with the title **Councilmember**
voter_df = voter_df.withColumn('random_val', when(voter_df.TITLE=='Councilmember', F.rand()))

# Show some of the DataFrame rows, noting whether the when clause worked
voter_df.show()

In [None]:
# Add a column to voter_df for a voter based on their position
voter_df = voter_df.withColumn('random_val', when(
                                voter_df.TITLE == 'Councilmember', F.rand()).when(
                                voter_df.TITLE == 'Mayor', 2).otherwise(0))

# Show some of the DataFrame rows
voter_df.show()

# Use the .filter() clause with random_val
voter_df.filter(voter_df.random_val==0).show()

### User-defined functions
 
 - UDF is a Python method that the user writes to perform a specific bit of logic
 - Wrapped via the pyspark.sql.functions.udf method
 - Stored as a variable
 - Called like a normal Spark function

#### Examples

In [None]:
# Reverse string UDF
def reverseString(mystr):
    return mystr[::-1]

udfReverseString = udf(reverseString, StringType())

user_df = user_df.withColumn('ReverseName', udfReverseString(user_df.Name))

# Argument less example
def sortingCap():
    return random.choice(['G', 'H', 'R', 'S'])

udfSortingCap = udf(sortingCap, StringType())

user_df = user_df.withColumn('Class', udfSortingCap())

In [None]:
def getFirstAndMiddle(names):
  # Return a space separated string of names
  return ' '.join(names)

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
voter_df.show(10)

### Partitioning

 - Spark breaks DFs into partitions or chunks of data
 - The partitions size can vary based on the type of Spark cluster being used
 - Monotonically increasing IDs
     - integer (64-bit), increases in value, unique
     - not necessarily sequential (gaps exist)
     - completely parallel
     - up to 8.4 billion ids per partition
 - When working with data, you sometimes only want to access certain fields and perform various operations. In this case, find all the unique voter names from the DataFrame and add a unique ID number. Remember that Spark IDs are assigned based on the DataFrame partition - as such the ID values may be much greater than the actual number of rows in the DataFrame.
 - With Spark's lazy processing, the IDs are not actually generated until an action is performed and can be somewhat random depending on the size of the dataset.

In [None]:
# Select all the unique council voters
voter_df = df.select(df["VOTER NAME"]).distinct()

# Count the rows in voter_df
print("\nThere are %d rows in the voter_df DataFrame.\n" % voter_df.count())

# Add a ROW_ID
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())

# Show the rows with 10 highest IDs in the set
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)

#### IDs with different partitions

You've just completed adding an ID field to a DataFrame. Now, take a look at what happens when you do the same thing on DataFrames containing a different number of partitions.

To check the number of partitions, use the method .rdd.getNumPartitions() on a DataFrame.

In [None]:
# Print the number of partitions in each DataFrame
print("\nThere are %d partitions in the voter_df DataFrame.\n" % voter_df.rdd.getNumPartitions())
print("\nThere are %d partitions in the voter_df_single DataFrame.\n" % voter_df_single.rdd.getNumPartitions())

# Add a ROW_ID field to each DataFrame
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id())
voter_df_single = voter_df_single.withColumn('ROW_ID',F.monotonically_increasing_id())

# Show the top 10 IDs in each DataFrame 
voter_df.orderBy(voter_df.ROW_ID.desc()).show(10)
voter_df_single.orderBy(voter_df_single.ROW_ID.desc()).show(10)

#### More ID tricks

Once you define a Spark process, you'll likely want to use it many times. Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. You have been given the task to make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.

In [None]:
# Determine the highest ROW_ID and save it in previous_max_ID
previous_max_ID = voter_df_march.select('ROW_ID').rdd.max()[0]

# Add a ROW_ID column to voter_df_april starting at the desired value
voter_df_april = voter_df_april.withColumn('ROW_ID', F.monotonically_increasing_id() + previous_max_ID)

# Show the ROW_ID from both DataFrames and compare
voter_df_march.select('ROW_ID').show()
voter_df_april.select('ROW_ID').show()

### Caching

 - Refers to storing the results of a DataFrame in memory or on disk of the processing nodes in a cluster
 - Improves the speed for subsequent transformations or actions as the data no longer needs to be retrieved from the original data source
 - Reduces the resource utilization of the cluster as there is less need to access the storage, networking and CPU of the Spark nodes as the data is likely already present

Caching disadvantages
 - Very large datasets may not fit in memory reserved for cached DataFrames
 - So performance might not be improved at all depending on the later transformations
 - If data is not cached in memory it might be persisted on disk, but depending on the cluster's configuration this might not be a signifcant performance boost
 
So caching is very useful but only if a DataFrame is planned to be used again, otherwise it's not worth caching

Implementing caching
 - call cache() on the DataFrame before an action
 - cache() is a Spark transformation i.e. nothing will be cached until an Spark action is called
 - check if a DataFrame is cached with is_cached()
 - remove an object from the cache by calling unpersist()

In [None]:
start_time = time.time()

# Add caching to the unique rows in departures_df
departures_df = departures_df.distinct().cache()

# Count the unique rows in departures_df, noting how long the operation takes
print("Counting %d rows took %f seconds" % (departures_df.count(), time.time() - start_time))

# Count the rows again, noting the variance in time of a cached DataFrame
start_time = time.time()
print("Counting %d rows again took %f seconds" % (departures_df.count(), time.time() - start_time))

In [None]:
# Determine if departures_df is in the cache
print("Is departures_df cached?: %s" % departures_df.is_cached)
print("Removing departures_df from cache")

# Remove departures_df from the cache
departures_df.unpersist()

# Check the cache status again
print("Is departures_df cached?: %s" % departures_df.is_cached)

### Improving performance

 - Spark clusters consist of a driver process and as many worker processes as required
 - The driver handles task assignments and consolidation of the data results from the workers
 - The workers typically handle the actual transformation / action tasks of a Spark job / they operate fairly independently and report  results back to the driver
 - When importing files a larger number of smaller sized objects will perform better than a single large file
 - Objects should be of similar size generally speaking
 - Having a well-defined schemas also improves drastically performance as Spark doesn't have to read multiples times the dataset to infer the schema / saves time from cleansing

In [None]:
# Import the full and split files into DataFrames
full_df = spark.read.csv('departures_full.txt.gz')
split_df = spark.read.csv('departures_*.txt.gz')

# Print the count and run time for each DataFrame
start_time_a = time.time()
print("Total rows in full DataFrame:\t%d" % full_df.count())
print("Time to run: %f" % (time.time() - start_time_a))

start_time_b = time.time()
print("Total rows in split DataFrame:\t%d" % split_df.count())
print("Time to run: %f" % (time.time() - start_time_b))### Improving performance


### Configuration options

 - Spark contains many configuration settings
 - To read config settings via the command line use
 > spark.conf.get(configuration name)
 - To write config settings via the command line use
 > spark.conf.set(configuration name)

#### Cluster types
 - Single node: deployinh all components on a single system (physical / VM / container)
 - Standalone clusters: with dedicated machines as the driver and workers
 - Managed clusters: the cluster is managed by 3rd party cluster managers such as YARN, Mesos, Kubernetes


#### Driver
 - Handling task assignment to the various nodes / process in the cluster
 - Monitors the state of all processes and tasks and handles any task retries
 - Consolidates results from other processes in the cluster
 - Handles any access to shared data / variables and confirms each worker process has the necessary resources (code, data, etc)

#### Worker
 - Runs actual tasks assigned by the driver and communicates those results back to the driver
 - Should have access to all resources necessary to complete a task otherwise pauses to obtain these resources

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

# Recreate the DataFrame using the departures data file
departures_df = spark.read.csv('departures.txt.gz').distinct()

# Print the number of partitions for each instance
print("Partition count before change: %d" % before)
print("Partition count after change: %d" % departures_df.rdd.getNumPartitions())

#### Shuffling
 - Refers to the moving of data fragments to various workers as required to complete certain tasks
 - This, although hides complexity from the user, it's generally slow as it lowers the cluster's throughput since the workers wait for data to be transferred
 - Limiting shuffling sometimes is a good practice
     - use repartition(num_partitions)
      - use coalesce(num_partitions) instead which consolidates data without requiring a full data shuffle
     - use join()
     - use broadcast() -> provides a copy of an object to each worker so that there is less need for communication between nodes -> limits data shuffling
     > from pyspark.sql.functions import broadcast<br>
     > combined_df = df_1.join(broadcast(df_2))

Using broadcasting on Spark joins

Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's broadcast operations to give each node a copy of the specified data.

A couple tips:

Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.

In [None]:
# Join the flights_df and aiports_df DataFrames
normal_df = flights_df.join(airports_df, \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan
normal_df.explain()

In [None]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

Comparing broadcast vs normal joins

You've created two types of joins, normal and broadcasted. Now your manager would like to know what the performance improvement is by using Spark optimizations. If the results are promising, you'll be given more opportunity to tweak the Spark setup as needed.

In [None]:
start_time = time.time()
# Count the number of rows in the normal DataFrame
normal_count = normal_df.count()
normal_duration = time.time() - start_time

start_time = time.time()
# Count the number of rows in the broadcast DataFrame
broadcast_count = broadcast_df.count()
broadcast_duration = time.time() - start_time

# Print the counts and the duration of the tests
print("Normal count:\t\t%d\tduration: %f" % (normal_count, normal_duration))
print("Broadcast count:\t%d\tduration: %f" % (broadcast_count, broadcast_duration))

<img src="assets/spark/pipeline.png" style="width: 600px;"/>

In [None]:
# Import the data to a DataFrame
departures_df = spark.read.csv('2015-departures.csv.gz', header=True)

# Remove any duration of 0
departures_df = departures_df.filter(
    F.col('Actual elapsed time (Minutes)').alias('duration')!=0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json('output.json', mode='overwrite')

### Data pipelines

In [None]:
# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv('annotations.csv.gz', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))