<div>
<img src=https://www.institutedata.com/wp-content/uploads/2019/10/iod_h_tp_primary_c.svg width="300">
</div>

# Lab 10.3 - Using AWS for cluster computing

### Introduction

The purpose of this lab is to gain exposure to cluster computing, often necessary when datasets become too large to manage on a local machine. You will learn how to work with a large dataset through AWS - in particular using an EMR cluster and the PySpark Python library.

Note that this notebook will need to be loaded into an AWS SageMaker instance in order for the code to run succesfully. It will not work on your laptop.

**Important**: To prevent excessive billing costs, you will need to shut down your instances when you finish working on AWS for the day - see instructions at the bottom of this page. Please give yourself 10 minutes at the end of the day for this. 

After you have logged into the AWS console you will be able to see the billing cost explorer at https://console.aws.amazon.com/billing/home?/costexplorer#/costexplorer


### 1. Connecting to AWS via AWS Educate

Sign into the AWS console as done in Lab 10.1:

a) Sign in to https://www.awseducate.com/signin/SiteLogin and navigate to the "Classrooms & Credits" tab, then "Request or go to an AWS Educate Classroom".

b) Select "My classrooms" and then the "Go to classroom" link that appears under "Courses where I am a Student".

c) You will be redirected to a https://labs.vocareum.com page. Click "AWS Console" and you will be directed to your AWS console.

### 2. Setting up a SageMaker instance connected to an EMR cluster

Follow the instructions in the link below to create a SageMaker notebook instance linked to a Spark  Elastic Map Reduce (EMR) cluster. This involves configuring some security groups to allow SageMaker to communicate with the cluster. Finally the notebook is connected to Spark by using Livy, a REST API for interacting with Spark.

https://aws.amazon.com/blogs/machine-learning/build-amazon-sagemaker-notebooks-backed-by-spark-in-amazon-emr/

**Optional**: To use up fewer credits, use a cheaper instance in EMR than the `m5.xlarge` default given. One possibility is using `m4.large` instead (but it will be slower to run). A full list of instances and pricing is given at https://aws.amazon.com/emr/pricing/.

While working through this setup it will be convenient to have a text editor (e.g Notepad) open to make note of the following details about the cluster that will be needed when setting up SageMaker. Have separate browser tabs open for the EMR cluster console and for SageMaker.

- Private IP Address: e.g. `172.31.46.73` 
- VPC name: `vpc-..... (<ip address>)`
- EC2 Subnet: `subnet-..... default in us-...`
- EMR master's security group: `sg-... (ElasticMapReduce-master)`

### 3. Setting up a Spark session

Upload this notebook into your SageMaker instance setting the kernel to `sparkmagic` where requested and then run the cell below. You will be able to proceed if there is no error.

In [1]:
%%info

In [2]:
from pyspark.sql import SparkSession

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1613274019988_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The SparkSession class allows one to create a Spark DataFrame.

In [3]:
spark = SparkSession.builder.appName("Temperature Analysis").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4. Working with a large dataset

We will work with a public dataset from the Registry of Open Data on AWS. GSOD (Global Surface Summary of the Day) contains daily weather measurements from over 9000 weather stations dating back to 1901 (with more complete data since the 1970s). It is located in s3 at s3://aws-gsod. More details about the data can be found at https://aws.amazon.com/public-datasets/gsod/

Note that some of the cells to be run below start with %%local. This means those cells are run locally (on the SageMaker instance) rather than on the cluster.

In [4]:
%%help

Magic,Example,Explanation
info,%%info,Outputs session information for the current Livy endpoint.
cleanup,%%cleanup -f,"Deletes all sessions for the current Livy endpoint, including this notebook's session. The force flag is mandatory."
delete,%%delete -f -s 0,Deletes a session by number for the current Livy endpoint. Cannot delete this kernel's session.
logs,%%logs,Outputs the current session's Livy logs.
configure,"%%configure -f {""executorMemory"": ""1000M"", ""executorCores"": 4}",Configure the session creation parameters. The force flag is mandatory if a session has already been  created and the session will be dropped and recreated. Look at Livy's POST /sessions Request Body for a list of valid parameters. Parameters must be passed in as a JSON string.
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."
sql,%%sql -o tables -q SHOW TABLES,"Executes a SQL query against the variable sqlContext (Spark v1.x) or spark (Spark v2.x).  Parameters:  -o VAR_NAME: The result of the SQL query will be available in the %%local Python context as a  Pandas dataframe.  -q: The magic will return None instead of the dataframe (no visualization).  -m, -n, -r are the same as the %%spark parameters above."
local,%%local a = 1,All the code in subsequent lines will be executed locally. Code must be valid Python code.
send_to_spark,%%send_to_spark -i variable -t str -n var,"Sends a variable from local output to spark cluster.  Parameters:  -i VAR_NAME: Local Pandas DataFrame(or String) of name VAR_NAME will be available in the %%spark context as a Spark dataframe(or String) with the same name.  -t TYPE: Specifies the type of variable passed as -i. Available options are:  `str` for string and `df` for Pandas DataFrame. Optional, defaults to `str`.  -n NAME: Custom name of variable passed as -i. Optional, defaults to -i variable name.  -m MAXROWS: Maximum amount of Pandas rows that will be sent to Spark. Defaults to 2500."


