# Pre-requisites
* Should have already running Kafka Cluster. The Topic 'mytopic' will be created automatically once we start the Read Stream.
* This code assumes all services like Kafka/Socket Server/Zookeeper are running on the same machine where Jupyter Notebook Process was triggered. If not, update the hostname in Step 3.
* Make sure you have included required streaming jars while starting Jupyter.

Example:

```PYSPARK_DRIVER_PYTHON_OPTS="ANACONDA_HOME/bin/jupyter-notebook --ip hostname --no-browser --notebook-dir HOME/jupyter_notebook_spark" pyspark --jars HOME/jars/spark-sql-kafka-0-10_2.11-2.3.1.jar,KAFKA_HOME/libs/kafka-clients-2.0.0.jar --master yarn-client --executor-memory 8g --num-executors 3 --executor-cores 4 --name Notebook &```

Jars Download Link:
    1. wget http://central.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.3.1/spark-sql-kafka-0-10_2.11-2.3.1.jar
    2. kafka-clients-2.0.0.jar can be found at $KAFKA_HOME/libs

# Step 1: 
* Importing Required Python/SparkSQL Modules
* Stop any Active Query.

Different Managing Actions on the query-
```
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name  // get the name of the auto-generated or user-specified name
query.explain()  // print detailed explanations of the query
query.stop()  // stop the query
query.awaitTermination()  // block until query is terminated, with stop() or with error
query.exception  // the exception if the query has been terminated with error
query.recentProgress  // an array of the most recent progress updates for this query
query.lastProgress  // the most recent progress update of this streaming query```

In [23]:
# Import SparkSQL Modules
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Other Python Modules
import os, sys, subprocess, json
import datetime, time
import socket, signal

try:
    if append_mode_query.status['message'] != 'Stopped':
        append_mode_query.stop()
except:
    pass

# Step 2:
* Update the 'source' variable as required. 
* Available Options are 'socket' or 'kafka'

In [24]:
# Input Source (socket or Kafka)
source = 'KAFKA'

# Step 3:
## Setting required variables:

   * ### hostname: 
     * Server where netcat or kafka Broker is Running. If your source is running on a separate Machine, hardcode the 'hostname' variable to required value.
    
   * ### schema: 
      * To be applied on the Incoming Stream.
      
    By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures.

In [25]:
# Socket Server Details

hostname = socket.gethostname()

socketServer = hostname
socketPort = 9999

# Kafka Server Details
kafkaBrokerServer = hostname
kafkaBrokerPort = 9092
zookeeperServer = hostname
zookeeperPort = 2185

kafkaBroker = kafkaBrokerServer+":"+str(kafkaBrokerPort)

schema = T.StructType([T.StructField("fname", T.StringType(), True),\
                         T.StructField("lname", T.StringType(), True),\
                         T.StructField("email", T.StringType(), True),\
                         T.StructField("principal", T.StringType(), True),\
                         T.StructField("passport_make_date", T.StringType(), True),\
                         T.StructField("passport_expiry_date", T.StringType(), True),\
                         T.StructField("ipaddress", T.StringType(), True),\
                         T.StructField("mobile", T.StringType(), True)])

# Step 4:
* Creating a function to validate an Email Address received in the Stream.
* Registering this function as a spark UDF.


In [26]:
def validate_email(email_address):
        try:
                import re
                match = re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email_address.lower())
                if match == None:
                        status = False
                else:
                        status = True
                return status
        except Exception,e:
                print ("Failed to Validate the Email Address !!")
                print ("ERROR: " , e)
                
# Register the Function as UDF
validate_email_address = F.udf(validate_email, T.BooleanType())

# Step 5:
### Collecting the incoming Stream.
     
* .readStream: To start reading the Stream.
* .format: The Data Source. Available values are Socket, Directory and Kafka.
* .option: Takes the required details like socketServer or kafkaBroker. An important one '.option("sep", ";")' is to use desired filed separator.
   
### Printing the Default Schema of Raw Stream.
    
* The Streamed DataFrame contains one column of strings named “value” for sure, and each line in the streaming text data becomes a row in the table. 
* There can be other columns too depending on the Data Source. For eq. Kafka will have Columns - key, value, topic, partition, offset, timestamp and timestampType
    
### Creating recordDF DataFrame by applying required Schema on the Stream.
    
* .schema: Apply required schema on Column "value". This will give us all desired columns Drived from the streamed data.
    
