### Visualizing weather data

In [1]:
import pandas as pd
import numpy as np
import sklearn as sk
import urllib
import math
%pylab inline

import findspark
findspark.init()

from pyspark import SparkContext
#sc.stop()
sc = SparkContext(master="local[3]",pyFiles=['lib/numpy_pack.py','lib/spark_PCA.py','lib/computeStatistics.py'])

from pyspark import SparkContext
from pyspark.sql import *
import pyspark.sql
sqlContext = SQLContext(sc)

Populating the interactive namespace from numpy and matplotlib


In [2]:
import numpy as np
from lib.numpy_pack import packArray,unpackArray
from spark_PCA import computeCov

In [3]:

from computeStatistics import computeOverAllDist, STAT_Descriptions

### Read weather data for state

In [6]:
state='WA'
data_dir='../Data/Weather'

tarname=state+'.tgz'
parquet=state+'.parquet'

!rm -rf $data_dir/$tarname

command="curl https://mas-dse-open.s3.amazonaws.com/Weather/by_state/%s > %s/%s"%(tarname,data_dir,tarname)
print(command)
!$command
!ls -lh $data_dir/$tarname

curl https://mas-dse-open.s3.amazonaws.com/Weather/by_state/WA.tgz > ../Data/Weather/WA.tgz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.4M  100 61.4M    0     0  11.7M      0  0:00:05  0:00:05 --:--:-- 12.5M 0:00:02 10.1M
-rw-r--r-- 1 jovyan users 62M May  6 09:18 ../Data/Weather/WA.tgz


In [7]:
cur_dir,=!pwd
%cd $data_dir
!tar -xzf $tarname
!du ./$parquet
%cd $cur_dir


weather_df=sqlContext.read.parquet(data_dir+'/'+parquet)
print(weather_df.count())
weather_df.show(5)

