In [1]:
import os
from pyspark.sql import SparkSession
import requests as req
import zipfile as zf
import pandas as pd
from pathlib import Path
import sweetviz as sv # to print df profile
import glob
import gzip
import shutil
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from pyspark.sql.functions import input_file_name, regexp_extract, col, avg, min, max, stddev, count,\
                                    countDistinct, to_timestamp, col, when, split, regexp_replace,\
                                    dayofmonth, month, year, hour, minute, second
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, IndexToString
from pyspark.ml import Pipeline

In [2]:
# Start the Spark session
spark=SparkSession \
    .builder \
    .appName("IDEAL_dataset_spark_pipeline") \
    .config("spark.jars", "/home/abcxyz/userid123/Downloads/postgresql-42.7.3.jar") \
    .getOrCreate()

24/03/28 19:59:52 WARN Utils: Your hostname, MD-061377 resolves to a loopback address: 127.0.1.1; using 192.168.1.57 instead (on interface wlp0s20f3)
24/03/28 19:59:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/28 19:59:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/28 19:59:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/03/28 19:59:54 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/03/28 19:59:54 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
# download metadata zipped file from the url metadata_url
metadata_url = "https://datashare.ed.ac.uk/bitstream/handle/10283/3647/metadata_and_surveys.zip?sequence=20&isAllowed=y"
mtdt_download = req.get(metadata_url)

In [4]:
# download household_sensors (hhs) zipped file containing ,csv.gz files
# very large file over 15GB zipped file initially downloaded and 
# extracted with 128GB RAM Ubuntu machine, sample extract used here.

#hhs_url="https://datashare.ed.ac.uk/bitstream/handle/10283/3647/household_sensors.zip?sequence=25&isAllowed=y"
#hhsensor_download = req.get(household_sensors_url)

# Note: the sample data has been provided in https://github.com/DataEng-AML/surveypaper/tree/main

In [5]:
# Extract the metadata and survey files but interest only in the metadata files as listed
dstpath = '/home/abcxyz/userid123/Downloads'
allfiles=[os.path.join(dstpath,file) for file in os.listdir(dstpath) if file.endswith(".csv")]#check for csv files
with open("metadata_and_surveys.zip", "wb") as each_csv: # open the metadata_and_surveys.zip file
    each_csv.write(mtdt_download.content) # write the downloaded file into each_csv
    with zf.ZipFile("metadata_and_surveys.zip","r") as myzip: # open the zip file
        myzip.extractall("metadata_and_surveys") # extract the zip file
        for file in myzip.namelist(): # iterate over each extracted file
            ext = os.path.splitext(file)[1] # retrieve the file extension
            path_name = os.path.splitext(file)[0] # retrieve the basename
            if ext == ".csv" and "metadata" in path_name: # check if the file in metadata folder is a csv
                print(path_name) # print the basename 

metadata/weatherfeed
metadata/meterreading
metadata/location
metadata/room
metadata/tariff
metadata/sensor
metadata/home
metadata/person
metadata/other_appliance
metadata/appliance
metadata/sensorbox


In [6]:
# Assign directories to receive the household sensor data

# path to sample household sensor data
hh_file_path = '/home/abcxyz/userid123/Downloads/household_sensors.zip'

# existing path to receive the unzipped output from .zip
hh_unzip_loc = '/home/abcxyz/userid123/household_sensors'

# existing path for the extracted csv from the .csv.gz output
hh_csv_loc = '/home/abcxyz/userid123/household_sensors_csv' 

In [7]:
# Unzip and extract the content of the .csv.gz household sensor sample data
with zf.ZipFile(hh_file_path,"r") as hhzip:
    hhzip.extractall(hh_unzip_loc) 
    
# list the files in location if the files match hom308 amd are .gz. Check subfolders with recursive=True
all_files = glob.glob(f"{hh_unzip_loc}/**/home308*.gz", recursive=True)# example only interested in home308 files

# iterate over the extracted file location
for folder in all_files:
    extract_hh_csv=os.path.splitext(os.path.basename(folder))[0] # extract file basename without extension
    
    # open the .gz file
    with gzip.open(folder, 'rb') as input_gz, open(os.path.join(hh_csv_loc, extract_hh_csv), 'wb') as output_csv:
        shutil.copyfileobj(input_gz, output_csv) # extracted csv files copied into hh_csv_loc

# iterate over the files in hh_csv_loc and print their names       
for each in [file for file in os.listdir(hh_csv_loc) if file.endswith('.csv')]:
    print(each) # printing the household sensor filenames in hh_csv_loc

