# Batch and Structured Streaming using Python DataFrames API

In this notebook we are going to take a quick look at how to use DataFrame API to build Structured Streaming applications. We also going to look into how we will resuse the pipeline (from the previous demo) used to transform the data and predict the probability of a user downloading an app

In [2]:
# Check if a spark session is automatically created for you
spark

In [3]:
# Version of spark
spark.version

In [4]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.ml import PipelineModel

## Sample Data
We have some sample data as files in `/mnt/dbdata/fraud/train_sample_parquet` which we are going to use to build this appication. [Original source](https://www.kaggle.com/c/talkingdata-adtracking-fraud-detection)

In [6]:
source_path = "/mnt/dbdata/fraud/" # Replace this with your own path

### Batch Processing

In [8]:
# Read the static data
sample_data = (
               spark\
              .read\
              .parquet(source_path + "train_sample_parquet/")
              )

In [9]:
display(sample_data)

ip,app,device,os,channel,click_time,attributed_time,is_attributed
87540,12,1,13,497,2017-11-07T17:30:38.000+0000,,0
105560,25,1,17,259,2017-11-07T21:40:27.000+0000,,0
101424,12,1,19,212,2017-11-08T02:05:24.000+0000,,0
94584,13,1,13,477,2017-11-07T12:58:08.000+0000,,0
68413,12,1,1,178,2017-11-09T17:00:09.000+0000,,0
93663,3,1,17,115,2017-11-09T09:22:13.000+0000,,0
17059,1,1,17,135,2017-11-09T09:17:58.000+0000,,0
121505,9,1,25,442,2017-11-07T18:01:53.000+0000,,0
192967,2,2,22,364,2017-11-08T17:35:17.000+0000,,0
143636,3,1,19,135,2017-11-08T20:35:26.000+0000,,0


##### Demonstration of simple transformation in *batch* mode
In this scenario, we are only interested in collecting information where the person has downloaded the app, i.e. `is_attributed = 1`.

In [11]:
# Query to filter the bulk data
has_downloaded = sample_data.filter("is_attributed = 1")

display(has_downloaded)

ip,app,device,os,channel,click_time,attributed_time,is_attributed
224120,19,0,29,213,2017-11-08T10:22:13.000+0000,2017-11-08T10:22:38.000+0000,1
272894,10,1,7,113,2017-11-08T14:10:05.000+0000,2017-11-08T14:10:37.000+0000,1
79001,19,0,0,213,2017-11-07T17:54:22.000+0000,2017-11-07T19:59:05.000+0000,1
131029,19,0,0,343,2017-11-09T18:58:46.000+0000,2017-11-09T19:52:01.000+0000,1
40352,19,0,0,213,2017-11-08T06:19:03.000+0000,2017-11-08T09:55:02.000+0000,1
48733,35,1,18,274,2017-11-07T20:25:50.000+0000,2017-11-07T21:10:30.000+0000,1
330861,35,1,22,21,2017-11-09T02:54:44.000+0000,2017-11-09T06:39:52.000+0000,1
309576,5,1,32,113,2017-11-09T16:47:51.000+0000,2017-11-09T16:47:55.000+0000,1
220571,71,1,25,3,2017-11-08T12:35:21.000+0000,2017-11-08T12:37:46.000+0000,1
240051,35,1,19,21,2017-11-08T16:07:13.000+0000,2017-11-08T17:46:42.000+0000,1


##### Demonstration of the same above transformation but in *stream* mode
In this scenario, we will be emulating a scenario where a stream of raw data is flowing in but again instead of storing all the data, we are only interested in storing the data when someone has downloaded the app.

The parquet files will flow into the spark streaming connector - one file at a time - and this emulates a streaming environment that we will use here

##### But before we do that let us define the schema of the incoming streaming data

In [14]:
schema = StructType([StructField("ip",IntegerType(),True),
                    StructField("app",IntegerType(),True),
                    StructField("device",IntegerType(),True),
                    StructField("os",IntegerType(),True),
                    StructField("channel",IntegerType(),True),
                    StructField("click_time",TimestampType(),True),
                    StructField("attributed_time",TimestampType(),True),
                    StructField("is_attributed",IntegerType(),True)])


Let's convert to a streaming query that continuously updates as data comes. Since we just have a static set of files, we are going to emulate a stream from them by reading one parquet file at a time, in the order it was created. The query we have to write is pretty much the same as the interactive query above.

In [16]:
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream
    .schema(schema)
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .parquet(source_path + "train_parquet/")
)