### Printing the New Schema.
  

In [27]:
if source.lower() == 'socket':
    socketStream = spark\
               .readStream\
               .format('socket')\
               .option('host', socketServer)\
               .option('port', socketPort)\
               .load()
    
    print "\nPrinting Raw Socket Stream Schema.\n"
    
    socketStream.printSchema()
    
    recordDF = socketStream.select(socketStream.value).where(socketStream.value != '')
    
    recordDF = recordDF.withColumn("value", F.from_json("value", schema)).select(F.col('value.*'))
    
    print "\nPrinting recordDF DataFrame Schema.\n"
    
    recordDF.printSchema()

elif source.lower() == 'kafka':
    kafkaStream = spark\
                .readStream\
                .format('kafka')\
                .option('kafka.bootstrap.servers', kafkaBroker)\
                .option('subscribe', 'mytopic')\
                .load()                
    
    print "\nPrinting Raw Kafka Stream Schema.\n"
    
    kafkaStream.printSchema()
                
    kafkaStream = kafkaStream.select(F.from_json(F.col("value").cast("string"), schema).alias("value"))
    
    recordDF = kafkaStream.select("value.*").where(kafkaStream.value.isNotNull())
    
    print "\nPrinting recordDF DataFrame Schema.\n"
    
    recordDF.printSchema()
    
else:
    print "Please choose sources - Socket or Kafka !!"


Printing Raw Kafka Stream Schema.

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