Let us read in the data from all weather stations for the year 2000. Here programming will primarily use the PySpark library.

In [5]:
weatherdata2000 = spark.read.option("header", "true").csv('s3://aws-gsod/2000/*.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
type(weatherdata2000)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pyspark.sql.dataframe.DataFrame'>

The printSchema() method is useful to list the types of each column.

In [7]:
weatherdata2000.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ID: string (nullable = true)
 |-- USAF: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- Elevation: string (nullable = true)
 |-- Country_Code: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Mean_Temp: string (nullable = true)
 |-- Mean_Temp_Count: string (nullable = true)
 |-- Mean_Dewpoint: string (nullable = true)
 |-- Mean_Dewpoint_Count: string (nullable = true)
 |-- Mean_Sea_Level_Pressure: string (nullable = true)
 |-- Mean_Sea_Level_Pressure_Count: string (nullable = true)
 |-- Mean_Station_Pressure: string (nullable = true)
 |-- Mean_Station_Pressure_Count: string (nullable = true)
 |-- Mean_Visibility: string (nullable = true)
 |-- Mean_Visibility_Count: string (nullable = true)
 |-- Mean_Windspeed: string (nullable = true)
 |-- Mean_Windspeed_C

Use the count() method to show that this dataset has over 2.5 million rows.

In [8]:
weatherdata2000.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2546978

Show a few rows

In [9]:
weatherdata2000.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+-----+---------+------------+--------+---------+----------+----+-----+---+---------+---------------+-------------+-------------------+-----------------------+-----------------------------+---------------------+---------------------------+---------------+---------------------+--------------+--------------------+-------------+--------+--------+---------------------+--------+---------------------+-------------+-----------+----------+---+---------------+-----------+----+-------+-------+
|          ID|  USAF| WBAN|Elevation|Country_Code|Latitude|Longitude|      Date|Year|Month|Day|Mean_Temp|Mean_Temp_Count|Mean_Dewpoint|Mean_Dewpoint_Count|Mean_Sea_Level_Pressure|Mean_Sea_Level_Pressure_Count|Mean_Station_Pressure|Mean_Station_Pressure_Count|Mean_Visibility|Mean_Visibility_Count|Mean_Windspeed|Mean_Windspeed_Count|Max_Windspeed|Max_Gust|Max_Temp|Max_Temp_Quality_Flag|Min_Temp|Min_Temp_Quality_Flag|Precipitation|Precip_Flag|Snow_Depth|Fog|Rain_or_Drizzle|Snow_or_Ice|Hail

Here is a neater way to display the same data:

In [10]:
weatherdata2000.show(n=5, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-------------------------------------
 ID                            | 719250-99999 
 USAF                          | 719250       
 WBAN                          | 99999        
 Elevation                     | 27.4         
 Country_Code                  | CA           
 Latitude                      | 69.108       
 Longitude                     | -105.138     
 Date                          | 2000-01-01   
 Year                          | 2000         
 Month                         | 01           
 Day                           | 01           
 Mean_Temp                     | -14.2        
 Mean_Temp_Count               | 24           
 Mean_Dewpoint                 | -23.0        
 Mean_Dewpoint_Count           | 24           
 Mean_Sea_Level_Pressure       | 990.5        
 Mean_Sea_Level_Pressure_Count | 24           
 Mean_Station_Pressure         | 987.3        
 Mean_Station_Pressure_Count   | 8            
 Mean_Visibility               | 1.7          
 Mean_Visibil

The following cell creates a Pandas dataframe that contains data for one weather station.

In [11]:
%%spark -o syd_pdf
syd_pdf = weatherdata2000.filter(weatherdata2000.USAF == '947680')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Here is a reference list of a few weather station locations by USAF, in case you would like to try another location.

947680 - Sydney

948680 - Melbourne

486980 - Singapore

725033 - New York

583620 - Shanghai

037683 - London

In [12]:
%%local
type(syd_pdf)

pandas.core.frame.DataFrame

We see it is now a familiar Pandas dataframe. Being of manageable size, one may work with it using Pandas.

In [13]:
%%local
syd_pdf.shape

(366, 34)

**Exercise**: Create a line chart of this dataframe using Date on the x axis and Mean_Temp on the y axis. Note that the cell below is enough to get us started.

**Bonus**: Convert the temperature to degrees Celcius before plotting.

In [14]:
%%local
syd_pdf

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

PySpark has `groupby`, `filter`, `select` and `describe` as commonly used methods. For example the following cell summarises the Max_Temp column in weatherdata2000.

In [15]:
weatherdata2000.describe(['Max_Temp']).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+
|summary|          Max_Temp|
+-------+------------------+
|  count|           2544696|
|   mean|63.654217281750064|
| stddev|23.569468189725317|
|    min|              -0.2|
|    max|              99.9|
+-------+------------------+

Note the max does not seem to exceed 100 because the values of this field are still strings. We shall use the following code to convert the types of a few columns.

In [16]:
from pyspark.sql.functions import avg
from pyspark.sql.functions import col

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
cols = ['Max_Temp', 'Mean_Temp', 'Min_Temp']

for col_name in cols:
    weatherdata2000 = weatherdata2000.withColumn(col_name, col(col_name).cast('float'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Verify that this did as intended:

In [18]:
weatherdata2000.describe(['Max_Temp']).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|         Max_Temp|
+-------+-----------------+
|  count|          2544696|
|   mean|63.65421727994728|
| stddev|23.56946819213572|
|    min|           -108.9|
|    max|            130.6|
+-------+-----------------+

**Exercise**: How many records in weatherdata2000 have Country_Code of 'AS' (Australia)?

In [19]:
weatherdata2000.filter(weatherdata2000.Country_Code=='AS').count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

155257

**Exercise**: Write a groupby query to determine the 10 hottest weather stations by average maximum temperature (i.e. averaged over the year for each station ID). Show four column headings: ID, Latitude, Longitude and the average maximum temperature.

In [20]:
weatherdata2000.groupby('ID', 'Latitude', 'Longitude').agg(avg('Max_Temp').alias('AvMax')).sort(col('AvMax').desc()).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------+---------+------------------+
|          ID|Latitude|Longitude|             AvMax|
+------------+--------+---------+------------------+
|647530-99999|  17.917|   19.111|108.05000305175781|
|626800-99999|    17.7|   33.967|101.52674752093375|
|410300-99999|  21.433|   39.767| 101.3207650940275|
|627330-99999|  15.317|     35.6|100.91104301499443|
|652450-99999|    6.15|    6.783|100.80000305175781|
|651010-99999|    8.44|    4.494|100.45000076293945|
|689260-99999| -34.083|    21.25| 100.4000015258789|
|612720-99999|    13.4|    -6.15|100.34505471030434|
|626400-99999|  19.533|   33.317|100.18717189154889|
|626600-99999|   18.55|    31.85|100.10089523543172|
+------------+--------+---------+------------------+
only showing top 10 rows

**Optional**: If time permits look up PySpark documentation to investigate other ways to analyse the data.

### 5. Closing AWS instances

**Firstly, ensure you have saved your notebook locally** before performing this. Also give yourself sufficient time at the end of the day (10 minutes) to complete this.

Stop your SageMaker instance by returning to the "Notebook instances" page, selecting your instance and then under Actions select "Stop". Then once the status shows "Stop" you may delete this instance. 

Terminate the EMR cluster by selecting it in the console and then pressing the Terminate button. A window might show that your cluster has termination protection turned on. Select "Turn off protection", select the tick button and then hit the red terminate button.

Once the EMR, EC2 and SageMaker consoles are clear of active instances, you may sign out of AWS.


After a couple of days log into https://console.aws.amazon.com/billing/home?/costexplorer#/costexplorer via AWS Educate to see if there are still running charges.

### Further reference

https://github.com/aws/amazon-sagemaker-examples



---



---



> > > > > > > > > © 2021 Institute of Data


---



---



