<a href="https://colab.research.google.com/github/alostmathematician/ISTA-366/blob/main/NB7_ISTA_322_Databricks_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Intro to Pyspark on Databricks

Welcome to the lesson on working with pyspark on Databricks!

The Databricks has a wonderful user interface that's great at keeping your notebooks and resources organized.  You'll notice on the right side you have tabs that allow you to go to different tools in your workspace.  

The 'core' that you're familiar with is the workspace.  The workspace is where you can make new notebooks, folders to store multiple notebooks, and libraries.  We obviously will focus just on notebooks, but the other tools are useful for more in-depth operations.  

Notebooks in Databricks are pretty much identical to Juptyer Notebooks.  These operate a lot like Google Colab notebooks in that you're given a free cluster and storage as part of your community edition account.  You can upload additional resources, data, etc and it's stored on their servers (vs. your Google Drive).  This big reason why we're using this over Colab is that it's built around doing distributed big data work via spark and pyspark.  Pyspark is ready to go with no need to install a bunch of other software and build paths.  Pretty great, huh?

There are other tabs as well.  The Data tab allows you to set up connections to databases. Jobs allows you to schedule your ETL processes. Clusters allows you to set up multiple clusters and connect what you need when you need it.  The Community Edition comes with a free 2 core, 15gb ram cluster, but if you were paying you could throw down for higher powered resources.  

Let's jump right in and make a cluster.  Just go to the Cluster's tab, hit 'create cluster', give it a name (any name works), and click create.  It should default to your local region and the right version of spark.  After that, come back here and attach it up in the upper left tab.  After that we'll be ready to computer (you won't need to create/attach in the future).

## Importing libraries
You import libraries as normal.  Let's bring in pyspark

In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("pyspark_w1") \
    .getOrCreate()

from pyspark.sql.functions import *

## Importing data
There are multiple ways to import data into databricks.  
* One is to connect to an external data source such as Amazon S3 or a RDBMS.  We're going to skip this method as it's highly dependent on what resource you're using, has to deal with a lot of security measures, and other things that'll cause problems in a class setting.
* You can also get data via a URL.  This method is slightly more complicated when using spark as it'll need to bring in the data and then put it on your cluster.  
* Another way is to upload data directly to the Databricks File System. To do this you can go to the file tab in the upper left, click 'upload data', and then drag and drop your file.  After that finishes it'll give you a line of python code that'll allow you to import your data.

I'll show you how to use the last two methods.

### Uploading from a URL

To get files into your environment and accessible to spark you need to provide a path to where the file is located in this environment.  To do this you add the file to your spark environment with `sparkContext.addFile()` and feed it the URL inside the `addFile()` function.  

We'll need to import the `SparkFiles` function from pyspark as well.

In [None]:
from pyspark import SparkFiles
url = "https://ista322-fall2021.s3.us-west-1.amazonaws.com/covid_daily_cases.csv"
spark.sparkContext.addFile(url)

Once that's done now you can read it in using the local filepath.

Using `SparkFiles` and `.get()` we can get the filepath to our data called 'covid_daily_cases.csv'

In [None]:
SparkFiles.get("covid_daily_cases.csv")

But we need to provide the full path, so we must add 'file://' before that path to the file.

In [None]:
fp = 'file://'+SparkFiles.get("covid_daily_cases.csv")
fp

Great!  We have a filepath to our data.  Reading data through pyspark is similar to pandas.  We first call our spark session.  Then the `.read` function, followed by the file type function.  In this case `.csv()`.  Inside `.csv()` you can feed it the filepath

In [None]:
covid = spark.read.csv(fp)

We can take a quick look at our data by calling the `.display()` method on our covid data.  `.display()` Kicks back a nice neatly formatted view of your data.

In [None]:
# Use .display() to look at your data
covid.display()

_c0,_c1,_c2,_c3,_c4,_c5
date,state,county_code,county,daily_cases,daily_deaths
1/22/2020,Washington,53061,Snohomish,0,0
1/23/2020,Washington,53061,Snohomish,0,0
1/24/2020,Washington,53061,Snohomish,0,0
1/25/2020,Illinois,17031,Cook,0,0
1/25/2020,Washington,53061,Snohomish,0,0
1/26/2020,California,6059,Orange,0,0
1/26/2020,Illinois,17031,Cook,0,0
1/26/2020,Washington,53061,Snohomish,0,0
1/27/2020,Arizona,4013,Maricopa,0,0


### Uploading data to Databricks