home308_outside2829_sensor19926_gas-pulse_gas.csv
home308_livingroom2828_sensor19920c19924_electric-mains_electric-combined.csv
home308_kitchen2831_sensor19973_tempprobe_hot-water-hot-pipe.csv
home308_kitchen2831_sensor19972_tempprobe_hot-water-cold-pipe.csv
home308_kitchen2831_sensor19967_tempprobe_central-heating-return.csv
home308_kitchen2831_sensor19968_tempprobe_central-heating-flow.csv


In [8]:
# Preview the sensor data csv with .show() method after reading the csv
preview_19926 = spark.read.csv(hh_csv_loc +"/home308_outside2829_sensor19926_gas-pulse_gas.csv")

preview_19926.show(5) 

+-------------------+---+
|                _c0|_c1|
+-------------------+---+
|2018-03-07 13:35:17|316|
|2018-03-07 13:41:04|316|
|2018-03-07 13:41:52|316|
|2018-03-07 13:42:36|316|
|2018-03-07 13:43:17|316|
+-------------------+---+
only showing top 5 rows



In [9]:
# Define column and names to add to the dataframe
# Create spark dataframes from the household sensor csv
# Create a new column and populate with the source csv file name
# Integrate the readings from the sensors for home 308 in the example

# names of files in hh_csv_loc ending in .csv
hh_csv_list = [file for file in os.listdir(hh_csv_loc) if file.endswith('.csv')] # list of the csv filenames

# initialize the variable spark_hh_dataframe to None
spark_hh_dataframe = None

# iterate over the file name in the list hh_csv_list
# TimestampType --> timestamp datatype, IntegerType --> integer datatype
for all_file in hh_csv_list:
    column = StructType([StructField("datetime", TimestampType(), True),\
                         StructField("value", IntegerType(), True),])
    
    # create Spark dataframe with the datatype schema defined with StructType, StructField 
    each_file = spark.read.csv(os.path.join(hh_csv_loc,all_file), schema=column)
    
    #Extract the csv filename and use it to fill a new feature csv_filename
    each_file = each_file.withColumn("csv_filename", regexp_extract(input_file_name(), "[^/]*$",0)) 
    
    # Assign each dataframe to the initialized variable and union the dataframes afterwards 
    if spark_hh_dataframe is None:
        spark_hh_dataframe = each_file
    else:
        spark_hh_dataframe=spark_hh_dataframe.union(each_file) # create a union of the files

spark_hh_dataframe.show(5, truncate=False) # truncate=False to view the full entry each feature

+-------------------+-----+-------------------------------------------------+
|datetime           |value|csv_filename                                     |
+-------------------+-----+-------------------------------------------------+
|2018-03-07 13:35:17|316  |home308_outside2829_sensor19926_gas-pulse_gas.csv|
|2018-03-07 13:41:04|316  |home308_outside2829_sensor19926_gas-pulse_gas.csv|
|2018-03-07 13:41:52|316  |home308_outside2829_sensor19926_gas-pulse_gas.csv|
|2018-03-07 13:42:36|316  |home308_outside2829_sensor19926_gas-pulse_gas.csv|
|2018-03-07 13:43:17|316  |home308_outside2829_sensor19926_gas-pulse_gas.csv|
+-------------------+-----+-------------------------------------------------+
only showing top 5 rows



In [10]:
# Preview the metadata csv with .show() method after reading the csv

# path to the unzipped metadata csv output
mtdt_csv_loc = '/home/abcxyz/userid123/metadata_and_surveys/metadata'

# metadata csv filenames
mtdt_csv_list = [file for file in os.listdir(mtdt_csv_loc) if file.endswith('.csv')] 

# inferSchema = True to preserve the schema of the dataset
preview_meterreading= spark.read.csv(mtdt_csv_loc +"/meterreading.csv", header=True, inferSchema=True)

preview_meterreading.show(5,truncate=False)

+------+----------+----------------+-----------+----------+-------+
|homeid|provenance|provenancedetail|energytype |date      |reading|
+------+----------+----------------+-----------+----------+-------+
|77    |technician|repair_visit    |electricity|2018-01-16|30561.0|
|77    |technician|repair_visit    |gas        |2018-01-16|8081.0 |
|79    |technician|repair_visit    |electricity|2018-01-16|28822.0|
|79    |technician|repair_visit    |gas        |2018-01-16|5152.0 |
|96    |technician|repair_visit    |electricity|2018-01-15|18532.0|
+------+----------+----------------+-----------+----------+-------+
only showing top 5 rows



In [11]:
# some statistics of the dataset, using the preview_meterreading example
preview_meterreading.toPandas().describe(include='all')

