# Introduction to Spark

## Getting to know PySpark

- The Spark DataFrame was designed to behave a lot like a SQL table.
- You can think of the **SparkContext** as the connection to the cluster and the **SparkSession** as the interface with that connection.   

**Creating Session and Checking Tables**

In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(my_spark)

In [None]:
# Print the tables in the catalog
print(spark.catalog.listTables())

**Basic SQL Query and Converting to Pandas**

In [None]:
# Don't change this query
query = "FROM flights SELECT * LIMIT 10"

# Get the first 10 rows of flights
flights10 = spark.sql(query)

# Show the results
flights10.show()

**Converting to Pandas and From Pandas**

In [None]:
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# Run the query
flight_counts = spark.sql(query)

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

In [None]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Add spark_temp to the catalog
# Because this table is temporary, it is only accessible 
# from the specific SparkSession used to create it (spark in this case)
spark_temp.createOrReplaceTempView("temp")

**Reading** data from outside sources:

In [None]:
# Don't change this file path
file_path = "/usr/local/share/datasets/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)

## Manipulating Data

Spark DataFrame is immutable. To update a column, we can use the **withColumn** method and overwrite the original DataFrame. To overwrite the original columns, use "air_time" in the example below.

In [None]:
flights = flights.withColumn("duration_hrs", flights.air_time/60)

The **filter** method is a counterpart to WHERE in SQL:

In [None]:
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")
# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Count the number of rows beginning with '#'
comment_count = annotations_df.filter(col('_c0').startswith('#')).count()

Difference between **select** and **withColumn**: the first one returns only the columns you selected.

In [None]:
# Select the first set of columns
selected1 = flights.select("tailnum", "origin", "dest")

# Select the second set of columns
temp = flights.select(flights.origin,
                      flights.dest,
                      flights.carrier)

Selecting with expressions(**selectExpr**) and renaming (**alias**):

In [None]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

Using **groupBy** for aggregation:

In [None]:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()

Behind the scenes, using groupBy is creating a **pyspark.sql.GroupedData** object. You can do groupings in two steps.  

Finally, you can use **.agg** with functions from **pyspark.sql.functions** to create expressions:

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month", "dest")

# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()

To use **.join** specify the second table, key, and type of join:

In [None]:
# Join the DataFrames
flights_with_airports = flights.join(airports, on="dest", how="leftouter")

## Machine Learning Pipelines

- **pyspark.ml** module
- **Transformer** classes have a **.transform()** method takes a DataFrame and returns another DataFrame (e.g. Bukectizer). 
- **Estimator** implements **.fit()** and returns a model.

We can change variable type using **cast** and **withColumn**:

In [None]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))

Encoding categorical variables with **StringIndexer** and **OneHotEncoder**:

In [None]:
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol = "carrier", output_col="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol = "carrier_index", output_col="carrier_fact")

# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

We need to assemble the columns we will use into a **vector** (ML algorithms in Spark take as input a Vector type):

In [None]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

Combining the steps into a **Pipeline**, transforming the data, and **splitting** the data:

In [None]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

# Split the data into training and test sets
training, test = piped_data.randomSplit([0.6, 0.4])

## Model Tuning and Selection

Training model with cross-validation and evaluating results on the test set:

In [None]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

# Build the grid
grid = grid.build()

# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

# Cleaning Data with PySpark

## DataFrame Details

**Spark Schemas** (data validation)
- Define the format of a DataFrame
- Various data types: strings, dates, integers, arrays
- Can filter garbage data during import (data that does not conform)
- Imrpoves read performance (no need for inference)

Defining a schema:

In [None]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

**Immutability and Lazy Processing**  
- Spark is designed to use immutable objects: defined once and re-created if reassigned
- This is not slow because of lazy execution: it allows efficient planning

**Parquet**
- Common issues with csv files: schema is not defined, no data types defined, slow to parse, cannot be shared during reading. 
- **Parquet**: compressed columnar data format. 
- Supports predicate pushdown and automatically includes schema information.

In [None]:
df = spark.read.format('parquet').load('filename.parquet')
df = spark.read.parquet('filename.parquet')

df.write.parquet('filename.parquet', mode='overwrite')

## Manipulating DataFrames

