In [1]:
import os
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import pyspark.sql.types as T

os.getcwd()
os.chdir("/mnt/workspace/Public-DSC291/notebooks/Section2-PCA/Full_Dataset_analysis")
os.getcwd()

from Startup import *

finished standard imports
dict_items([('spark.logConf', 'True'), ('spark.executor.memory', '3g'), ('spark.default.parallelism', '10'), ('spark.app.name', 'Weather_PCA'), ('spark.cores.max', '4'), ('spark.executor.cores', '1')])
started SparkContext and SQLContext in 14.57 seconds
loaded weather.parquet in 8.22 seconds
loaded stations.parquet in 0.24 seconds
registered dataframes as tables in 0.24 seconds


In [3]:
%%time
query1="""
SELECT stations.ID, stations.state, stations.latitude, stations.longitude,
        weather.Measurement, weather.Year, weather.Values
FROM stations
LEFT JOIN weather
ON stations.ID = weather.Station
WHERE stations.state <> '' AND weather.Measurement='PRCP'
"""

query2="""
SELECT stations.ID, stations.state, stations.latitude, stations.longitude,
        weather.Measurement, weather.Year, weather.Values
FROM stations
LEFT JOIN weather
ON stations.ID = weather.Station
WHERE stations.state <> '' AND weather.Measurement='SNOW'
"""

PRCP_df = sqlContext.sql(query1)
SNOW_df = sqlContext.sql(query2)
sqlContext.registerDataFrameAsTable(PRCP_df,'PRCP_df')
sqlContext.registerDataFrameAsTable(SNOW_df,'SNOW_df')

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 88.2 ms


In [53]:
query="""
SELECT stations.ID, stations.state, stations.latitude, stations.longitude,
        weatherA.Year, weatherA.Values as PRCP, weatherB.Values as SNOW
FROM stations, weather as weatherA, weather as weatherB
WHERE stations.state <> '' AND 
        stations.ID = weatherA.Station AND
        stations.ID = weatherB.Station AND
        weatherA.Year = weatherB.Year AND
        weatherA.Measurement = 'PRCP' AND
        weatherB.Measurement = 'SNOW'
"""

Weather_df = sqlContext.sql(query)
sqlContext.registerDataFrameAsTable(Weather_df, 'Weather_df')

In [54]:
#Filter for certain dates
filter_year = 1945
Weather_df = Weather_df.filter(Weather_df.Year > filter_year)

#Filter for only stations with data in the full range
cnt_bystation = Weather_df.groupBy('ID').count()
Weather_df = Weather_df.join(cnt_bystation, "ID")
mx = cnt_bystation.agg({"count": "max"}).collect()[0]["max(count)"]
Weather_df = Weather_df.filter(Weather_df['count'] == mx)
Weather_pdf = Weather_df.toPandas()

In [55]:
def f1(row, col):
    return row[col][:362]

def f2(row, col):
    return row[col][362:]

def Jan2Jun(df, col):
    df_first = df.copy()
    
    # df_first is same as df except for its ['Values'] column only contains the 
    # the first half year data.
    df_first['V_first'] = df_first.apply(f1, axis=1, args=[col])
    
    # subtract df_first 'Year' column by 1, because we want to join originally 1998 Jan-Jun to 1997 July-Dec,
    # as 1997 data, so minus 1998 by 1 to match 1997.
    df_first['Year'] = df_first.apply(lambda x: x['Year']-1, axis=1)
    
    # df_second is same as df except for its ['Values'] column only contains the 
    # the second half year data.
    df_second = df.copy()
    df_second['V_second'] = df_second.apply(f2, axis=1, args=[col])
    
    # drop the inrrelevant columns of df_second so that join can be easier later. I only kept 'ID' and 'Year'
    # that are used to be joined on later.
    df_second = df_second.drop(['state', 'latitude', 'SNOW', 'PRCP',
                                'longitude', 'count', col], axis=1)
    
    # join df_first and df_second on ID, Year.
    result = pd.merge(df_first, df_second, on=['ID', 'Year'], how='inner')
    
    # generate the thansformed data, e.g. 1997 July - 1998 June from 'V_second' (1997 July - 1997 Dec) and 
    # 'V_first' (1998 Jan - 1998 June)
    result[col] = result.apply(lambda x: x['V_second'] + x['V_first'], axis=1)
    
    # drop V_second and V_first as they are useless now.
    return result.drop(['V_second', 'V_first'], axis=1)


