# Data Science and DataOps workflows with Amazon EMR Studio Workshop

Hello! And welcome to the re:Invent 2021 Data Science and DataOps workflows with Amazon EMR Studio Workshop.

In this workshop, you will learn
1. **How to use PySpark in EMR Studio to write and schedule some basic ETL**
2. Utilize the prepared data for some data analysis and visualization
3. How to deploy your work to a publicly-hosted CloudFront website

## Part 1 - Data Exploration

First, what are we going to do? Well the Registry of Open Data has many datasets to choose from, but today we're going to use the [NOAA Global Surface Summary of Day](https://registry.opendata.aws/noaa-gsod/) (GSOD) dataset to do some basic climate analysis. Go ahead and take a look at the link to get an idea of the elements in the dataset.

Have you ever wondered what temperature trends look like month over month? Or even year over year? Using the NOAA GSOD dataset, we can analyze weather for over 9,000 weather stations across the globe. But the data is in CSV format has about 10,000 files for each year.

## Step 1 - Exploration

First, let's explore the data on the bucket. The bucket name is `noaa-gsod-pds` and each year of weather data, back to 1929, is stored in a prefix by year.

In [1]:
df = spark.read.csv("s3://noaa-gsod-pds/2021/", header = "true")
df.show()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1638389066788_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+----------+---------+--------------------+------+---------------+------+---------------+------+--------------+-----+--------------+-----+----------------+-----+---------------+-----+-----+------+--------------+------+--------------+-----+---------------+-----+------+
|    STATION|      DATE| LATITUDE| LONGITUDE|ELEVATION|                NAME|  TEMP|TEMP_ATTRIBUTES|  DEWP|DEWP_ATTRIBUTES|   SLP|SLP_ATTRIBUTES|  STP|STP_ATTRIBUTES|VISIB|VISIB_ATTRIBUTES| WDSP|WDSP_ATTRIBUTES|MXSPD| GUST|   MAX|MAX_ATTRIBUTES|   MIN|MIN_ATTRIBUTES| PRCP|PRCP_ATTRIBUTES| SNDP|FRSHTT|
+-----------+----------+---------+----------+---------+--------------------+------+---------------+------+---------------+------+--------------+-----+--------------+-----+----------------+-----+---------------+-----+-----+------+--------------+------+--------------+-----+---------------+-----+------+
|72401599999|2021-01-01|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|  43.0|           

We can see there's quite a bit of data here including a Station ID (`STATION`) and Name (`NAME`), and various temperature/weather attributes.

We can also see there are some columns that have values of `9999.9` or `999.9` in them, indicating there's no valid data for that value.

In addition, by default Spark will not infer the input schema and every column will be of string type. Let's go ahead and read a single file using the Station ID and see if Spark can detect a schema.

In [2]:
station_id = df.limit(1).collect()[0].STATION
dfSingle = spark.read.csv(f"s3://noaa-gsod-pds/2021/{station_id}.csv",
    header="true",
    inferSchema="true"
)

