# kafkaReceiveDataPy
This notebook receives data from Kafka on the topic 'test', and stores it in the 'time_test' table of Cassandra (created by cassandra_init.script in startup_script.sh).

```
CREATE KEYSPACE test_time WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE test_time.sent_received(
 time_sent TEXT,
 time_received TEXT,
PRIMARY KEY (time_sent)
);
```

A message that gives the current time is received every second. 

## Add dependencies

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time

## Load modules and start SparkContext
Note that SparkContext must be started to effectively load the package dependencies. Two cores are used, since one is needed for running the Kafka receiver.

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

## SaveToCassandra function
Takes a list of tuple (rows) and save to Cassandra 

In [3]:
def saveToCassandra(rows):
    if not rows.isEmpty(): 
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="sent_received", keyspace="test_time")\
        .save()

## Create streaming task
* Receive data from Kafka 'test' topic every five seconds
* Get stream content, and add receiving time to each message
* Save each RDD in the DStream to Cassandra. Also print on screen

In [4]:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
data = kvs.map(lambda x: x[1])
rows= data.map(lambda x:Row(time_sent=x,time_received=time.strftime("%Y-%m-%d %H:%M:%S")))
rows.foreachRDD(saveToCassandra)
rows.pprint()

## Start streaming

In [5]:
ssc.start()

-------------------------------------------
Time: 2018-12-08 08:22:30
-------------------------------------------
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:45')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:46')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:47')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:48')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:49')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:50')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:51')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:52')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:53')
Row(time_received='2018-12-08 08:22:37', time_sent=u'2018-12-08 06:38:54')
...

-------------------------------------------
Time: 2018-12-08 08:22:35
-------------------------------------------
Row(time_received

## Stop streaming

In [6]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

-------------------------------------------
Time: 2018-12-08 08:23:00
-------------------------------------------
Row(time_received='2018-12-08 08:23:00', time_sent=u'2018-12-08 08:22:55')
Row(time_received='2018-12-08 08:23:00', time_sent=u'2018-12-08 08:22:56')
Row(time_received='2018-12-08 08:23:00', time_sent=u'2018-12-08 08:22:57')
Row(time_received='2018-12-08 08:23:00', time_sent=u'2018-12-08 08:22:58')

-------------------------------------------
Time: 2018-12-08 08:23:05
-------------------------------------------

-------------------------------------------
Time: 2018-12-08 08:23:10
-------------------------------------------



## Get Cassandra table content

In [7]:
data=sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="sent_received", keyspace="test_time")\
    .load()
data.show()

+-------------------+-------------------+
|          time_sent|      time_received|
+-------------------+-------------------+
|2018-12-08 07:30:53|2018-12-08 08:22:33|
|2018-12-08 08:16:44|2018-12-08 08:22:33|
|2018-12-08 08:16:18|2018-12-08 08:22:33|
|2018-12-08 08:15:36|2018-12-08 08:22:33|
|2018-12-08 07:17:39|2018-12-08 08:22:33|
|2018-12-08 07:15:50|2018-12-08 08:22:33|
|2018-12-08 07:07:31|2018-12-08 08:22:33|
|2018-12-08 07:35:54|2018-12-08 08:22:33|
|2018-12-08 07:29:54|2018-12-08 08:22:33|
|2018-12-08 07:57:38|2018-12-08 08:22:33|
|2018-12-08 08:00:06|2018-12-08 08:22:33|
|2018-12-08 06:46:27|2018-12-08 08:22:33|
|2018-12-08 07:29:24|2018-12-08 08:22:33|
|2018-12-08 06:49:03|2018-12-08 08:22:33|
|2018-12-08 07:40:23|2018-12-08 08:22:33|
|2018-12-08 07:45:06|2018-12-08 08:22:33|
|2018-12-08 08:22:21|2018-12-08 08:22:33|
|2018-12-08 07:43:29|2018-12-08 08:22:33|
|2018-12-08 07:26:38|2018-12-08 08:22:33|
|2018-12-08 06:44:04|2018-12-08 08:22:33|
+-------------------+-------------