You can also upload a dataset directly to Databricks. If you go to the file tab in the top menu of this UI, you can select 'upload data' from the menu. Then you can simply drag and drop or select the file you want to upload (you can try with the .csv version of the covid data you loaded from a URL above - just copy/paste the url link above into a new browser window to download it). Once it finishes uploading you can hit 'next'. On that following menu it'll show you the file as well as giving you the option to copy the line of code needing to bring it in using pyspark, pandas, R, or scala.  

I've already done that and copied the link below.  You can see the elements of the import code are really similar.  You're still calling your spark session and the read function via `spark.read`, but instead are telling it the format and then a generic `.load()` function.  You could just as easily use `spark.read.csv()`.

In [None]:
#replace the placeholder for your link below with your own link
covid_2 = spark.read.format("csv").option("header", "true").load("placeholder_for_your_link")

In [None]:
# Looking at a .display() shows the data are the same.
covid_2.display()

_c0,_c1,_c2,_c3,_c4,_c5
date,state,county_code,county,daily_cases,daily_deaths
1/22/2020,Washington,53061,Snohomish,0,0
1/23/2020,Washington,53061,Snohomish,0,0
1/24/2020,Washington,53061,Snohomish,0,0
1/25/2020,Illinois,17031,Cook,0,0
1/25/2020,Washington,53061,Snohomish,0,0
1/26/2020,California,6059,Orange,0,0
1/26/2020,Illinois,17031,Cook,0,0
1/26/2020,Washington,53061,Snohomish,0,0
1/27/2020,Arizona,4013,Maricopa,0,0


### Modifying your import

Spark has some clear differences in how it imports data.  The big one you can probably see from our `.display()` is that unlike pandas, pyspark doesn't by default import the first row of your dataframe as the column names.  

You can modify that behavior along with a host of other import behaviors by adding in a `.option()` call.  Inside you can feed it a specific option you want to modify.  In this case we'll specify `'header, True'`.

In [None]:
covid = (spark.read
         .option('header', True)
         .csv(fp)
        )


There are other options such as specifying the delimiter.  Obviously we're using a comma, but you might have a file that needs to be separated by a different symbol.

In [None]:
covid = (spark.read
         .option('header', True)
         .option('delimiter', ',')
         .csv(fp)
        )

## Printing and altering schema

Pyspark readily works with unstructured data.  We're going to hold off on that, but one thing that's worth noting here is that you can apply a schema to your data on import.  This is important for working with nested data.  **But**, it's also important because pyspark doesn't infer datatypes when importing data.  This means that the default read is to have everything be strings. Let's take a look at the schema using the `printSchema()` method.

In [None]:
# use printSchema()
covid.printSchema()

So above we can see that all our columns are at the same level.  But also that they're all strings and null values are allowed.  But should they all be strings?  Of course not!  Many of the columns are just numeric.  We can give our read another option to infer what the schema and datatypes should be.  Just toss in `.option('inferSchema', True)`

In [None]:
covid = (spark.read
         .option('header', True)
         .option('delimiter', ',')
         .option('inferSchema', True)
         .csv(fp)
        )

Looking at our schema below we can see that now the numeric columns are all considered as integers.  We'll convert those next.

In [None]:
covid.printSchema()

## Datatype conversions in pyspark

Datatype conversions in spark are conceptually straightforward, but the syntax is a bit different.  In python you'd use a format as follows to convert a column to an integer:
```
df['new_column'] = df['column_to_convert'].astype(int)
```

But in pyspark you it differs a bit.  Specifically, you'll use the `.withColumn()` function to apply a datatype to an existing column.  The generalized format is:
```
df = df.withColumn('new_column', col('column_to_convert').cast(dataType()))
```

The first argument within `withColumn()` is just the column name you want to asign the output of modified column.  This could be something new, or it could overwrite an the column you modified... for example if you just wanted change a datatype of a column you'd overwrite.  

The second argument then selects a column.  You select columns a bit differently in pyspark, instead using `col()` with the column name inside.  `.cast()` then, well ,casts that column as a new datatype.  The datatype you specify inside `.cast()`.  

A third thing to note.  You actually import your datatype methods.  So 'IntegerType', 'StringType', etc.  

The last thing to note.  In python you'd create a new column by calling `df['new_name']` on the left side of the equals.  When using pyspark's `withColumn()` function you only need to put the dataframe to the left of the equals as it understands that you want to make a new column inside that dataframe and not overwrite the whole thing.

Let's go convert 'county_code' to a string and then make a new column called 'date_dt' that's a datetime version of our 'date' column.