Unnamed: 0,homeid,provenance,provenancedetail,energytype,date,reading
count,746.0,746,746,746,746,746.0
unique,,125,4,2,163,
top,,technician,installation_visit,electricity,2018-05-23,
freq,,425,393,374,40,
mean,211.453083,,,,,148638.2
std,70.347468,,,,,1255496.0
min,62.0,,,,,1.268
25%,157.0,,,,,5936.0
50%,216.0,,,,,17276.0
75%,266.0,,,,,41959.0


In [12]:
preview_meterreading.toPandas().isnull().sum()

homeid              0
provenance          0
provenancedetail    0
energytype          0
date                0
reading             0
dtype: int64

In [13]:
# iterate over the csv in the folder to create spark dataframes
# prefix each dataframe with spark_ in addition to the source csv file


spark_prefix = "spark_"

# initialize the variable spark_df
spark_df = {}

# loop over the mtdt_csv_list and create Spark dataframe from each csv in the folder
for all_metadata_file in mtdt_csv_list:
    metadata_file = spark.read.csv(os.path.join(mtdt_csv_loc,all_metadata_file), header=True, inferSchema=True)
    each_mtdt_filename = spark_prefix+os.path.splitext(all_metadata_file)[0]
    spark_df[each_mtdt_filename] = metadata_file


# loop over Spark dataframe created in the previous loop and display the first 5 rows of the dataframe
for mtdt_fn, each_metadata_file in spark_df.items():
    each_metadata_file
    print(f"Spark DataFrame: {mtdt_fn}")
    each_metadata_file.show(5)
    
# loop over Spark dataframe created in the previous loop and print the schema    
for mtdt_fn, each_metadata_file in spark_df.items():
    print(f"Spark Schema: {mtdt_fn}")
    each_metadata_file.printSchema()


24/03/28 20:00:07 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Spark DataFrame: spark_home
+------+------------+---------+---------+----------------+------------------+----------------+------------------+------------------+-----------+-----------------+------------------+---------+-----------------+-----------------+-----------+--------------+--------------+----------------+-------------+---------------+-----------+--------------------+--------------+
|homeid|install_type| location|residents|       starttime|starttime_enhanced|         endtime|          cohortid|       income_band|study_class|         hometype|equivalised_income|occupancy|urban_rural_class| urban_rural_name|  build_era|new_build_year|smart_monitors|smart_automation|occupied_days|occupied_nights|entry_floor|       outdoor_space|outdoor_drying|
+------+------------+---------+---------+----------------+------------------+----------------+------------------+------------------+-----------+-----------------+------------------+---------+-----------------+-----------------+-----------+---

In [14]:
# Change data types for some datetime columns from string to a format of timestamp within spark_home dataframe

# assign some columns of interest to the variable timestampcol_spark_home
timestampcol_spark_home = ["starttime", "starttime_enhanced", "endtime"]
col_format = "dd/MM/yyyy HH:mm"

# loop over the listed features in the variable timestampcol_spark_home and perform the datatype change
for colname in timestampcol_spark_home:
    spark_df['spark_home']=spark_df['spark_home'].withColumn(colname, to_timestamp(colname,col_format))
    

In [15]:
# display the schema to see if the changes took place 
spark_df['spark_home'].printSchema()

root
 |-- homeid: integer (nullable = true)
 |-- install_type: string (nullable = true)
 |-- location: string (nullable = true)
 |-- residents: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- starttime_enhanced: timestamp (nullable = true)
 |-- endtime: timestamp (nullable = true)
 |-- cohortid: string (nullable = true)
 |-- income_band: string (nullable = true)
 |-- study_class: string (nullable = true)
 |-- hometype: string (nullable = true)
 |-- equivalised_income: string (nullable = true)
 |-- occupancy: string (nullable = true)
 |-- urban_rural_class: string (nullable = true)
 |-- urban_rural_name: string (nullable = true)
 |-- build_era: string (nullable = true)
 |-- new_build_year: integer (nullable = true)
 |-- smart_monitors: string (nullable = true)
 |-- smart_automation: string (nullable = true)
 |-- occupied_days: integer (nullable = true)
 |-- occupied_nights: integer (nullable = true)
 |-- entry_floor: string (nullable = true)
 |-- outdoor_space

In [16]:
# Retrieves a list of the keys in the dictionary
list(spark_df.keys())

['spark_home',
 'spark_sensor',
 'spark_person',
 'spark_room',
 'spark_tariff',
 'spark_other_appliance',
 'spark_appliance',
 'spark_sensorbox',
 'spark_weatherfeed',
 'spark_meterreading',
 'spark_location']