## Get Cassandra table content using SQL

In [8]:
data.registerTempTable("sent_received");
data.printSchema()
data=sqlContext.sql("select * from sent_received")
data.show()

root
 |-- time_sent: string (nullable = true)
 |-- time_received: string (nullable = true)

+-------------------+-------------------+
|          time_sent|      time_received|
+-------------------+-------------------+
|2018-12-08 07:30:53|2018-12-08 08:22:33|
|2018-12-08 08:16:44|2018-12-08 08:22:33|
|2018-12-08 08:16:18|2018-12-08 08:22:33|
|2018-12-08 08:15:36|2018-12-08 08:22:33|
|2018-12-08 07:17:39|2018-12-08 08:22:33|
|2018-12-08 07:15:50|2018-12-08 08:22:33|
|2018-12-08 07:07:31|2018-12-08 08:22:33|
|2018-12-08 07:35:54|2018-12-08 08:22:33|
|2018-12-08 07:29:54|2018-12-08 08:22:33|
|2018-12-08 07:57:38|2018-12-08 08:22:33|
|2018-12-08 08:00:06|2018-12-08 08:22:33|
|2018-12-08 06:46:27|2018-12-08 08:22:33|
|2018-12-08 07:29:24|2018-12-08 08:22:33|
|2018-12-08 06:49:03|2018-12-08 08:22:33|
|2018-12-08 07:40:23|2018-12-08 08:22:33|
|2018-12-08 07:45:06|2018-12-08 08:22:33|
|2018-12-08 08:22:21|2018-12-08 08:22:33|
|2018-12-08 07:43:29|2018-12-08 08:22:33|
|2018-12-08 07:26:38|2018-

# Lesson - Processing Streamed Data With PySpark

Now that the data has been saved to a Spark DataFrame, we can begin to process the data through a pipeline.

Dates, times, and timestamps are notoriously difficult to work with in Python, even within Pandas! However, working with them in Spark proves to be fairly easy.

First, we'll import `pyspark.sql.functions`. This gives you access to common SQL functions within PySpark, and are useful for processing data beyond what's shown below.

We'll first define `split_col`, which will split the `time_sent` column at the whitespace between date and time.

In [9]:
import pyspark.sql.functions as f
split_time_sent = f.split(data['time_sent'], ' ')

After defining `split_col`, we can separate the date and time.

In [10]:
df_stream = data.withColumn('date_time_sent', split_time_sent.getItem(0))
df_stream = df_stream.withColumn('time_time_sent', split_time_sent.getItem(1))

We'll now perform the same operation on the `time_received` column in the DataFrame.

In [11]:
split_time = f.split(df_stream['time_received'], ' ')

df_stream = df_stream.withColumn('date_time_received', split_time.getItem(0))
df_stream = df_stream.withColumn('time_time_received', split_time.getItem(1))

In [12]:
df_stream.show()

+-------------------+-------------------+--------------+--------------+------------------+------------------+
|          time_sent|      time_received|date_time_sent|time_time_sent|date_time_received|time_time_received|
+-------------------+-------------------+--------------+--------------+------------------+------------------+
|2018-12-08 07:30:53|2018-12-08 08:22:33|    2018-12-08|      07:30:53|        2018-12-08|          08:22:33|
|2018-12-08 08:16:44|2018-12-08 08:22:33|    2018-12-08|      08:16:44|        2018-12-08|          08:22:33|
|2018-12-08 08:16:18|2018-12-08 08:22:33|    2018-12-08|      08:16:18|        2018-12-08|          08:22:33|
|2018-12-08 08:15:36|2018-12-08 08:22:33|    2018-12-08|      08:15:36|        2018-12-08|          08:22:33|
|2018-12-08 07:17:39|2018-12-08 08:22:33|    2018-12-08|      07:17:39|        2018-12-08|          08:22:33|
|2018-12-08 07:15:50|2018-12-08 08:22:33|    2018-12-08|      07:15:50|        2018-12-08|          08:22:33|
|2018-12-0

