# Part 1. Process NOAA weather conditions data

In this part, I will process yearly weather data based on monitoring sites. The raw data named "gsoy-latest.tar.gz" is downloaded from NOAA Climate Data Center (https://www.ncdc.noaa.gov/cdo-web/datasets). One challenge to process the data is that there are more than 20,000 individual .csv files in the gz file, so it is unfeasible to uncompress the file locally. Instead, I directly uncompressed the file on the cloud, following the instructions from this link: https://medium.com/@carolynjjankowski/using-aws-to-untar-a-file-and-transfer-files-to-an-s3-bucket-e24384045515. I firstly created an s3 bucket named "noaadata" and uploaded the raw data file to the bucket. Then I set up an EC2 instance to uncompress the file and then transfered all invididual .csv files to a folder named "all" in the same bucket. We can see what matereials are in the folder by running the following bash command. The individual .csv data files are in the "s3" folder.

In [1]:
# Take a look at data location
%%bash 
aws s3 ls s3://noaadata/all/

                           PRE .cache/
                           PRE .local/
                           PRE .ssh/
                           PRE s3/
2020-05-22 22:06:24          0 
2020-05-22 23:42:18        124 .bash_history
2020-05-22 23:42:18         18 .bash_logout
2020-05-22 23:42:18        193 .bash_profile
2020-05-22 23:42:18        231 .bashrc
2020-05-22 23:43:02    1869136 get-pip.py


Then I will load all the files in the "s3" folder. Please note that each data file corresponds to a monitoring station and is named after the station code. I only need data for the US, so I will only load data files starting with "US".

In [2]:
# Load data
data = spark.read.format("csv").option("header", "true").load("s3://noaadata/all/s3/US*.csv")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1590299307055_0010,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Take a look at the data fields and select the ones I need, including location, elevation, temperature, and precipitation.

In [3]:
data.printSchema()
data.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- AWND: string (nullable = true)
 |-- AWND_ATTRIBUTES: string (nullable = true)
 |-- CDSD: string (nullable = true)
 |-- CDSD_ATTRIBUTES: string (nullable = true)
 |-- CLDD: string (nullable = true)
 |-- CLDD_ATTRIBUTES: string (nullable = true)
 |-- DP01: string (nullable = true)
 |-- DP01_ATTRIBUTES: string (nullable = true)
 |-- DP10: string (nullable = true)
 |-- DP10_ATTRIBUTES: string (nullable = true)
 |-- DP1X: string (nullable = true)
 |-- DP1X_ATTRIBUTES: string (nullable = true)
 |-- DSND: string (nullable = true)
 |-- DSND_ATTRIBUTES: string (nullable = true)
 |-- DSNW: string (nullable = true)
 |-- DSNW_ATTRIBUTES: string (nullable = true)
 |-- DT00: string (nullable = true)
 |-- DT00_ATTRIBUTES: string (nullable = true)
 |-- DT32: stri

In [4]:
data[['STATION','DATE','LATITUDE','LONGITUDE','ELEVATION','PRCP','TAVG','TMIN','TMAX']].show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----+--------+---------+---------+------+-----+----+-----+
|    STATION|DATE|LATITUDE|LONGITUDE|ELEVATION|  PRCP| TAVG|TMIN| TMAX|
+-----------+----+--------+---------+---------+------+-----+----+-----+
|USW00094728|1869|40.77898|-73.96925|     42.7|1149.9| null|null| null|
|USW00094728|1870|40.77898|-73.96925|     42.7| 997.3|12.02|8.33|15.70|
|USW00094728|1871|40.77898|-73.96925|     42.7|1305.6|10.71|6.52|14.90|
|USW00094728|1872|40.77898|-73.96925|     42.7|1117.9|10.74|6.54|14.93|
|USW00094728|1873|40.77898|-73.96925|     42.7|1219.6|10.43|6.39|14.48|
|USW00094728|1874|40.77898|-73.96925|     42.7|1164.6|10.69|6.44|14.94|
|USW00094728|1875|40.77898|-73.96925|     42.7|1033.5| 9.66|5.75|13.56|
|USW00094728|1876|40.77898|-73.96925|     42.7|1061.5|11.09|7.34|14.83|
|USW00094728|1877|40.77898|-73.96925|     42.7|1021.6|11.55|7.78|15.32|
|USW00094728|1878|40.77898|-73.96925|     42.7|1236.7|12.01|8.33|15.68|
|USW00094728|1879|40.77898|-73.96925|     42.7| 991.9|11.19|7.18

In [5]:
# Select variables and limit data to 2003-2014
data_select = data[['STATION','DATE','LATITUDE','LONGITUDE','ELEVATION','PRCP','TAVG','TMIN','TMAX']].filter('DATE>=2003').filter('DATE<=2014')
data_select.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----+--------+---------+---------+------+-----+-----+-----+
|    STATION|DATE|LATITUDE|LONGITUDE|ELEVATION|  PRCP| TAVG| TMIN| TMAX|
+-----------+----+--------+---------+---------+------+-----+-----+-----+
|USW00094728|2003|40.77898|-73.96925|     42.7|1484.7|11.88| 8.14|15.63|
|USW00094728|2004|40.77898|-73.96925|     42.7|1319.2|12.49| 8.62|16.37|
|USW00094728|2005|40.77898|-73.96925|     42.7|1422.1|13.18| 9.18|17.17|
|USW00094728|2006|40.77898|-73.96925|     42.7|1521.5|13.77| 9.85|17.70|
|USW00094728|2007|40.77898|-73.96925|     42.7|1567.9|12.78| 8.75|16.82|
|USW00094728|2008|40.77898|-73.96925|     42.7|1362.4|12.97| 8.80|17.13|
|USW00094728|2009|40.77898|-73.96925|     42.7|1362.4|12.23| 8.39|16.07|
|USW00094728|2010|40.77898|-73.96925|     42.7|1254.7|13.72| 9.74|17.69|
|USW00094728|2011|40.77898|-73.96925|     42.7|1850.3|13.56| 9.55|17.58|
|USW00094728|2012|40.77898|-73.96925|     42.7| 979.0|14.06|10.16|17.96|
+-----------+----+--------+---------+---------+----

As I will need to merge the selected weather data with some other data, I will send these processed weather data to another folder on my s3 bucket. Here I tried two different ways: (1) I used the coalesce option to write everything into one file. However this approach is very time-consuming. I believe that only one core can be used in this approach, so it is not taking advantage of the parallel computing power of PySpark. Also, if the data is extremely large, this approach may cause out of memory errors, so in the future I will always use the second approach. (2) Let PySpark to use partitions to write the files in the folder. This will end up multiple .csv files. Then I used bash command to combine the files into one. This approach needs an additional step of convertion, but it is much faster than the previous one.

In [None]:
# (1) Save data as one .csv file on s3
data_select.coalesce(1).write.save("s3://noaadata/us_weather", format='csv', header=True)

# (2) Alternative: convert parts into one csv file
# data_select.write.save("s3://noaadata/us", format='csv', header=True)
# %%bash 
# aws s3 sync s3://noaadata/us /Users/artemisyang/Dropbox/project
# cat /Users/artemisyang/Dropbox/project/part-* > /Users/artemisyang/Dropbox/us_weather.csv

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next I will explore more about the data quality. After deleting the rows with missing values, there are only 5744 records left. Then I calculated the total number of weather stations, which is only 717. This is far less than the number of facilities in my emissions dataset. As there are too many missing values, the data quality is not good enough for my research purpose. However, after some consultation with my advisor, I was able to find a better data source. For the next step, I will use another notebook to explore the other weather data source and conduct the main machine learning analysis. 

In [1]:
# Investigate
data = spark.read.format("csv").option("header", "true").load("s3://noaadata/us_weather.csv")
print('Total Rows: %d' % data.count())
data = data.dropna()
print('Total Rows: %d' % data.count())
data.printSchema()
data.show(5)
from pyspark.sql.functions import col, countDistinct
data.agg(countDistinct(col("STATION")).alias("count")).show()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1591568792608_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Rows: 129729
Total Rows: 5744
root
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- TAVG: string (nullable = true)
 |-- TMIN: string (nullable = true)
 |-- TMAX: string (nullable = true)

+-----------+----+--------+---------+---------+------+-----+----+-----+
|    STATION|DATE|LATITUDE|LONGITUDE|ELEVATION|  PRCP| TAVG|TMIN| TMAX|
+-----------+----+--------+---------+---------+------+-----+----+-----+
|USW00094728|2003|40.77898|-73.96925|     42.7|1484.7|11.88|8.14|15.63|
|USW00094728|2004|40.77898|-73.96925|     42.7|1319.2|12.49|8.62|16.37|
|USW00094728|2005|40.77898|-73.96925|     42.7|1422.1|13.18|9.18|17.17|
|USW00094728|2006|40.77898|-73.96925|     42.7|1521.5|13.77|9.85| 17.7|
|USW00094728|2007|40.77898|-73.96925|     42.7|1567.9|12.78|8.75|16.82|
+-----------+----+--------+---------+----