In [17]:
# create appropraite names for the spark dataframes
spark_home=spark_df['spark_home']
spark_sensor=spark_df['spark_sensor']
spark_person=spark_df['spark_person']
spark_room=spark_df['spark_room']
spark_tariff=spark_df['spark_tariff']
spark_other_appliance=spark_df['spark_other_appliance']
spark_appliance=spark_df['spark_appliance']
spark_sensorbox=spark_df['spark_sensorbox']
spark_weatherfeed=spark_df['spark_weatherfeed']
spark_meterreading=spark_df['spark_meterreading']
spark_location=spark_df['spark_location']

In [18]:
# some statistics of the dataset, using 
spark_meterreading.describe().toPandas().transpose()

24/03/28 20:00:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
homeid,746,211.45308310991956,70.34746798731733,62,335
provenance,746,972.4205607476636,208.8933057829631,1007,technician
provenancedetail,746,,,all_inapp_meters_mid,repair_visit
energytype,746,,,electricity,gas
reading,746,148638.2009155496,1255496.1026341487,1.268,2.2936994E7


In [19]:
# obtain the statistics of the Spark dataframe, convert the result to Pandas and transpose the output 
spark_sensor.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
sensorid,20081,12037.284198994073,6266.531893731428,1174,31573
sensorboxid,20081,2.794922612950086E14,1172.6833868025014,279492261292874,279492261297177
type,20081,,,battery,temperature
unit,20081,,,0.01V,1Wh
status,20081,,,active,offline
roomid,20081,1856.3234400677256,682.543379466405,650,3071
subcircuit_type,86,,,attic plug socket - marked as shower,water_heater
scalingfactor,20081,0.950398790767392,0.21467270601832275,0.00911,1.0
rawunit,1288,,,0.01m3,1ft3


In [20]:
# Check for duplicates - in few examples

spark_weatherfeed.exceptAll(spark_weatherfeed.dropDuplicates()).show()
spark_meterreading.exceptAll(spark_meterreading.dropDuplicates()).show()

+------+------------+----------+----+------+---+
|feedid|weather_type|locationid|unit|source|url|
+------+------------+----------+----+------+---+
+------+------------+----------+----+------+---+

+------+----------+----------------+----------+----+-------+
|homeid|provenance|provenancedetail|energytype|date|reading|
+------+----------+----------------+----------+----+-------+
+------+----------+----------------+----------+----+-------+



In [21]:
# Check record count in the household sensor data 
# Check for and remove duplicates, then reconfirm record count
spark_hh_dataframe.count()

1790955

In [22]:
# Allow logging output to be at ERROR level rather than several logging outputs 
spark.sparkContext.setLogLevel("ERROR")

# show duplicate records
spark_hh_dataframe.exceptAll(spark_hh_dataframe.dropDuplicates()).show(truncate=False)


                                                                                

+-------------------+-----+-----------------------------------------------------------------------------+
|datetime           |value|csv_filename                                                                 |
+-------------------+-----+-----------------------------------------------------------------------------+
|2018-03-25 02:57:55|131  |home308_livingroom2828_sensor19920c19924_electric-mains_electric-combined.csv|
|2018-03-25 02:57:43|131  |home308_livingroom2828_sensor19920c19924_electric-mains_electric-combined.csv|
|2018-03-25 02:57:34|131  |home308_livingroom2828_sensor19920c19924_electric-mains_electric-combined.csv|
|2018-03-25 02:57:26|131  |home308_livingroom2828_sensor19920c19924_electric-mains_electric-combined.csv|
+-------------------+-----+-----------------------------------------------------------------------------+



In [23]:
#remove duplates
dedup_spark_hh_dataframe = spark_hh_dataframe.dropDuplicates()

# replace the spark dataframe with the dedup dataframe
spark_hh_dataframe=dedup_spark_hh_dataframe

# check record count after duplicate removal
spark_hh_dataframe.count()

                                                                                

1790951

In [24]:
# Rename column datetime to timestamp in the spark household sensor dataframe
spark_hh_dataframe = spark_hh_dataframe.withColumnRenamed("datetime", "timestamp")

spark_hh_dataframe.show(5)



+-------------------+-----+--------------------+
|          timestamp|value|        csv_filename|
+-------------------+-----+--------------------+
|2018-03-07 19:37:55| 1264|home308_outside28...|
|2018-03-07 22:56:20| 2527|home308_outside28...|
|2018-03-08 07:25:11|  316|home308_outside28...|
|2018-03-14 20:41:50|  316|home308_outside28...|
|2018-03-15 23:51:01|  316|home308_outside28...|
+-------------------+-----+--------------------+
only showing top 5 rows



                                                                                

In [25]:
# Check to ensure the csv_filename contains the complete set of source csv filenames
spark_hh_dataframe.select("csv_filename").distinct().collect()

                                                                                

