<a href="https://colab.research.google.com/github/LakhiCharanMahato/Weather-Analysis-across-various-states-of-North-America/blob/master/New%20York/NY_note1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing Requirements

**Ensure that Edit-->Hardware Accelertor(GPU) is enabled.**

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()

# Library Requirements

**On the left hand side the storage icon is the local storage for Google colab.
Make a folder name lib there and upload all the items from Weather-Analysis-across-various-states-of-North-America/New York/lib/**

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


# Dealing with nans

In [5]:
import numpy as np
a=np.array([1,np.nan,2,np.nan,3,4,5])
print('a=',a)
print('np.mean(a)=',np.mean(a))
print('np.mean(np.nan_to_num(a))=',np.mean(np.nan_to_num(a))) # =(1+0+2+0+3+4+5)/7
print('np.nanmean(a)=',np.nanmean(a)) # =(1+2+3+4+5)/5

a= [ 1. nan  2. nan  3.  4.  5.]
np.mean(a)= nan
np.mean(np.nan_to_num(a))= 2.142857142857143
np.nanmean(a)= 3.0


In [6]:
np.outer(a,a)

array([[ 1., nan,  2., nan,  3.,  4.,  5.],
       [nan, nan, nan, nan, nan, nan, nan],
       [ 2., nan,  4., nan,  6.,  8., 10.],
       [nan, nan, nan, nan, nan, nan, nan],
       [ 3., nan,  6., nan,  9., 12., 15.],
       [ 4., nan,  8., nan, 12., 16., 20.],
       [ 5., nan, 10., nan, 15., 20., 25.]])

# Computing PCA using RDD

Computing PCA involves 2 steps:
1. Computing covariance matrix.
2. Computing the eigen vector decomposition.

In [7]:
import os
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

from pyspark import SparkContext,SparkConf

def create_sc(pyFiles):
    sc_conf = SparkConf()
    sc_conf.setAppName("Weather_PCA")
    sc_conf.set('spark.executor.memory', '3g')
    sc_conf.set('spark.executor.cores', '1')
    sc_conf.set('spark.cores.max', '4')
    sc_conf.set('spark.default.parallelism','10')
    sc_conf.set('spark.logConf', True)
    print(sc_conf.getAll())

    sc = SparkContext(conf=sc_conf,pyFiles=pyFiles)

    return sc 

sc = create_sc(pyFiles=['lib/numpy_pack.py','lib/spark_PCA.py','lib/computeStatistics.py'])

dict_items([('spark.app.name', 'Weather_PCA'), ('spark.executor.memory', '3g'), ('spark.executor.cores', '1'), ('spark.cores.max', '4'), ('spark.default.parallelism', '10'), ('spark.logConf', 'True')])


In [0]:
from pyspark.sql import *
sqlContext = SQLContext(sc)

import numpy as np
from lib.computeStatistics import *

In [9]:
state='NY'
EMR=False
if not EMR:
    data_dir='Data/Weather'
    tarname=state+'.tgz'
    parquet=state+'.parquet'
    %mkdir -p $data_dir

    !rm -rf $data_dir/$tarname

    command="curl https://mas-dse-open.s3.amazonaws.com/Weather/by_state/%s > %s/%s"%(tarname,data_dir,tarname)
    print(command)
    !$command
    !ls -lh $data_dir/$tarname

    cur_dir,=!pwd
    %cd $data_dir
    !tar -xzf $tarname
    !du ./$parquet
    %cd $cur_dir

curl https://mas-dse-open.s3.amazonaws.com/Weather/by_state/NY.tgz > Data/Weather/NY.tgz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 63.2M  100 63.2M    0     0  12.8M      0  0:00:04  0:00:04 --:--:-- 15.2M
-rw-r--r-- 1 root root 64M Mar 26 09:27 Data/Weather/NY.tgz
/content/Data/Weather
77828	./NY.parquet
/content


In [0]:
if EMR:  # not debugged, should use complete parquet and extract just the state of interest.
    data_dir='/mnt/workspace/Data'
    !hdfs dfs -mkdir /weather/
    !hdfs dfs -CopyFromLocal $data_dir/$parquet /weather/$parquet

    # When running on cluster
    #!mv ../../Data/Weather/NY.parquet /mnt/workspace/Data/NY.parquet

    !aws s3 cp --recursive --quiet /mnt/workspace/Data/NY.parquet s3://dse-weather/NY.parquet

    !aws s3 ls s3://dse-weather/

    local_path=data_dir+'/'+parquet
    hdfs_path='/weather/'+parquet
    local_path,hdfs_path

    !hdfs dfs -copyFromLocal $local_path $hdfs_path

    !hdfs dfs -du /weather/
    parquet_path=hdfs_path

# Data Cleaning

In [11]:
parquet_path = data_dir+'/'+parquet
!du -sh $parquet_path

77M	Data/Weather/NY.parquet


In [12]:
%%time
df=sqlContext.read.parquet(parquet_path)
print(df.count())
df.show(5)

168398
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1946|[99 46 52 46 0B 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1947|[79 4C 75 4C 8F 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1948|[72 48 7A 48 85 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517

In [0]:
sqlContext.registerDataFrameAsTable(df,'table')

In [14]:
Query="""
SELECT Measurement,count(Measurement) as count 
FROM table 
GROUP BY Measurement
ORDER BY count
"""
counts=sqlContext.sql(Query)
counts.show()

+-----------+-----+
|Measurement|count|
+-----------+-----+
|   TOBS_s20|10956|
|       TOBS|10956|
|   TMAX_s20|13437|
|       TMAX|13437|
|   TMIN_s20|13442|
|       TMIN|13442|
|   SNWD_s20|14617|
|       SNWD|14617|
|       SNOW|15629|
|   SNOW_s20|15629|
|   PRCP_s20|16118|
|       PRCP|16118|
+-----------+-----+



In [15]:
from time import time
t=time()

N=sc.defaultParallelism
print('Number of executors=',N)
print('took',time()-t,'seconds')

Number of executors= 10
took 0.0050945281982421875 seconds


In [16]:
!ls lib

computeStatistics.py  MultiPlot.py   __pycache__	      spark_PCA.py
decomposer.py	      numpy_pack.py  Reconstruction_plots.py  YearPlotter.py


# Computing Covariance Matrix

In [0]:
# %load lib/spark_PCA.py
import numpy as np
from numpy import linalg as LA

def outerProduct(X):
    """Computer outer product and indicate which locations in matrix are undefined"""
    O=np.outer(X,X)
    N=1-np.isnan(O)
    return (O,N)

def sumWithNan(M1,M2):
    """Add two pairs of (matrix,count)"""
    (X1,N1)=M1
    (X2,N2)=M2
    N=N1+N2
    X=np.nansum(np.dstack((X1,X2)),axis=2)
    return (X,N)

def HW_func(S,N):
    E= S[1:,0]                              # E is the sum of the vectors
    NE= np.array(N[1:,0],dtype=np.float)    # NE is the number of not-nan antries for each coordinate of the vectors
    Mean= E/NE                              # Mean is the Mean vector (ignoring nans)
    O= S[1:,1:]                             # O is the sum of the outer products
    NO= np.array(N[1:,1:],dtype=np.float)   # NO is the number of non-nans in the outer product.
    return  E,NE,Mean,O,NO

In [0]:
def computeCov(RDDin):
    """computeCov recieves as input an RDD of np arrays, all of the same length, 
    and computes the covariance matrix for that set of vectors"""
    RDD=RDDin.map(lambda v:np.array(np.insert(v,0,1),dtype=np.float64)) # insert a 1 at the beginning of each vector so that the same 
                                           #calculation also yields the mean vector
    OuterRDD=RDD.map(outerProduct)   # separating the map and the reduce does not matter because of Spark uses lazy execution.
    (S,N)=OuterRDD.reduce(sumWithNan)

    E,NE,Mean,O,NO=HW_func(S,N)

    Cov=O/NO - np.outer(Mean,Mean)
    # Output also the diagnal which is the variance for each day
    Var=np.array([Cov[i,i] for i in range(Cov.shape[0])])
    return {'E':E,'NE':NE,'O':O,'NO':NO,'Cov':Cov,'Mean':Mean,'Var':Var}

In [19]:
if __name__=="__main__":
    # create synthetic data matrix with j rows and rank k
    
    V=2*(np.random.random([2,10])-0.5)
    data_list=[]
    for i in range(1000):
        f=2*(np.random.random(2)-0.5)
        data_list.append(np.dot(f,V))
    # compute covariance matrix
    RDD=sc.parallelize(data_list)
    OUT=computeCov(RDD)

    #find PCA decomposition
    eigval,eigvec=LA.eig(OUT['Cov'])
    print('eigval=',eigval)
    print('eigvec=',eigvec)

eigval= [ 1.36685487e+00+0.00000000e+00j  2.29367848e-01+0.00000000e+00j
 -8.10105924e-17+7.96382746e-17j -8.10105924e-17-7.96382746e-17j
  7.88851209e-17+0.00000000e+00j  5.39699625e-17+0.00000000e+00j
 -3.67980520e-17+0.00000000e+00j  1.58193311e-17+7.41164318e-18j
  1.58193311e-17-7.41164318e-18j -6.36689236e-18+0.00000000e+00j]
eigvec= [[-0.35780548+0.j         -0.01518794+0.j          0.38379663-0.23286603j
   0.38379663+0.23286603j  0.458165  +0.j         -0.1562244 +0.j
   0.02236314+0.j         -0.0263539 +0.19156201j -0.0263539 -0.19156201j
  -0.06512137+0.j        ]
 [ 0.16635764+0.j         -0.18593124+0.j          0.64604164+0.j
   0.64604164-0.j         -0.52884859+0.j          0.18917869+0.j
  -0.08050918+0.j         -0.2490146 -0.14885252j -0.2490146 +0.14885252j
  -0.24879068+0.j        ]
 [ 0.10596097+0.j          0.41570014+0.j          0.16368015-0.02437838j
   0.16368015+0.02437838j  0.25671615+0.j          0.15271127+0.j
  -0.38804382+0.j          0.06913741-0.0967

# Computing the eigen vector decomposition

In [20]:
%%writefile lib/tmp
# %load lib/computeStatistics.py


from numpy import linalg as LA
import numpy as np

from numpy_pack import packArray,unpackArray
from spark_PCA import computeCov
from time import time

def computeStatistics(sqlContext,df):
    """Compute all of the statistics for a given dataframe
    Input: sqlContext: to perform SQL queries
            df: dataframe with the fields 
            Station(string), Measurement(string), Year(integer), Values (byteArray with 365 float16 numbers)
    returns: STAT, a dictionary of dictionaries. First key is measurement, 
             second keys described in computeStats.STAT_Descriptions
    """

    sqlContext.registerDataFrameAsTable(df,'weather')
    STAT={}  # dictionary storing the statistics for each measurement
    measurements=['TMAX', 'SNOW', 'SNWD', 'TMIN', 'PRCP', 'TOBS']
    
    for meas in measurements:
        t=time()
        Query="SELECT * FROM weather\n\tWHERE measurement = '%s'"%(meas)
        mdf = sqlContext.sql(Query)
        print(meas,': shape of mdf is ',mdf.count())

        data=mdf.rdd.map(lambda row: unpackArray(row['Values'],np.float16))

        #Compute basic statistics
        STAT[meas]=computeOverAllDist(data)   # Compute the statistics 

        # compute covariance matrix
        OUT=computeCov(data)

        #find PCA decomposition
        eigval,eigvec=LA.eig(OUT['Cov'])

        # collect all of the statistics in STAT[meas]
        STAT[meas]['eigval']=eigval
        STAT[meas]['eigvec']=eigvec
        STAT[meas].update(OUT)

        print('time for',meas,'is',time()-t)
    
    return STAT

# Compute the overall distribution of values and the distribution of the number of nan per year
def find_percentiles(SortedVals,percentile):
    L=int(len(SortedVals)/percentile)
    return SortedVals[L],SortedVals[-L]
  
def computeOverAllDist(rdd0):
    UnDef=np.array(rdd0.map(lambda row:sum(np.isnan(row))).sample(False,0.01).collect())
    flat=rdd0.flatMap(lambda v:list(v)).filter(lambda x: not np.isnan(x)).cache()
    count,S1,S2=flat.map(lambda x: np.float64([1,x,x**2]))\
                  .reduce(lambda x,y: x+y)
    mean=S1/count
    std=np.sqrt(S2/count-mean**2)
    Vals=flat.sample(False,0.0001).collect()
    SortedVals=np.array(sorted(Vals))
    low100,high100=find_percentiles(SortedVals,100)
    low1000,high1000=find_percentiles(SortedVals,1000)
    return {'UnDef':UnDef,\
          'mean':mean,\
          'std':std,\
          'SortedVals':SortedVals,\
          'low100':low100,\
          'high100':high100,\
          'low1000':low100,\
          'high1000':high1000
          }

# description of data returned by computeOverAllDist
STAT_Descriptions=[
('SortedVals', 'Sample of values', 'vector whose length varies between measurements'),
 ('UnDef', 'sample of number of undefs per row', 'vector whose length varies between measurements'),
 ('mean', 'mean value', ()),
 ('std', 'std', ()),
 ('low100', 'bottom 1%', ()),
 ('high100', 'top 1%', ()),
 ('low1000', 'bottom 0.1%', ()),
 ('high1000', 'top 0.1%', ()),
 ('E', 'Sum of values per day', (365,)),
 ('NE', 'count of values per day', (365,)),
 ('Mean', 'E/NE', (365,)),
 ('O', 'Sum of outer products', (365, 365)),
 ('NO', 'counts for outer products', (365, 365)),
 ('Cov', 'O/NO', (365, 365)),
 ('Var', 'The variance per day = diagonal of Cov', (365,)),
 ('eigval', 'PCA eigen-values', (365,)),
 ('eigvec', 'PCA eigen-vectors', (365, 365))
]




Writing lib/tmp


In [21]:
%%time 
### This is the main cell, where all of the statistics are computed.
STAT=computeStatistics(sqlContext,df)

TMAX : shape of mdf is  13437
time for TMAX is 119.1714699268341
SNOW : shape of mdf is  15629
time for SNOW is 138.20228505134583
SNWD : shape of mdf is  14617
time for SNWD is 124.44392347335815
TMIN : shape of mdf is  13442
time for TMIN is 117.39819121360779
PRCP : shape of mdf is  16118
time for PRCP is 134.4644582271576
TOBS : shape of mdf is  10956
time for TOBS is 90.82817363739014
CPU times: user 625 ms, sys: 512 ms, total: 1.14 s
Wall time: 12min 4s


In [22]:
print("   Name  \t                 Description             \t  Size")
print("-"*80)
print('\n'.join(["%10s\t%40s\t%s"%(s[0],s[1],str(s[2])) for s in STAT_Descriptions]))

   Name  	                 Description             	  Size
--------------------------------------------------------------------------------
SortedVals	                        Sample of values	vector whose length varies between measurements
     UnDef	      sample of number of undefs per row	vector whose length varies between measurements
      mean	                              mean value	()
       std	                                     std	()
    low100	                               bottom 1%	()
   high100	                                  top 1%	()
   low1000	                             bottom 0.1%	()
  high1000	                                top 0.1%	()
         E	                   Sum of values per day	(365,)
        NE	                 count of values per day	(365,)
      Mean	                                    E/NE	(365,)
         O	                   Sum of outer products	(365, 365)
        NO	               counts for outer products	(365, 365)
       Cov	                

In [23]:
## Dump STAT and STST_Descriptions into a pickle file.
from pickle import dump

filename=data_dir+'/STAT_%s.pickle'%state
dump((STAT,STAT_Descriptions),open(filename,'wb'))
!ls -l $data_dir

total 89824
drwxrwxr-x 2  498  500     4096 Apr 19  2018 NY.parquet
-rw-r--r-- 1 root root 66288146 Mar 26 09:27 NY.tgz
-rw-r--r-- 1 root root 25684418 Mar 26 09:40 STAT_NY.pickle
