#  Problem Summary:

# This project is for preparing data set for weather observation for the year 2017. We are given a raw data set contains weather station, weather date, weather type and weather data corresponding to the weather type. We assume that the weather data is observed many times within a day and each weather station. In this way, we could group the data based on stations and dates and obtain the weather data with different types.

# Specification of the infrastructure that you have used 

# I used my PC as a Spark Standalone instance with HDFS & Hive to solve this problem. Python in the language to solve the problem. 

In [111]:
import os
import wget
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row

In [2]:
exec(open(os.path.join(os.environ["SPARK_HOME"],'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.6.5 (default, Apr 26 2018 08:42:37)
SparkSession available as 'spark'.


In [3]:
sc

# (1) Download the file to local directory and unzip it.

In [112]:
# Download the file 
url="ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2017.csv.gz"
filename = wget.download(url)

In [None]:
# unzip the file to local directory
import gzip
import shutil
with gzip.open('2017.csv.gz', 'rb') as f_in:
    with open('2017.csv', 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)

# (2)  Store the raw data in Hive as a table named ‘WeatherRaw’.

In [70]:
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row

# load data into spark RDD and save into HIVE table
csv_data = sc.textFile("2017.csv")
csv_data  = csv_data.map(lambda p: p.split(","))
df_csv = csv_data.map(lambda p: Row(station_identifier = p[0], observation_date = p[1], observation_type=p[2], observation_value=p[3], observation_measure=p[4],observation_quality=p[5],observation_source=p[6],observation_time=p[7])).toDF()
hc = HiveContext(sc)
df_csv.write.format("orc").saveAsTable("WeatherRaw")


In [71]:
df_csv.show(5)

+----------------+-------------------+-------------------+------------------+----------------+----------------+-----------------+------------------+
|observation_date|observation_measure|observation_quality|observation_source|observation_time|observation_type|observation_value|station_identifier|
+----------------+-------------------+-------------------+------------------+----------------+----------------+-----------------+------------------+
|        20170101|                   |                   |                 N|                |            PRCP|                0|       US1MISW0005|
|        20170101|                   |                   |                 N|                |            SNOW|                0|       US1MISW0005|
|        20170101|                   |                   |                 N|                |            SNWD|                0|       US1MISW0005|
|        20170101|                   |                   |                 N|                |            

# (3) Generate the final dataset and store it in Hive as a table named ‘WeatherCurated’. 

In [106]:
import pyspark.sql.functions as F

# Generate the "WeatherCurated" Table by doing data transformation. 

df_cur = df_csv.groupBy("station_identifier","observation_date").agg(
(F.sum(F.when(F.col('observation_type')=='PRCP',F.col('observation_value')))/10).alias('Precipitation'),
(F.max(F.when(F.col('observation_type')=='TMAX',F.col('observation_value')))/10).alias('MaxTemparature'),
(F.min(F.when(F.col('observation_type')=='TMIN',F.col('observation_value')))/10).alias('MinTemparature'),
(F.sum(F.when(F.col('observation_type')=='SNOW',F.col('observation_value')))).alias('Snowfall'),
(F.sum(F.when(F.col('observation_type')=='SNWD',F.col('observation_value')))).alias('SnowDepth'),
(F.sum(F.when(F.col('observation_type')=='EVAP',F.col('observation_value')))/10).alias('Evaporation'),
(F.sum(F.when(F.col('observation_type')=='WESD',F.col('observation_value')))/10).alias('WaterEquivalentSnowDepth'),
(F.sum(F.when(F.col('observation_type')=='WESF',F.col('observation_value')))/10).alias('WaterEquivalentSnowFall'),
(F.sum(F.when(F.col('observation_type')=='PSUN',F.col('observation_value')))).alias('Sunshine')
).sort('observation_date')

df_cur = df_cur.select(F.col('station_identifier').alias('Station_identifier'),
                       F.col('observation_date').alias('Observation_date'),
                        F.col('Precipitation'),
                        F.col('MaxTemparature'),
                        F.col('MinTemparature'),
                        F.col('Snowfall'),
                        F.col('SnowDepth'),
                        F.col('Evaporation'),
                        F.col('WaterEquivalentSnowDepth'),
                        F.col('WaterEquivalentSnowFall'),
                        F.col('Sunshine')
                        )
df_cur.write.format("orc").saveAsTable("WeatherCurated")

In [109]:
df_cur.write.option('delimiter','\t').csv('aggre.tsv')
df_cur.show(10)

+------------------+----------------+-------------+--------------+--------------+--------+---------+-----------+------------------------+-----------------------+--------+
|Station_identifier|Observation_date|Precipitation|MaxTemparature|MinTemparature|Snowfall|SnowDepth|Evaporation|WaterEquivalentSnowDepth|WaterEquivalentSnowFall|Sunshine|
+------------------+----------------+-------------+--------------+--------------+--------+---------+-----------+------------------------+-----------------------+--------+
|       ASN00011019|        20170101|          0.0|          22.1|          16.4|    null|     null|       null|                    null|                   null|    null|
|       ASN00010519|        20170101|          0.0|          null|          null|    null|     null|       null|                    null|                   null|    null|
|       ASN00003001|        20170101|          0.0|          null|          null|    null|     null|       null|                    null|        