[Row(csv_filename='home308_outside2829_sensor19926_gas-pulse_gas.csv'),
 Row(csv_filename='home308_livingroom2828_sensor19920c19924_electric-mains_electric-combined.csv'),
 Row(csv_filename='home308_kitchen2831_sensor19973_tempprobe_hot-water-hot-pipe.csv'),
 Row(csv_filename='home308_kitchen2831_sensor19972_tempprobe_hot-water-cold-pipe.csv'),
 Row(csv_filename='home308_kitchen2831_sensor19967_tempprobe_central-heating-return.csv'),
 Row(csv_filename='home308_kitchen2831_sensor19968_tempprobe_central-heating-flow.csv')]

In [26]:
# Create temporary table with registerTempTable to perform sql like operations
spark_sensor.registerTempTable("spark_sensor")
spark_sensor_output = spark.sql('SELECT * from spark_sensor where sensorid = 13124')
spark_sensor_output.show(5)



+--------+---------------+-----+----+------+------+--------------------+-------------+-------+-------+
|sensorid|    sensorboxid| type|unit|status|roomid|     subcircuit_type|scalingfactor|rawunit|counter|
+--------+---------------+-----+----+------+------+--------------------+-------------+-------+-------+
|   13124|279492261295239|power|  1W|active|  2093|attic plug socket...|          1.0|   NULL|      1|
+--------+---------------+-----+----+------+------+--------------------+-------------+-------+-------+



In [27]:
# change data in a column using spark functions when() and otherwise() 
spark_sensor = spark_sensor.withColumn("status", when(col("status") == \
                                    "offline", "inactive").otherwise(col("status")))
spark_tariff.show(5)

+------+-----------------+--------------------+-----------+---------------------------+-------------------------+
|homeid|notification_date|    provenancedetail| energytype|daily_standing_charge_pence|unit_charge_pence_per_kwh|
+------+-----------------+--------------------+-----------+---------------------------+-------------------------+
|   167|       2016-08-25|primary_facetofac...|electricity|                      23.02|                     NULL|
|    62|       2016-08-26|primary_facetofac...|electricity|                      31.31|                     NULL|
|    62|       2016-08-26|primary_facetofac...|        gas|                      24.74|                     NULL|
|    61|       2016-10-06|primary_facetofac...|electricity|                       0.33|                     0.12|
|    61|       2016-10-06|primary_facetofac...|        gas|                        0.3|                     0.33|
+------+-----------------+--------------------+-----------+---------------------------+-

In [28]:
spark_sensor.registerTempTable("spark_sensor")
spark_sensor_output = spark.sql('SELECT * from spark_sensor where status = "inactive"')
spark_sensor_output.show(5)

+--------+---------------+-----------+-----+--------+------+---------------+-------------+-------+-------+
|sensorid|    sensorboxid|       type| unit|  status|roomid|subcircuit_type|scalingfactor|rawunit|counter|
+--------+---------------+-----------+-----+--------+------+---------------+-------------+-------+-------+
|    4225|279492261293492|      light|0.1cd|inactive|  1011|           NULL|          1.0|   NULL|      1|
|    4226|279492261293492|   humidity| 0.1%|inactive|  1011|           NULL|          1.0|   NULL|      1|
|    4227|279492261293492|temperature| 0.1C|inactive|  1011|           NULL|          1.0|   NULL|      1|
|    4228|279492261293492|    battery|0.01V|inactive|  1011|           NULL|          1.0|   NULL|      1|
|    4229|279492261293492|    battery|0.01V|inactive|  1011|           NULL|          1.0|   NULL|      2|
+--------+---------------+-----------+-----+--------+------+---------------+-------------+-------+-------+
only showing top 5 rows



##### Prepare the household sensor dataframe spark_hh_dataframe and join with spark_tariff

In [29]:
# The column to split
col_to_split = col("csv_filename")

# The column shall be split into the follwing parts
# this have been obtained from the documentation.pdf for IDEAL household energy sources
# that referenced file naming format and the constituent parts
col_name = ["home_id","room_type", "room_id", "sensor_id", "sensorbox_type", "sensor_type"]

# split the compound column csv_filename and assign new column name to each split
# ignoring the homeid in index 0, because the data is from home 308 and the same id over the column

spark_hh_df_split = spark_hh_dataframe\
            .withColumn("home_id", regexp_extract(split(col_to_split, "_").getItem(0),r'\d+',0).cast('INT')) \
            .withColumn("room_type", regexp_extract(split(col_to_split, "_").getItem(1),r'[a-zA-Z]+',0)) \
            .withColumn("room_id", regexp_extract(split(col_to_split, "_").getItem(1),r'\d+',0).cast('INT')) \
            .withColumn("sensor_id", regexp_extract(split(col_to_split, "_").getItem(2),r'\d+',0).cast('INT')) \
            .withColumn("sensorbox_type", split(col_to_split, "_").getItem(3)) \
            .withColumn("sensor_type", regexp_replace(split(col_to_split, "_").getItem(4),'.csv',''))