Print the schema to look at the data types of `dev`.

In [13]:
df_stream.printSchema()

root
 |-- time_sent: string (nullable = true)
 |-- time_received: string (nullable = true)
 |-- date_time_sent: string (nullable = true)
 |-- time_time_sent: string (nullable = true)
 |-- date_time_received: string (nullable = true)
 |-- time_time_received: string (nullable = true)



Because this output from Kafka is a string representation, we'll need to convert each column we created to a relevant data type.

First, convert the original `time_sent` and `time_received` columns from `string` to `timestamp`. 


In [14]:
df_stream = df_stream.withColumn("time_sent", f.to_utc_timestamp("time_sent", "dd-MMM-yy"))
df_stream = df_stream.withColumn("time_received", f.to_utc_timestamp("time_received", "dd-MMM-yy"))

Print the schema, and show the data again. Notice that `time_sent` and `time_received` are now converted to a `timestamp` data type.

In [15]:
df_stream.printSchema()
df_stream.show()

root
 |-- time_sent: timestamp (nullable = true)
 |-- time_received: timestamp (nullable = true)
 |-- date_time_sent: string (nullable = true)
 |-- time_time_sent: string (nullable = true)
 |-- date_time_received: string (nullable = true)
 |-- time_time_received: string (nullable = true)

+--------------------+--------------------+--------------+--------------+------------------+------------------+
|           time_sent|       time_received|date_time_sent|time_time_sent|date_time_received|time_time_received|
+--------------------+--------------------+--------------+--------------+------------------+------------------+
|2018-12-08 07:30:...|2018-12-08 08:22:...|    2018-12-08|      07:30:53|        2018-12-08|          08:22:33|
|2018-12-08 08:16:...|2018-12-08 08:22:...|    2018-12-08|      08:16:44|        2018-12-08|          08:22:33|
|2018-12-08 08:16:...|2018-12-08 08:22:...|    2018-12-08|      08:16:18|        2018-12-08|          08:22:33|
|2018-12-08 08:15:...|2018-12-08 08:22

Now that our data is processed, let's convert the Spark DataFrame to Pandas & format as JSON.

## Combine Spark DataFrame With Existing CSV 

You'll often be joining data to existing sources. This example shows how to join an existing CSV with a Spark DataFrame, process the data, and save to JSON.

Import the `titanic_train.csv` data into a Spark DataFrame.

In [20]:
df_titanic = sqlContext.read.csv("titanic_train.csv", header=True, inferSchema=True).limit(df_stream.count())
type(df_titanic)

pyspark.sql.dataframe.DataFrame

Review the data with the `show()` method.

In [21]:
df_titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In order to combine this data, we're going to create a new, auto-incremented column called `PassengerId` within the `df_stream` DataFrame.

After creating the column, we'll convert the data type from `long` to `int`.

In [22]:
from pyspark.sql.types import IntegerType

df_stream = df_stream.withColumn("PassengerId", f.monotonically_increasing_id())

df_stream = df_stream.withColumn("PassengerId", df_stream["PassengerId"].cast(IntegerType()))

Combine the data into a new Spark DataFrame, `df_combined`.

Then, drop the duplicated `PassengerId` column, using the `join` method.

In [23]:
df_combined = df_stream.join(df_titanic, df_stream.PassengerId == df_titanic.PassengerId).drop(df_stream.PassengerId)

In [24]:
df_combined.printSchema()
print("Total Combined Rows: ", df_combined.count())

root
 |-- time_sent: timestamp (nullable = true)
 |-- time_received: timestamp (nullable = true)
 |-- date_time_sent: string (nullable = true)
 |-- time_time_sent: string (nullable = true)
 |-- date_time_received: string (nullable = true)
 |-- time_time_received: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

('Total Combined Rows: ', 4584)