In [73]:
New_Weather_pdf = Jan2Jun(Weather_pdf, 'PRCP')
New_Weather_pdf = Jan2Jun(New_Weather_pdf, 'SNOW')
del New_Weather_pdf['count']
PRCP = [np.nansum(unpackArray(v,np.float16)) for v in list(New_Weather_pdf['PRCP'])]
SNOW = [np.nansum(unpackArray(v,np.float16)) for v in list(New_Weather_pdf['SNOW'])]
New_Weather_pdf['PRCP'] = PRCP
New_Weather_pdf['SNOW'] = SNOW
New_Weather_pdf.sample(5)

Unnamed: 0,ID,state,latitude,longitude,Year,PRCP,SNOW
3929,USC00201675,MI,41.9622,-84.9925,1976,9344.0,2152.0
63406,USC00342912,OK,36.4194,-97.8747,2005,9936.0,392.0
50717,USC00241974,MT,48.1794,-111.9614,1996,2516.0,676.0
2959,USC00267369,NV,35.4661,-114.9217,2002,2168.0,0.0
9224,USW00014923,IL,41.4653,-90.5233,1953,9360.0,1172.0


In [74]:
# Filter stations only in US
url = 'https://raw.githubusercontent.com/python-visualization/folium/master/examples/data'
df_ElNinoYears = pd.read_csv(url + '/US_Unemployment_Oct2012.csv')
US_states = list(df_ElNinoYears['State'])
pick = [New_Weather_pdf['state'][i] in US_states for i in New_Weather_pdf.index]
New_Weather_pdf = New_Weather_pdf[pick]

In [83]:
ElNinoYears = [1951, 1957, 1965, 1968, 1972, 1977, 1982, 1991, 1997]
data = []
for station in set(New_Weather_pdf['ID']):
    temp_pdf = New_Weather_pdf[New_Weather_pdf['ID']==station]
    temp_pdf.index = range(len(temp_pdf))
    state = temp_pdf['state'][0]
    lat = temp_pdf['latitude'][0]
    lon = temp_pdf['longitude'][0]
    pick_EN = np.array([temp_pdf['Year'][i] in ElNinoYears for i in temp_pdf.index])
    PRCP_EN =temp_pdf[pick_EN]['PRCP'].mean()
    PRCP_NEN = temp_pdf[~pick_EN]['PRCP'].mean()
    SNOW_EN = temp_pdf[pick_EN]['SNOW'].mean()
    SNOW_NEN = temp_pdf[~pick_EN]['SNOW'].mean()
    data.append([station, state, lat, lon, PRCP_EN, PRCP_NEN, SNOW_EN, SNOW_NEN])
    
df = pd.DataFrame(data, columns=['station', 'state', 'latitude', 
                                 'longitude', 'PRCP_EN', 'PRCP_NEN', 
                                 'SNOW_EN', 'SNOW_NEN'])
df

Unnamed: 0,station,state,latitude,longitude,PRCP_EN,PRCP_NEN,SNOW_EN,SNOW_NEN
0,USC00416747,TX,31.5206,-99.9264,5159.333333,5823.859649,38.111111,80.666667
1,USC00120877,IN,39.3853,-87.0400,11625.777778,10734.035088,484.444444,375.684211
2,USC00355139,OR,44.6167,-121.0011,2691.444444,2402.052632,355.888889,233.982456
3,USC00391579,SD,43.0431,-96.9033,7213.777778,5984.947368,883.444444,727.122807
4,USW00013996,KS,39.0725,-95.6261,9421.777778,8695.087719,682.222222,467.140351
5,USC00410120,TX,32.7047,-99.3011,6497.111111,6671.789474,38.333333,69.280702
6,USC00478027,WI,45.8236,-91.8761,7973.333333,7286.421053,1274.777778,1239.561404
7,USC00471064,WI,44.1614,-88.0803,7504.888889,6981.684211,1044.111111,937.052632
8,USW00023050,NM,35.0419,-106.6156,2120.444444,2165.771930,273.222222,252.105263
9,USC00293649,NM,34.2594,-106.0931,3708.000000,3828.070175,429.777778,487.982456