# Join the spark_hh_dataframe_split and spark_room metadata dataframe to get the associated
# metadata for each room in the home. 
# Do the join on the roomid of the now split spark_hh_dataframe and the roomid of the spark_room metadata

csld_spark_hh_dataframe = spark_hh_df_split\
            .join(spark_sensor, spark_hh_df_split.sensor_id==spark_sensor.sensorid, how='inner')


# Drop columns from the consolidated dataframe
csld_spark_hh_dataframe = csld_spark_hh_dataframe.drop('csv_filename','homeid','notification_date')

# View the schema of the new dataframe
csld_spark_hh_dataframe.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: integer (nullable = true)
 |-- home_id: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- room_id: integer (nullable = true)
 |-- sensor_id: integer (nullable = true)
 |-- sensorbox_type: string (nullable = true)
 |-- sensor_type: string (nullable = true)
 |-- sensorid: integer (nullable = true)
 |-- sensorboxid: long (nullable = true)
 |-- type: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- status: string (nullable = true)
 |-- roomid: integer (nullable = true)
 |-- subcircuit_type: string (nullable = true)
 |-- scalingfactor: double (nullable = true)
 |-- rawunit: string (nullable = true)
 |-- counter: integer (nullable = true)



In [30]:
# select columns of interest as an option rather than drop
csld_spark_hh_dataframe=csld_spark_hh_dataframe.select("timestamp", "value", "home_id","room_type", "room_id",\
                                                       "sensor_id", "sensorbox_type", "sensor_type", "status")

In [31]:
csld_spark_hh_dataframe.show(5)



+-------------------+-----+-------+---------+-------+---------+--------------+-----------+------+
|          timestamp|value|home_id|room_type|room_id|sensor_id|sensorbox_type|sensor_type|status|
+-------------------+-----+-------+---------+-------+---------+--------------+-----------+------+
|2018-03-07 19:37:55| 1264|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|
|2018-03-07 22:56:20| 2527|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|
|2018-03-08 07:25:11|  316|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|
|2018-03-14 20:41:50|  316|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|
|2018-03-15 23:51:01|  316|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|
+-------------------+-----+-------+---------+-------+---------+--------------+-----------+------+
only showing top 5 rows



                                                                                

In [32]:
# check for missing values in the columns

# get the missing values in the dataframe
missing_values_df = csld_spark_hh_dataframe.selectExpr(*[f"sum(int({col} is null)) as {col}" \
                                                         for col in csld_spark_hh_dataframe.columns])

In [33]:
missing_values_df.show()



+---------+-----+-------+---------+-------+---------+--------------+-----------+------+
|timestamp|value|home_id|room_type|room_id|sensor_id|sensorbox_type|sensor_type|status|
+---------+-----+-------+---------+-------+---------+--------------+-----------+------+
|        0|    0|      0|        0|      0|        0|             0|          0|     0|
+---------+-----+-------+---------+-------+---------+--------------+-----------+------+



                                                                                

In [34]:
# Display some statistics, results without scientific notation
with pd.option_context('display.float_format', '{:.2f}'.format):
    stats_csld_spark_hh_dataframe = csld_spark_hh_dataframe.toPandas().describe(include='all')
    print(stats_csld_spark_hh_dataframe)

                                                                                

                         timestamp      value    home_id   room_type  \
count                      1790951 1790951.00 1790951.00     1790951   
unique                         NaN        NaN        NaN           3   
top                            NaN        NaN        NaN  livingroom   
freq                           NaN        NaN        NaN     1382973   
mean    2018-03-16 13:10:10.050400     341.26     308.00         NaN   
min            2018-03-07 13:21:30       0.00     308.00         NaN   
25%            2018-03-12 09:07:09     133.00     308.00         NaN   
50%            2018-03-16 13:45:56     184.00     308.00         NaN   
75%     2018-03-20 20:01:22.500000     301.00     308.00         NaN   
max            2018-03-25 14:44:32   94781.00     308.00         NaN   
std                            NaN     574.13       0.00         NaN   

          room_id  sensor_id  sensorbox_type        sensor_type   status  
count  1790951.00 1790951.00         1790951            1790

In [35]:
# for the categorical features, assign index values to the distinct entries

# select the columns to assign index and a new name for the indexed output
cat_features = [("room_type","roomtype"),("sensorbox_type","sensorboxtype"),\
                ("sensor_type","sensortype"),("status","sensor_status")]

