Creating a SparkSession

In [1]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(my_spark)


Viewing tables : Once you've created a SparkSession, you can start poking around to see what data is in your cluster!

Your SparkSession has an attribute called catalog which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.

One of the most useful is the .listTables() method, which returns the names of all the tables in your cluster as a list.

In [None]:
# Print the tables in the catalog
print(spark.catalog.listTables())

Use the .sql() method to get the first 10 rows of the flights table and save the result to flights10. The variable query contains the appropriate SQL query.
Use the DataFrame method .show() to print flights10.

In [None]:
# Don't change this query
query = "FROM flights SELECT * LIMIT 10"

# Get the first 10 rows of flights
flights10 = spark.sql(query)

# Show the results
flights10.show()

In [None]:
#using panda on Spark DataFrame
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# Run the query
flight_counts = spark.sql(query)

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

 put data into Spark via pandas

In [None]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")
# Examine the tables in the catalog again
print(spark.catalog.listTables())

In [None]:
# Don't change this file path
file_path = "/usr/local/share/datasets/airports.csv"

# Read in the airports data
airports = spark.read.csv(file_path, header=True)

# Show the data
airports.show()

Use the spark.table() method with the argument "flights" to create a DataFrame containing the values of the flights table in the .catalog. Save it as flights.
Show the head of flights using flights.show(). Check the output: the column air_time contains the duration of the flight in minutes.
Update flights to include a new column called duration_hrs, that contains the duration of each flight in hours (you'll need to divide air_time by the number of minutes in an hour).

In [None]:
# Create the DataFrame flights
flights = spark.table("flights")

# Show the head
flights.show()

# Add duration_hrs
flights = flights.withColumn("duration_hrs",flights.air_time/60)

Use the .filter() method to find all the flights that flew over 1000 miles two ways:
First, pass a SQL string to .filter() that checks whether the distance is greater than 1000. Save this as long_flights1.
Then pass a column of boolean values to .filter() that checks the same thing. Save this as long_flights2.
Use .show() to print heads of both DataFrames and make sure they're actually equal!

In [None]:
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show()
long_flights2.show()

Select the columns "tailnum", "origin", and "dest" from flights by passing the column names as strings. Save this as selected1.
Select the columns "origin", "dest", and "carrier" using the df.colName syntax and then filter the result using both of the filters already defined for you (filterA and filterB) to only keep flights from SEA to PDX. Save this as selected2.

In [None]:
# Select the first set of columns
selected1 = flights.select("tailnum", "origin", "dest")

# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)

# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

Create a table of the average speed of each flight both ways.

Calculate average speed by dividing the distance by the air_time (converted to hours). Use the .alias() method name this column "avg_speed". Save the output as the variable avg_speed.
Select the columns "origin", "dest", "tailnum", and avg_speed (without quotes!). Save this as speed1.
Create the same table using .selectExpr() and a string containing a SQL expression. Save this as speed2.

In [None]:
# Define avg_speed
avg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")

Find the length of the shortest (in terms of distance) flight that left PDX by first .filter()ing and using the .min() method. Perform the filtering by referencing the column directly, not passing a SQL string.
Find the length of the longest (in terms of time) flight that left SEA by filter()ing and using the .max() method. Perform the filtering by referencing the column directly, not passing a SQL string.



In [None]:
# Find the shortest flight from PDX in terms of distance
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

# Find the longest flight from SEA in terms of air time
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()

 **I# AggregatingI**
 

To get you familiar with more of the built in aggregation methods, here's a few more exercises involving the flights table!
Use the .avg() method to get the average air time of Delta Airlines flights (where the carrier column has the value "DL") that left SEA. The place of departure is stored in the column origin. show() the result.
Use the .sum() method to get the total number of hours all planes in this dataset spent in the air by creating a column called duration_hrs from the column air_time. show() the result.

In [None]:
# Average duration of Delta flights
flights.filter(flights.origin == "SEA").filter(flights.carrier == "DL").groupBy().avg("air_time").show()

# Total hours in the air
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

Create a DataFrame called by_plane that is grouped by the column tailnum.
Use the .count() method with no arguments to count the number of flights each plane made.
Create a DataFrame called by_origin that is grouped by the column origin.
Find the .avg() of the air_time column to find average duration of flights from PDX and SEA.

In [None]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

# **GROUPING AND AGGREGATION**
Import the submodule pyspark.sql.functions as F.
Create a GroupedData table called by_month_dest that's grouped by both the month and dest columns. Refer to the two columns by passing both strings as separate arguments.
Use the .avg() method on the by_month_dest DataFrame to get the average dep_delay in each month for each destination.
Find the standard deviation of dep_delay by using the .agg() method with the function F.stddev().

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month","dest")

# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()

# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()

# **Joining II**
Examine the airports DataFrame by calling .show(). Note which key column will let you join airports to the flights table.
Rename the faa column in airports to dest by re-assigning the result of airports.withColumnRenamed("faa", "dest") to airports.
Join the flights with the airports DataFrame on the dest column by calling the .join() method on flights. Save the result as flights_with_airports.
The first argument should be the other DataFrame, airports.
The argument on should be the key column.
The argument how should be "leftouter".
Call .show() on flights_with_airports to examine the data again. Note the new information that has been added.

In [None]:
# Examine the data
print(airports.show())

# Rename the faa column
airports = airports.withColumnRenamed("faa","dest")

# Join the DataFrames
flights_with_airports = flights.join(airports,on="dest",how="leftouter")

# Examine the new DataFrame
print(flights_with_airports.show())

# **Getting started with machine learning pipelines**

1. First, rename the year column of planes to plane_year to avoid duplicate column names. 

2. Create a new DataFrame called model_data by joining the flights table with planes using the tailnum column as the key.

In [None]:
# Rename year column
planes = planes.withColumnRenamed("year","plane_year")

# Join the DataFrames
model_data = flights.join(planes, on="tailnum", how="leftouter")

String to integer
Now you'll use the .cast() method you learned in the previous exercise to convert all the appropriate columns from your DataFrame model_data to integers!

To convert the type of a column using the .cast() method, you can write code like this:

dataframe = dataframe.withColumn("col", dataframe.col.cast("new_type"))

In [None]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay",model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

Create the column plane_age using the .withColumn() method and subtracting the year of manufacture (column plane_year) from the year (column year) of the flight.

In [None]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

# **Making a Boolean**
# Consider that you're modeling a yes or no question: is the flight late? However, your data contains the arrival delay in minutes for each flight. Thus, you'll need to create a boolean column which indicates whether the flight was late or not!

Use the .withColumn() method to create the column is_late. This column is equal to model_data.arr_delay > 0.
Convert this column to an integer column so that you can use it in your model and name it label (this is the default name for the response variable in Spark's machine learning routines).
Filter out missing values (this has been done for you).

In [None]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

In this exercise you'll create a StringIndexer and a OneHotEncoder to code the carrier column. To do this, you'll call the class constructors with the arguments inputCol and outputCol.

The inputCol is the name of the column you want to index or encode, and the outputCol is the name of the new column that the Transformer should create.

**Create a StringIndexer called carr_indexer by calling StringIndexer() with inputCol="carrier" and outputCol="carrier_index".
Create a OneHotEncoder called carr_encoder by calling OneHotEncoder() with inputCol="carrier_index" and outputCol="carrier_fact".**

In [None]:
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

In [None]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest",outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index",outputCol="dest_fact")

# **Assemble a vector**
The last step in the Pipeline is to combine all of the columns containing our features into a single column. This has to be done before modeling can take place because every Spark modeling routine expects the data to be in this form. You can do this by storing each of the values from a column as an entry in a vector. Then, from the model's point of view, every observation is a vector that contains all of the information about it and a label that tells the modeler what value that observation corresponds to.

Because of this, the pyspark.ml.feature submodule contains a class called VectorAssembler. This Transformer takes all of the columns you specify and combines them into a new vector column.

In [None]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

# **Create the pipeline**
You're finally ready to create a Pipeline!

Pipeline is a class in the pyspark.ml module that combines all the Estimators and Transformers that you've already created. This lets you reuse the same modeling process over and over again by wrapping it up in one simple object. Neat, right?

In [None]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

# **Transform the data**
Hooray, now you're finally ready to pass your data through the Pipeline you created!

In [None]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

# **SPLIT THE DATA**
Use the DataFrame method .randomSplit() to split piped_data into two pieces, training with 60% of the data, and test with 40% of the data by passing the list [.6, .4] to the .randomSplit() method.

In [None]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

# **Create the modeler**
The Estimator you'll be using is a LogisticRegression from the pyspark.ml.classification submodule.

In [None]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

To understand logistic regression, note the following key points:

- **Basic Concept:**

Logistic regression is a statistical model used for classification.
Unlike linear regression, which predicts a numeric value, logistic regression predicts the probability of an event occurring, which falls between 0 and 1.

- **Classification in Logistic Regression:**

Logistic regression can be used for classification tasks.
To classify observations, you set a cutoff point (threshold) on the predicted probabilities.
If the predicted probability is above the threshold, the observation is classified as a positive outcome ('yes' or the event occurring).
If the predicted probability is below the threshold, the observation is classified as a negative outcome ('no' or the event not occurring).
- **Hyperparameters:**

In logistic regression, there are hyperparameters, which are values not estimated from the data but supplied by the user.
These hyperparameters can be tuned to optimize the model's performance.
Tuning involves testing different values for hyperparameters to find the best settings for your specific task.

Here's the essential information about k-fold cross-validation:

- **Purpose of Cross-Validation:**

Cross-validation is a method used to estimate a model's performance on unseen data, similar to the test dataset.
It helps assess how well a model generalizes to new, unseen data.
- **K-Fold Cross-Validation:**

In k-fold cross-validation, the training data is divided into several partitions or "folds."
The value of 'k' represents the number of partitions, and typically, you use a value like 3, 5, or 10.
Each fold is used as a test set while the remaining folds are used for training.
The model is trained and evaluated 'k' times, with each fold serving as the test set once.
This process ensures that every data block is used for testing exactly once.
- **Cross-Validation Error:**

After each iteration (each fold used as a test set), the model's error is measured on the held-out partition.
These errors are averaged to calculate the cross-validation error, which is an estimate of how the model will perform on unseen data.
- **Hyperparameter Tuning:**

Cross-validation is commonly used to select the best hyperparameters for a model.
A grid of possible hyperparameter values is created, and the cross-validation error is used to compare different combinations.
The combination of hyperparameters that results in the lowest cross-validation error is typically chosen as the best model.
In your specific case, you are using k-fold cross-validation to tune hyperparameters, specifically elasticNetParam and regParam, for your logistic regression model. This process helps you select the best hyperparameter values for your model, improving its performance on unseen data.

# **Exactly! The cross validation error is an estimate of the model's error on the test set.**

# Create the evaluator
The first thing you need when doing cross validation for model selection is a way to compare different models. Luckily, the pyspark.ml.evaluation submodule has classes for evaluating different kinds of models. Your model is a binary classification model, so you'll be using the BinaryClassificationEvaluator from the pyspark.ml.evaluation module.

This evaluator calculates the area under the ROC. This is a metric that combines the two kinds of errors a binary classifier can make (false positives and false negatives) into a simple number. You'll learn more about this towards the end of the chapter!

In [None]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

# **Make a grid**

Next, you need to create a grid of values to search over when looking for the optimal hyperparameters. The submodule pyspark.ml.tuning includes a class called ParamGridBuilder that does just that (maybe you're starting to notice a pattern here; PySpark has a submodule for just about everything!).

You'll need to use the .addGrid() and .build() methods to create a grid that you can use for cross validation. The .addGrid() method takes a model parameter (an attribute of the model Estimator, lr, that you created a few exercises ago) and a list of values that you want to try. The .build() method takes no arguments, it just returns the grid that you'll use later.

## Import the submodule pyspark.ml.tuning under the alias tune.
## Call the class constructor ParamGridBuilder() with no arguments. Save this as grid.
## Call the .addGrid() method on grid with lr.regParam as the first argument and np.arange(0, .1, .01) as the second argument. This second call is a function from the numpy module (imported as np) that creates a list of numbers from 0 to .1, incrementing by .01. Overwrite grid with the result.
## Update grid again by calling the .addGrid() method a second time create a grid for lr.elasticNetParam that includes only the values [0, 1].
## Call the .build() method on grid and overwrite it with the output

In [None]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam,[0, 1])

# Build the grid
grid = grid.build()

# **Make the validator**
The submodule pyspark.ml.tuning also has a class called CrossValidator for performing cross validation. This Estimator takes the modeler you want to fit, the grid of hyperparameters you created, and the evaluator you want to use to compare your models.

The submodule pyspark.ml.tune has already been imported as tune. You'll create the CrossValidator by passing it the logistic regression Estimator lr, the parameter grid, and the evaluator you created in the previous exercises.

1. Create a CrossValidator by calling tune.CrossValidator() with the arguments:
-         estimator=lr
-         estimatorParamMaps=grid
-         evaluator=evaluator
2. Name this object cv.

In [None]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

# **Fit the model(s)**
You're finally ready to fit the models and select the best one!

Unfortunately, cross validation is a very computationally intensive procedure. Fitting all the models would take too long on DataCamp.

To do this locally you would use the code:

    # Fit cross validation models
    models = cv.fit(training)

    # Extract the best model
    best_lr = models.bestModel
Remember, the training data is called training and you're using lr to fit a logistic regression model. Cross validation selected the parameter values regParam=0 and elasticNetParam=0 as being the best. These are the default values, so you don't need to do anything else with lr before fitting the model.

In [None]:
# Call lr.fit()
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

# **Evaluating binary classifiers**
For this course we'll be using a common metric for binary classification algorithms call the AUC, or area under the curve. In this case, the curve is the ROC, or receiver operating curve. The details of what these things actually measure isn't important for this course. All you need to know is that for our purposes, the closer the AUC is to one (1), the better the model is!


If you've created a perfect binary classification model, the AUC (Area Under the Curve) would be equal to 1.

In binary classification, the ROC curve (Receiver Operating Characteristic) is a graphical representation of the model's performance. The AUC measures the area under this ROC curve. A perfect model, which can perfectly distinguish between the two classes without any misclassifications, would have an AUC of 1, indicating perfect discriminatory power.

# **Evaluate the model**
Remember the test data that you set aside waaaaaay back in chapter 3? It's finally time to test your model on it! You can use the same evaluator you made to fit the model.

- Use your model to generate predictions by applying best_lr.transform() to the test data. Save this as test_results.
- Call evaluator.evaluate() on test_results to compute the AUC. Print the output.

In [None]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))