In [17]:
# Same query as staticInputDF
streamingDownloadedDF = streamingInputDF.filter("is_attributed = 1")

# Is this DF actually a streaming DF?
streamingDownloadedDF.isStreaming

##### Let the streaming begin!!

In [19]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingDownloadedDF
    .writeStream
    .format("parquet")        # parquet = "store it in the Azure Data Lake Store (ADLS)"
    .option("checkpointLocation", "/mnt/dbdata/fraud/streaming_output_checkpoint/")  
    .option("path", "/mnt/dbdata/fraud/vs_demo/streaming_sample_output/")
    .start()
)

In [20]:
query.stop()

Lets examine the transformed output folder a little more closely

In [22]:
streamed_output = spark.read.parquet(source_path+"vs_demo/streaming_sample_output")

In [23]:
display(streamed_output.limit(10))

ip,app,device,os,channel,click_time,attributed_time,is_attributed
372,35,1,20,274,2017-11-08T09:31:43.000+0000,2017-11-08T10:33:36.000+0000,1
685,19,0,21,213,2017-11-07T14:16:07.000+0000,2017-11-07T14:17:18.000+0000,1
727,19,0,21,213,2017-11-07T20:16:43.000+0000,2017-11-07T20:18:24.000+0000,1
727,19,6,29,213,2017-11-07T22:58:45.000+0000,2017-11-07T23:41:51.000+0000,1
1332,35,1,13,274,2017-11-07T19:57:44.000+0000,2017-11-07T20:08:13.000+0000,1
1332,35,1,13,274,2017-11-07T21:19:29.000+0000,2017-11-07T22:08:00.000+0000,1
1550,19,0,0,347,2017-11-07T03:43:37.000+0000,2017-11-07T04:41:14.000+0000,1
2407,19,212,21,213,2017-11-07T15:45:03.000+0000,2017-11-07T15:46:06.000+0000,1
2407,19,1362,24,282,2017-11-09T16:32:44.000+0000,2017-11-09T16:47:26.000+0000,1
4203,9,1,13,232,2017-11-07T00:11:55.000+0000,2017-11-07T11:05:52.000+0000,1


In [24]:
# Final output of the processed data
streamed_output.select("is_attributed").distinct().show()

In [25]:
# Original range of value in the streaming input data
sample_data.select("is_attributed").distinct().show()

### Lets do some prediction streaming in real time

#### But before we jump into prediction streaming lets revisit regular batch processing

In [28]:
# Lets reuse the test data set for a quick demonstration of batch processing
sample_data = spark.read.parquet(source_path + "kk_test_data/")

In [29]:
display(sample_data.limit(10))

ip,app,device,os,channel,click_time,attributed_time,is_attributed
73404,11,1,15,21,2017-11-07T06:51:16.000+0000,,0
73404,18,1,6,107,2017-11-07T07:19:52.000+0000,,0
73404,18,1,13,107,2017-11-07T07:21:55.000+0000,,0
73404,1,1,19,178,2017-11-07T07:39:03.000+0000,,0
73404,9,1,19,442,2017-11-07T07:39:05.000+0000,,0
73404,12,1,19,140,2017-11-07T07:39:06.000+0000,,0
73404,18,1,6,107,2017-11-07T07:50:45.000+0000,,0
73404,18,1,13,107,2017-11-07T07:58:29.000+0000,,0
73404,15,1,8,412,2017-11-07T08:00:23.000+0000,,0
73404,3,1,8,480,2017-11-07T08:00:23.000+0000,,0


