In [15]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import asc
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
from pyspark.sql.types import StringType,DateType,DataType,datetime,TimestampType
from pyspark.sql.types import IntegerType,FloatType,datetime,DatetimeConverter
from pyspark.sql.functions import desc,col
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import asc,unix_timestamp, from_unixtime
from pyspark.streaming import StreamingListener
from pyspark.sql import Window
from datetime import datetime
from dateutil import tz
import pyspark.sql.types as tp
import pyspark.sql.functions as func
from pyspark.sql.functions import unix_timestamp, from_unixtime
import sqlite3
from sqlite3 import Error
from dateutil import parser
import iso8601

If spark is to be run in local mode the setMaster has to be "local" or the IP address of the master computer if its to be run in a distributed node.
For local node, the number of thread (proccessors) can also be set with "local[2]" for 2 proccessors. 

In [2]:
# create a configuration setting for the spark object
configuration = SparkConf().setAppName('spark streaming').setMaster('local')

A sparkcontext is used to create  RDD objects on the clusters. RDD objects are Resilient distributed dataset

In [3]:
# initializing a sparkcontext
sc= SparkContext(conf=configuration)

A Sparksession is used to create Dataframes object, most pandas dataframe method can be applied on the spark dataframe.

In [4]:
#initializing a sparksession
spark = SparkSession \
         .builder \
         .appName("spark streaming") \
         .getOrCreate()

In [5]:
# verifying the configuration of the sparkcontext using a sparksession object
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.id', 'local-1599026175002'),
 ('spark.driver.port', '4148'),
 ('spark.driver.host', 'DESKTOP-JG0SMAD'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'spark streaming'),
 ('spark.ui.showConsoleProgress', 'true')]

In [6]:
# needed to set the schema of the dataframe to be streamed. ommiting this will cause the sparksession read method to first
#deduce the schema of the input before starting its read process. this takes additional time, so collectig the schema  
# information ahead makes the read method faster

source = "file:///Users/iroch/dev/toprocess"

data = spark.read.load("C:/Users/iroch/dev/process/household_data_15min_singleindex.csv",format="csv", sep=",",
                     header="true")

This script below has two section, the database connection function and the streaming process. the streaming process uses a foreachBatch method to output the streaming job to a sink. the ForeachBatch method accepts a function that is written to do a connection to a database.

In [16]:

#function to do a connection to a database after recieving the dataframe streams in batches

def database_conn(datafrm,batchId):
    # This function performs database_connection when called, it collects some batch of the streaming data and performs a connection to 
    # the batabase. it also collects the batch id of the stream as well.
    # it performs methods like database open and close using a try and catch exception.
    
    db_p = "C:/here/the/path_to/the_database/goes/db.sqlite3"
    dlist = datafrm.collect()
    df =pd.DataFrame(data=dlist,columns=["utc_timestamp",'dishwasher','electric_vehicle','freezer','grid_export','grid_import',\
                              'heat_pump','pv','refrigerator','washing_machine','total_load_consume'])
    

    
    try:
        conn = None
        conn = sqlite.connect(db_p)
            
        c = conn.cursor()
        for i,row in df.iterrows():
            
            # the data are inserted by rows
            
            row["utc_timestamp"]= datetime.strptime(row["utc_timestamp"],'%Y-%m-%dT%H:%M:%SZ')
            
            val =(row["utc_timestamp"],row["dishwasher"],row['electric_vehicle'],row['freezer'],row['grid_export'],\
                  row['grid_import'],row['heat_pump'],row['pv'],row['refrigerator'],row['washing_machine'],row['total_load_consume'])
            
            sql = """insert into forecast_fcast (utc_timestamp,dishwasher,electric_vehicle,freezer,grid_export,grid_import,heat_pump,pv,\
                    refrigerator,washing_machine,total_load_consume) values (?,?,?,?,?,?,?,?,?,?,?)"""
            
            
            c.execute(sql,val)
            conn.commit()
    
        
        
    except Error:
        print('failed to connect')
    finally:
        if conn:
            c.close()
            conn.close()

            
            
# begin the streaming with info of the source to listen to for data, the schema of the expected data and to use the column
#names.
dstream = spark.readStream.csv(path=source,schema=data.schema, header=True)