In [None]:
# Import our needed functions
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, DateType, TimestampType
# Also need to modify a setting to deal with a later date conversion - don't need to know this!
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")


Alright, so we'll start by converting the 'county_code' column back to a string.  

The code below essentially says "make a new column called 'country_code' using the existing column 'country_code' cast as a string datatype"

In [None]:
# Convert
covid = covid.withColumn('county_code', col('county_code').cast(StringType()))

In [None]:
# Check schema to see datatypes
covid.printSchema()

Great, now let's make a new column called 'date_dt' that converts our date column from a string to a date.  

The syntax is slightly different in that you call `col()` inside a `to_date()` function followed by the date format you want to convert.  Our dates are like 01/28/2020, so the format is 'MM/dd/yyyy'

In [None]:
covid = covid.withColumn('date_dt', to_date(col('date'), 'MM/dd/yyyy'))

In [None]:
# check data
covid.display()

date,state,county_code,county,daily_cases,daily_deaths,date_dt
1/22/2020,Washington,53061,Snohomish,0,0,2020-01-22
1/23/2020,Washington,53061,Snohomish,0,0,2020-01-23
1/24/2020,Washington,53061,Snohomish,0,0,2020-01-24
1/25/2020,Illinois,17031,Cook,0,0,2020-01-25
1/25/2020,Washington,53061,Snohomish,0,0,2020-01-25
1/26/2020,California,6059,Orange,0,0,2020-01-26
1/26/2020,Illinois,17031,Cook,0,0,2020-01-26
1/26/2020,Washington,53061,Snohomish,0,0,2020-01-26
1/27/2020,Arizona,4013,Maricopa,0,0,2020-01-27
1/27/2020,California,6037,Los Angeles,0,0,2020-01-27


## Other transforms in pyspark

Let's just look at some quick transforms.  

* We'll make a new column of the ratio of covid cases to deaths by.  This again just uses `withColumn()`.
* We'll do a groupby so you can see how similar that is!

We'll start by making a column of the ratio of deaths to total cases, so dividing the number of deaths by total cases.  Let's assign to a new column called 'case_death_ratio'.

Again, the first argument in `withColumn` is the column name of the result, second is just the math!

In [None]:
# Make 'case_death_ratio' column
covid = covid.withColumn('case_death_ratio', col('daily_deaths')/col('daily_cases') )

In [None]:
covid.display()

date,state,county_code,county,daily_cases,daily_deaths,date_dt,case_death_ratio
1/22/2020,Washington,53061,Snohomish,0,0,2020-01-22,
1/23/2020,Washington,53061,Snohomish,0,0,2020-01-23,
1/24/2020,Washington,53061,Snohomish,0,0,2020-01-24,
1/25/2020,Illinois,17031,Cook,0,0,2020-01-25,
1/25/2020,Washington,53061,Snohomish,0,0,2020-01-25,
1/26/2020,California,6059,Orange,0,0,2020-01-26,
1/26/2020,Illinois,17031,Cook,0,0,2020-01-26,
1/26/2020,Washington,53061,Snohomish,0,0,2020-01-26,
1/27/2020,Arizona,4013,Maricopa,0,0,2020-01-27,
1/27/2020,California,6037,Los Angeles,0,0,2020-01-27,


Groupby operations work *very* similar to straight python.  The format is still `df.groupBy('column_to_group').agg(math)`.  Couple things to note:
* pyspark uses camelCase, so it's `groupBy` not `groupby`
* `.agg()` works a bit differently.  Instead of a dictionary of the column you want to do math on as the key and math function as the value, you instead use a format like `.agg(math_function('column_to_do_math_on'))`

Let's go and count the total number of deaths by state, by county.

In [None]:
# Make covid_grouped where we group by state and county and then sum up the number of deaths
covid_grouped = covid.groupBy('state', 'county').agg(sum('daily_deaths'))

In [None]:
# Check
covid_grouped.display()

state,county,sum(daily_deaths)
California,Orange,1391
Oregon,Clackamas,66
Ohio,Geauga,50
New Jersey,Atlantic,263
Georgia,Laurens,82
Louisiana,Lafayette,136
Florida,Monroe,24
North Carolina,Davidson,42
New Hampshire,Sullivan,1
Tennessee,Claiborne,6


## Wrapping up

This lesson was meant to be a short primer to pyspark and obviously not an exhaustive overview.  Hopefully you can see how syntax makes it easy to do all the same things that you would in python, but in a manner that readily eats up big data by distributing it across a cluster.  Next week we'll learn some more pyspark and then you'll get to apply in your homework.