# create a list, iterate over and apply index using StringIndexer to transform the features, apply assigned index
index_cat = [StringIndexer(inputCol=col_to_input, outputCol=col_to_output)\
            for col_to_input, col_to_output in cat_features]

# create a pipeline object to fir the StringIndexer model into the dataframe
pipeline_index = Pipeline(stages=index_cat).fit(csld_spark_hh_dataframe)

# transform the dataframe
csld_spark_hh_dataframe = pipeline_index.transform(csld_spark_hh_dataframe)

                                                                                

In [36]:
# display the first 5 records of the new dataframe 
csld_spark_hh_dataframe.show(5)



+-------------------+-----+-------+---------+-------+---------+--------------+-----------+------+--------+-------------+----------+-------------+
|          timestamp|value|home_id|room_type|room_id|sensor_id|sensorbox_type|sensor_type|status|roomtype|sensorboxtype|sensortype|sensor_status|
+-------------------+-----+-------+---------+-------+---------+--------------+-----------+------+--------+-------------+----------+-------------+
|2018-03-07 19:37:55| 1264|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|     2.0|          2.0|       5.0|          0.0|
|2018-03-07 22:56:20| 2527|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|     2.0|          2.0|       5.0|          0.0|
|2018-03-08 07:25:11|  316|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|     2.0|          2.0|       5.0|          0.0|
|2018-03-14 20:41:50|  316|    308|  outside|   2829|    19926|     gas-pulse|        gas|active|     2.0|          2.0|    

                                                                                

In [37]:
# select the indexed column of interest
# select columns of interest as an option rather than drop
csld_spark_hh_dataframe=csld_spark_hh_dataframe.select("timestamp", "value", "home_id","roomtype", "room_id",\
                                                       "sensor_id", "sensorboxtype", "sensortype", "sensor_status")

In [38]:
# display the first 5 records of the new dataframe 
csld_spark_hh_dataframe.show(5)



+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+
|          timestamp|value|home_id|roomtype|room_id|sensor_id|sensorboxtype|sensortype|sensor_status|
+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+
|2018-03-07 19:37:55| 1264|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0|
|2018-03-07 22:56:20| 2527|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0|
|2018-03-08 07:25:11|  316|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0|
|2018-03-14 20:41:50|  316|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0|
|2018-03-15 23:51:01|  316|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0|
+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+
only showing top 5 rows



                                                                                

In [39]:
# extract day, month, year, hour, minute, seconds from timestamp column
csld_spark_hh_dataframe = csld_spark_hh_dataframe.withColumn("day", dayofmonth("timestamp"))
csld_spark_hh_dataframe = csld_spark_hh_dataframe.withColumn("month", month("timestamp"))
csld_spark_hh_dataframe = csld_spark_hh_dataframe.withColumn("year", year("timestamp"))
csld_spark_hh_dataframe = csld_spark_hh_dataframe.withColumn("hour", hour("timestamp"))
csld_spark_hh_dataframe = csld_spark_hh_dataframe.withColumn("minute", minute("timestamp"))
csld_spark_hh_dataframe = csld_spark_hh_dataframe.withColumn("second", second("timestamp"))

In [40]:
# extract day, month, year, hour, minute, seconds from timestamp column
csld_spark_hh_dataframe.show(5,truncate=False)



+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+---+-----+----+----+------+------+
|timestamp          |value|home_id|roomtype|room_id|sensor_id|sensorboxtype|sensortype|sensor_status|day|month|year|hour|minute|second|
+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+---+-----+----+----+------+------+
|2018-03-07 19:37:55|1264 |308    |2.0     |2829   |19926    |2.0          |5.0       |0.0          |7  |3    |2018|19  |37    |55    |
|2018-03-07 22:56:20|2527 |308    |2.0     |2829   |19926    |2.0          |5.0       |0.0          |7  |3    |2018|22  |56    |20    |
|2018-03-08 07:25:11|316  |308    |2.0     |2829   |19926    |2.0          |5.0       |0.0          |8  |3    |2018|7   |25    |11    |
|2018-03-14 20:41:50|316  |308    |2.0     |2829   |19926    |2.0          |5.0       |0.0          |14 |3    |2018|20  |41    |50    |
|2018-03-15 23:51:01|316  |308    |2.0     |2829

                                                                                

#### Send the transformed data to PostgreSql database

In [41]:
# set up the connection
pyspark_postgreSQL = "jdbc:postgresql://localhost:5432/test_spark_db"
connx = {
    "user": "postgres", 
    "password": "4321", 
    "driver": "org.postgresql.Driver"
}

# define the dataframe to write to the database
table_name = "csld_spark_hh_dataframe"

