### Spark Streaming
The following code has to be executed in the pyspark instance in the notebook of our EMR cluster

#### First configure the spark application

In [None]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.2",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
        "spark.sql.streaming.checkpointLocation": "/tmp/"
    }
}

We load the data from our Kafka topic in a dataframe

In [None]:
import pyspark.sql.functions as f

In [None]:
df = (
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", 'ec2-3-95-18-154.compute-1.amazonaws.com:9092'], #here input the correct DNS of our kafka server
      .option("subscribe", "views")
      .load()
)

We now create the schema and apply it to our stream

In [None]:
from pyspark.sql.types import *

pythonSchema=(
    StructType()
        .add('comment_id',StringType())
        .add('user',StringType())
        .add('timestamp',StringType())
        .add('device',StringType())
        .add('geo',StringType())
        .add('minutes',DoubleType())
)

In [None]:
df_views=(
    df
        .select(f.col('value').cast('String').alias('jsonData'))
        .select(f.from_json('jsonData',pythonSchema).alias('views'))
        .select('views.*')

)

In order to work with streaming data we have to create an stream object

In [None]:
stream= (
    df_views
        .writeStream
    .format('memory')
    .queryName('views')
    .start()
)

In [None]:
stream.isActive

In [None]:
spark.table('views').show()

Once the stream is being processed, we can also update our mongodb collection for each row of our stream

In [None]:
sc.install_pypi_package('pymongo') #for pymongo to work in our pyspark notebook we need to install this
sc.install_pypi_package('dnspython')

In [None]:
username='aalferea91'
password='HVPk1tchDKf71SY9'

In [None]:
class SendToMongoDB_ForeachWriter:
    
    def open(self,partition_id,epoch_id):
        import pymongo
        
        client=pymongo.MongoClient(connection = f'mongodb+srv://aalferea91:{password}@lambdaprojectcluster.epdco1f.mongodb.net/?retryWrites=true&w=majority')
        db=client.social
        self.comments_col=db.comments
        
        return True
    
    def process(self,row):
        self.comments_col.update_one(
            {'_id':str(row['comment_id'])},
            {'$inc':{'views':1}},
            upsert=True
        )
    
    def close(self,err):
        if err:
            raise err

In [None]:
stream=(
    df_views
        .writeStream
        .foreach(SendToMongoDB_ForeachWriter())
    .outputMode('append')
    .start()
)

At last we will also use Spark to write the data from the stream in a bucket of S3 in order to have that as our analytical layer for our analysts.

In [None]:
stream=(
    df_views
        .withColumn('year',f.year('timestamp')),
        .withColumn('month',f.month('timestamp')),
        .writeStream
        .format('parquet')
        .outputMode('append')
        .partitionBy('year','month')
        .option('path','s3a://com.mbit.aalferea/views/') #here put the location of your bucket
        .start()
)