In [1]:
import os
os.environ['SPARK_HOME'] = '/opt/spark'

In [2]:
import findspark
findspark.init()

import glob
import pandas as pd
import numpy as np
import datetime


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, DoubleType, StringType, ArrayType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import mean, array

In [4]:
app_name = "whatever"
master = "local[*]"
spark = SparkSession \
    .builder \
    .appName(app_name) \
    .master(master) \
    .getOrCreate()
sc = spark.sparkContext



In [5]:
sensor_locs = pd.read_csv("sensor_locs_big_box.csv").set_index("sensor_id")

In [6]:
sensor_locs.head()

Unnamed: 0_level_0,lat,lon,xy_,x,y,ndvi,elevation
sensor_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
14091,37.88362,-122.070087,"(136, 170)",136,170,5099,106.826744
8988,38.0285,-122.0302,"(145, 211)",145,211,2763,8.664356
12811,37.40637,-122.062429,"(138, 35)",138,35,2755,12.5958
4770,37.787307,-122.417252,"(59, 142)",59,142,931,40.919197
10092,37.857566,-121.97286,"(158, 162)",158,162,4230,195.418274


In [7]:
id_lookup = sensor_locs.T.to_dict()

In [8]:
sorted(glob.glob("*bigger_3.parquet"), reverse=True)[-12:]

['201908_bigger_3.parquet',
 '201907_bigger_3.parquet',
 '201906_bigger_3.parquet',
 '201905_bigger_3.parquet',
 '201904_bigger_3.parquet',
 '201903_bigger_3.parquet',
 '201902_bigger_3.parquet',
 '201901_bigger_3.parquet',
 '201812_bigger_3.parquet',
 '201811_bigger_3.parquet',
 '201810_bigger_3.parquet',
 '201809_bigger_3.parquet']

#### old code to sparkify

In [9]:

#spark.udf.register("getNeighbors", get_neighbors_space_time, ArrayType(DoubleType()))
lookup_xy = udf(lambda s, col: id_lookup[s][col], IntegerType())
lookup_other = udf(lambda s, col: id_lookup[s][col], DoubleType())
ts = udf(lambda x: int(datetime.datetime.strptime(x, "%Y/%m/%dT%H:%M").timestamp()), IntegerType())
ts_id = udf(lambda arr: "_".join([str(n) for n in arr]), StringType())
n_neighbors = udf(lambda ns: (np.array(ns) > 0).sum(), IntegerType())


In [12]:
%%time

finished_files = []

def full_process(n, filename):
    df = spark.read.parquet(filename)

    df = df.withColumn('x', lookup_xy(df.sensor_id.cast(IntegerType()), F.lit('x'))) \
        .withColumn('y', lookup_xy(df.sensor_id.cast(IntegerType()), F.lit('y'))) \
        .withColumn('ndvi', lookup_xy(df.sensor_id.cast(IntegerType()), F.lit('ndvi'))) \
        .withColumn('elevation', lookup_other(df.sensor_id.cast(IntegerType()), F.lit('elevation'))) \
        .withColumn('ts_', ts(df.created_at))

    averages = df.groupBy("ts_").mean()
    df_with_avgs = df.join(averages, on="ts_")
    imputed_df = df_with_avgs.withColumn("imputed_hum", F.coalesce("humidity", "avg(humidity)")) \
        .withColumn("imputed_temperature", F.coalesce("temperature", "avg(temperature)")) \
        .withColumn("imputed_epa_pm25_value", F.coalesce("epa_pm25_value", "avg(epa_pm25_value)")) \
        .withColumn("time_space_id", ts_id(array('ts_', 'x', 'y')))

    ts_averages = imputed_df.groupBy(['time_space_id']) \
                    .agg({'2_5um':'mean', 'sensor_id':'count'})

    ts_average_dict = ts_averages.toPandas().set_index('time_space_id').T.to_dict()
    
    def get_neighbors_space_time(ts_, x, y, pm):
        """
        Inputs: single observation, a training dataframe, and a time delta
        Outputs: vector of length 25 corresponding to surrounding neighbor observations
        
        In case you are wondering, I have to redfine this every loop because of the way
        the ts_avg_dict is broadcast
        
        """
        ts_ = int(ts_)
        x = int(x)
        y = int(y)


        neighbors = np.zeros((25))

        c = 0
        for i in range(-2,3):
            for j in range(-2,3):
                ts_id_ = f"{ts_}_{x+i}_{y+j}"
                if i == 0 and j == 0:
                    if ts_average_dict[ts_id_]['count(sensor_id)'] > 1:
                        n_s = ts_average_dict[ts_id_]['count(sensor_id)']
                        avg = ts_average_dict[ts_id_]['avg(2_5um)']
                        # remove the sensor itself from consideration
                        neighbors[c] = ((n_s*avg) - pm)/(n_s-1)

                else:

                    if ts_id_ in ts_average_dict:
                        neighbors[c] = ts_average_dict[ts_id_]['avg(2_5um)']       
                c += 1

        return neighbors.tolist()
    get_neighbors = udf(lambda arr: get_neighbors_space_time(*arr), ArrayType(DoubleType()))    
    
    df_w_neighbors = imputed_df \
        .withColumn('neighbors', get_neighbors(array('ts_', 'x', 'y', '2_5um')))


    
    cols_to_save = ['2_5um', 'imputed_epa_pm25_value', 'imputed_hum', 
                    'imputed_temperature', 'wind_x', 'wind_y', 'ndvi',
                    'elevation', 'neighbors'] # + neighbor_cols

    if n == 0:
        df_w_neighbors \
            .select(cols_to_save) \
            .write.parquet('big_processed_4.parquet')
        finished_files.append(filename)
    else:
        df_w_neighbors \
            .select(cols_to_save) \
            .write.mode("append").parquet('big_processed_4.parquet')
        finished_files.append(filename)
    print(filename, "done")
    
    


CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.72 µs


In [13]:
%%time

for n, filename in enumerate(sorted(glob.glob("*bigger_3.parquet"), reverse=True)[-12:]):
    full_process(n, filename)

201908_bigger_3.parquet done
201907_bigger_3.parquet done
201906_bigger_3.parquet done
201905_bigger_3.parquet done
201904_bigger_3.parquet done
201903_bigger_3.parquet done
201902_bigger_3.parquet done
201901_bigger_3.parquet done
201812_bigger_3.parquet done
201811_bigger_3.parquet done
201810_bigger_3.parquet done
201809_bigger_3.parquet done
CPU times: user 11min 34s, sys: 9.72 s, total: 11min 43s
Wall time: 21min 11s


In [7]:
df = spark.read.parquet('big_processed_4.parquet')

In [8]:
df.printSchema()

root
 |-- 2_5um: double (nullable = true)
 |-- imputed_epa_pm25_value: double (nullable = true)
 |-- imputed_hum: double (nullable = true)
 |-- imputed_temperature: double (nullable = true)
 |-- wind_x: double (nullable = true)
 |-- wind_y: double (nullable = true)
 |-- ndvi: integer (nullable = true)
 |-- elevation: double (nullable = true)
 |-- neighbors: array (nullable = true)
 |    |-- element: double (containsNull = true)