dfSingle.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- STATION: long (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullable = tru

Let's take a quick look at a few interesting attributes for this single station and see what it looks like.

In [None]:
dfSingle.select("DATE", "TEMP", "VISIB", "WDSP", "PRCP", "MAX", "MIN").summary("count", "mean", "min", "max").show()

Using this, we can see various attributes about the temperature data and it all seems to make sense. For example, the highest `MAX` temp was 95.5.

In order to make this data easily usable, let's go ahead and do some transformations.

## Step 2 - Data Transformation

We'll perform a few steps:
- Change the names for better readability
- Convert to a binary, columnar format optimized for analytics - Apache Parquet
- Partition the data by year so we can more easily read slices of the data
- _Also_ repartition the data so we have less files
- Extract the `FRSHTT` column to multiple columns with boolean attributes

In addition, we're going to add a parameter to this notebook so we can run it programmatically later by tagging the cell.

The next cell is already tagged but you can check how it's done by clicking the "Notebook Tools" wrench icon on the left-hand side when the cell below is selected.
- Expand "Advanced Tools" and you should see the following in "Cell Metadata"

```json
{
    "tags": [
        "parameters"
    ]
}
```

In [3]:
# This cell has a `parameters` tag so we can override it in workflows
# ref: https://papermill.readthedocs.io/en/latest/usage-parameterize.html#id1
# You can specify one year or multiple comma-separated years
years_to_process = '2021'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
year_list = years_to_process.split(',')
print(f"Processing the following years: {year_list}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Processing the following years: ['2021']

Now we're going to take the schema we inferred above, use it to read the rest of the data, and write it back out to S3.

For the output location, use the S3 bucket created for you as part of this lab, e.g. `emr-dev-exp-<ACCOUNT_ID>`.

You can find your account ID in the top right of AWS console or by running the next cell.

In [5]:
%%local

# Retrieves AWS Account ID
!aws sts get-caller-identity --query Account --output text

068216289992


**!!! ⚠️**

**<font color="red">MAKE SURE YOU REPLACE `<ACCOUNT_ID>` BELOW WITH THE ACCOUNT ID DISPLAYED ABOVE</font>**
    
**!!! ⚠️**

In [6]:
from pyspark.sql.types import StringType

OUTPUT_BUCKET="emr-dev-exp-068216289992"

inferred_schema = dfSingle \
    .withColumn("STATION", dfSingle["STATION"].cast(StringType())) \
    .withColumn("FRSHTT", dfSingle["FRSHTT"].cast(StringType())) \
    .schema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# If a notebook cell fails, EMR programmatic notebook execution will mark the notebook run as "failed"
assert OUTPUT_BUCKET != "emr-dev-exp-068216289992", "DEFAULT OUTPUT BUCKET MUST BE CHANGED"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
DEFAULT OUTPUT BUCKET MUST BE CHANGED
Traceback (most recent call last):
AssertionError: DEFAULT OUTPUT BUCKET MUST BE CHANGED



First, let's read in our data and convert everything the way we want it.

In [8]:
# For this demo, we'll just use the first year
# In production, we can change this logic
year = year_list[0]

# We're going to give most of the columns more readable names
desired_columns = ["STATION", "NAME", "DATE", "LONGITUDE", "LATITUDE", "TEMP", "VISIB", "WDSP", "PRCP", "MAX", "MIN", "FRSHTT"]
renamed_columns = ["station_id", "station_name", "date", "longitude", "latitude", "mean_temp", "visibility", "wind_speed", "precipitation", "max_temp", "min_temp", "FRSHTT"]

# Read in the data
input_prefix = f"s3://noaa-gsod-pds/{year}/"
df = spark.read.csv(input_prefix, header="true", schema=inferred_schema)

# Select only the columns we're interested in, while renaming them as well
from pyspark.sql.functions import col
column_selections = [col(desired_columns[idx]).alias(renamed_columns[idx]) for idx,_ in enumerate(desired_columns)]
dfSlim = df.select(*column_selections)
dfSlim.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------+----------+---------+---------+----------+----------+-------------+--------+--------+------+
| station_id|        station_name|      date| longitude| latitude|mean_temp|visibility|wind_speed|precipitation|max_temp|min_temp|FRSHTT|
+-----------+--------------------+----------+----------+---------+---------+----------+----------+-------------+--------+--------+------+
|72401599999|ALLEN C PERKINSON...|2021-01-01|-77.957528|37.074194|     43.0|       8.9|       7.2|         0.06|    59.2|    37.4|000000|
|72401599999|ALLEN C PERKINSON...|2021-01-02|-77.957528|37.074194|     44.8|       9.0|       4.5|         0.32|    60.8|    35.4|100000|
|72401599999|ALLEN C PERKINSON...|2021-01-03|-77.957528|37.074194|     44.9|       7.5|       6.0|         0.57|    48.9|    42.8|100000|
|72401599999|ALLEN C PERKINSON...|2021-01-04|-77.957528|37.074194|     41.6|       9.9|       2.5|          0.0|    48.7|    38.7|000000|
|72401599999|ALLEN C PERKINSON...|

In [9]:
# Let's break the FRSHTT column out into multiple columns
dfSlim = dfSlim \
    .withColumn("has_fog", dfSlim.FRSHTT.substr(1,1) == "1") \
    .withColumn("has_rain", dfSlim.FRSHTT.substr(2,1) == "1") \
    .withColumn("has_snow", dfSlim.FRSHTT.substr(3,1) == "1") \
    .withColumn("has_hail", dfSlim.FRSHTT.substr(4,1) == "1") \
    .withColumn("has_thunder", dfSlim.FRSHTT.substr(5,1) == "1") \
    .withColumn("has_tornado", dfSlim.FRSHTT.substr(6,1) == "1") \
    .drop("FRSHTT")

dfSlim.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------+----------+---------+---------+----------+----------+-------------+--------+--------+-------+--------+--------+--------+-----------+-----------+
| station_id|        station_name|      date| longitude| latitude|mean_temp|visibility|wind_speed|precipitation|max_temp|min_temp|has_fog|has_rain|has_snow|has_hail|has_thunder|has_tornado|
+-----------+--------------------+----------+----------+---------+---------+----------+----------+-------------+--------+--------+-------+--------+--------+--------+-----------+-----------+
|72401599999|ALLEN C PERKINSON...|2021-01-01|-77.957528|37.074194|     43.0|       8.9|       7.2|         0.06|    59.2|    37.4|  false|   false|   false|   false|      false|      false|
|72401599999|ALLEN C PERKINSON...|2021-01-02|-77.957528|37.074194|     44.8|       9.0|       4.5|         0.32|    60.8|    35.4|   true|   false|   false|   false|      false|      false|
|72401599999|ALLEN C PERKINSON...|2021-01-03|-77.9

In [10]:
# And just to verify...let's look at some tornado days
dfSlim.where("has_tornado==true").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------+----------+---------+---------+----------+----------+-------------+--------+--------+-------+--------+--------+--------+-----------+-----------+
| station_id|        station_name|      date| longitude| latitude|mean_temp|visibility|wind_speed|precipitation|max_temp|min_temp|has_fog|has_rain|has_snow|has_hail|has_thunder|has_tornado|
+-----------+--------------------+----------+----------+---------+---------+----------+----------+-------------+--------+--------+-------+--------+--------+--------+-----------+-----------+
|78016013601|L F WADE INTERNAT...|2021-06-08| -64.68333| 32.36667|     77.2|       6.3|       6.2|         0.02|    82.2|    71.8|  false|   false|   false|   false|      false|       true|
|78016013601|L F WADE INTERNAT...|2021-08-14| -64.68333| 32.36667|     81.8|       6.3|       2.8|         0.25|    86.7|    78.1|  false|    true|   false|   false|      false|       true|
|78016013601|L F WADE INTERNAT...|2021-09-13| -64.

Great, this dataframe looks promising! Let's write it back out to S3.

In [11]:
output_prefix = f"s3://{OUTPUT_BUCKET}/weather_data/year={year}/"

df_with_year_and_month = dfSlim \
    .withColumn("year", dfSlim.date.substr(1,4))

# Normally, we wouldn't repartition all the way to 1 because that forces all the data through one executor.
# However, in the case of this data, when converted to Parquet and compressed, the resulting file is only 22MB.
df_with_year_and_month \
    .repartition(1) \
    .write \
    .mode("overwrite") \
    .format("parquet") \
    .save(output_prefix)

print("Finished writing weather data out to: ", output_prefix)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Finished writing weather data out to:  s3://emr-dev-exp-068216289992/weather_data/year=2021/

Fantastic! In our [next notebook](02_data-analysis.ipynb), we're going to do some simple data analysis of the resulting data.

In this way, we're separating our data processing (ETL) vs. data analysis. And with EMR, we can now execute this notebook with different parameters.

## (Optional) Programmatic Notebook Execution

With EMR Studio, it's possible to [orchestrate your analytics jobs](https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-by-running-amazon-emr-notebooks-programmatically/) by using the `start-notebook-execution` API.

In order to do this, you'll need a few pieces of information:
- Cluster ID: This can be found on the top right of the notebook when you hover over "Cluster attached" and starts with `j-`.
- Workspace ID: This can be found in the URL and starts with `e-`. **Use the ALL CAPS version.**

You can also navigate back to your EMR Studio Workspaces list, change the settings and include **Id** and **Cluster**.

Once you have those values, start a new [AWS CloudShell](https://console.aws.amazon.com/cloudshell/home) and execute the following command with the `<WORKSPACE_ID>` and `<EMR_CLUSTER_ID>` replaced. This will run the provided EMR notebook on the referenced Cluster and convert weather data for the year `2020`.

```shell
export WORKSPACE_ID=e-ABCDEFGHIJKL123456
export EMR_CLUSTER_ID=j-ABCDEFGHIJ123

aws emr start-notebook-execution \
    --editor-id ${WORKSPACE_ID} \
    --notebook-params '{"years_to_process":"2020"}' \
    --relative-path /01_data-exploration.ipynb \
    --notebook-execution-name weather_2020 \
    --execution-engine '{"Id": "'${EMR_CLUSTER_ID}'"}' \
    --service-role EMRDevExp-NotebookExecutionRole
```

You'll reeive a notebook execution ID and you can use `aws emr describe-notebook-execution` to monitor the status of the notebook.

```json
{
    "NotebookExecutionId": "ex-ABCDEFGHIJKLMNOPQRS1234567890"
}
```

```shell
aws emr describe-notebook-execution \
    --notebook-execution-id ex-ABCDEFGHIJKLMNOPQRS1234567890
```

Once the execution reaches the `FINISHED` status, you should see new weather data in your S3 bucket.

```shell
aws s3 ls s3://emr-dev-exp-568026268536/weather_data/
```