# select the specified columns from the input stream

maincol = dstream.select("utc_timestamp","cet_cest_timestamp","DE_KN_residential4_dishwasher","DE_KN_residential4_ev",
 "DE_KN_residential4_freezer","DE_KN_residential4_grid_export","DE_KN_residential4_grid_import",
 "DE_KN_residential4_heat_pump","DE_KN_residential4_pv","DE_KN_residential4_refrigerator","DE_KN_residential4_washing_machine")


# change the data type as suitable for the task

maincol = maincol.withColumn("time_stamp",maincol["utc_timestamp"].cast(DateType()))
maincol = maincol.withColumn("dishwasher",maincol["DE_KN_residential4_dishwasher"].cast(FloatType()))
maincol = maincol.withColumn("electric_vehicle",maincol["DE_KN_residential4_ev"].cast(FloatType()))
maincol = maincol.withColumn("freezer",maincol["DE_KN_residential4_freezer"].cast(FloatType()))
maincol = maincol.withColumn("grid_export",maincol["DE_KN_residential4_grid_export"].cast(FloatType()))
maincol = maincol.withColumn("grid_import",maincol["DE_KN_residential4_grid_import"].cast(FloatType()))
maincol = maincol.withColumn("heat_pump",maincol["DE_KN_residential4_heat_pump"].cast(FloatType()))
maincol = maincol.withColumn("pv",maincol["DE_KN_residential4_pv"].cast(FloatType()))
maincol = maincol.withColumn("refrigerator",maincol["DE_KN_residential4_refrigerator"].cast(FloatType()))
maincol = maincol.withColumn("washing_machine",maincol["DE_KN_residential4_washing_machine"].cast(FloatType()))


# fill in missing values with float 0.00

maincol2 = maincol.na.fill(0.00, subset=['dishwasher','electric_vehicle','freezer','grid_export','grid_import',\
                              'heat_pump','pv','refrigerator','washing_machine'])

# perform addition operation on the loads to get a total load used in the resident

main1 =maincol2.withColumn('total_1',col('electric_vehicle')+col('dishwasher')+col('freezer'))
main2 =main1.withColumn('total_2',col('total_1')+col('refrigerator')+col('washing_machine'))
main3 =main2.withColumn('total_load_consume',col('total_2')+col('heat_pump'))


#drop unimportant columns from the stream

Dmain = main3.drop('total_1','total_2',"cet_cest_timestamp","DE_KN_residential4_ev","DE_KN_residential4_dishwasher",\
                   "DE_KN_residential4_freezer","DE_KN_residential4_grid_export","DE_KN_residential4_grid_import","DE_KN_residential4_heat_pump",\
                  "DE_KN_residential4_pv","DE_KN_residential4_refrigerator","DE_KN_residential4_washing_machine","time_stamp")

# write the datastream to a batabase using the provided function. write the checkpoint information to a secured location in 
#case of failures during streaming. this info can be used to restore the streaming process. retrieve the next
#data from source every 200 seconds with the trigger, then start the streaming without termination.

query = Dmain.writeStream.foreachBatch(database_conn).outputMode("append").option("checkpointLocation", "C:/Users/iroch/dev/result/checkpoint")\
            .trigger(processingTime ='200 seconds').start()




In [17]:
# check if the streaming process is running

Dmain.isStreaming

True

In [18]:
# print the last progress

print(query.lastProgress)

{'id': '409fa97a-038e-4a97-8898-1bc6d3f9cdfc', 'runId': 'de08df24-76e0-49c0-8c4a-5a9ca1e596c1', 'name': None, 'timestamp': '2020-09-02T06:09:33.048Z', 'batchId': 6, 'numInputRows': 0, 'processedRowsPerSecond': 0.0, 'durationMs': {'getOffset': 66, 'triggerExecution': 365}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/Users/iroch/dev/toprocess]', 'startOffset': {'logOffset': 5}, 'endOffset': {'logOffset': 5}, 'numInputRows': 0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'ForeachBatchSink'}}


In [19]:
# print the status

print(query.status)

{'message': 'Waiting for next trigger', 'isDataAvailable': False, 'isTriggerActive': False}
