# Spark Structured Streaming on Africa's Political Conflicts Dataset

### You should run the Kafka publisher .py file to stream data into the topic "conflicts"
#### this notebook should be run by pyspark using this command :
PYSPARK_PYTHON=python3 $SPARK_HOME/bin/pyspark --jars spark-sql-kafka-0-10_2.12-3.0.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,kafka-clients-0.10.2.2.jar,commons-pool2-2.8.0.jar

In [28]:
#This commented code was used to devide the dataset into parts before switching to Kafka
#import pandas as pd

#in_csv = 'conflicts.csv'
#number_lines = sum(1 for row in (open(in_csv)))
#rowsize = 5000
#for i in range(1,number_lines,rowsize):
#    df = pd.read_csv(in_csv,
#          header=None,
#          nrows = rowsize,#number of rows to read at each loop
#          skiprows = i)
#    out_csv = 'conflicts' + str(i) + '.csv'
#    df.to_csv(out_csv,
#          index=False,
#          header=False,
#          mode='a',#append data to csv file
#          chunksize=rowsize)#size of data to append for each loop

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

#Create the session
spark = SparkSession \
    .builder \
    .appName("ConflictsApp") \
    .getOrCreate()

In [29]:
#Read the stream from kafka topic conflicts 
dfraw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "conflicts") \
  .option("startingOffsets", "earliest") \
  .load()
dfraw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [30]:
#Create the schema for our data
schema = StructType([StructField("", StringType(),True),
                         StructField("ISO", StringType(),True),
                         StructField("EVENT_DATE", StringType(),True),
                         StructField("EVENT_TYPE", StringType(),True),
                         StructField("SUB_EVENT_TYPE", StringType(),True),
                         StructField("ACTOR1", StringType(),True),
                         StructField("ASSOC_ACTOR_1", StringType(),True),
                         StructField("ACTOR2", StringType(),True),
                         StructField("ASSOC_ACTOR_2", StringType(),True),
                         StructField("INTERACTION", StringType(),True),
                         StructField("REGION", StringType(),True),
                         StructField("COUNTRY", StringType(),True),
                         StructField("ADMIN1", StringType(),True),
                         StructField("ADMIN2", StringType(),True),
                         StructField("LOCATION", StringType(),True),
                         StructField("SOURCE", StringType(),True),
                         StructField("NOTES", StringType(),True),
                         StructField("FATALITIES", StringType(),True)
                    ])

#convert the received data as defined by our schema and keeping the timestamp
df = dfraw.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") \
  .select(from_json("value", schema).alias("data"), "timestamp") \
  .select("data.*", "timestamp")
df.printSchema()

root
 |-- : string (nullable = true)
 |-- ISO: string (nullable = true)
 |-- EVENT_DATE: string (nullable = true)
 |-- EVENT_TYPE: string (nullable = true)
 |-- SUB_EVENT_TYPE: string (nullable = true)
 |-- ACTOR1: string (nullable = true)
 |-- ASSOC_ACTOR_1: string (nullable = true)
 |-- ACTOR2: string (nullable = true)
 |-- ASSOC_ACTOR_2: string (nullable = true)
 |-- INTERACTION: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- ADMIN1: string (nullable = true)
 |-- ADMIN2: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- NOTES: string (nullable = true)
 |-- FATALITIES: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [31]:
#visualize the data recieved
df.writeStream.format("console").outputMode("append").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9c46b7b340>

### Queries

In [43]:
#ds = df.toDF(*df.columns)
#Converting FATALITIES to Integer and EVENT_DATE to timestamp type
ds = df.withColumn("FATALITIES", df["FATALITIES"].cast(IntegerType()))
ds = df.withColumn("EVENT_DATE", to_timestamp(df["EVENT_DATE"],"yyyy-MM-dd"))
to_timestamp
ds.printSchema()

root
 |-- : string (nullable = true)
 |-- ISO: string (nullable = true)
 |-- EVENT_DATE: timestamp (nullable = true)
 |-- EVENT_TYPE: string (nullable = true)
 |-- SUB_EVENT_TYPE: string (nullable = true)
 |-- ACTOR1: string (nullable = true)
 |-- ASSOC_ACTOR_1: string (nullable = true)
 |-- ACTOR2: string (nullable = true)
 |-- ASSOC_ACTOR_2: string (nullable = true)
 |-- INTERACTION: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- ADMIN1: string (nullable = true)
 |-- ADMIN2: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- NOTES: string (nullable = true)
 |-- FATALITIES: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [20]:
query = ds.groupBy("REGION").count()
query \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9c46ba8a90>

In [35]:
query = ds.groupBy("INTERACTION").sum("FATALITIES")
query \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9c46c43940>

In [47]:
query = ds.select(date_format('EVENT_DATE','yyyy-MM').alias('month')).filter(col('INTERACTION').contains("2")).groupBy("month").count()
query \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9c46b8e100>

In [49]:
query = ds.filter(col('INTERACTION').contains("3")).groupBy(year('EVENT_DATE')).count()
query \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f9c46cbe700>

### Windowing

In [13]:
windowedCounts = ds \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(ds.timestamp, "10 minutes", "5 minutes"),
        ds.ISO) \
    .count()

In [14]:
windowedCounts.writeStream.format("console").outputMode("update").start()

<pyspark.sql.streaming.StreamingQuery at 0x7fdb3d82d760>