Printing recordDF DataFrame Schema.

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- email: string (nullable = true)
 |-- principal: string (nullable = true)
 |-- passport_make_date: string (nullable = true)
 |-- passport_expiry_date: string (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- mobile: string (nullable = true)



# Step 6:
## Applying basic Transformations like
* Generating a new Column 'event_time'.
* Renaming an existing Column.
* Validating the email Column for valid Emails.

In [28]:
# Adding the EVENT Time
recordDF = recordDF.withColumn("event_time", F.current_timestamp().cast(T.TimestampType()))

# Just Renaming a Column
recordDF = recordDF.select(
              recordDF.fname,
              recordDF.lname,
              recordDF.email,
              recordDF.passport_make_date,
              recordDF.passport_expiry_date,
              recordDF.ipaddress,
              recordDF.mobile,
              )\
              .withColumnRenamed(
                "principal",
                "realm"
              )

# Add new column having Email validity Status
recordDF = recordDF.select("*" , validate_email_address(recordDF.email).alias("is_email_valid"))

# Step 7:
## Applying some more Transformations like
* Change the Data Type of passport_make_year and passport_expiry_year from String to TimestampType().
* Generate a new column showing the year for passport make and expire.
* Choosing Required Columns.
        

In [29]:
# Change the Data Type of passport_make_year and passport_expiry_year from String to TimestampType()
recordDF = recordDF.withColumn("passport_make_date", recordDF.passport_make_date.cast(T.TimestampType()))\
        .withColumn("passport_expiry_date", recordDF.passport_expiry_date.cast(T.TimestampType()))\
        .withColumn("mobile", recordDF.mobile.cast(T.LongType()))

# Generate a new column showing the year for passport make and expire
recordDF = recordDF.withColumn("passport_make_year", F.year(recordDF.passport_make_date).cast(T.IntegerType()))\
        .withColumn("passport_expiry_year", F.year(recordDF.passport_expiry_date).cast(T.IntegerType()))


# Choose Required columns:
recordDF = recordDF.select(
              recordDF.fname,
              recordDF.lname,
              recordDF.email,
              recordDF.mobile,
              recordDF.passport_expiry_year,
              recordDF.is_email_valid,
              )

# Step 8:
### Writing the Transformed Streamed Data into Memory.
* .queryName: Optionally, specify a unique name of the query for identification. Compulsory for format('memory').


* .outputMode: 
    Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink.
    Complete Mode: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries
    Update Mode: Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink


* .format: 
    Sink where to store the data. Available options - Memory, Console, File, Kafka, Foreach and ForeachBatch.
    For File Sink, this option can take the output format like .format('parquet').


* .option: Important options
    ```
    truncate: Whether to truncate the Columns having large values.
    numRows: How many rows should be displayed.
    checkpointLocation: Required for File and Kafka sinks.
    path: Used for File sink. Path to an HDFS Location.
    kafka.bootstrap.servers: Comma separated KafkaBroker:Port Details
    topic: To which topic will be sending the data.
    ```

* .trigger: The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.
    ```
    unspecified (default): If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

    Fixed interval micro-batches: The query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals. If no new data is available, then no micro-batch will be kicked off.

    One-time micro-batch	The query will execute *only one* micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster.

    Continuous with fixed checkpoint interval: Experimental as of Spark verison 2.4.0
    ```
    
* .start: Start the Streaming.

In [30]:
append_mode_query = recordDF.writeStream\
                      .queryName("append_mode_query")\
                      .outputMode("append")\
                      .format("memory")\
                      .option("truncate", "false")\
                      .option("numRows", 5)\
                      .trigger(processingTime='5 seconds')\
                      .start()

# Step 9:
## Sample Data for Input
* If your source is a Socket Server, use below Command to Start Socket Server
    ```
    nc -lk hostname 9999
    ```
  
* If your source is Kafka, use below Command to Start the Producer

    ```
    KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafkaBroker --topic mytopic
    ```
    
* Use below sample Data as Input one by one. Make Sure you use the same Format.


* ROUND 1: To Add some records

{"fname" : "Billy","lname" : "Clark","email" : "Billy_Clark@@yahoo.com","principal" : "Billy@EXAMPLE.COM","passport_make_date" : "2013-07-30 14:39:59.964057","passport_expiry_date" : "2023-07-30 14:39:59.964057","ipaddress" : "142.195.1.154" , "mobile" : "9819415434"}

{"fname" : "Wayne","lname" : "iller","email" : "Wayne_iller@bnz.co.nz","principal" : "Wayne@EXAMPLE.COM","passport_make_date" : "2011-03-30 14:40:00.973376","passport_expiry_date" : "2021-03-30 14:40:00.973376","ipaddress" : "81.187.181.223" , "mobile" : "9828826164"}

{"fname" : "Christian","lname" : "weign","email" : "Christian_weign@tcs.com","principal" : "Christian@EXAMPLE.COM","passport_make_date" : "2013-02-28 14:40:01.982722","passport_expiry_date" : "2023-02-28 14:40:01.982722","ipaddress" : "158.169.175.39" , "mobile" : "9870023860"}

{"fname" : "Daniel","lname" : "weign","email" : "Daniel_weign#gmail.com","principal" : "Daniel@EXAMPLE.COM","passport_make_date" : "2016-07-29 14:40:02.992117","passport_expiry_date" : "2026-07-29 14:40:02.992117","ipaddress" : "203.81.64.23" , "mobile" : "9804581156"}

* ROUND 2: To Update the lname

{"fname" : "Thomas","lname" : "dagarin","email" : "Thomas_dagarin@@gmail.com","principal" : "Thomas@EXAMPLE.COM","passport_make_date" : "2013-10-29 14:40:05.003005","passport_expiry_date" : "2023-10-29 14:40:05.003005","ipaddress" : "108.125.242.158" , "mobile" : "9823308381"}

{"fname" : "Russell","lname" : "Wright","email" : "Russell_Wright@nbc.com","principal" : "Russell@EXAMPLE.COM","passport_make_date" : "2012-07-30 14:40:06.011988","passport_expiry_date" : "2022-07-30 14:40:06.011988","ipaddress" : "64.169.155.253" , "mobile" : "9848316824"}

{"fname" : "Kyle","lname" : "kumar","email" : "Kyle_kumar@hotmail.com","principal" : "Kyle@EXAMPLE.COM","passport_make_date" : "2012-04-30 14:40:07.021665","passport_expiry_date" : "2022-04-30 14:40:07.021665","ipaddress" : "198.196.75.63" , "mobile" : "9837699819"}


# Step 10:

* Make sure Step 9 is ready to take your Input Data.
* Query the Streamed Data like a Table.

Filtering the Stream based on Valid email addresses.

In [33]:
spark.sql("SELECT * FROM append_mode_query where is_email_valid = 'true'").show(10,False)

+-------+------+----------------------+----------+--------------------+--------------+
|fname  |lname |email                 |mobile    |passport_expiry_year|is_email_valid|
+-------+------+----------------------+----------+--------------------+--------------+
|Russell|Wright|Russell_Wright@nbc.com|9848316824|2022                |true          |
|Kyle   |kumar |Kyle_kumar@hotmail.com|9837699819|2022                |true          |
+-------+------+----------------------+----------+--------------------+--------------+

