> This notebook contains the whole evaluation and analysis of DataProcessor, the newest InterSCity's microservice that is focused on abstracting cluster processing from smart city applications developers.


# DataProcessor Report and Experiment

To successfully schematize and evaluate DataProcessor, we prepared this notebook, that contains our code and analysis of an experiment that we defined to evaluate DataProcessor. The experiment focus on solving a bigger problem, which is broken down into smaller ones.

Since we will compare two approaches, one that (1) uses DataProcessor and the other that (2) uses Spark directly, for convenience, we are going to name such approaches **DataProcessor Solutions** and **Raw-Spark Solutions**, although DataProcessor still uses Spark to process its data.

The problem that we are going to solve is the following one:

#### *How to identify anomalous traffic in a city, such as São Paulo?*####

To solve the problem we:



**1.** Generate almost-real São Paulo traffic data using a simulator called SimDiasca [SimDiasca](https://github.com/DylanGuedes/sim-diasca-blue) that already has a city traffic model called [InterSCSimulator](https://github.com/DylanGuedes/interscsimulator-blue).

The reason why we are going to use a simulator is because, at some point, we also need to evaluate streaming processing; if we choose to rely on the available real data (i.e: [olhovivo API](http://olhovivo.sptrans.com.br/)) it would not be possible to evaluate scenarios that generates Big Data quickly (olhovivo API only broadcast and update traffic data each minute, which is clearly not-so-fast). On the other hand, the simulation is able to generate data in a miliseconds fashion, as you may know.

**2.** Use the [InterSCity Smart City Platform](https://gitlab.com/interscity/interscity-platform) to store the generated data.

 InterSCity has a microservice called `DataCollector` that uses MongoDB to store its data. Therefore, to be able to extract InterSCity data, we are going to use a Mongo connector that integrates with Spark, named `MongoConnector`.

**3.** Compare the required time to extract InterSCity data using both, DataProcessor and Raw-Spark solutions, comparing also LOC and knowledge-complexity of both.

**4.** Since our problem is based around classifying anomalous state in a city, we define that the first simulation (1st point) is the normal city state, and a second simulation will be an anomalous version of the first. Also by our definition, the anomalous city will be the first city without key edges in its graph.

We defined key edges as the top100 most used edges by cars during a simulation. Without the key edges, we expect the city to have worse traffic, which will help in identifying anomalous edges.

The process of couting how many cars walked in each edge will be done by Spark with both approaches, DataProcessor and Raw-Spark.


**5.** Then, we will define a model that is capable of classifying city edges as anomalous or not. We are going to use a **ultra-simple-model** that just uses mean and stddev and will classify as anomalous any edge that is out of the range [mean-stddev, mean+stddev]. Again, we create both, DataProcessor and Raw-Spark solutions and evaluate both. At the end we will store this model as a file to be reopened later.


**6.** We will start a data processor stream that receive smart city data and classify edges into anomalous or not. This one uses DataProcessor.


**7.** We ran a new simulation that uses the anomalous graph (i.e: without key edges). In real time, the data points generated by this simulation will be classified by the stream started at the previous step. The anomalous datapoints will be stored to further use.


**8.** Now, we repeat steps 6 and 7 but now using a Raw-Spark stream.


**9.** Finally, we compare both stream solutions regarding its performance and usability.



Now that we know the steps, we can finally start the whole thing! First things first, we will configure the environment so that we can jump into step 3.

### Environment Setup 

We will start configuring spark parameters to correctly be able to interact with the Revoada cluster and DataProcessor data.

The Master URL, as the name suggests, is the master's host. Since we are using Spark Standalone Cluster Manager, the URL starts with `spark://`, instead of `yarn://` or `mesos://`.

Then, we fill SparkConf (Spark entity that controls Spark parameters) to also include in its dependencies the Mongo Connector. As you may know, InterSCity's Data Collector uses MongoDB to store its data; since we want to gather this data directly/bypassing, Mongo Connector is our best option.

At the end we just instantiate a SparkContext and a SparkSession using the SparkConf that we just configured.

In [15]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

MASTER_URL = "spark://10.4.0.20:7077"

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

conf = (SparkConf()
        .set("spark.eventLog.enabled", "true")
        .set("spark.history.fs.logDirectory", "/tmp/spark-events")
        .set("spark.app.name", "step2-datacollector-extraction")
        .set("spark.driver.memory", "6G")
        .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0") # mongo connector
        .setMaster(MASTER_URL))

## 2. DataCollector Data Gathering
### 2.1. Raw-Spark solution

With a correct Spark environment, we can finally interact with DataCollector to gather InterSCity's data. We will first list the Raw-Spark solution, explaining the key code used. The parameters used are:
- `default_uri` is just the DataCollector MongoDB's host. 
- `data_collector_development` database name
- `sensor_values` mongo collection
- `27017`  mongo port

Then, we define a `pipeline` to be used in MongoDB extraction. This parameter is important for heterogenous DataCollector environments: if the DataCollector has stored different capabilities, the pipeline filter out the capabilities that are not important for you, so that the query is executed faster.

Right next we define the city traffic `schema` - the schema of the simulation traffic is defined.

With all these things defined we can finally extract the data, which occur with the command `load`. The simulation only generates data points, that specify at which node a given car was at some timestamp.

The DataFrame `df` contains the raw simulation data - you can test it, if you want.

We choose to also store the dataframe as a parquet file because, as you can see, without it we only apply **transformations** to our dataframe. Since Spark uses lazy evaluation, without an **action** it will not run anything; since writing to a file is an action, it will trigger Spark processing so that we can finally measure its performance.



In [1]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
import time

from pyspark.sql.types import LongType, StringType, StructType

capability = "city_traffic"
default_uri = "mongodb://10.4.0.20:27017/data_collector_development"
default_collection = "sensor_values"
pipeline = "{'$match': {'capability': '"+capability+"'}}"

sch = (StructType()
    .add("nodeID", LongType())
    .add("tick", LongType())
    .add("uuid", StringType()))

df = (spark
    .read
    .format("com.mongodb.spark.sql.DefaultSource")
    .option("spark.mongodb.input.uri", "{0}.{1}".format(default_uri, default_collection))
    .option("pipeline", pipeline)
    .schema(sch)
    .load()
    .withColumnRenamed("nodeID", "U")
    .withColumnRenamed("tick", "T0"))

(df
    .write
    .format("parquet")
    .mode("overwrite")
    .save("/tmp/dataprocessor-report/step2.parquet"))

t2 = time.time()

spark.stop()
sc.stop()

NameError: name 'spark' is not defined

In [8]:
time_spent = t2-t0

print("Time diff: {0} - {1} = {2}s".format(t0, t2, time_spent))

Time diff: 1556044609.9297001 - 1556044800.4620075 = 190.53230738639832s


Ok, just to compare, according to Python, the action lasted 190.53s, which is pretty close to SparkUI results:

![RawSparkSummaryMetrics](task2-raw-spark/raw-spark-history-index.png)

3.2min or 192 seconds, pretty close to the result gave by Python's `time` method. This comparison shows that using Python or Spark to measure the time spent into processing is not **that** relevant.

Now, we should see the RDD-DAG that Spark defined for our little problem:
![RawSparkStages](task2-raw-spark/raw-spark-stages.png)

The entire problem was solved in a single stage, which means that no shuffle happened.

Also, it is good to see a summary of the defined tasks:
![RawSparkTasksSummary](task2-raw-spark/raw-spark-summary-metrics.png)

The most relevant thing is the `task deserialization` and `tasks serialization` - the final amount is low which is great. However, since we are using Python, in more complex tasks these metrics will become relevant since the overhead of translating Python into JVM can turn into a real overhead.

You can load all these results through file `app-20190423183649-0004`, stored in `task2-raw-spark` folder; you just need to copy it to the same folder that your Spark is configured to use as the log folder (in our case, we defined this folder to be `/tmp/spark-events` with the parameters `spark.eventLog.enabled=true` and `spark.history.fs.logDirectory=/tmp/spark-events`)

#### 2.1.1. All available hardware resources with lower amount of data

We are going to use now all available resources but with less fraction of data.


In [10]:
import time
total = 33466742 

samples = [str(int(total*0.01)), str(int(total*0.1)), str(int(total*0.5)), str(int(total*0.95))]
processing_times = []

for u in samples:
    spark.stop()
    sc.stop()

    t0 = time.time()

    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    import time

    from pyspark.sql.types import LongType, StringType, StructType

    capability = "city_traffic"
    default_uri = "mongodb://10.4.0.20:27017/data_collector_development"
    default_collection = "sensor_values"
    pipeline = "[{'$match': {'capability': '"+capability+"'}}, {'$limit': "+u+"}]"

    sch = (StructType()
        .add("nodeID", LongType())
        .add("tick", LongType())
        .add("uuid", StringType()))

    df = (spark
        .read
        .format("com.mongodb.spark.sql.DefaultSource")
        .option("spark.mongodb.input.uri", "{0}.{1}".format(default_uri, default_collection))
        .option("pipeline", pipeline)
        .schema(sch)
        .load()
        .withColumnRenamed("nodeID", "U")
        .withColumnRenamed("tick", "T0"))

    (df
        .write
        .format("parquet")
        .mode("overwrite")
        .save("/tmp/dataprocessor-report/step2.parquet"))

    t2 = time.time()

    spark.stop()
    sc.stop()
    
    processing_times.append(t2-t0)

Py4JJavaError: An error occurred while calling o376.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded

	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1171)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1069)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1013)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2067)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 32 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded


