# Pyspark workbook 2!

Welcome to our second workbook on Pyspark.  The main goal of this workbook is to teach you the critical transform operations we learned using pandas but now via pyspark.  We'll also learn how to use SQL queries to wrangle data in pyspark, which is wonderful for those of you who prefer SQL to python for wrangling.

There's little here that's conceptually new, but more this is just new syntax that I want you to learn.

In [None]:
!pip install pyspark

In [None]:
import pyspark

## Creating a Spark Session
First we create the Spark Session as before

In [None]:
# Create our session
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("pyspark-2") \
    .getOrCreate()


## Importing our data

 We'll be using the NYC Taxi data again, but this time from only a single month (June, 2020).  Let's bring the data like we did last time:
 * Assign the URL location to an object called 'url'
 * Add that file to our spark session with `sparkContext.addFile()`
 * Create a filepath to the location of that file by using `.get()` to get the location of the file in our spark session
 * Read that file along with some options to infer the schema, assign a header, and make timestamps

### Create our filepath

Let's start by creating our filepath.  The url is a link to the data directly on the NYC taxi authorities Amazon S3 storage.  Note that my `.get()` then uses the filename from the end of the  URL

In [None]:
from pyspark import SparkFiles
url =  "http://131.193.32.85:9000/mybucket/yellow_tripdata_2020-06.csv"
spark.sparkContext.addFile(url)
fp = 'file://'+SparkFiles.get("yellow_tripdata_2020-06.csv")

### Import our data

Now that we have filepath to the taxi data stored as 'fp', let's read it in.  We'll use `spark.read.csv()` like before.  We'll also set a few options:
* 'header' we'll set to true so that it reads the first row as column names
* 'inferSchema' as True so that it infers and applies datatypes to the columns
* 'timestampFormat' we'll use to take the timestamps in the two columns and turn them into datetimes vs. strings

The first two you've seen in the last lesson.  Let's add in the 'timestampFormat' option.  You add it in like any other option, but instead of staying `True` to indicate that you want to apply that option, you instead give it the format of the strings that contain the date and time info.  In this case the format is something like '2020-06-22 08:58:06' for a ride that occured on June 22nd at 8:58:06am.  Thus, the second argument needs to be that generalized format of "yyyy-MM-dd HH:mm:ss".  It's worth pointing out that "M" denotes the month and "m" denotes the minute. <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html" target="_blank">You can see a full list of datetime formatting here</a>

In [None]:
# Reading in our csv with the options applied
rides_df = (spark.read
            .option('header', True)
            .option('inferSchema', True)
            .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
            .csv(fp))

Let's take a look at the schema of 'rides_df' to see the datatypes

You can see that the 'tpep_pickup_datetime' and 'tpep_dropoff_datetime' columns are listed as timestamps

In [None]:
rides_df.printSchema()

In [None]:
# Take a quick look using show
rides_df.show()

## Dropping data

We learned earlier that it's common to have to drop data.  This can be either whole columns because they're unnecessary, redundant, or because the end user does not need to see them.  We also might have to drop duplicated rows.  Let's take a quick look at the pyspark ways to do these actions.  

* First we're going to cut out columns.  This actually uses `.drop()` just like python.
* Next we're going to deduplicate our data using `dropDuplicates()`.

### Dropping columns
Starting with `.drop()`, let's remove the columns 'RatecodeID', 'store_and_fwd_flag', 'mta_tax', 'tolls_amount', and  'improvement_surcharge'. To do this you simply give it a series of column names inside `.drop()` This is almost identical to pandas except for that in pyspark it's not a proper list (e.g. ['column1', 'column2']) and instead just a comma separated list of names inside the function.  

So the python code would be ```rides_df = rides_df.drop(['RatecodeID', 'store_and_fwd_flag', 'mta_tax', 'tolls_amount', 'improvement_surcharge'])```

While the pyspark code is ```rides_df = rides_df.drop('RatecodeID', 'store_and_fwd_flag', 'mta_tax', 'tolls_amount', 'improvement_surcharge')```

In [None]:
# Drop those columns
rides_df = rides_df.drop('RatecodeID', 'store_and_fwd_flag', 'mta_tax', 'tolls_amount', 'improvement_surcharge')

In [None]:
# Check
rides_df.show()

### Dropping rows

Let's drop some duplicated rows now.  This dataset doesn't actually have any, but for the sake of demonstrating functionality we're going to drop any row that has a duplicated pickup time.  You would apply this method any time you want to make sure you only have unique values a row.  

First we'll check the number of rows in the data.  This uses `.count()` vs. `.shape()` in pandas world.  

Then we'll apply the `dropDuplicates()` method from pyspark.  As you can imagine, this works just like `drop_duplicates()` in pandas.  In this case, you do feed it an actual list of values of what you want to drop. As we want to drop any row that has the exact same pickup time, we'll insert '[tpep_pickup_datetime]' inside the method.

In [None]:
# First, how many rows do we have?
rides_df.count()

