In [2]:
import tensorflow.keras as keras
import tensorflow as tf
# from petastorm.tf_utils import tf_tensors
# from petastorm import make_reader, make_batch_reader

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from petastorm.spark import SparkDatasetConverter, make_spark_converter

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .parquet("./data/dataTraining.parquet")

In [6]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Light: double (nullable = true)
 |-- CO2: double (nullable = true)
 |-- HumidityRatio: double (nullable = true)
 |-- Occupancy: integer (nullable = true)



In [None]:
df = df.withColumn("date", F.to_timestamp(F.col("date")))

df.show(5)

In [None]:
df.write.parquet("./data/dataTraining.parquet")

In [None]:
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, 'hdfs://./data/temp')

In [6]:
from pyspark.sql.window import Window

In [7]:
windowSpec = Window.orderBy(F.col("date").desc())

In [8]:
df \
    .select("date", "Temperature") \
    .withColumn("mean_temp", F.mean("Temperature").over(windowSpec)) \
    .orderBy("date") \
    .show()

+-------------------+-----------+------------------+
|               date|Temperature|         mean_temp|
+-------------------+-----------+------------------+
|2015-02-04 17:51:00|      23.18| 20.61908364034639|
|2015-02-04 17:51:59|      23.15|20.618769108737492|
|2015-02-04 17:53:00|      23.15| 20.61845818490857|
|2015-02-04 17:54:00|      23.15|20.618147184685586|
|2015-02-04 17:55:00|       23.1|20.617836108040382|
|2015-02-04 17:55:59|       23.1|20.617531098960516|
|2015-02-04 17:57:00|       23.1|20.617226014912212|
|2015-02-04 17:57:59|       23.1| 20.61692085586783|
|2015-02-04 17:58:59|       23.1|20.616615621799713|
|2015-02-04 18:00:00|     23.075|20.616310312680188|
|2015-02-04 18:01:00|     23.075|20.616008002378045|
|2015-02-04 18:02:00|       23.1|20.615705617725116|
|2015-02-04 18:03:00|       23.1|  20.6154000840414|
|2015-02-04 18:04:00|      23.05|20.615094475195647|
|2015-02-04 18:04:59|       23.0| 20.61479494197818|
|2015-02-04 18:06:00|       23.0|20.6145014866

In [9]:
df.select("date", "Temperature").withColumn("nextTemp", F.lag("Temperature", 1).over(windowSpec)).show()

+-------------------+----------------+----------------+
|               date|     Temperature|        nextTemp|
+-------------------+----------------+----------------+
|2015-02-10 09:33:00|            21.1|            null|
|2015-02-10 09:32:00|            21.1|            21.1|
|2015-02-10 09:30:59|            21.1|            21.1|
|2015-02-10 09:29:59|           21.05|            21.1|
|2015-02-10 09:29:00|           21.05|           21.05|
|2015-02-10 09:28:00|           21.05|           21.05|
|2015-02-10 09:27:00|            21.0|           21.05|
|2015-02-10 09:26:00|          21.025|            21.0|
|2015-02-10 09:24:59|            21.0|          21.025|
|2015-02-10 09:23:59|            21.0|            21.0|
|2015-02-10 09:23:00|            21.0|            21.0|
|2015-02-10 09:22:00|            21.0|            21.0|
|2015-02-10 09:21:00|            21.0|            21.0|
|2015-02-10 09:20:00|        20.95875|            21.0|
|2015-02-10 09:19:00|         20.9175|        20

In [7]:
def prep_data_spark(spark, 
                    window_size, 
                    sampling_factor, 
                    sample_strategy, 
                    dataset_path=None,
                    id_col='id',
                    time_col='time',
                    value_col='value',
                    label_col='label'):
    """
    Windows data that is already in a spark dataframe.

    Parameters
    ----------
    spark: spark session or sql dataframe
    dataset_path: str, Default None. path to parquet file
    window_size: size of window to use
    label_aggregation_strategy: strategy to use for label aggregation. One of 'mean' or 'boolean'  

    Returns
    -------
    windows: spark.sql.DataFrame containing windows
    
    
    ts_id | ts | label_ts
    ------------------
    list   list   list
    
    
    Preferred:
    id | time      | value ...  | label
    --------------------------
    int timestamp   float ...   
    """
    from pyspark.sql import SparkSession
    from pyspark.sql.dataframe import DataFrame
    from pyspark.sql.window import Window
    from pyspark.sql import functions as F
    
    TIME_COL = 'time'
    VALUE_COL = 'value'
    LABEL_COL = 'label'
    
    agg_strategies = {'mean': F.mean, 'boolean': F.max}
    
    if isinstance(spark, DataFrame):
        df = spark
    elif isinstance(spark, SparkSession) and dataset_path is not None:
        df = spark.read.parquet(dataset_path)
    else:
        raise TypeError("Expected spark context + filepath or spark.sql.DataFrame")
    
    # cast timestamp column to timestamp type
    df = df.withColumn(time_col, df[time_col].cast('timestamp'))

    # apply windowing to values pairs
    if id_col in df.columns:
        windowSpec = Window.partitionBy(id_col).orderBy(F.col(time_col).desc())
        windowSpecLabels = Window.partitionBy(id_col).orderBy(F.col(time_col).desc()).rowsBetween(-window_size, 0)
    else:
        windowSpec = Window.orderBy(F.col(time_col).desc())
        windowSpecLabels = Window.orderBy(F.col(time_col).desc()).rowsBetween(-window_size, 0)
    
    agg_labels = agg_strategies[sample_strategy](label_col).over(windowSpecLabels)
    
    windowed_df = df.withColumn(f"{value_col}_1", F.lag(value_col, 1).over(windowSpec))
    
    if window_size > 1:
        for i in range(2, window_size + 1):
            windowed_df = windowed_df.withColumn(f"{value_col}_{i}", F.lag(value_col, i).over(windowSpec))

    # remove rows that can't fill a full window and add the labels to the df
    return windowed_df.where(F.col(f"{value_col}_{window_size}").isNotNull()).withColumn(label_col, agg_labels)

