In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
dbutils.fs.mkdirs("/FileStore/tables/small/")

In [0]:
from pyspark.sql.types import StringType, StructType, StructField, FloatType, TimestampType, IntegerType, DateType

In [0]:
df_schema = StructType([StructField('LCLid', StringType(), True),
                       StructField('tstp', TimestampType(), True),
                       StructField('energy(kWh/hh)', FloatType(), True),])

In [0]:
df = spark.read.csv("dbfs:/FileStore/tables/small", header = True, schema = df_schema)

In [0]:
df_demo = spark.read.csv("/FileStore/tables/demographic_info/informations_households.csv", header = True, inferSchema = True)

In [0]:
df_acorn = spark.read.csv("/FileStore/tables/demographic_info/acorn_details.csv", header = True, inferSchema = True)

In [0]:
df_join = df.join(df_demo, 'LCLid')

In [0]:
display(df_join.limit(5))

LCLid,tstp,energy(kWh/hh),stdorToU,Acorn,Acorn_grouped,file
MAC000048,2011-12-08T12:30:00.000+0000,0.229,ToU,ACORN-E,Affluent,block_20
MAC000048,2011-12-08T13:00:00.000+0000,0.213,ToU,ACORN-E,Affluent,block_20
MAC000048,2011-12-08T13:30:00.000+0000,0.272,ToU,ACORN-E,Affluent,block_20
MAC000048,2011-12-08T14:00:00.000+0000,0.576,ToU,ACORN-E,Affluent,block_20
MAC000048,2011-12-08T14:30:00.000+0000,0.194,ToU,ACORN-E,Affluent,block_20


In [0]:
df_avg_group = df_join.select('LCLid','tstp', 'Acorn', 'file', 'energy(kWh/hh)').groupBy('Acorn').avg('energy(kWh/hh)')

In [0]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, date_format

In [0]:
df_time = df_join.select(df_join.LCLid, df_join.tstp, df_join.Acorn, df_join.file, df_join['energy(kWh/hh)'], unix_timestamp(df_join.tstp, 'm/d/yyyy h:m:ss a').alias('ut'))\
  .select(df_join.LCLid, df_join.tstp, df_join.Acorn, df_join.file, df_join['energy(kWh/hh)'], from_unixtime('ut').alias('dty'))\
  .select(df_join.LCLid, df_join.tstp, df_join.Acorn, df_join.file, df_join['energy(kWh/hh)'], date_format('dty', 'd/M/yyyy').alias('Date'),
          date_format('dty', 'hh:mm:ss a').alias('Time'),
          date_format('dty', 'M').alias('Month'),
          date_format('dty', 'HH').alias('Hour'))

In [0]:
df_energy = df.select('tstp', 'energy(kWh/hh)')

In [0]:
df_datetime = df_time.join(df_energy, 'tstp')

In [0]:
df_datetime.show()

In [0]:
acron_list=['ACORN-A', 'ACORN-B', 'ACORN-C', 'ACORN-D']

In [0]:
df_sum_group = df_time.select('LCLid', 'Date', 'Acorn', 'file', 'energy(kWh/hh)').filter(df_time.Acorn.isin(acron_list)).groupBy('Date', 'Acorn').sum('energy(kWh/hh)')

In [0]:
display(df_sum_group.orderBy('sum(energy(kWh/hh))', ascending=False).limit(20))

Date,Acorn,sum(energy(kWh/hh))
20/1/2013,ACORN-D,7952821.025118522
19/1/2013,ACORN-D,7829511.937676381
2/12/2012,ACORN-D,7553100.263541604
18/1/2013,ACORN-D,7551676.475071783
24/3/2013,ACORN-D,7399175.84233765
9/12/2012,ACORN-D,7315936.932139107
21/1/2013,ACORN-D,7312919.060399973
6/12/2012,ACORN-D,7295947.348037228
13/12/2012,ACORN-D,7281682.395831217
1/12/2012,ACORN-D,7281209.939939927


In [0]:
display(df_time.limit(20))

