In [1]:
import numpy as np

In [2]:
arr = np.array([1, np.nan, 2, np.nan, 3, 4, 5])
arr

array([ 1., nan,  2., nan,  3.,  4.,  5.])

In [3]:
np.mean(arr)

nan

In [4]:
np.mean(np.nan_to_num(arr))

2.142857142857143

In [5]:
np.nanmean(arr)

3.0

In [6]:
np.outer(arr, arr)

array([[ 1., nan,  2., nan,  3.,  4.,  5.],
       [nan, nan, nan, nan, nan, nan, nan],
       [ 2., nan,  4., nan,  6.,  8., 10.],
       [nan, nan, nan, nan, nan, nan, nan],
       [ 3., nan,  6., nan,  9., 12., 15.],
       [ 4., nan,  8., nan, 12., 16., 20.],
       [ 5., nan, 10., nan, 15., 20., 25.]])

In [7]:
from pyspark import SparkContext
sc = SparkContext(master="local[2]")

In [8]:
from pyspark.sql import *
sqlContext = SQLContext(sc)

In [9]:
from numpy import linalg

In [10]:
ny_df = sqlContext.read.parquet("../Data/NY.parquet/")
print("Total count : ", ny_df.count())
ny_df.show(5)

Total count :  168398
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|    Station|Measurement|Year|              Values|       dist_coast|      latitude|         longitude|        elevation|state|             name|
+-----------+-----------+----+--------------------+-----------------+--------------+------------------+-----------------+-----+-----------------+
|USW00094704|   PRCP_s20|1945|[00 00 00 00 00 0...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1946|[99 46 52 46 0B 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1947|[79 4C 75 4C 8F 4...|361.8320007324219|42.57080078125|-77.71330261230469|208.8000030517578|   NY|DANSVILLE MUNI AP|
|USW00094704|   PRCP_s20|1948|[72 48 7A 48 85 4...|361.8320007324219|42.57080078125|-77.71330261230469

In [11]:
sqlContext.registerDataFrameAsTable(ny_df, "new_york_weather")

In [12]:
query = """
SELECT measurement, COUNT(measurement) AS count
FROM new_york_weather
GROUP BY measurement
ORDER BY count
"""

counts = sqlContext.sql(query)
counts.show()

+-----------+-----+
|measurement|count|
+-----------+-----+
|   TOBS_s20|10956|
|       TOBS|10956|
|       TMAX|13437|
|   TMAX_s20|13437|
|       TMIN|13442|
|   TMIN_s20|13442|
|   SNWD_s20|14617|
|       SNWD|14617|
|       SNOW|15629|
|   SNOW_s20|15629|
|   PRCP_s20|16118|
|       PRCP|16118|
+-----------+-----+



In [13]:
# principle component analysis - pca

def outer_product(x):
    """compute outer product and indicate which locations in the matrix are undefined"""
    outer_res = np.outer(x, x)
    nan_count = 1 - np.isnan(outer_res)
    return (outer_res, nan_count)

def sum_with_nan(m1, m2):
    """add two pairs and return (matrix, count)"""
    (x1, n1) = m1
    (x2, n2) = m2
    n = n1 + n2
    x = np.nansum(np.dstack((x1, x2)), axis=2)
    return (x, n)

def data_func(sum_of_matrix, non_nan_num):
    """function to return sample data"""
    # E -> sum of vectors
    E = np.ones([365])
    # NE -> number of not-nan entries for each coordinate of the vectors
    NE = np.ones([365])
    # MEAN -> mean of the vectors ignoring nans
    MEAN = np.ones([365])
    # O -> sum of the outer products
    O = np.ones([365, 365])
    # NO -> number of non-nans in the outer product
    NO = np.ones([365, 365])
    return E, NE, MEAN, O, NO

In [14]:
def compute_cov(RDD):
    """input -> input RDD of numpy arrays (all same length)
       computes -> covariance matrix of the vectors"""
    # insert 1 at the beginning of each vector so that the calculation also yields the mean vector
    rdd = RDD.map(lambda v: np.array(np.insert(v, 0, 1), dtype=np.float64))
    
    outer_rdd = rdd.map(outer_product)
    (S, N) = outer_rdd.reduce(sum_with_nan)
    
    E, NE, MEAN, O, NO = data_func(S, N)
    
    cov_matrix = O / NO - np.outer(MEAN, MEAN)
    var = np.array([cov_matrix[i, i] for i in range(cov_matrix.shape[0])])
    return {
        "E": E,
        "NE": NE,
        "O": O,
        "NO": NO,
        "COV": cov_matrix,
        "MEAN": MEAN,
        "VAR": var
    }

In [15]:
v = 2 * (np.random.random([2, 10]) - 0.5)
# print("vector : ", v)

data_list = []
for i in range(1000):
    f = 2 * (np.random.random(2) - 0.5)
    data_list.append(np.dot(f, v))

# compute covariance matrix
rdd = sc.parallelize(data_list)
out = compute_cov(rdd)

# find PCA decomposition
eig_val, eig_vec = linalg.eig(out["COV"])
print("eigen value = \n", eig_val)
print("eigen vector = \n", eig_vec)

eigen value = 
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0

In [16]:
from time import time

In [17]:
def pack_array(arr):
    """pack a numpy array into a byte-array"""
    if type(arr) != np.ndarray:
        raise Exception("input for pack_array should be numpy.ndarray, instead got {}.".format(str(type(arr))))
    return bytearray(arr.tobytes())

In [18]:
def unpack_array(arr):
    """unpack a byte-array to numpy.ndarray"""
    return np.frombuffer(arr, dtype=np.float16)

In [19]:
def find_percentiles(sorted_values, percentile):
    L = int(len(sorted_values) / percentile)
    return sorted_values[L], sorted_values[-L]

In [39]:
def compute_overall_distribution(RDD):
    """compute the overall distribution of values and distribution of number of nan per year"""
    undef_temp = RDD.map(lambda row: sum(np.isnan(row))).sample(False, 0.01).collect()
    undefined = np.array(undef_temp)
    flat = RDD.flatMap(lambda v: list(v)).filter(lambda x: not np.isnan(x)).cache()
    count, s1, s2 = flat.map(lambda x: np.float64([1, x, x**2])).reduce(lambda x, y: x + y)
    mean = s1 / count
    std = np.sqrt(s2 / count - mean**2)
    vals = flat.sample(False, 0.0001).collect()
    sorted_values = np.array(sorted(vals))
    low_100, high_100 = find_percentiles(sorted_values, 100)
    low_1000, high_1000 = find_percentiles(sorted_values, 1000)
    return {
        "undefined": undefined,
        "mean": mean,
        "std": std,
        "sorted_values": sorted_values,
        "low_100": low_100,
        "high_100": high_100,
        "low_1000": low_1000,
        "high_1000": high_1000
    }

In [40]:
def compute_statistics(sqlContext, df):
    """compute all statistics for the data_frame
       input -> sqlContext : perform SQL queries
                data_frame : consists of below fields
                    - Station (string)
                    - Measurement (string)
                    - Year (integer)
                    - Values (byteArray with 365 float16 numbers)
       returns -> STAT : dictionary of dictionaries"""
    
    sqlContext.registerDataFrameAsTable(df, "weather")
    STAT = {}
    measurements = ['TMAX', 'SNOW', 'SNWD', 'TMIN', 'PRCP', 'TOBS']
    
    for measure in measurements:
        t = time()
        query = "SELECT * FROM weather WHERE measurement='%s'" % (measure)
        measure_df = sqlContext.sql(query)
        print("{} : shape -> {}".format(measure, measure_df.count()))
        # unpack column -> values to float16
        data = measure_df.rdd.map(lambda row: unpack_array(row["Values"]))
        # compute basic statistics
        STAT[measure] = compute_overall_distribution(data)
        # compute covariance matrix
        out = compute_cov(data)
        # compute pca decomposition
        eig_val, eig_vec = linalg.eig(out["COV"])
        
        # collect all statistics for measure
        STAT[measure]["eig_val"] = eig_val
        STAT[measure]["eig_vec"] = eig_vec
        STAT[measure].update(out)
        
        print("time for {} is {}".format(measure, time() - t))
    
    return STAT

In [41]:
%%time

STAT = compute_statistics(sqlContext, ny_df)

TMAX : shape -> 13437
time for TMAX is 152.0520203113556
SNOW : shape -> 15629
time for SNOW is 169.7507836818695
SNWD : shape -> 14617
time for SNWD is 152.31755566596985
TMIN : shape -> 13442
time for TMIN is 153.168390750885
PRCP : shape -> 16118
time for PRCP is 173.7279713153839
TOBS : shape -> 10956
time for TOBS is 117.41092300415039
Wall time: 15min 18s


In [43]:
import pickle

file_name = "STAT_NY.pickle"
pickle.dump(STAT, open(file_name, "wb"))