We can also drop a few columns, considering they're redundant, and have already been processed above.

In [25]:
drop_list = ['time_sent', 'time_received']

df_combined = df_combined.select([column for column in df_combined.columns if column not in drop_list])

Review the DataFrame schema to make sure the columns have been dropped.

In [26]:
df_combined.printSchema()

root
 |-- date_time_sent: string (nullable = true)
 |-- time_time_sent: string (nullable = true)
 |-- date_time_received: string (nullable = true)
 |-- time_time_received: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Review the `Age` and `Cabin` variables. Some have a value of `null`, meaning they're missing.

In Spark, we'll _impute_ the missing values.

In [27]:
df_combined.select("PassengerId", "Age", "Cabin").show()

+-----------+----+-----+
|PassengerId| Age|Cabin|
+-----------+----+-----+
|          1|22.0| null|
|          2|38.0|  C85|
|          3|26.0| null|
|          4|35.0| C123|
|          5|35.0| null|
|          6|null| null|
|          7|54.0|  E46|
|          8| 2.0| null|
|          9|27.0| null|
|         10|14.0| null|
|         11| 4.0|   G6|
|         12|58.0| C103|
|         13|20.0| null|
|         14|39.0| null|
|         15|14.0| null|
|         16|55.0| null|
|         17| 2.0| null|
|         18|null| null|
|         19|31.0| null|
|         20|null| null|
+-----------+----+-----+
only showing top 20 rows



First, we'll get the average age of all passengers. But we'll make sure to exclude those who don't have an age, to avoid skewing the average.

We'll also fill the `Cabin` column with the value `__NA__`, showing there was no value given in the dataset.

In [28]:
from pyspark.sql.functions import mean, col

avg_age = df_combined.select(mean(df_combined["Age"]).alias("Age")).where(col("Age").isNotNull())
avg_age.rdd.map(lambda row: row.asDict())
avg_age = avg_age.collect()[0]["Age"]

Perform a sanity check to make sure the values are filled in correctly.

In [29]:
df_combined.select("PassengerId", "Age", "Cabin").fillna({'Age': round(avg_age), 'Cabin':"__NA__"}).show()

+-----------+----+------+
|PassengerId| Age| Cabin|
+-----------+----+------+
|          1|22.0|__NA__|
|          2|38.0|   C85|
|          3|26.0|__NA__|
|          4|35.0|  C123|
|          5|35.0|__NA__|
|          6|30.0|__NA__|
|          7|54.0|   E46|
|          8| 2.0|__NA__|
|          9|27.0|__NA__|
|         10|14.0|__NA__|
|         11| 4.0|    G6|
|         12|58.0|  C103|
|         13|20.0|__NA__|
|         14|39.0|__NA__|
|         15|14.0|__NA__|
|         16|55.0|__NA__|
|         17| 2.0|__NA__|
|         18|30.0|__NA__|
|         19|31.0|__NA__|
|         20|30.0|__NA__|
+-----------+----+------+
only showing top 20 rows



Now, fill in the values within the `df_combined` DataFrame.

In [30]:
df_combined = df_combined.fillna({'Age': round(avg_age), 'Cabin':"__NA__"})

## Saving to JSON 

After processing our data, we'll want to save the data to a JSON file. 

However, if you try to save the DataFrame as a JSON object using `toJSON`, you'll find the DataFrame will be saved as a collection of files. 

Because we're using Spark, our data is spread across multiple nodes, computing in parallel, and saved as parts to a directory. And larger datasets means more files. With Spark, you don't want to save locally; at the end of processing, you'll almost always send the data to a database, or to a storage service like Amazon S3.

However, there is a way to get around this: converting a Spark DataFrame to a Pandas DataFrame, then saving as JSON!

In [31]:
pd_combined = df_combined.toPandas()

After converting to Pandas, we'll save the file to JSON on our local computer.

In [32]:
pd_combined.to_json("spark_titanic.json", orient="records", lines=True)