In [1]:
# print all the outputs in a cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity="all"
import pandas as pd
from datetime import datetime
import calendar
import warnings
warnings.filterwarnings("ignore")
import glob

In [2]:
sc

##### Read all half hourly dataset and household information data

In [3]:

files = glob.glob('halfhourly_dataset/halfhourly_dataset/*csv')

In [4]:
# Reading Household info data
df_household = pd.read_csv("informations_households.csv", encoding="utf-8")

In [5]:
df = spark.read.format("csv").option("header", "true").load("halfhourly_dataset/halfhourly_dataset/*csv")
    
    

In [6]:
df.count()

167817021

In [7]:
df.columns

['LCLid', 'tstp', 'energy(kWh/hh)']

In [8]:
# clean up
df = df[df['energy(kWh/hh)'] != 'Null']

In [9]:
df.count()

167811461

In [10]:
# Renaming the columns
from pyspark.sql.functions import col
df = df.select(col("LCLid").alias("LCLid"), col("tstp").alias("DateTime"), col("energy(kWh/hh)").alias("KWh"))


In [11]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh
0,MAC000048,2011-12-08 12:30:00.0000000,0.229
1,MAC000048,2011-12-08 13:00:00.0000000,0.213


In [12]:
# convert datetime column from string to datetime format
from pyspark.sql import functions as F
df= df.withColumn('DateTime',F.to_timestamp(df.DateTime))

In [13]:
# Creating year day hour and date columns
from pyspark.sql.functions import year, month, dayofmonth,hour,to_date
df= df.withColumn('year',F.year(df.DateTime))

In [14]:
df= df.withColumn('month',F.month(df.DateTime))
df= df.withColumn('day',F.dayofmonth(df.DateTime))
df= df.withColumn('hour',F.hour(df.DateTime))
df= df.withColumn('date',F.to_date(df.DateTime))




In [15]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh,year,month,day,hour,date
0,MAC000048,2011-12-08 12:30:00,0.229,2011,12,8,12,2011-12-08
1,MAC000048,2011-12-08 13:00:00,0.213,2011,12,8,13,2011-12-08


In [16]:
# Merging household information with half hourly data
spark_df_household = spark.createDataFrame(df_household)
spark_df_household.limit(2).toPandas()

Unnamed: 0,LCLid,stdorToU,Acorn,Acorn_grouped,file
0,MAC005492,ToU,ACORN-,ACORN-,block_0
1,MAC001074,ToU,ACORN-,ACORN-,block_0


In [17]:
df = df.join(spark_df_household, on='LCLid', how='left_outer')

In [18]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh,year,month,day,hour,date,stdorToU,Acorn,Acorn_grouped,file
0,MAC000032,2011-12-07 12:30:00,0.462,2011,12,7,12,2011-12-07,Std,ACORN-Q,Adversity,block_94
1,MAC000032,2011-12-07 13:00:00,0.645,2011,12,7,13,2011-12-07,Std,ACORN-Q,Adversity,block_94


In [19]:
df = df.withColumn('Acorn', F.regexp_replace(col('Acorn'), "ACORN-", ""))


In [20]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh,year,month,day,hour,date,stdorToU,Acorn,Acorn_grouped,file
0,MAC000032,2011-12-07 12:30:00,0.462,2011,12,7,12,2011-12-07,Std,Q,Adversity,block_94
1,MAC000032,2011-12-07 13:00:00,0.645,2011,12,7,13,2011-12-07,Std,Q,Adversity,block_94


In [21]:
# check unique tariff
df.select('stdorToU').distinct().rdd.map(lambda r: r[0]).collect()


['Std', 'ToU']

In [22]:
df.select('Acorn_grouped').distinct().rdd.map(lambda r: r[0]).collect()


['Adversity', 'ACORN-', 'Affluent', 'ACORN-U', 'Comfortable']

In [23]:
df.count()

167811461

In [24]:
# clean up 
df = df.where((col('Acorn_grouped') != 'ACORN-') & (col('Acorn_grouped') != 'ACORN-U'))

In [25]:
df.count()

166362426

In [26]:
df = df.withColumn('time',F.date_format(df.DateTime,'hh:mm:ss a'))

In [27]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh,year,month,day,hour,date,stdorToU,Acorn,Acorn_grouped,file,time
0,MAC000032,2011-12-07 12:30:00,0.462,2011,12,7,12,2011-12-07,Std,Q,Adversity,block_94,12:30:00 PM
1,MAC000032,2011-12-07 13:00:00,0.645,2011,12,7,13,2011-12-07,Std,Q,Adversity,block_94,01:00:00 PM


In [28]:
df = df.withColumn('quarter',F.quarter(df.DateTime))


In [29]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh,year,month,day,hour,date,stdorToU,Acorn,Acorn_grouped,file,time,quarter
0,MAC000032,2011-12-07 12:30:00,0.462,2011,12,7,12,2011-12-07,Std,Q,Adversity,block_94,12:30:00 PM,4
1,MAC000032,2011-12-07 13:00:00,0.645,2011,12,7,13,2011-12-07,Std,Q,Adversity,block_94,01:00:00 PM,4


In [30]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


In [31]:
df = df.withColumn('quarter', F.concat(col('year'),F.lit(' '),col('quarter')))


In [32]:
df.limit(2).toPandas()

Unnamed: 0,LCLid,DateTime,KWh,year,month,day,hour,date,stdorToU,Acorn,Acorn_grouped,file,time,quarter
0,MAC000032,2011-12-07 12:30:00,0.462,2011,12,7,12,2011-12-07,Std,Q,Adversity,block_94,12:30:00 PM,2011 4
1,MAC000032,2011-12-07 13:00:00,0.645,2011,12,7,13,2011-12-07,Std,Q,Adversity,block_94,01:00:00 PM,2011 4


In [129]:
# save to parquet 
df.write.parquet("df_all.parquet")