# Get NOAA ghcn/daily/by_year into parquet on S3

**How to process the new NOAA data (from Yoav)**

*Input:* A dataframe with columns: `Station, Year, day-in-year, measurement type, value`
1. Translate Dataframe into RDD of rows
2. Map into key-value RDD with the format `key=(Station,year, measurement)  value=[(day-of-year, value)]`
3. Reduce by key: take the union
4. Results in an RDD of the form: `key=(Station,year, measurement) value=[(....),(....)  ]`
5. Translate the RDD into a dataframe: map value list into a 365 array, pack into bytearray

## Setup and imports

In [1]:
from Startup import *
from pyspark.sql.types import *
sc.stop()
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)

finished standard imports
dict_items([('spark.executor.cores', '1'), ('spark.cores.max', '4'), ('spark.app.name', 'Weather_PCA'), ('spark.executor.memory', '3g'), ('spark.logConf', 'True'), ('spark.default.parallelism', '10')])
started SparkContext and SQLContext in 14.48 seconds
loaded weather.parquet in 9.07 seconds
loaded stations.parquet in 0.21 seconds
registered dataframes as tables in 0.21 seconds


## Get the .csv files from NOAA s3 bucket and put into data fame

In [2]:
%%time
startYear = 1763  # data starts in 1763
endYear = 2020  # data (currently ends in 2020)

# set schema for import from csv
schemaString = "id year_date element data_value"
schema = StructType([StructField(field_name, StringType(), True) for field_name in schemaString.split()])

# the s3 bucket noaa-ghcn-pds/csv/ contains all of the observations from 1763 to the present organized in .csv files
# loop through all years 
allYears = np.arange(startYear,endYear+1)
for yr in allYears:
    fn = "s3://noaa-ghcn-pds/csv/" + str(yr) + ".csv"
    dt = sc.textFile(fn).map(lambda l: l.split(",")).map(lambda p: ([x.strip() for x in p[0:4]]))
    schemaDT = sqlContext.createDataFrame(dt, schema)
    if yr == allYears[0]:
        df = schemaDT
    else:
        df = df.union(schemaDT)
        
# register table for querying
df.createOrReplaceTempView("ghcnd")

CPU times: user 2.15 s, sys: 204 ms, total: 2.35 s
Wall time: 9.23 s


## Query for data, and specify if the year is a leap year

In [3]:
%%time
# cast values to appropriate types and include whether year is leap year or not
qry = """
SELECT id AS Station,
       CAST(SUBSTRING(year_date, 1, 4) AS SMALLINT) AS Year,
       DAYOFYEAR(TO_DATE(year_date,'yyyyMMdd')) AS Day,
       NOT ISNULL(TO_DATE(CONCAT(SUBSTRING(year_date, 1, 4), '0229'),'yyyyMMdd')) AS isleapyear,
       element AS Measurement,
       CAST(data_value AS FLOAT) AS Value
FROM ghcnd"""
raw_data = sqlContext.sql(qry)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 197 ms


## Get the data into [Station, Year, Measurement, Value by Day of Year] format

In [4]:
%%time
# define function for mapping (day-of-year, value) pairs into 365-day array
def putIntoArray(x):
    arr = [None] * 366  # initialize list
    for pair in x[1]:
        arr[pair[0]-1] = pair[1]  # set the (dayOfYear-1)-th entry to the corresponding value
    if x[0][3]:
        del arr[59]  # delete feb 29 for leap year
    else:
        del arr[365]  # delete day 366 for non leap year
        print('toss last day')
    return (x[0][0], x[0][1], x[0][2], arr)

# 1. Translate Dataframe into RDD of rows
arr_rdd = (raw_data.rdd
           # 2. Map into key-value RDD with the format `key=(Station,year, measurement)  value=[(day-of-year, value)]`
           .map(lambda x: ((x[0],x[1],x[4],x[3]),[(x[2],x[5])]))
           # 3. Reduce by key: take the union
           # 4. Results in an RDD of the form: `key=(Station,year, measurement) value=[(....),(....)  ]`
           .reduceByKey(lambda a, b: a + b)
           # 5. Translate the RDD into a dataframe: map value list into a 365 array, pack into bytearray
           .map(putIntoArray))

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 5.88 s


## Make a dataframe from the RDD

In [5]:
%%time
schemaString = "Station Year Measurement Values"
typeList = [StringType(), IntegerType(), StringType(), ArrayType(DoubleType())]
schema = StructType([StructField(field_name, typeList[i], True) for i,field_name in enumerate(schemaString.split())])
arr_df = sqlContext.createDataFrame(arr_rdd,schema)

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 23 ms


## Write to a .parquet file on s3

In [6]:
%%time
arr_df.write.parquet("s3://philipp-ghcnd/GHCNDby_year.parquet")

CPU times: user 1.16 s, sys: 104 ms, total: 1.26 s
Wall time: 3h 18min 1s


## Test by loading data from S3

In [8]:
%%time
s3Data = sqlContext.sql("SELECT * FROM parquet.`s3://philipp-ghcnd/GHCNDby_year.parquet`")
s3Data.show()

+-----------+----+-----------+--------------------+
|    Station|Year|Measurement|              Values|
+-----------+----+-----------+--------------------+
|USC00369050|2015|       WT11|[,,, 1.0,,,,,,,,,...|
|USW00023275|1954|       SNOW|[,,,,,,,,,,,,,,,,...|
|USC00204954|1936|       PRCP|[0.0, 147.0, 0.0,...|
|USS0007K02S|1979|       PRCP|[0.0, 25.0, 0.0, ...|
|USC00273177|1929|       PRCP|[117.0, 38.0, 0.0...|
|CA007056200|1973|       PRCP|[48.0, 0.0, 0.0, ...|
|US1MAPL0023|2017|       WESF|[,,,,, 36.0,,,,,,...|
|IN011360600|1922|       PRCP|[97.0, 168.0, 0.0...|
|ASN00092083|1973|       PRCP|[0.0, 0.0, 0.0, 0...|
|ASN00023708|1918|       PRCP|[0.0, 0.0, 0.0, 0...|
|USC00306774|1991|       WT11|[,,,,,,,,,,,,,,,,...|
|USC00292820|2020|       MDPR|[,,,,,,,,,,,,,,,,...|
|USC00238524|1984|       DAPR|[,,,,,,,,,,,,,,,,...|
|USC00349760|2009|       WT05|[,,,,,,,,,,,,,,,,...|
|USC00148563|1963|       SNWD|[0.0, 0.0, 0.0, 0...|
|GMM00010253|1970|       SNWD|[80.0, 70.0, 120....|
|MXN00015076

In [9]:
%%time
s3Data.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 19.6 s


12346092