# using jdbc connection to write data. mode can be overwrite, append, ignore, merge, ErrorIfExists
csld_spark_hh_dataframe.write.format("jdbc").option("url",pyspark_postgreSQL)\
                            .option("dbtable",table_name).options(**connx).mode("overwrite").save()

                                                                                

#### Read from the PostgreSql database to verify the data

In [42]:
pyspark_postgreSQL = "jdbc:postgresql://localhost:5432/test_spark_db"
connx = {
    "user": "postgres", 
    "password": "4321", 
    "driver": "org.postgresql.Driver"
}


table_name = "csld_spark_hh_dataframe"

new_data = spark.read.jdbc(url=pyspark_postgreSQL, table=table_name, properties=connx)

new_data.show(5)


[Stage 148:>                                                        (0 + 1) / 1]

+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+---+-----+----+----+------+------+
|          timestamp|value|home_id|roomtype|room_id|sensor_id|sensorboxtype|sensortype|sensor_status|day|month|year|hour|minute|second|
+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+---+-----+----+----+------+------+
|2018-03-08 19:21:18|  632|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0|  8|    3|2018|  19|    21|    18|
|2018-03-10 20:54:56|  316|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0| 10|    3|2018|  20|    54|    56|
|2018-03-12 17:24:22|  316|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0| 12|    3|2018|  17|    24|    22|
|2018-03-16 00:38:03|  316|    308|     2.0|   2829|    19926|          2.0|       5.0|          0.0| 16|    3|2018|   0|    38|     3|
|2018-03-16 17:27:21|  316|    308|     2.0|   2

                                                                                

#### Transform with VectorAssembler and prepare data for Machine Learning

In [43]:
# VectorAssembler
# Select features to transform with VectorAssembler, please features in order of preference
feature_for_ML = ["day","month","year","hour","minute","second","value","home_id","roomtype",\
                  "room_id","sensor_id","sensorboxtype","sensortype","sensor_status"]

# the new column features_value contains the values of each feature in feature_for_ML
csld_spark_hh_assembler = VectorAssembler(inputCols=feature_for_ML, outputCol="features_value")

# transformed consolidated_spark_hh_dataframe
csld_spark_hh_for_ML = csld_spark_hh_assembler.transform(csld_spark_hh_dataframe)

# peek into the output
csld_spark_hh_for_ML.show(5,truncate=False)



+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+---+-----+----+----+------+------+---------------------------------------------------------------------------+
|timestamp          |value|home_id|roomtype|room_id|sensor_id|sensorboxtype|sensortype|sensor_status|day|month|year|hour|minute|second|features_value                                                             |
+-------------------+-----+-------+--------+-------+---------+-------------+----------+-------------+---+-----+----+----+------+------+---------------------------------------------------------------------------+
|2018-03-07 19:37:55|1264 |308    |2.0     |2829   |19926    |2.0          |5.0       |0.0          |7  |3    |2018|19  |37    |55    |[7.0,3.0,2018.0,19.0,37.0,55.0,1264.0,308.0,2.0,2829.0,19926.0,2.0,5.0,0.0]|
|2018-03-07 22:56:20|2527 |308    |2.0     |2829   |19926    |2.0          |5.0       |0.0          |7  |3    |2018|22  |56    |20    |[7.0,3.0,2018.0,2

                                                                                

In [44]:
# some final statistics of the dataset, using the stats_csld_spark_hh_for_ML

with pd.option_context('display.float_format', '{:.2f}'.format):
    stats_csld_spark_hh_for_ML = csld_spark_hh_for_ML.toPandas().describe(include='all')
    print(stats_csld_spark_hh_for_ML)

                                                                                

                         timestamp      value    home_id   roomtype  \
count                      1790951 1790951.00 1790951.00 1790951.00   
unique                         NaN        NaN        NaN        NaN   
top                            NaN        NaN        NaN        NaN   
freq                           NaN        NaN        NaN        NaN   
mean    2018-03-16 13:10:10.050400     341.26     308.00       0.23   
min            2018-03-07 13:21:30       0.00     308.00       0.00   
25%            2018-03-12 09:07:09     133.00     308.00       0.00   
50%            2018-03-16 13:45:56     184.00     308.00       0.00   
75%     2018-03-20 20:01:22.500000     301.00     308.00       0.00   
max            2018-03-25 14:44:32   94781.00     308.00       2.00   
std                            NaN     574.13       0.00       0.42   

          room_id  sensor_id  sensorboxtype  sensortype  sensor_status  \
count  1790951.00 1790951.00     1790951.00  1790951.00     1790951.00   

In [45]:
# terminate the SparkSession
spark.stop()