/home/jovyan/work/Sections/Data/Weather
76112	./WA.parquet
/home/jovyan/work/Sections/Section2-Weather-PCA
177336
+-----------+-----------+----+--------------------+-----------------+-----------------+-------------------+-----------------+-----+----------+
|    Station|Measurement|Year|              Values|       dist_coast|         latitude|          longitude|        elevation|state|      name|
+-----------+-----------+----+--------------------+-----------------+-----------------+-------------------+-----------------+-----+----------+
|USC00459342|       PRCP|1901|[00 7E 00 7E 00 7...|126.7229995727539|45.79999923706055|-121.93329620361328|351.1000061035156|   WA|WIND RIVER|
|USC00459342|       PRCP|1906|[00 7E 00 7E 00 7...|126.7229995727539|45.79999923706055|-121.93329620361328|351.1000061035156|   WA|WIND RIVER|
|USC00459342|       PRCP|1907|[00 7E 00 7E 00 7...|126.7229995727539|45.79999923706055|-121.93329620361328|351.1000061035156|   WA|WIND RIVER|
|USC00459342|       PRCP|193

### read statistics information for state.

In [8]:
!aws s3 ls mas-dse-open/Weather/by_state/ | grep STAT

/bin/sh: 1: aws: not found


In [None]:
#read statistics
filename='STAT_%s.pickle'%state
command="curl https://mas-dse-open.s3.amazonaws.com/Weather/by_state/%s.gz > %s/%s.gz"%(filename,data_dir,filename)
print(command)
!$command

In [None]:
gzpath='%s/%s.gz'%(data_dir,filename)
print(gzpath)
!ls -l $gzpath
!gunzip -f $gzpath
STAT,STAT_Descriptions = load(open(data_dir+'/'+filename,'rb'))
print('keys from STAT=',STAT.keys())

### Read information about US weather stations.

In [None]:
filename='US_stations.tsv.gz'
command="curl https://mas-dse-open.s3.amazonaws.com/Weather/Info/%s > %s/%s"%(filename,data_dir,filename)
print(command)
!$command
filename_no_gz = filename[:-3]
!gunzip -f $data_dir/$filename
!ls -lh $data_dir/US_stations*

In [None]:
#read csv into pandas dataframe
PATH='%s/%s'%(data_dir,filename_no_gz)
print(PATH)
stations=pd.read_csv(PATH,sep='\t')

In [None]:
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType, FloatType,BinaryType

schema = StructType([StructField("Station", StringType(), False),
                     StructField("Dist_coast", FloatType(), False),
                     StructField("Latitude", FloatType(), False),
                     StructField("Longitude", FloatType(), False),
                     StructField("Elevation", FloatType(), False),
                     StructField("State", StringType(), True),                  
                     StructField("Name", StringType(), False)])                


In [None]:
stations_df = sqlContext.createDataFrame(stations,schema).drop('State')
stations_df.show(4)

In [None]:
jdf=weather_df.join(stations_df,on='Station',how='left')
jdf.show(4)

In [None]:
jdf.select(['Station','Measurement','Year','Name']).show(4)

In [None]:
jdf.count()

In [None]:
sqlContext.registerDataFrameAsTable(jdf,'jdf')

In [None]:
# get all measurements for a particular year and a particular station
measurement='PRCP'
Query="""
SELECT *
FROM jdf 
WHERE Measurement='%s'
AND Name='BUFFALO'
ORDER BY YEAR"""%(measurement)
df=sqlContext.sql(Query)
print(df.count())
df.show(3)

In [None]:
df.select(['Station','Year','Values']).schema

### Smoothing by convolving with gaussian window

In [None]:
from astropy.convolution import convolve
from scipy import signal
#using astrophy.convolution.convolve and not scipy.signal.convolve because the first can handle nans.

orig_pdf=df.toPandas()
orig_pdf.head()

def Smoother(orig_pdf,order=101,std=20):
    window = signal.gaussian(order, std=std)
    window/=sum(window)

    L=list(orig_pdf['Values'])

    orig=np.stack([unpackArray(V,np.float16) for V in L])
    orig_shape=orig.shape
    orig=orig.flatten()

    smoothed = convolve(orig, window)
    smoothed=np.reshape(smoothed,orig_shape)

    #create a new pandas dataframe
    smoothed_pdf=orig_pdf.copy()   # make a copy

    L=[packArray(smoothed[i,:]) for i in range(smoothed.shape[0])]
    smoothed_pdf['Values']=L

    smoothed_pdf.loc[0,'Measurement']

    new_name = 'smooth_'+smoothed_pdf.loc[0,'Measurement']
    smoothed_pdf['Measurement']=new_name
    return smoothed_pdf

In [None]:
smoothed_pdf=Smoother(orig_pdf)
smoothed_pdf.head(3)

In [None]:
Query="""
SELECT Station,count(Year) as count
FROM jdf 
WHERE Measurement='%s'
GROUP BY Station
ORDER BY count
"""%(measurement)
stat_pdf=sqlContext.sql(Query).toPandas()
stat_pdf.tail(4)

In [None]:
stations=list(stat_pdf['Station'])
stations=stations[-1:0:-1]
stations[:4]

In [None]:
import pyspark.sql.functions as sqlf
import pyspark
import pyarrow
pyspark.__version__

In [None]:
ndf=Smoother(orig_pdf)
ndf.head(3)

In [None]:
from time import time
# get all measurements for a particular year and a particular station
measurement='PRCP'
Query_template="""
SELECT *
FROM jdf 
WHERE Measurement='%s'
AND Station='%s'
ORDER BY YEAR"""

for station in stations:
    t0=time()
    Query=Query_template%(measurement,station)

    pdf=sqlContext.sql(Query).toPandas()
    t1=time()
    smoothed_pdf=Smoother(pdf)
    t2=time()
    smoothed_df= sqlContext.createDataFrame(smoothed_pdf)
    jdf=jdf.union(smoothed_df)
    t3=time()
    print('Station=%s, rows=%d, prep=%5.2f,compute=%5.2f,cleanup=%f5.2,total=%f5.2'
          %(station,pdf.shape[0],t1-t0,t2-t1,t3-t2,t3-t0))

In [None]:
jdf.count()

In [None]:
!ls ../../Data/Weather/

In [None]:
outfilename='../../Data/Weather/Joined_smoothed_PRCP.parquet'
jdf.write.save(outfilename)

In [None]:
!du -sh $outfilename

In [None]:
print(pdf.columns)
sdf = sqlContext.createDataFrame(pdf)
sdf.schema

In [None]:
# 'smoothed_%s'%(station),
sdf.count()

In [None]:
jdf.count()

In [None]:
jdf=jdf.union(sdf)
jdf.count()

## BinaryType not supported  by pandas_udf
Running the following code: 
```python
import pyspark.sql.functions as sqlf
import pyspark
import pyarrow
pyspark.__version__  (2.3.0)

from pyspark.sql.functions import pandas_udf, PandasUDFType
def Smoother(orig_pdf):
    return orig_pdf

### Offending command
smoother_udf=pandas_udf(Smoother,df.select(['Station','Year','Values']).schema, PandasUDFType.GROUPED_MAP) 

X=df.groupby("Station").apply(smoother_udf)
X.show()
```
Generates the following error message
```
NotImplementedError: Invalid returnType with grouped map Pandas UDFs: StructType(List(StructField(Station,StringType,true),StructField(Year,IntegerType,true),StructField(Values,BinaryType,true))) is not supported
```

Works find if only ('Station','Year') are used

In [None]:
orig_df.schema

In [None]:
from lib.YearPlotter import YearPlotter
fig, ax = plt.subplots(figsize=(10,7));
YP=YearPlotter()
YP.plot(smoothed[110:120,:].transpose(),fig,ax,title='smoothed %s for %s'%(measurement,stat));
plt.savefig('percipitation.png')
#title('A sample of graphs');

In [None]:
fig, ax = plt.subplots(figsize=(10,7));
YP=YearPlotter()
i=85
factor=5
pair=np.stack([orig[i,:],smoothed[i,:]*factor])
pair.shape

YP.plot(pair.transpose(),fig,ax,title='smoothed %s for %s'%(measurement,stat));

In [None]:
from scipy import signal
from astropy.convolution import convolve
window = signal.gaussian(81, std=20)

window/=sum(window)

In [None]:
P=T[3,:]
P[10:30]=np.nan
f=filtered = convolve(P, window)
print(len(f))
plot(f)
plot(P)

### Distribution of missing observations
The distribution of missing observations is not uniform throughout the year. We visualize it below.

In [None]:
from MultiPlot import *                
def plot_valid(m,fig,axis):
    valid_m=STAT[m]['NE']
    YP.plot(valid_m,fig,axis,title='valid-counts '+m)
    

In [None]:
plot_pair(['TMIN','TMAX'],plot_valid)

In [None]:
plot_pair(['TOBS','PRCP'],plot_valid)

In [None]:
plot_pair(['SNOW', 'SNWD'],plot_valid)

### Plots of mean and std of observations

In [None]:
def plot_mean_std(m,fig,axis):
    scale=1.
    temps=['TMIN','TMAX','TOBS']
    percipitation=['PRCP','SNOW','SNWD']
    _labels=['mean+std','mean','mean-std']
    if (m in temps or m=='PRCP'):
        scale=10.
    mean=STAT[m]['Mean']/scale
    std=np.sqrt(STAT[m]['Var'])/scale
    graphs=np.vstack([mean+std,mean,mean-std]).transpose()
    YP.plot(graphs,fig,axis,labels=_labels,title='Mean+-std   '+m)
    if (m in temps):
        axis.set_ylabel('Degrees Celsius')
    if (m in percipitation):
        axis.set_ylabel('millimeter')



In [None]:
plot_pair(['TMIN','TMAX'],plot_mean_std)

In [None]:
plot_pair(['TOBS','PRCP'],plot_mean_std)

In [None]:
plot_single('TOBS',plot_mean_std,'r_figures/TOBS.png')

In [None]:
plot_pair(['SNOW', 'SNWD'],plot_mean_std)

In [None]:
plot_single('SNOW',plot_mean_std,'r_figures/SNOW.png')

In [None]:
plot_single('SNWD',plot_mean_std,'r_figures/SNWD.png')

### plotting top 3 eigenvectors

In [None]:
def plot_eigen(m,fig,axis):
    EV=STAT[m]['eigvec']
    YP.plot(EV[:,:3],fig,axis,title='Top Eigenvectors '+m)

In [None]:
plot_pair(['TMIN','TMAX'],plot_eigen)

In [None]:
plot_pair(['TOBS','PRCP'],plot_eigen)

In [None]:
plot_pair(['SNOW', 'SNWD'],plot_eigen)

### Script for plotting percentage of variance explained

In [None]:
def pltVarExplained(j):
    subplot(1,3,j)
    EV=STAT[m]['eigval']
    k=5
    L=([0,]+list(cumsum(EV[:k])))/sum(EV)
    #print m,L
    plot(L)
    title('Percentage of Variance Explained for '+ m)
    ylabel('Percentage of Variance')
    xlabel('# Eigenvector')
    grid()
    

In [None]:
f=plt.figure(figsize=(15,4))
j=1
for m in ['TMIN', 'TOBS', 'TMAX']: #,
    pltVarExplained(j)
    j+=1
f.savefig('r_figures/VarExplained1.png')

In [None]:
f=plt.figure(figsize=(15,4))
j=1
for m in ['SNOW', 'SNWD', 'PRCP']:
    pltVarExplained(j)
    j+=1 
f.savefig('r_figures/VarExplained2.png')

In [None]:
#sc.stop()