In [None]:
# Now let's dropDuplicates
rides_df= rides_df.dropDuplicates(['tpep_pickup_datetime'])

In [None]:
# Now how many rows?
rides_df.count()

## Selecting and filtering data

Pyspark makes it really easy to also filter data based on conditions.  There are two identical functions, `filter()` and `where()`, which both allow you to specify a column name and condition within the column to filter data.  The `where()` exists for those people coming from a SQL background.  

Selecting columns and values is also easy using `.select()`

Let's start by filtering all rides that are above average in length.  We'll this as follows:
* Make an 'avg_dist' variable where we select the 'trip_distance' column and take the average.  
* Filter using `where()` to filter out only rows that have an above average distance.

### Making average distance variable

Using pandas you'd get the average value of a column using code like this `df['col_name'].mean()`. Pyspark uses a bit different syntax in that you apply the mathematical function inside the `.select()` you use to select your column.  So the equivalent pyspark code would be `df.select(avg('col_name'))`.

The other difference is that pyspark returns a dataframe, so you need to select just the first value of it if you want to store it as an object and use later.  Watch.

In [None]:
# bulk import all the functions from pyspark.sql.functions
from pyspark.sql.functions import *

In [None]:
# Note how getting the average trip distance returns a dataframe
rides_df.select(avg('trip_distance'))

In [None]:
# If you use .show() you can see this more clearly.
rides_df.select(avg('trip_distance')).show()

We can extract just that first value by applying the `.first()` method and then selecting the 0th position

In [None]:
# we can select just that first value
rides_df.select(avg('trip_distance')).first()[0]

In [None]:
# Make your avg_dist object
avg_dist = rides_df.select(avg('trip_distance')).first()[0]

### Using where to filter
Now we can use `where()` to select only rows where the trip distance is greater than that average distance.  We'll make this into a dataframe called 'long_df'

In [None]:
long_df = rides_df.where(col('trip_distance') >= 4)
long_df.show()

This was a simple example, but you can use similar functionality of min, max, count to then use SQL like filtering via `.where()`.

For example, let's do a filter that is making sure we're only having rides that are greater than 0 miles but less than 15.  These values are reasonable and unlikely to be errors.

In [None]:
min_trip_dist = rides_df.select(min('trip_distance')).first()[0]
max_trip_dist = rides_df.select(max('trip_distance')).first()[0]

# Now filter based on both
rides_df.where( (col('trip_distance') > min_trip_dist) & (col('trip_distance') < max_trip_dist) ).show()

In [None]:
# Go and swap out .where for .filter just to show that  you get the same results!
...

## Pyspark and SQL queries

One great thing about pyspark is that you can actually work on your dataframe using straight SQL syntax! All you need to do is to turn the dataframe into a table and then you can run queries.  

To make the table you use the code `df_name.createGlobalTempView('what_to_call_table')`.  This takes the dataframe and creates a SQL table with the name given in `createGlobalTempView`.

After that, you can run queries using the `.sql()` function.  

Let's create a table called 'rides' from our 'rides_df' dataframe.

In [None]:
spark.catalog.dropGlobalTempView("rides")

In [None]:
# create 'rides' table
rides_df.createGlobalTempView("rides")

### Run a SQL query

Now let's run a simple sql query that gets all rides with only one passenger and just the trip_distance column.  You write this as a normal SQL query with one exception... your FROM statement needs to reference 'global_temp', which is the global temporary environment that the table is stored.  So in this case you'll write `FROM global_temp.rides` as rides is the name of the table.

In [None]:
sq = """SELECT trip_distance FROM global_temp.rides
          WHERE passenger_count = 1
          LIMIT 5"""
spark.sql(sq).show()

Here we can do a slightly more complicated query.  We'll extract the hour from our datetime using `date_part()` so that we can count up the number of rides in each hour using GROUP BY.  Note that my COUNT is just calling 'VendorID'.  This is because COUNT just counts the number of rows in a column, and since each trip is a row it doesn't matter which column we count from.

In [None]:
sq = """SELECT date_part('hour', tpep_pickup_datetime) as hour, COUNT(VendorID) as number_rides FROM global_temp.rides
          GROUP BY hour
          ORDER BY hour"""
spark.sql(sq).show()

## Wrapping up

That's it for pyspark in this class!  As with anything, it's by no means exhaustive.  But hopefully you can see the connection between pyspark functionality and the regular pandas and SQL ideas and tools we've learned so far.   

One last thing I want to mention.  Just because you can do it in pyspark doesn't mean you should.  Given the distributed nature of spark, some operations could be slower on pyspark vs. just python.  This could be the case if your data aren't actually large, in which case all the moving of data around across the cluster could actually slow things down!  

Another thing worth mentioning is that you might not want to be using pyspark as a tool for data exploration.  If you're working with super big data, it might be worth sampling a subset of data to bring locally, exploring and developing your transformations there, then going and running it all on the full data and cluster.  Like anything, these are tools and your situation will dictate when to apply what tool!