**ArrayType()** columns: data type analogous to lists. Utilities to interact with ArrayType:
- .size(column)
- .getItem(index)

Filtering column content:

In [None]:
# Show the distinct VOTER_NAME entries
voter_df.select("VOTER_NAME").distinct().show(40, truncate=False)

# Filter voter_df where the VOTER_NAME is 1-20 characters in length
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')

# Filter out voter_df where the VOTER_NAME contains an underscore
voter_df = voter_df.filter(~ F.col('VOTER_NAME').contains('_'))

# Show the distinct VOTER_NAME entries again
voter_df.select('VOTER_NAME').distinct().show(40, truncate=False)

Modifying columns using **split**, **getItem**, and **size**:

In [None]:
# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))

# Drop the splits column
voter_df = voter_df.drop('splits')

**Conditional DF Operations**  

In [None]:
# using .when
voter_df = voter_df.withColumn('random_val',
                               when(voter_df['TITLE'] == 'Councilmember', F.rand()))

# using multiple .when and .otherwise
voter_df = voter_df.withColumn('random_val',
                               when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))

## Improving Performance

**Chaching**: storing DataFrames in memory or disk (of the processing nodes) of a cluster. Improves speed on later transformations/actions.

*Tips*
- Cache only if you plan to use the DF again;
- Test caching DFs at various points and determine if your performance improves;
- Cache in memory and fast SSD / NVMe storage;
- Even local disk could improve performance (test it);
- If caching is not working, you could test creating intermediate Parquet representations;
- Stop caching when finished.

**Import Performance**
- Number and size of files. Using more objects is usually better, more so if the objects have similar size.
- Well-defined schemas drastically improve import performance. This also provides validation on import.
- You can use wildcards

*Tips on splitting objects*
- Use OS utilities/scripts (split, cut, awk)
- Custom scripts (e.g. Python)
- Write out to Parquet

**Cluster Configuration**  

spark.conf.get(configuration name)  
spark.conf.set(configuration name)

**Driver**
- Handles and monitors tasks
- Consolidating results
- Handles access to shared data
- *Tip*: Driver node should have double the memory of the worker and fast local storage

**Worker**
- Runs actual task
- Ideally has all code, data, and resources for a task
- *Recommendation*: more worker nodes is often better than larger workers. Test to find the balance.
- Fast local storage is extremely useful.

In [None]:
# Name of the Spark application instance
app_name = spark.conf.get('spark.app.name')

# Driver TCP port
driver_tcp_port = spark.conf.get('spark.driver.port')

# Number of join partitions
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')

# Show the results
print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Changing the number of partitions:

In [None]:
# Store the number of partitions in variable
before = departures_df.rdd.getNumPartitions()

# Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)

**Spark Execution Plan**  

**Shuffling**  
Moving data around to various workers  
Hides complexity from the user, but it can be slow  
It is necessary, but try to minimize it as much as possible :
- limit use of .repartition(num_partitions) (requires full shuffle)
    - use .coalesce(num_partitions) to reduce the number of partitions
- .join() might require shuffle, so use it with care 


**Broadcasting**  
- Provides a copy of an object to each worker  
- Prevents excess communication between nodes
- Can drastically speed up .join() operations
- *Tip*: on a join, broadcast the smallest DF
- *Tip*: it might be better to just skip broadcast on small DFs though
- *Tip*: the execution plan will show broadcastHashJoin if broadcast is done correctly

Using broadcast with join:

In [None]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

## Complex Processing and Data Pipelines

Reading csv files:

In [None]:
# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv('annotations.csv.gz', sep='|', comment='#')

Dealing with data that is imported into a single column:

In [None]:
# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df['colcount'] < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', tmp_fields.getItem(0))
split_df = split_df.withColumn('filename',tmp_fields.getItem(1))
# etc...

# This last column will be useful on the next cell
split_df = split_df.withColumn('split_cols', split_cols)

Using a UDF to retrieve the remaining columns:

In [None]:
def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

**Final Analysis and Delivery**  
- UDFs are more flexible, but might come with a performance penalty
- Using inline calculations is more efficient, since Spark can optimize the DAG before performing them

Defining a schema:

In [None]:
DogType = StructType([
    StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])