In [None]:
!pip install elephas -q

In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import window, avg, count
from pyspark.sql import types as T
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# from pyspark.mllib.linalg import Matrix, Matrices, Vectors, SparseVector, DenseVector, VectorUDT
from pyspark.ml.linalg import Matrix, Matrices, Vectors, SparseVector, DenseVector, VectorUDT
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, Transformer, PipelineModel
from pyspark.ml.regression import LinearRegression

from elephas.ml_model import ElephasEstimator, ElephasTransformer
from elephas.spark_model import SparkMLlibModel

import numpy as np

from keras.models import Sequential
from keras.layers import Dense
from keras import backend as K
from keras import optimizers

Using TensorFlow backend.




#### Create the spark session

In [2]:
spark = (SparkSession
         .builder
         .appName("Streaming")
         .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4')
         .getOrCreate())

#### Read our processed time windows from mongo

In [3]:
df = (spark
         .read
         .format("mongo")
         .option("spark.mongodb.input.uri", "mongodb://165.22.199.122/processed.internal")
         .load()
         .drop('_id')
         .orderBy('window.end'))

df.show()

+--------+-----------------+-------------------+-------------------+--------------------+
|n_tweets|            price|          sentiment|          timestamp|              window|
+--------+-----------------+-------------------+-------------------+--------------------+
|      24|9186.630000000001|0.17804166666666665|2019-11-03 14:44:00|[2019-11-03 14:34...|
|      73|9183.607500000002|0.24112465753424658|2019-11-03 14:46:00|[2019-11-03 14:36...|
|     126|9182.878333333334|0.25534523809523824|2019-11-03 14:48:00|[2019-11-03 14:38...|
|     184|         9181.015| 0.2538385869565217|2019-11-03 14:50:00|[2019-11-03 14:40...|
|     257|         9184.389|0.21677548638132285|2019-11-03 14:52:00|[2019-11-03 14:42...|
|     311|         9186.677|0.21157234726688087|2019-11-03 14:54:00|[2019-11-03 14:44...|
|     312|         9189.179|0.19037852564102548|2019-11-03 14:56:00|[2019-11-03 14:46...|
|     316|           9192.1|0.17361044303797465|2019-11-03 14:58:00|[2019-11-03 14:48...|
|     313|

#### Create a transformer to calculate the price difference and generate the y labels

In [42]:
class LabelTransformer(Transformer):
    """
    Custorm tranformer that the label that needs to be predicted
    """
    
    def __init__(self):
        super(LabelTransformer, self).__init__()
        
    def _transform(self, df: DataFrame) -> DataFrame:
        
        # Just return the dataframe if it is streaming, we cannot calculate the label
        if df.isStreaming:
            return df.withColumn('label', F.lin(0))
        
        # Define the window function
        window = Window.partitionBy().orderBy('timestamp')

        # Create a price lag of 1 window
        df = df.withColumn('prev_price', F.lag(df.price).over(window))

        # Calculate the price difference
        df = df.withColumn('price_diff', df.price - df.prev_price)
        
        # Y label
        df = df.withColumn('label', F.lag(df.price, -2).over(window))

        # Drop the previous price column
        df = df.drop('prev_price', 'window')
        
        # Drop all nan values (first price)
        df = df.na.drop()

        return df

In [51]:
label_transformer = LabelTransformer()
df_label = label_transformer.transform(df)
df_label.show(5)

+--------+-----------------+-------------------+-------------------+-------------------+--------+
|n_tweets|            price|          sentiment|          timestamp|         price_diff|   label|
+--------+-----------------+-------------------+-------------------+-------------------+--------+
|      73|9183.607500000002|0.24112465753424658|2019-11-03 14:46:00| -3.022499999999127|9181.015|
|     126|9182.878333333334|0.25534523809523824|2019-11-03 14:48:00|-0.7291666666678793|9184.389|
|     184|         9181.015| 0.2538385869565217|2019-11-03 14:50:00|-1.8633333333345945|9186.677|
|     257|         9184.389|0.21677548638132285|2019-11-03 14:52:00| 3.3739999999997963|9189.179|
|     311|         9186.677|0.21157234726688087|2019-11-03 14:54:00| 2.2880000000004657|  9192.1|
+--------+-----------------+-------------------+-------------------+-------------------+--------+
only showing top 5 rows



#### Create a transformer to bring all the features to one array

In [44]:
class TimeTransformer(Transformer):
    """
    A custom Transformer which transforms all values to timeseries. This is needed to input it into
    the neural network
    """

    def __init__(self, window_size, slide_size, feature_length):
        super(TimeTransformer, self).__init__()
        self.window_size = window_size
        self.slide_size = slide_size
        self.feature_length = feature_length

    def _transform(self, df: DataFrame) -> DataFrame:
        
        # Create the timeseries. Window 24 minutes and collect the list of variables needed
        df_window = (df
             .groupBy(F.window(df.timestamp, self.window_size, self.slide_size))
             .agg(
                 F.collect_list('price'), 
                 F.collect_list('sentiment'), 
                 F.collect_list('n_tweets'),
                 F.last('label').alias('label'),
                 F.max('timestamp').alias('timestamp')))

        # Concatenate all array columns
        df_features = df_window.withColumn('features', 
                    F.concat(
                        F.col('collect_list(price)'), 
                        F.col('collect_list(sentiment)'),
                        F.col('collect_list(n_tweets)')))

        # Make sure all the values are there
        df_features = df_features.where(F.size(F.col('features')) == self.feature_length)
        
        # Dropped the left over array columns
        df_features = df_features.drop(
            'window', 
            'collect_list(price)', 
            'collect_list(sentiment)', 
            'collect_list(n_tweets)')

        # Parse the features as vector instead of array (length need to be consistent)
        list_to_vector_udf = F.udf(lambda l: Vectors.dense(l), VectorUDT())

        df_features = df_features.select(
            df_features["label"], 
            list_to_vector_udf(df_features["features"]).alias("features"))

        return df_features.drop('timestamp')

In [52]:
time_transformer = TimeTransformer(
    window_size='6 minutes', 
    slide_size='2 minutes', 
    feature_length=9)

df_features = time_transformer.transform(df_label)
df_features.show()

+-----------------+--------------------+
|            label|            features|
+-----------------+--------------------+
|9190.783000000001|[9193.535,9191.23...|
|         9177.135|[9181.14000000000...|
|9166.708999999999|[9178.72499999999...|
|          9205.12|[9188.31000000000...|
|9219.877000000002|[9225.366,9225.54...|
|9289.094000000001|[9276.361,9281.37...|
|9507.947999999999|[9485.876,9506.06...|
|         9338.444|[9349.09800000000...|
|9334.369999999999|[9323.552,9326.79...|
|         9334.596|[9334.103,9332.85...|
|          9363.09|[9361.298,9362.43...|
|9371.324999999999|[9385.72499999999...|
|         9353.047|[9351.784,9352.34...|
|9354.309000000001|[9352.392,9353.59...|
|         9181.469|[9169.53799999999...|
|9224.529999999999|[9223.59222222222...|
|9012.176000000001|[9009.57000000000...|
|         8773.946|[8788.408,8787.93...|
|          8835.76|[8824.52600000000...|
|         8811.962|[8815.41499999999...|
+-----------------+--------------------+
only showing top

In [56]:
lr_pipeline.write().overwrite().save('lr_pipeline')

In [None]:
lr_estimator = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_pipeline = Pipeline(stages=[lr_estimator]).fit(df_features)

# Save the pipeline
lr_pipeline.write().overwrite().save('lr_pipeline')

In [58]:
df_transformed = lr_pipeline.transform(df_features)
df_transformed.show()

+-----------------+--------------------+-----------------+
|            label|            features|       prediction|
+-----------------+--------------------+-----------------+
|9190.783000000001|[9193.535,9191.23...|9190.584564115663|
|         9177.135|[9181.14000000000...| 9179.07620051795|
|9166.708999999999|[9178.72499999999...|9173.560828482081|
|          9205.12|[9188.31000000000...|9191.914361492025|
|9219.877000000002|[9225.366,9225.54...|9224.582859729486|
|9289.094000000001|[9276.361,9281.37...|9281.463804854786|
|9507.947999999999|[9485.876,9506.06...| 9497.13243990458|
|         9338.444|[9349.09800000000...| 9344.04252864767|
|9334.369999999999|[9323.552,9326.79...|9326.498846735196|
|         9334.596|[9334.103,9332.85...|9332.112661362595|
|          9363.09|[9361.298,9362.43...|9361.081552730007|
|9371.324999999999|[9385.72499999999...|9380.393790586293|
|         9353.047|[9351.784,9352.34...| 9352.19848674924|
|9354.309000000001|[9352.392,9353.59...| 9353.4244042184

In [59]:
pred_rdd = df_transformed.rdd.map(lambda p: (p.prediction, p.label)).cache()

In [60]:
metrics = RegressionMetrics(pred_rdd)
metrics.rootMeanSquaredError

10.201432235528591

#### Elephas prediction

In [54]:
def root_mean_squared_error(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true))) 

def build_model():
    model = Sequential()
    model.add(Dense(64, activation='relu', input_shape=(36,)))
    model.add(Dense(32, activation='relu'))
        
    model.add(Dense(1))
    model.compile(optimizer='adam', loss=root_mean_squared_error)
    
    return model

In [88]:
class ElephantEstimator(ElephasEstimator):
    def _transform(self, df):
        
        features_col=self.getFeaturesCol()
    
        output_col = self.getOutputCol()

        label_col = self.getLabelCol()

        #output_col = "prediction"

        #label_col = "label"

        new_schema = copy.deepcopy(df.schema)

        new_schema.add(StructField(output_col, DoubleType(), True))

        rdd = df.rdd.coalesce(1)

        features = np.asarray(

            #rdd.map(lambda x: from_vector(x.features)).collect())
            rdd.map(lambda x: from_vector(x[features_col])).collect())

            # Note that we collect, since executing this on the rdd would require model serialization once again

        #display(len(features[0]))

        model = model_from_yaml(self.get_keras_model_config())

        model.set_weights(self.weights.value)

        #prediction=model.predict(features)
        #display(prediction)

        predictions = rdd.ctx.parallelize(model.predict(features)).coalesce(1)

        #display(predictions.take(2))

        predictions = predictions.map(lambda x: [float(x)])

        #display(predictions.take(2))

        predictions = predictions.map(lambda x: tuple(x))

        display(rdd.zip(predictions).take(2))

        results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1])

        # TODO: Zipping like this is very likely wrong

        # results_rdd = rdd.zip(predictions).map(lambda pair: Row(features=to_vector(pair[0].features),

        #                                        label=pair[0].label, prediction=float(pair[1])))


        #display(results_rdd.take(2))


        results_df = df.sql_ctx.createDataFrame(results_rdd, new_schema)

        results_df = results_df.withColumn(

            output_col, results_df[output_col].cast(DoubleType()))

        results_df = results_df.withColumn(

            label_col, results_df[label_col].cast(DoubleType()))

        return results_df

In [71]:
keras_model = build_model()
keras_model.load_weights('models/keras_weights.hdf5')
keras_model.summary()

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_4 (Dense)              (None, 64)                2368      
_________________________________________________________________
dense_5 (Dense)              (None, 32)                2080      
_________________________________________________________________
dense_6 (Dense)              (None, 1)                 33        
Total params: 4,481
Trainable params: 4,481
Non-trainable params: 0
_________________________________________________________________


In [89]:
adam = optimizers.Adam(lr=0.01)
opt_conf = optimizers.serialize(adam)

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephantEstimator()
estimator.setFeaturesCol('features')
estimator.setLabelCol('label')
estimator.set_keras_model_config(keras_model.to_yaml())
estimator.set_categorical_labels(False)
# estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(5) 
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode('synchronous')
estimator.set_loss('mean_squared_error')
estimator.set_metrics(['mean_squared_error'])

ElephantEstimator_893e51a7f654

In [91]:
model = Pipeline(stages=[price_diff_transformer, time_transformer, estimator]).fit(df)
# df_pred = model.transform(df)
# df_pred.show()

### Streaming test

#### Twitter stream

In [3]:
# Define the timestamp format
timestampFormat = "dd-MM-yyyy HH:mm:ss"

# Create the schema of incoming data
twitter_schema = T.StructType([
    T.StructField('timestamp', T.TimestampType(), False),
    T.StructField('text', T.StringType(), False),
    T.StructField('sentiment', T.DoubleType(), False)
])

In [4]:
# Read kafka stream and subscribe to twitter topic
twitter_stream = (spark.readStream
          .format('kafka')
          .option('kafka.bootstrap.servers', 'kafka:9092')
          .option('startingOffsets', 'latest')
          .option('subscribe', 'twitter')
          .load()
          .select(F.col("key").cast("string"), \
                  F.from_json(F.col("value").cast("string"), twitter_schema, \
                  { "timestampFormat": timestampFormat }).alias("value")))

In [5]:
# Create streaming moving windows
twitter_aggregation = (twitter_stream
                     .select('value.*')
                     .withWatermark('timestamp', '10 seconds')
                     .groupBy(window('timestamp', '10 minutes', '2 minutes'))
                     .agg(avg('sentiment').alias('sentiment'), count('timestamp').alias('n_tweets'))).select(F.col('window.end').alias('timestamp'), F.col('sentiment'), F.col('n_tweets'))

In [6]:
twitter_agg_stream = (twitter_aggregation
    .writeStream
    .outputMode("append")
    .format("console")
    .start())

In [7]:
twitter_agg_stream.stop()
twitter_agg_stream.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [8]:
# Add the timestamp as key
twitter_aggregation = twitter_aggregation.withColumn('key', F.col('timestamp'))

# Send the data to kafka
(twitter_aggregation
    .selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "twitter-agg")
    .option("checkpointLocation", "checkpoints/twitter-agg")
    .start())

<pyspark.sql.streaming.StreamingQuery at 0x7f6b3663dd68>

#### Crypto stream

In [9]:
# Define the timestamp format
timestampFormat = "dd-MM-yyyy HH:mm:ss"

# Create the schema of incoming data
crypto_schema = T.StructType([
    T.StructField('timestamp', T.TimestampType(), False),
    T.StructField('price', T.DoubleType(), False)
])

In [10]:
# Read kafka stream and subscribe to crypto topic
crypto_stream = (spark.readStream
          .format('kafka')
          .option('kafka.bootstrap.servers', 'kafka:9092')
          .option('startingOffsets', 'latest')
          .option('subscribe', 'crypto')
          .load()
          .select(F.col("key").cast("string"), \
                  F.from_json(F.col("value").cast("string"), crypto_schema, \
                  { "timestampFormat": timestampFormat }).alias("value")))

In [11]:
# Create streaming moving windows
crypto_aggregation = (crypto_stream
                     .select('value.*')
                     .withWatermark('timestamp', '10 seconds')
                     .groupBy(window('timestamp', '10 minutes', '2 minutes'))
                     .agg(avg('price').alias('price'))).select(F.col('window.end').alias('timestamp'), F.col('price'))

# Successfully ingested this stream

In [12]:
# Add the timestamp as key
crypto_aggregation = crypto_aggregation.withColumn('key', F.col('timestamp'))

# Send the data to kafka
(crypto_aggregation
    .selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "crypto-agg")
    .option("checkpointLocation", "checkpoints/crypto-agg")
    .start())

<pyspark.sql.streaming.StreamingQuery at 0x7f6b36644a20>

#### Read the crypto aggregation stream

In [13]:
# Create the schema of incoming aggregated crypto data
crypto_agg_schema = T.StructType([
    T.StructField('timestamp', T.TimestampType(), False),
    T.StructField('price', T.DoubleType(), False)
])

# Read the crypto aggregation stream
crypto_agg_stream = ((spark.readStream
          .format('kafka')
          .option('kafka.bootstrap.servers', 'kafka:9092')
          .option('startingOffsets', 'latest')
          .option('subscribe', 'crypto-agg')
          .load()
          .select(
              F.col("key").cast("string"), 
              F.from_json(F.col("value").cast("string"), crypto_agg_schema).alias("value")))
                     .select('value.*'))

crypto_agg_stream.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)



#### Read the twitter aggregation stream

In [14]:
# Create the schema of incoming aggregated crypto data
twitter_agg_schema = T.StructType([
    T.StructField('timestamp', T.TimestampType(), False),
    T.StructField('sentiment', T.DoubleType(), False),
    T.StructField('n_tweets', T.IntegerType(), False)
])

# Read the twitter aggregation stream
twitter_agg_stream = ((spark.readStream
          .format('kafka')
          .option('kafka.bootstrap.servers', 'kafka:9092')
          .option('startingOffsets', 'latest')
          .option('subscribe', 'twitter-agg')
          .load()
          .select(
              F.col("key").cast("string"), 
              F.from_json(F.col("value").cast("string"), twitter_agg_schema).alias("value")))
                     .select('value.*'))

twitter_agg_stream.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- sentiment: double (nullable = true)
 |-- n_tweets: integer (nullable = true)



#### Join the two streams

In [15]:
merged_stream = (crypto_agg_stream
                    .join(twitter_agg_stream, 'timestamp')
                    .withWatermark('timestamp', '10 seconds'))

In [33]:
# Write merged stream to memory
merged_reader = (df_lr_pred
                       .writeStream
                       .queryName('merged_streams')
                       .format('memory')
                       .start())

In [31]:
merged_reader.stop()
merged_reader.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [None]:
20:50:00
        7394.958728813559,
        7395.494237288136,
        7395.11153846154

In [37]:
spark.sql('select * from merged_streams').show(truncate=False)

+-------------------+-----------------------------------------------------------------------------------------------------------------------------------+------------------+
|timestamp          |features                                                                                                                           |prediction        |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------+------------------+
|2019-11-18 20:50:00|[7394.958728813559,7395.494237288136,7395.11153846154,0.11442416851441242,0.12960817307692307,0.1496274752475247,451.0,416.0,404.0]|7397.882370153243 |
|2019-11-18 20:52:00|[7395.494237288136,7395.11153846154,7394.740000000001,0.12960817307692307,0.1496274752475247,0.1415411330049261,416.0,404.0,406.0] |7397.814034418346 |
|2019-11-18 20:54:00|[7395.11153846154,7394.740000000001,7395.55264957265,0.1496274752475247,0.1415411330049261,0.14015862944162438,404

In [16]:
merged_stream.isStreaming

True

In [23]:
lr_prediction_pipeline = PipelineModel.load('lr_pipeline')

In [60]:
time_transformer = TimeTransformer(
    window_size='6 minutes', 
    slide_size='2 minutes', 
    feature_length=9)

df_features = time_transformer.transform(merged_stream)

In [61]:
df_features.isStreaming

True

In [62]:
df_lr_pred = lr_prediction_pipeline.transform(df_features)

In [63]:
df_lr_pred.isStreaming

True

In [64]:
df_lr_pred.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- features: vector (nullable = true)
 |-- pred_timestamp: timestamp (nullable = true)
 |-- prediction: double (nullable = false)



In [66]:
# Write merged stream to memory
stream_readers = (df_lr_pred
                       .writeStream
                       .queryName('merged_stream')
                       .format('console')
                       .start())

In [65]:
stream_readers.stop()
stream_readers.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [59]:
class TimeTransformer(Transformer):
    """
    A custom Transformer which transforms all values to timeseries. This is needed to input it into
    the neural network
    """

    def __init__(self, window_size, slide_size, feature_length):
        super(TimeTransformer, self).__init__()
        self.window_size = window_size
        self.slide_size = slide_size
        self.feature_length = feature_length

    def _transform(self, df: DataFrame) -> DataFrame:
        
        # Create the timeseries. Window 24 minutes and collect the list of variables needed
        df_window = (df
             .groupBy(F.window(df.timestamp, self.window_size, self.slide_size))
             .agg(
                 F.collect_list('price'), 
                 F.collect_list('sentiment'), 
                 F.collect_list('n_tweets'),
                 F.max('timestamp').alias('timestamp')))

        # Concatenate all array columns
        df_features = df_window.withColumn('features', 
                    F.concat(
                        F.col('collect_list(price)'), 
                        F.col('collect_list(sentiment)'),
                        F.col('collect_list(n_tweets)')))

        # Make sure all the values are there
        df_features = df_features.where(F.size(F.col('features')) == self.feature_length)

        # Parse the features as vector instead of array (length need to be consistent)
        list_to_vector_udf = F.udf(lambda l: Vectors.dense(l), VectorUDT())

        df_features = df_features.select(
            df_features["timestamp"],
            list_to_vector_udf(df_features["features"]).alias("features"))
        
        # Add the time of the bitcoin price prediction
        df_features = df_features.withColumn('pred_timestamp', (df_features.timestamp + F.expr('INTERVAL 4 MINUTES')))

        return df_features