In [9]:
processing_times

[]

In [36]:
t0 = time.time()

import requests
DATAPROCESSOR_URL = "http://localhost:4000"
headers = {'Content-type': 'application/vnd.api+json'}

response = requests.post(DATAPROCESSOR_URL+"/api/processing_jobs/5/run", headers=headers)
print(response.text)

t1 = time.time()

{"jsonapi":{"version":"1.0"},"included":[{"type":"job-script","id":"3","attributes":{"title":"Extract Collector","path":"collectorsource.py","language":"python","code-sample":"from pyspar\nfrom pyspar\nfrom pyspar\nimport requ\nimport os\nfrom pyspar\n\nimport sys\n\nif __name__\n    # Loadi","code":"from pyspark.sql.types import StructType, StructField, ArrayType, StringType, DoubleType, IntegerType, DateType\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import explode, col\nimport requests\nimport os\nfrom pyspark import SparkContext, SparkConf\n\nimport sys\n\nif __name__ == '__main__':\n    # Loading the dataset\n    my_uuid = str(sys.argv[1])\n    os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'\n    url = \"http://localhost:4000\" + '/api/job_templates/{0}'.format(my_uuid)\n    response = requests.get(url)\n    params = response.json()[\"data\"][\"attributes\"][\"user-params\"]\n\n    functional_params = params[\"functional\"]\n    capability = params[\"inter

In [38]:
print(t1-t0)

197.69988322257996


197.699s vs 190.53230738639832s

So, the overhead of sending the response from Cmd to Elixir and adding it to the response request looks like 7 secs slower. It is a lot but, 

### 2.2. DataProcessor Solution

For DataProcessor, our approach will be different. Since it is just a webserver that handle REST requests, we will show you only the REST requests and the spent time on it.


#### 1.3 Preparing the data

Now we use some data munging to derive data that will be useful right next:
- First, we will map data related to the same trip to be visualized as a tuple of origin and destination - trips that only have a single point are cut off.
- Then, we will round the tick in windows of 20 minutes (so that a trip ocurring at 08:21 and at 08:23 are paired).
- To facilitate the whole process, we cast the tick to a timestamp, so that Spark can visualize it as datetime.
- To speed things up, at the end we make cache of the data
- To be able to make SQL queries in Spark we create a Spark view (similar to a SQL table) with the data

In [1]:
from pyspark.sql.functions import lead, col, hour, minute
from pyspark.sql import Window

w = Window.partitionBy("uuid").orderBy("T0")

leadedDf = (df
    .withColumn("T1", lead(col("T0"), 1).over(w))
    .withColumn("(T1-T0)", col("T1") - col("T0"))
    .withColumn("round(T0)", (col("T0")/1800).cast("int")*1800)
    .withColumn("V", lead(col("U"), 1).over(w))
    .filter("`T1` is not null")
    .withColumn("TWindow", col("T0").cast("timestamp"))
    .withColumn("H", hour("TWindow"))
    .withColumn("M", ((minute("TWindow")/20).cast("int")*20)))

leadedDf.createOrReplaceGlobalTempView("traffic")

AttributeError: 'NoneType' object has no attribute '_jvm'

### 2. Data Validation

As we said before, now we just want to make sure that the simulation data looks right. So we will just group the data using time windows and analyse the results.

We will group the data using Spark instead of Pandas, since Spark can use the whole cluster to process the data, achieving better performance.

After processing the data with Spark we finally turn it into a Pandas dataframe, since Pandas has easy-to-use plot functions, while Spark do not.

In [None]:
trafficDf = (spark
             .sql("select round(T0, 0) from global_temp.traffic")
             .groupby("round(T0, 0)")
             .count())

pandasDf = trafficDf.toPandas()

In [None]:
import pandas as pd
import matplotlib as plt

pandasDf['current_time'] = pd.to_datetime(pandasDf['round(T0, 0)'], unit='s').dt.strftime('%H:%M')

pandasDf.sort_values('round(T0, 0)').plot(x='current_time', y='count', kind='bar', figsize=(18, 16))
from matplotlib import pyplot as plt
plt.savefig('foo.png')

Ok, looks like the majority of trips started around 07AM and 18PM, which makes a lot of sense. The only weird thing is that 00:00 and 00:30 are presented two times, which does not makes sense. The reason why this happened is because we ignored the day; one of these data at 0:00 and other at 0:30 are actually from the next day, so that they are not grouped together. Now we should check the durations (i.e: difference between ticks T0 and T1).

In [None]:
from pyspark.sql.functions import first, last, sum

In [None]:
ticksDiffDf = (spark
             .sql("select uuid,T0,T1,round(T0) from global_temp.traffic")
             .withColumn("(T1-T0)", col("T1") - col("T0"))
             .groupby("uuid").agg(sum(col("(T1-T0)")), min(col("T0")))

In [None]:
from pyspark.sql.functions import max

newDf = (ticksDiffDf
    .groupby("uuid")
    .agg(max(col("Tf")), first(col("rounded_T0"))))

newDf

In [None]:
pdTicksDf = newDf.toPandas()

pdTicksDf['current_time'] = pd.to_datetime(pdTicksDf['first(rounded_T0, false)'], unit='s').dt.strftime('%H:%M')

In [None]:
pdTicksDf.sort_values('current_time').plot(x='first(rounded_T0, false)', y='rounded_T0', kind='bar', figsize=(18, 16))
from matplotlib import pyplot as plt
plt.savefig('foo.png')

In [None]:
pdTicksDf

In [None]:
sns.distplot(pdTicksDf["avg((T1-T0))"])

In [None]:
spark.sql("select * from global_temp.traffic").show(3)

# 3 - Defining which edges to remove from city graph

We will just count the most used edges and remove them from our graph. First,

In [None]:
from pyspark.sql.functions import sum, count, mean

df = (spark
      .sql("select U,V,(T1 - T0) from global_temp.traffic")
      .groupby("U", "V")
      .agg(count(col("(T1 - T0)")), mean(col("(T1 - T0)")))
      .orderBy(col("count((T1 - T0))").desc())
      .limit(100))

In [None]:
pdDf = df.toPandas()

In [None]:
import matplotlib as plt
pdDf["coords"] = list(zip(pdDf["U"], pdDf["V"]))

In [None]:
import matplotlib.pyplot as plt
plt.subplots(figsize=(18,16)) 
f = sns.distplot(toPlotDf['avg((T1 - T0))'])

Well, although it looks like the edges that we are going to remove has high vehicle concentration, it has a decent flow (30secs to successfully pass an edge). We are going to take this in consideration at the final analysis.

# 4. Model Training

In [None]:
from pyspark.sql.functions import stddev_pop, mean

df = (spark
      .sql("select U,V,(T1 - T0),round(T0, 0) from global_temp.traffic")
      .groupby("U", "V", "round(T0, 0)")
      .agg(mean("(T1 - T0)"), stddev_pop("(T1 - T0)"))
      )

In [None]:
df.orderBy(col("stddev_pop((T1 - T0))").desc()).show(100, False)

In [None]:
df.select("stddev_samp((T1 - T0))").describe().show()