In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext
import sys
import re
import os
import calendar

In [2]:
location_path = '/home/jovyan/Locations/WeatherStationLocations.csv'
output_path = '/home/jovyan/result.txt'

In [3]:
spark1 = SparkConf().setMaster("local").setAppName("236")
sc = SparkContext(conf = spark1)

In [4]:
spark=SparkSession \
        .builder \
        .appName('236') \
        .master("local")\
        .getOrCreate()

In [5]:
# Group stations in US by state
location = spark.read.csv(location_path).toPandas()  #Read csv to pandas dataframe
pandasDF = location.loc[1:, ['_c0', '_c3', '_c4']]           #only take these columns
sparkDF = spark.createDataFrame(pandasDF)           #convert the pandas dataframe to spark sql dataframe
# CTRY==US and ST is non-empty and we only take column USAF and ST
location_US = sparkDF.rdd.filter(lambda x: x[1]=='US').filter(lambda x: x[2]!=None).map(lambda x: (x[0],x[2]))
location_US.take(5)

[('423630', 'LA'),
 ('690014', 'NM'),
 ('690020', 'CA'),
 ('690044', 'UT'),
 ('690064', 'UT')]

In [6]:
# Read four recordings files
record1 = sc.textFile('/home/jovyan/Recordings/2006.txt')
record2 = sc.textFile('/home/jovyan/Recordings/2007.txt')
record3 = sc.textFile('/home/jovyan/Recordings/2008.txt')
record4 = sc.textFile('/home/jovyan/Recordings/2009.txt')
# Union 4 files
record = sc.union([record1,record2,record3,record4])
#Remove the header to make it easy to process
header = record.first()
record = record.filter(lambda x: x!=header).map(lambda x: re.findall('[\S]+',x)).map(lambda x: (x[0],(x[2][2:8], x[19])))
# Join the record and the location
temp1 = location_US.join(record)
temp1.take(5)

[('701333', ('AK', ('060101', '0.00H'))),
 ('701333', ('AK', ('060102', '0.00A'))),
 ('701333', ('AK', ('060103', '0.00I'))),
 ('701333', ('AK', ('060104', '0.00H'))),
 ('701333', ('AK', ('060105', '0.00H')))]

In [7]:
def computePRCP_perDay(str):
    if str is 'A':  #6
        return 4
    if str is 'B':  #12
        return 2
    if str is 'C':  #18
        return 1.3
    if str is 'D':  #24
        return 1
    if str is 'E':  #12
        return 2
    if str is 'F':  #24
        return 1
    if str is 'G':  #24
        return 1
    if str is 'H':
        return 0
    if str is 'I':
        return 0
    else:   #99.99
        return 0

# Compute the PRCP of one day based on the character
temp2 = temp1.map(lambda x: ((x[1][0], x[1][1][0]), float(x[1][1][1][0:4])*computePRCP_perDay(x[1][1][1][4])))
# Combine the records of same state and same day,compute their avg
temp3 = temp2.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1])) \
                          .mapValues(lambda x: round((x[0]/x[1]),2))
#Take the three columns we need
temp3 = temp3.map(lambda x: ((x[0][0], x[0][1][2:4]),x[1]))
temp3.collect()

[(('AK', '02'), 0.04),
 (('AK', '03'), 0.04),
 (('AK', '03'), 0.05),
 (('AK', '05'), 0.03),
 (('AK', '07'), 0.01),
 (('AK', '08'), 0.02),
 (('AK', '09'), 0.14),
 (('AK', '11'), 0.01),
 (('AK', '12'), 0.03),
 (('AK', '01'), 0.02),
 (('AK', '02'), 0.03),
 (('AK', '02'), 0.02),
 (('AK', '02'), 0.09),
 (('AK', '03'), 0.04),
 (('AK', '07'), 0.06),
 (('AK', '07'), 0.03),
 (('AK', '09'), 0.05),
 (('AK', '01'), 0.01),
 (('AK', '02'), 0.03),
 (('AK', '02'), 0.05),
 (('AK', '05'), 0.02),
 (('AK', '05'), 0.03),
 (('AK', '05'), 0.01),
 (('AK', '07'), 0.04),
 (('AK', '08'), 0.01),
 (('AK', '08'), 0.07),
 (('AK', '10'), 0.06),
 (('AK', '10'), 0.02),
 (('AK', '10'), 0.04),
 (('AK', '11'), 0.01),
 (('AK', '02'), 0.05),
 (('AK', '05'), 0.0),
 (('AK', '05'), 0.02),
 (('AK', '06'), 0.0),
 (('AK', '03'), 0.07),
 (('VA', '01'), 0.08),
 (('VA', '02'), 0.0),
 (('VA', '02'), 0.05),
 (('VA', '03'), 0.0),
 (('VA', '04'), 0.0),
 (('VA', '05'), 0.0),
 (('VA', '05'), 0.01),
 (('VA', '06'), 0.0),
 (('VA', '09'), 0.

In [8]:
# Compute the PRCP of a month(sum values that have the same key--key is state and month)
temp4 = temp3.reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], False)
# Compute the difference of max PRCP and min PRCP
max_p = temp4.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey().map(lambda x: (x[0],sorted(x[1],key=lambda a:a[1],reverse=True)))\
            .map(lambda x: (x[0], x[1][0][1]))
min_p = temp4.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey().map(lambda x: (x[0],sorted(x[1],key=lambda a:a[1],reverse=False)))\
            .map(lambda x: (x[0], x[1][0][1]))
diff_result = max_p.union(min_p).reduceByKey(lambda x,y:round((x-y),2)).sortBy(lambda x: x[1])

# Compute the max month and the min month
max_mon = temp4.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey().map(lambda x: (x[0],sorted(x[1],key=lambda a:a[1],reverse=True)))\
               .map(lambda x: (x[0], (x[1][0][0],round(x[1][0][1],2))))
min_mon = temp4.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey().map(lambda x: (x[0],sorted(x[1],key=lambda a:a[1],reverse=False)))\
               .map(lambda x: (x[0], (x[1][0][0],round(x[1][0][1],2))))

# Combine the result together, sorting by ascending order
result = max_mon.join(min_mon).join(diff_result).sortBy(lambda x: x[1][1])

final = result.collect()

In [9]:
# Write the header of the result
with open(output_path, 'w') as file:
    file.write('{:<10s}  {:<10s} {:<10s} {:<10s} {:<10s} {:<10s}\r' \
                    .format('STATE', 'max_month', 'max_prcp', 'min_month', 'min_prcp', 'diff'))
    
# Write the result
for x in final:
    with open(output_path, mode='a') as file:
        file.write('{:<10s}  {:<10s} {:<10s} {:<10s} {:<10s} {:<10s}\r' \
                   .format(str(x[0]), str(x[1][0][0][0]), str(x[1][0][0][1]/4), str(x[1][0][1][0]), str(x[1][0][1][1]/4), str(x[1][1]/4)))