LCLid,tstp,Acorn,file,energy(kWh/hh),Date,Time,Month
MAC000048,2011-12-08T12:30:00.000+0000,ACORN-E,block_20,0.229,8/12/2011,12:30:00 PM,12
MAC000048,2011-12-08T13:00:00.000+0000,ACORN-E,block_20,0.213,8/12/2011,01:00:00 PM,12
MAC000048,2011-12-08T13:30:00.000+0000,ACORN-E,block_20,0.272,8/12/2011,01:30:00 PM,12
MAC000048,2011-12-08T14:00:00.000+0000,ACORN-E,block_20,0.576,8/12/2011,02:00:00 PM,12
MAC000048,2011-12-08T14:30:00.000+0000,ACORN-E,block_20,0.194,8/12/2011,02:30:00 PM,12
MAC000048,2011-12-08T15:00:00.000+0000,ACORN-E,block_20,0.107,8/12/2011,03:00:00 PM,12
MAC000048,2011-12-08T15:30:00.000+0000,ACORN-E,block_20,0.107,8/12/2011,03:30:00 PM,12
MAC000048,2011-12-08T16:00:00.000+0000,ACORN-E,block_20,0.119,8/12/2011,04:00:00 PM,12
MAC000048,2011-12-08T16:30:00.000+0000,ACORN-E,block_20,0.326,8/12/2011,04:30:00 PM,12
MAC000048,2011-12-08T17:00:00.000+0000,ACORN-E,block_20,0.299,8/12/2011,05:00:00 PM,12


In [0]:
df1=df_time.withColumn("Month", df_time["Month"].cast(IntegerType()))

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import floor, col
from pyspark.sql.functions import *

In [0]:
display(df1.limit(5))

LCLid,tstp,Acorn,file,energy(kWh/hh),Date,Time,Month
MAC000048,2011-12-08T12:30:00.000+0000,ACORN-E,block_20,0.229,8/12/2011,12:30:0 PM,12
MAC000048,2011-12-08T13:00:00.000+0000,ACORN-E,block_20,0.213,8/12/2011,1:0:0 PM,12
MAC000048,2011-12-08T13:30:00.000+0000,ACORN-E,block_20,0.272,8/12/2011,1:30:0 PM,12
MAC000048,2011-12-08T14:00:00.000+0000,ACORN-E,block_20,0.576,8/12/2011,2:0:0 PM,12
MAC000048,2011-12-08T14:30:00.000+0000,ACORN-E,block_20,0.194,8/12/2011,2:30:0 PM,12


In [0]:
df_month = df1.withColumn('season', floor(df1['Month']%12/3)+1)

In [0]:
df_month.select('Month', 'season').distinct().show()

In [0]:
display(df_month.limit(20))

LCLid,tstp,Acorn,file,energy(kWh/hh),Date,Time,Month,season
MAC000048,2011-12-08T12:30:00.000+0000,ACORN-E,block_20,0.229,8/12/2011,12:30:0 PM,12,1
MAC000048,2011-12-08T13:00:00.000+0000,ACORN-E,block_20,0.213,8/12/2011,1:0:0 PM,12,1
MAC000048,2011-12-08T13:30:00.000+0000,ACORN-E,block_20,0.272,8/12/2011,1:30:0 PM,12,1
MAC000048,2011-12-08T14:00:00.000+0000,ACORN-E,block_20,0.576,8/12/2011,2:0:0 PM,12,1
MAC000048,2011-12-08T14:30:00.000+0000,ACORN-E,block_20,0.194,8/12/2011,2:30:0 PM,12,1
MAC000048,2011-12-08T15:00:00.000+0000,ACORN-E,block_20,0.107,8/12/2011,3:0:0 PM,12,1
MAC000048,2011-12-08T15:30:00.000+0000,ACORN-E,block_20,0.107,8/12/2011,3:30:0 PM,12,1
MAC000048,2011-12-08T16:00:00.000+0000,ACORN-E,block_20,0.119,8/12/2011,4:0:0 PM,12,1
MAC000048,2011-12-08T16:30:00.000+0000,ACORN-E,block_20,0.326,8/12/2011,4:30:0 PM,12,1
MAC000048,2011-12-08T17:00:00.000+0000,ACORN-E,block_20,0.299,8/12/2011,5:0:0 PM,12,1


In [0]:
df_season_group = df_month.select('LCLid', 'Acorn', 'file', 'energy(kWh/hh)', 'season').groupBy('season', 'Acorn').avg('energy(kWh/hh)')

In [0]:
display(df_season_group.orderBy('avg(energy(kWh/hh))', ascending=False).limit(20))

season,Acorn,avg(energy(kWh/hh))
1,ACORN-A,0.3045786590237697
1,ACORN-D,0.3045414487437917
1,ACORN-E,0.3045050481398152
1,ACORN-B,0.3043634373545845
1,ACORN-C,0.3043106753618357
1,ACORN-,0.3015295651675955
2,ACORN-C,0.2575531059391618
2,ACORN-B,0.2574648376795319
2,ACORN-A,0.2574120623037337
2,ACORN-D,0.2554588490627925


In [0]:
  df_avg_time = df_time.select('LCLid', 'Time', 'Acorn', 'file', 'energy(kWh/hh)').filter(df_time.Acorn.isin(acron_list)).groupBy('Time', 'Acorn').avg('energy(kWh/hh)')