In [8]:
prepped = prep_data_spark(df.select("date", "Temperature", "Occupancy"), 
                3, 
                1, 
                "mean", 
                time_col='date', 
                value_col='Temperature', 
                label_col='Occupancy')
prepped

DataFrame[date: timestamp, Temperature: double, Occupancy: double, Temperature_1: double, Temperature_2: double, Temperature_3: double]

### Cerebro Demo

In [9]:
from cerebro.backend import SparkBackend
from cerebro.keras import SparkEstimator

# datas storage for intermediate data and model artifacts.
from cerebro.storage import LocalStore, HDFSStore

# Model selection/AutoML methods.
from cerebro.tune import GridSearch, RandomSearch, TPESearch

# Utility functions for specifying the search space.
from cerebro.tune import hp_choice, hp_uniform, hp_quniform, hp_loguniform, hp_qloguniform

In [10]:
backend = SparkBackend(spark_context=spark.sparkContext, num_workers=2)
store = LocalStore(prefix_path='/Users/arunavgupta/Documents/FA21/cse234/data/')

CEREBRO => Time: 2021-11-26 21:17:37, Running 2 Workers


In [11]:
train_df, test_df = prepped.drop(F.col("date")).randomSplit([0.8, 0.2])
train_df = train_df.repartition(2)

In [12]:
class Model(SparkEstimator):
    
    def _fit(self, dataset):
        print(dataset)
        assert False


def estimator_gen_fn(params):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Input(shape=4, name='input_layer'))
    model.add(tf.keras.layers.Dense(1, input_dim=4))
    model.add(tf.keras.layers.Activation('sigmoid'))

    optimizer = tf.keras.optimizers.SGD(lr=params['lr'])
    loss = 'mse'

    estimator = Model(
        model=model,
        optimizer=optimizer,
        loss=loss,
        metrics=['acc'],
        batch_size=8)

    return estimator

In [13]:
search_space = {
    'lr': hp_choice([0.01, 0.001, 0.0001])
}

In [14]:
prepped.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Occupancy: double (nullable = true)
 |-- Temperature_1: double (nullable = true)
 |-- Temperature_2: double (nullable = true)
 |-- Temperature_3: double (nullable = true)



In [28]:
model_selection = GridSearch(backend, 
                               store, 
                               estimator_gen_fn, 
                               search_space, 
                               num_epochs=10, 
                               evaluation_metric='loss',
                               label_columns=['Occupancy'], 
                               feature_columns=['Temperature', 'Temperature_1', 'Temperature_2', 'Temperature_3'], 
                               verbose=1)

In [16]:
train_df.printSchema()

root
 |-- Temperature: double (nullable = true)
 |-- Occupancy: double (nullable = true)
 |-- Temperature_1: double (nullable = true)
 |-- Temperature_2: double (nullable = true)
 |-- Temperature_3: double (nullable = true)



In [29]:
model = model_selection.fit(train_df)

CEREBRO => Time: 2021-11-23 14:30:11, Preparing Data
CEREBRO => Time: 2021-11-23 14:30:11, Num Partitions: 2
CEREBRO => Time: 2021-11-23 14:30:11, Writing DataFrames
CEREBRO => Time: 2021-11-23 14:30:11, Train Data Path: file:///Users/arunavgupta/Documents/FA21/cse234/data/train_data
CEREBRO => Time: 2021-11-23 14:30:11, Val Data Path: file:///Users/arunavgupta/Documents/FA21/cse234/data/val_data
CEREBRO => Time: 2021-11-23 14:30:12, Train Partitions: 2
CEREBRO => Time: 2021-11-23 14:30:12, Val Partitions: 2
CEREBRO => Time: 2021-11-23 14:30:13, Train Rows: 4884
CEREBRO => Time: 2021-11-23 14:30:13, Val Rows: 1618
CEREBRO => Time: 2021-11-23 14:30:13, Initializing Workers
CEREBRO => Time: 2021-11-23 14:30:13, Initializing Data Loaders
CEREBRO => Time: 2021-11-23 14:30:14, Launching Model Selection Workload
CEREBRO => Time: 2021-11-23 14:30:14, Model: model_9_1637706614, lr: 0.01
CEREBRO => Time: 2021-11-23 14:30:14, Model: model_10_1637706614, lr: 0.001
CEREBRO => Time: 2021-11-23 14:3

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
!conda list | grep spark