#### Import the ML pipeline for transforming the data and predicting the outcome

In [31]:
pipeline = PipelineModel.load("/mnt/dbdata/fraud/kk_pipeline_model/")

In [32]:
predictions = pipeline.transform(sample_data)

#### Evaluation of results

In [34]:
display(predictions.select("conv_rate","prediction"))

#### Predictions via Stream Processing

In [36]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import numpy as np

In [37]:
raw_data = spark.read.parquet(source_path + "train_parquet")

Lets filter out the bots

In [39]:
# How many clicks to either get a download or not
ips_clicks = raw_data.select("ip","is_attributed").groupBy("ip")\
            .agg(F.sum("is_attributed").alias("total_downloads"),F.count("is_attributed").alias("num_clicks"))
bad_ips = ips_clicks.filter("total_downloads=0")
# Exclude ips that are potentially bots. For now exclude ips who have clicked 100 times or more but have not downloaded even once
bot_ips = bad_ips.select("ip").filter("num_clicks>=100").withColumnRenamed("ip","ip_")

# Good IPs
good_ips = raw_data.join(bot_ips,raw_data.ip == bot_ips.ip_,"left").filter(F.col("ip_").isNull()).drop("ip_")

**Split the data into train and test**

In [41]:
train_ips, test_ips = good_ips.randomSplit([0.7, 0.3], seed = 123)

**Basic Feature Engineering**

In [43]:
stringIndexer_label = StringIndexer(inputCol="is_attributed", outputCol="label").fit(good_ips)
stringIndexer_os = StringIndexer(inputCol="os", outputCol="os_ix",handleInvalid="skip")
stringIndexer_app = StringIndexer(inputCol="app", outputCol="app_ix", handleInvalid="skip")
stringIndexer_device = StringIndexer(inputCol="device", outputCol="device_ix", handleInvalid="skip")
stringIndexer_channel = StringIndexer(inputCol="channel", outputCol="channel_ix", handleInvalid="keep")

In [44]:
vectorAssembler_features = VectorAssembler(inputCols=["os_ix", "app_ix", "device_ix","channel_ix"], outputCol="features")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=stringIndexer_label.labels)

# Machine Learning Model
svm = LinearSVC(maxIter=10, regParam=0.1)

In [45]:
pipeline_svm = Pipeline(stages=[stringIndexer_label, stringIndexer_os, stringIndexer_app, stringIndexer_device, stringIndexer_channel, 
                               vectorAssembler_features, svm, labelConverter])

#### Let us build a linear SVM regression model to predit if an app will be downloaded

In [47]:
# Do not run this as it takes a very long time
model_svm = pipeline_svm.fit(train_ips)

In [48]:
# Always save the pipeline to disk for use later (especially when for any reason the cluster will need to be restarted)
model_svm.save("/mnt/dbdata/fraud/vs_svm_model")

#### Reload the model + pipeline

In [50]:
model_svm = PipelineModel.load("/mnt/dbdata/fraud/vs_svm_model")

In [51]:
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream
    .schema(schema)
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .parquet(source_path + "vs_demo/vs_test_data/")
)

In [52]:
# Same query as in batch processing
streamingDownloadedDF = model_svm.transform(streamingInputDF)

# Is this DF actually a streaming DF?
streamingDownloadedDF.isStreaming

In [53]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingDownloadedDF
    .writeStream
    .format("memory")        # parquet = "store it in the Azure Data Lake Store (ADLS)"
    .queryName("streaming_prediction") 
    .start()
)

In [54]:
query.stop()

#### Save the results to disk
To store this temp table on a permanent basis, we have several options: 
1. Hive Table
2. Azure Data Lake Store

In [56]:
# Write to Hive Table
spark.table("streaming_prediction").write.saveAsTable("hive_streaming_predictions")

In [57]:
# Write as parquet files to ADLS
spark.table("streaming_prediction").write.parquet(source_path + "vs_demo/adls_streaming_predictions_parquet")