In [0]:
display(df_avg_time.orderBy('avg(energy(kWh/hh))', ascending=False).limit(20))

Time,Acorn,avg(energy(kWh/hh))
7:30:0 PM,ACORN-B,0.3965887249143358
7:30:0 PM,ACORN-C,0.3951688328934269
7:30:0 PM,ACORN-A,0.3943812608262355
7:0:0 PM,ACORN-B,0.3937058410013068
7:0:0 PM,ACORN-C,0.3922678979757604
7:0:0 PM,ACORN-A,0.3914074773307148
8:0:0 PM,ACORN-B,0.3908824338771338
8:0:0 PM,ACORN-C,0.3895329596247594
8:0:0 PM,ACORN-A,0.3888568027667886
7:30:0 PM,ACORN-D,0.388444113457976


In [0]:
df_season_group = df_month.select('LCLid', 'season', 'Acorn', 'file', 'energy(kWh/hh)').filter(df_month.Acorn.isin('ACORN-A')).groupBy('season').avg('energy(kWh/hh)')

In [0]:
display(df_season_group.limit(5))

season,avg(energy(kWh/hh))
1,0.4666186416339891
3,0.3214046511153585
2,0.3957909567904761
4,0.3962565700511204


In [0]:
df_season_group1 = df_month.select('LCLid', 'season', 'Acorn', 'file', 'energy(kWh/hh)').filter(df_month.Acorn.isin('ACORN-B')).groupBy('season').avg('energy(kWh/hh)')

In [0]:
display(df_season_group1.limit(5))

season,avg(energy(kWh/hh))
1,0.2914630538945395
3,0.1949787213779411
2,0.2337479221680241
4,0.2576486139470246


In [0]:
df_season_group2 = df_month.select('LCLid', 'season', 'Acorn', 'file', 'energy(kWh/hh)').filter(df_month.Acorn.isin('ACORN-C')).groupBy('season').avg('energy(kWh/hh)')

In [0]:
display(df_season_group2.limit(5))

season,avg(energy(kWh/hh))
1,0.2917705490680019
3,0.2065873145935583
2,0.2420574012171795
4,0.2469812551052756


In [0]:
df_season_group3 = df_month.select('LCLid', 'season', 'Acorn', 'file', 'energy(kWh/hh)').where(df_month.Acorn =='ACORN-D').groupBy('season').avg('energy(kWh/hh)')

In [0]:
display(df_season_group3.limit(5))

season,avg(energy(kWh/hh))
1,0.3045414487437917
3,0.1944499053131987
2,0.2554588490627925
4,0.2492684336616919


In [0]:
df2=df_time.withColumn("Hour", df_time["Hour"].cast(IntegerType()))

In [0]:
df_day = df2.withColumn('session', floor((df2['Hour']%24)/8))

In [0]:
df_day_group = df_day.select('LCLid', 'session', 'Acorn', 'Hour', 'energy(kWh/hh)').where(df_day.Acorn =='ACORN-A').groupBy('session').avg('energy(kWh/hh)')

In [0]:
display(df_day_group.limit(5))

session,avg(energy(kWh/hh))
0,0.2590322206868792
1,0.404928334928941
2,0.5325448655786358


In [0]:
df_day.select('Hour', 'session').distinct().orderBy('Hour', ascending=False).show(24)

In [0]:
df_day_group1 = df_day.select('LCLid', 'session', 'Acorn', 'Hour', 'energy(kWh/hh)').where(df_day.Acorn =='ACORN-B').groupBy('session').avg('energy(kWh/hh)')

In [0]:
display(df_day_group1.limit(5))

session,avg(energy(kWh/hh))
0,0.1522674665042676
1,0.2550836496404376
2,0.3393421544867317


In [0]:
df_day_group2 = df_day.select('LCLid', 'session', 'Acorn', 'Hour', 'energy(kWh/hh)').where(df_day.Acorn =='ACORN-C').groupBy('session').avg('energy(kWh/hh)')

In [0]:
display(df_day_group2.limit(5))

session,avg(energy(kWh/hh))
0,0.151662367980092
1,0.2586630446428107
2,0.3391704104488147


In [0]:
df_day_group3 = df_day.select('LCLid', 'session', 'Acorn', 'Hour', 'energy(kWh/hh)').where(df_day.Acorn =='ACORN-D').groupBy('session').avg('energy(kWh/hh)')

In [0]:
display(df_day_group3.limit(5))

session,avg(energy(kWh/hh))
0,0.1749141710357742
1,0.2896678062235594
2,0.3872418533552959


In [0]:
df_day = df3.withColumn('session', replace({0: 'Night', 2: 'Day', 3: 'Evening'}, inplace=True))