In [1]:
import pyspark
import pandas as pd
import numpy as np
import importlib

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
rootPathSourceData = './Data/smart-meters-in-london/'

In [3]:
weatherHourlyPath = rootPathSourceData + 'weather_hourly_darksky.csv'
weatherHourly = spark.read.csv(weatherHourlyPath, header='true', inferSchema='true', sep=',')

In [4]:
weatherHourly.printSchema()

root
 |-- visibility: double (nullable = true)
 |-- windBearing: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- dewPoint: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- apparentTemperature: double (nullable = true)
 |-- windSpeed: double (nullable = true)
 |-- precipType: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- humidity: double (nullable = true)
 |-- summary: string (nullable = true)



In [5]:
weatherHourly.show(5)

+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|visibility|windBearing|temperature|               time|dewPoint|pressure|apparentTemperature|windSpeed|precipType|               icon|humidity|      summary|
+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|      5.97|        104|      10.24|2011-11-11 00:00:00|    8.86| 1016.76|              10.24|     2.77|      rain|partly-cloudy-night|    0.91|Partly Cloudy|
|      4.88|         99|       9.76|2011-11-11 01:00:00|    8.83| 1016.63|               8.24|     2.95|      rain|partly-cloudy-night|    0.94|Partly Cloudy|
|       3.7|         98|       9.46|2011-11-11 02:00:00|    8.79| 1016.36|               7.76|     3.17|      rain|partly-cloudy-night|    0.96|Partly Cloudy|
|      3.12|         99|       9.23|2011-11-11

In [6]:
householdInfoPath = rootPathSourceData + 'informations_households.csv'
householdInfo = spark.read.csv(householdInfoPath, header='true', inferSchema='true', sep=',')


In [7]:
householdInfo.printSchema()

root
 |-- LCLid: string (nullable = true)
 |-- stdorToU: string (nullable = true)
 |-- Acorn: string (nullable = true)
 |-- Acorn_grouped: string (nullable = true)
 |-- file: string (nullable = true)



In [8]:
householdInfo.show(5)

+---------+--------+-------+-------------+-------+
|    LCLid|stdorToU|  Acorn|Acorn_grouped|   file|
+---------+--------+-------+-------------+-------+
|MAC005492|     ToU| ACORN-|       ACORN-|block_0|
|MAC001074|     ToU| ACORN-|       ACORN-|block_0|
|MAC000002|     Std|ACORN-A|     Affluent|block_0|
|MAC003613|     Std|ACORN-A|     Affluent|block_0|
|MAC003597|     Std|ACORN-A|     Affluent|block_0|
+---------+--------+-------+-------------+-------+
only showing top 5 rows



In [9]:
weatherDailyPath = rootPathSourceData + 'weather_daily_darksky.csv'
weatherDaily = spark.read.csv(weatherDailyPath, header='true', inferSchema='true', sep=',')
weatherDaily.printSchema()


root
 |-- temperatureMax: double (nullable = true)
 |-- temperatureMaxTime: timestamp (nullable = true)
 |-- windBearing: integer (nullable = true)
 |-- icon: string (nullable = true)
 |-- dewPoint: double (nullable = true)
 |-- temperatureMinTime: timestamp (nullable = true)
 |-- cloudCover: double (nullable = true)
 |-- windSpeed: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- apparentTemperatureMinTime: timestamp (nullable = true)
 |-- apparentTemperatureHigh: double (nullable = true)
 |-- precipType: string (nullable = true)
 |-- visibility: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- apparentTemperatureHighTime: timestamp (nullable = true)
 |-- apparentTemperatureLow: double (nullable = true)
 |-- apparentTemperatureMax: double (nullable = true)
 |-- uvIndex: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- sunsetTime: timestamp (nullable = true)
 |-- temperatureLow: double (nullable = true)
 |-- temperatureMin: 

In [10]:
weatherDaily.show(5)

+--------------+-------------------+-----------+-------------------+--------+-------------------+----------+---------+--------+--------------------------+-----------------------+----------+----------+--------+---------------------------+----------------------+----------------------+-------+-------------------+-------------------+--------------+--------------+---------------+-------------------+-------------------+-------------------+--------------------+-------------------+----------------------+--------------------------+--------------------------+---------+
|temperatureMax| temperatureMaxTime|windBearing|               icon|dewPoint| temperatureMinTime|cloudCover|windSpeed|pressure|apparentTemperatureMinTime|apparentTemperatureHigh|precipType|visibility|humidity|apparentTemperatureHighTime|apparentTemperatureLow|apparentTemperatureMax|uvIndex|               time|         sunsetTime|temperatureLow|temperatureMin|temperatureHigh|        sunriseTime|temperatureHighTime|        uvIndexTi

In [11]:

df_tariffs = pd.read_excel('Tariffs.xlsx')

In [12]:
df_tariffs

Unnamed: 0,TariffDateTime,Tariff
0,2013-01-01 00:00:00,Normal
1,2013-01-01 00:30:00,Normal
2,2013-01-01 01:00:00,Normal
3,2013-01-01 01:30:00,Normal
4,2013-01-01 02:00:00,Normal
...,...,...
17515,2013-12-31 21:30:00,Normal
17516,2013-12-31 22:00:00,Normal
17517,2013-12-31 22:30:00,Normal
17518,2013-12-31 23:00:00,Normal


In [13]:

#df_dailyReading = spark.read.options(Map("header"->"true"))
#  .csv(rootPathSourceData + "daily_dataset/daily_dataset/")

df_dailyReading = spark.read.csv(rootPathSourceData + "daily_dataset/daily_dataset/",header = True)

In [14]:
df_dailyReading.show(5)

+---------+----------+-------------------+-------------------+------------------+------------+-------------------+-----------------+-------------------+
|    LCLid|       day|      energy_median|        energy_mean|        energy_max|energy_count|         energy_std|       energy_sum|         energy_min|
+---------+----------+-------------------+-------------------+------------------+------------+-------------------+-----------------+-------------------+
|MAC000048|2011-12-08|              0.107|0.15921739130434787|0.5760000000000001|          23|0.11802116779387856|3.662000000000001|              0.087|
|MAC000048|2011-12-09|              0.092|            0.12575|              0.57|          48|0.08927664391617872|            6.036|              0.087|
|MAC000048|2011-12-10|             0.0925|0.20964583541666668|1.3219999999999998|          48| 0.2882244672915173|       10.0630001|              0.087|
|MAC000048|2011-12-11|0.11599999999999999| 0.2451874979166667|         2.0009999| 

In [15]:
df_dailyDataSet = spark.read.csv(rootPathSourceData + "daily_dataset.csv/",header = True)

In [16]:
df_dailyDataSet.show(5)

+---------+----------+-------------+-------------------+------------------+------------+-------------------+------------------+-------------------+
|    LCLid|       day|energy_median|        energy_mean|        energy_max|energy_count|         energy_std|        energy_sum|         energy_min|
+---------+----------+-------------+-------------------+------------------+------------+-------------------+------------------+-------------------+
|MAC000131|2011-12-15|        0.485|0.43204545454545457|             0.868|          22|0.23914579678767536|             9.505|0.07200000000000001|
|MAC000131|2011-12-16|       0.1415|0.29616666875000003|         1.1160001|          48| 0.2814713178628203|14.216000100000002|              0.031|
|MAC000131|2011-12-17|       0.1015|          0.1898125|             0.685|          48| 0.1884046862418033|             9.111|              0.064|
|MAC000131|2011-12-18|        0.114| 0.2189791666666666|0.6759999999999999|          48|0.20291927853038208|10.5

In [17]:
df_hhDataSet = spark.read.csv(rootPathSourceData + "halfhourly_dataset/halfhourly_dataset/",header = True)

In [18]:
df_hhBlock = spark.read.csv(rootPathSourceData + "hhblock_dataset/hhblock_dataset/",header = True)

In [19]:
df_hhBlock.count()

3469352

In [20]:
df_hhDataSet.count()

167817021


mode = "overwrite"
url = "jdbc:postgresql://localhost:5432/project5"
properties = {"user": "hansremy","password": "Hryr1321","driver": "org.postgresql.Driver"}
weatherHourly.write.jdbc(url=url, table="test_result", mode=mode, properties=properties)

weatherHourly.write \
    .format("jdbc") \
    .option("url", "project5:postgresql:localhost") \
    .option("dbtable", "schema.weatherHourly") \
    .option("user", "hansremy") \
    .option("password", "Hryr1321@") \
    .save()

from pyspark.sql import SparkSession

spark2 = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/Users/hansremy/Bootcamp/postgresql-42.2.10.jar") \
    .getOrCreate()

df = spark2.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/project5%") \
    .option("dbtable", "Test") \
    .option("user", "hansremy") \
    .option("password", "Hryr1321@") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.printSchema()

In [21]:
#df_dailyReading_pd = df_dailyReading.toPandas()

In [22]:
df_dailyReading.count()

3510433

In [23]:
df_dailyReading.printSchema()

root
 |-- LCLid: string (nullable = true)
 |-- day: string (nullable = true)
 |-- energy_median: string (nullable = true)
 |-- energy_mean: string (nullable = true)
 |-- energy_max: string (nullable = true)
 |-- energy_count: string (nullable = true)
 |-- energy_std: string (nullable = true)
 |-- energy_sum: string (nullable = true)
 |-- energy_min: string (nullable = true)



In [24]:
df_dailyReading.show(5)

+---------+----------+-------------------+-------------------+------------------+------------+-------------------+-----------------+-------------------+
|    LCLid|       day|      energy_median|        energy_mean|        energy_max|energy_count|         energy_std|       energy_sum|         energy_min|
+---------+----------+-------------------+-------------------+------------------+------------+-------------------+-----------------+-------------------+
|MAC000048|2011-12-08|              0.107|0.15921739130434787|0.5760000000000001|          23|0.11802116779387856|3.662000000000001|              0.087|
|MAC000048|2011-12-09|              0.092|            0.12575|              0.57|          48|0.08927664391617872|            6.036|              0.087|
|MAC000048|2011-12-10|             0.0925|0.20964583541666668|1.3219999999999998|          48| 0.2882244672915173|       10.0630001|              0.087|
|MAC000048|2011-12-11|0.11599999999999999| 0.2451874979166667|         2.0009999| 

In [25]:
householdInfo.count()

5566

In [26]:
pdf_householdInfo = householdInfo.toPandas()

In [27]:
pdf_householdInfo

Unnamed: 0,LCLid,stdorToU,Acorn,Acorn_grouped,file
0,MAC005492,ToU,ACORN-,ACORN-,block_0
1,MAC001074,ToU,ACORN-,ACORN-,block_0
2,MAC000002,Std,ACORN-A,Affluent,block_0
3,MAC003613,Std,ACORN-A,Affluent,block_0
4,MAC003597,Std,ACORN-A,Affluent,block_0
...,...,...,...,...,...
5561,MAC002056,Std,ACORN-U,ACORN-U,block_111
5562,MAC004587,Std,ACORN-U,ACORN-U,block_111
5563,MAC004828,Std,ACORN-U,ACORN-U,block_111
5564,MAC001704,ToU,ACORN-U,ACORN-U,block_111


In [28]:
pdf_block90 = pdf_householdInfo[pdf_householdInfo['file'] =='block_90']

In [29]:
print(pdf_block90)

          LCLid stdorToU    Acorn Acorn_grouped      file
4500  MAC003296      Std  ACORN-O     Adversity  block_90
4501  MAC003301      Std  ACORN-O     Adversity  block_90
4502  MAC003988      ToU  ACORN-O     Adversity  block_90
4503  MAC001639      Std  ACORN-O     Adversity  block_90
4504  MAC000786      Std  ACORN-O     Adversity  block_90
4505  MAC002770      Std  ACORN-O     Adversity  block_90
4506  MAC002734      Std  ACORN-O     Adversity  block_90
4507  MAC000474      Std  ACORN-O     Adversity  block_90
4508  MAC003373      Std  ACORN-O     Adversity  block_90
4509  MAC003929      Std  ACORN-O     Adversity  block_90
4510  MAC000757      Std  ACORN-O     Adversity  block_90
4511  MAC003376      Std  ACORN-O     Adversity  block_90
4512  MAC004479      Std  ACORN-O     Adversity  block_90
4513  MAC001762      Std  ACORN-O     Adversity  block_90
4514  MAC001775      Std  ACORN-O     Adversity  block_90
4515  MAC000491      Std  ACORN-O     Adversity  block_90
4516  MAC00343

In [30]:
pdf_householdInfo['Acorn_grouped'].unique() 

array(['ACORN-', 'Affluent', 'Comfortable', 'Adversity', 'ACORN-U'],
      dtype=object)

In [31]:
pdf_householdInfo['Acorn'].unique() 

array(['ACORN-', 'ACORN-A', 'ACORN-B', 'ACORN-C', 'ACORN-D', 'ACORN-E',
       'ACORN-F', 'ACORN-G', 'ACORN-H', 'ACORN-I', 'ACORN-J', 'ACORN-K',
       'ACORN-L', 'ACORN-M', 'ACORN-N', 'ACORN-O', 'ACORN-P', 'ACORN-Q',
       'ACORN-U'], dtype=object)

In [32]:
pdf_householdInfo[pdf_householdInfo['Acorn']=='ACORN-']

Unnamed: 0,LCLid,stdorToU,Acorn,Acorn_grouped,file
0,MAC005492,ToU,ACORN-,ACORN-,block_0
1,MAC001074,ToU,ACORN-,ACORN-,block_0


In [33]:
pdf_householdInfo[pdf_householdInfo['Acorn_grouped'] == 'Comfortable']

Unnamed: 0,LCLid,stdorToU,Acorn,Acorn_grouped,file
2194,MAC003957,Std,ACORN-F,Comfortable,block_43
2195,MAC004059,Std,ACORN-F,Comfortable,block_43
2196,MAC001028,Std,ACORN-F,Comfortable,block_43
2197,MAC004777,Std,ACORN-F,Comfortable,block_43
2198,MAC004812,Std,ACORN-F,Comfortable,block_43
...,...,...,...,...,...
3696,MAC003461,ToU,ACORN-J,Comfortable,block_73
3697,MAC002273,Std,ACORN-J,Comfortable,block_73
3698,MAC005559,Std,ACORN-J,Comfortable,block_73
3699,MAC004901,Std,ACORN-J,Comfortable,block_73


In [34]:
LClidListMVP = []
for index, row in pdf_block90.iterrows():
    LClidListMVP.append(row['LCLid'])
    

In [35]:
print(LClidListMVP)

['MAC003296', 'MAC003301', 'MAC003988', 'MAC001639', 'MAC000786', 'MAC002770', 'MAC002734', 'MAC000474', 'MAC003373', 'MAC003929', 'MAC000757', 'MAC003376', 'MAC004479', 'MAC001762', 'MAC001775', 'MAC000491', 'MAC003433', 'MAC004021', 'MAC003883', 'MAC004873', 'MAC002893', 'MAC005049', 'MAC001468', 'MAC003136', 'MAC003062', 'MAC004085', 'MAC005089', 'MAC001530', 'MAC002167', 'MAC002463', 'MAC003207', 'MAC005140', 'MAC000427', 'MAC005153', 'MAC004017', 'MAC004011', 'MAC004076', 'MAC000929', 'MAC002870', 'MAC002547', 'MAC004165', 'MAC000514', 'MAC003379', 'MAC000363', 'MAC002299', 'MAC002387', 'MAC005520', 'MAC001887', 'MAC000213', 'MAC003516']


In [36]:
pdf_householdInfo[pdf_householdInfo['file'] =='block_90']

Unnamed: 0,LCLid,stdorToU,Acorn,Acorn_grouped,file
4500,MAC003296,Std,ACORN-O,Adversity,block_90
4501,MAC003301,Std,ACORN-O,Adversity,block_90
4502,MAC003988,ToU,ACORN-O,Adversity,block_90
4503,MAC001639,Std,ACORN-O,Adversity,block_90
4504,MAC000786,Std,ACORN-O,Adversity,block_90
4505,MAC002770,Std,ACORN-O,Adversity,block_90
4506,MAC002734,Std,ACORN-O,Adversity,block_90
4507,MAC000474,Std,ACORN-O,Adversity,block_90
4508,MAC003373,Std,ACORN-O,Adversity,block_90
4509,MAC003929,Std,ACORN-O,Adversity,block_90


In [37]:
import ProjectHelperFunctions as helper

In [38]:
houseHoldListSample = helper.fillMVPHouseHoldList(pdf_householdInfo)

MAC003296
MAC003301
MAC003988
MAC001639
MAC000786
MAC002770
MAC002734
MAC000474
MAC003373
MAC003929
MAC000757
MAC003376
MAC004479
MAC001762
MAC001775
MAC000491
MAC003433
MAC004021
MAC003883
MAC004873
MAC002893
MAC005049
MAC001468
MAC003136
MAC003062
MAC004085
MAC005089
MAC001530
MAC002167
MAC002463
MAC003207
MAC005140
MAC000427
MAC005153
MAC004017
MAC004011
MAC004076
MAC000929
MAC002870
MAC002547
MAC004165
MAC000514
MAC003379
MAC000363
MAC002299
MAC002387
MAC005520
MAC001887
MAC000213
MAC003516
MAC005492
MAC001074
MAC000002
MAC003613
MAC003597
MAC003579
MAC003566
MAC003557
MAC003553
MAC003482
MAC003463
MAC003449
MAC003428
MAC003423
MAC003422
MAC003400
MAC003394
MAC003388
MAC003348
MAC000246
MAC003305
MAC003281
MAC003252
MAC003239
MAC003646
MAC003656
MAC003668
MAC003680
MAC004431
MAC004387
MAC004319
MAC004247
MAC004179
MAC004034
MAC003874
MAC003863
MAC003856
MAC003851
MAC003223
MAC003844
MAC003826
MAC003817
MAC003805
MAC003775
MAC003740
MAC003737
MAC003719
MAC003718
MAC003686
MAC000450


In [39]:
print(len(houseHoldListSample))

200


In [40]:
importlib.reload(helper)

<module 'ProjectHelperFunctions' from '/Users/hansremy/Bootcamp/Project5/ProjectHelperFunctions.py'>

In [41]:
 householdAffluent ,householdComfortable ,  householdAdversity = helper.fillHouseHoldPerIncome(pdf_householdInfo)


In [42]:
householdAffluent

['MAC000002',
 'MAC003613',
 'MAC003597',
 'MAC003579',
 'MAC003566',
 'MAC003557',
 'MAC003553',
 'MAC003482',
 'MAC003463',
 'MAC003449',
 'MAC003428',
 'MAC003423',
 'MAC003422',
 'MAC003400',
 'MAC003394',
 'MAC003388',
 'MAC003348',
 'MAC000246',
 'MAC003305',
 'MAC003281',
 'MAC003252',
 'MAC003239',
 'MAC003646',
 'MAC003656',
 'MAC003668',
 'MAC003680',
 'MAC004431',
 'MAC004387',
 'MAC004319',
 'MAC004247',
 'MAC004179',
 'MAC004034',
 'MAC003874',
 'MAC003863',
 'MAC003856',
 'MAC003851',
 'MAC003223',
 'MAC003844',
 'MAC003826',
 'MAC003817',
 'MAC003805',
 'MAC003775',
 'MAC003740',
 'MAC003737',
 'MAC003719',
 'MAC003718',
 'MAC003686',
 'MAC000450',
 'MAC003840',
 'MAC003212',
 'MAC003182',
 'MAC003166',
 'MAC001628',
 'MAC001533',
 'MAC001528',
 'MAC001510',
 'MAC001271',
 'MAC001251',
 'MAC001239',
 'MAC001145',
 'MAC000974',
 'MAC000948',
 'MAC001689',
 'MAC000902',
 'MAC000850',
 'MAC000816',
 'MAC000778',
 'MAC000379',
 'MAC000768',
 'MAC000713',
 'MAC000386',
 'MAC0

In [43]:
df_hhDataSet.printSchema()

root
 |-- LCLid: string (nullable = true)
 |-- tstp: string (nullable = true)
 |-- energy(kWh/hh): string (nullable = true)



In [44]:
df_hhDataSet.createOrReplaceTempView("halfhourlyData")

In [45]:
df_p = df_hhDataSet.where(df_hhDataSet.LCLid == 'MAC003301').toPandas()

#df_1 = spark.sql('SELECT * FROM halfhourlyData WHERE LCLID="MAC003301" ').execute()

In [46]:
print(df_p)

           LCLid                         tstp energy(kWh/hh)
0      MAC003301  2012-09-24 09:30:00.0000000         0.106 
1      MAC003301  2012-09-24 10:00:00.0000000         0.089 
2      MAC003301  2012-09-24 10:30:00.0000000         0.176 
3      MAC003301  2012-09-24 11:00:00.0000000         0.191 
4      MAC003301  2012-09-24 11:30:00.0000000         0.103 
...          ...                          ...            ...
25032  MAC003301  2014-02-27 22:00:00.0000000          0.84 
25033  MAC003301  2014-02-27 22:30:00.0000000         0.335 
25034  MAC003301  2014-02-27 23:00:00.0000000         0.304 
25035  MAC003301  2014-02-27 23:30:00.0000000         0.358 
25036  MAC003301  2014-02-28 00:00:00.0000000         0.256 

[25037 rows x 3 columns]


In [47]:
len(householdAffluent)

2192

In [48]:
from pyspark.sql.functions import col
from functools import reduce


In [49]:
#df_householdAffluent=df_householdAffluent.withColumnRenamed('energyhh)', 'energy')

In [50]:
def renameColumns(df):
    newColumns = ["LCLid", "tstp","energy"]
    oldColumns = df.schema.names
    print(oldColumns)
    df = reduce(lambda data, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
    df.printSchema()
    df.show()
    return df

In [51]:
df_hhDataSet = renameColumns(df_hhDataSet)



['LCLid', 'tstp', 'energy(kWh/hh)']
root
 |-- LCLid: string (nullable = true)
 |-- tstp: string (nullable = true)
 |-- energy: string (nullable = true)

+---------+--------------------+-------+
|    LCLid|                tstp| energy|
+---------+--------------------+-------+
|MAC000048|2011-12-08 12:30:...| 0.229 |
|MAC000048|2011-12-08 13:00:...| 0.213 |
|MAC000048|2011-12-08 13:30:...| 0.272 |
|MAC000048|2011-12-08 14:00:...| 0.576 |
|MAC000048|2011-12-08 14:30:...| 0.194 |
|MAC000048|2011-12-08 15:00:...| 0.107 |
|MAC000048|2011-12-08 15:30:...| 0.107 |
|MAC000048|2011-12-08 16:00:...| 0.119 |
|MAC000048|2011-12-08 16:30:...| 0.326 |
|MAC000048|2011-12-08 17:00:...| 0.299 |
|MAC000048|2011-12-08 17:30:...| 0.107 |
|MAC000048|2011-12-08 18:00:...| 0.088 |
|MAC000048|2011-12-08 18:30:...| 0.088 |
|MAC000048|2011-12-08 19:00:...| 0.087 |
|MAC000048|2011-12-08 19:30:...| 0.087 |
|MAC000048|2011-12-08 20:00:...| 0.088 |
|MAC000048|2011-12-08 20:30:...| 0.087 |
|MAC000048|2011-12-08 21:00

In [52]:
df_householdAffluent

NameError: name 'df_householdAffluent' is not defined

In [53]:
df_householdAffluent = df_hhDataSet.where(col("LCLid").isin(householdAffluent))

In [54]:
df_householdComfortable = df_hhDataSet.where(col("LCLid").isin(householdComfortable))

In [55]:
df_householdAdversity = df_hhDataSet.where(col("LCLid").isin(householdAdversity))

In [56]:
df_householdAffluent.show(10)

+---------+--------------------+-------+
|    LCLid|                tstp| energy|
+---------+--------------------+-------+
|MAC000048|2011-12-08 12:30:...| 0.229 |
|MAC000048|2011-12-08 13:00:...| 0.213 |
|MAC000048|2011-12-08 13:30:...| 0.272 |
|MAC000048|2011-12-08 14:00:...| 0.576 |
|MAC000048|2011-12-08 14:30:...| 0.194 |
|MAC000048|2011-12-08 15:00:...| 0.107 |
|MAC000048|2011-12-08 15:30:...| 0.107 |
|MAC000048|2011-12-08 16:00:...| 0.119 |
|MAC000048|2011-12-08 16:30:...| 0.326 |
|MAC000048|2011-12-08 17:00:...| 0.299 |
+---------+--------------------+-------+
only showing top 10 rows



In [71]:
df_hhDataSet.createOrReplaceTempView('allHouseholds')

In [57]:
df_householdAffluent.createOrReplaceTempView("householdAffluent")

In [58]:
df_householdComfortable.createOrReplaceTempView("householdComfortable")

In [59]:
df_householdAdversity.createOrReplaceTempView("householdAdversity")

In [60]:
def createQueryGroupByTime(table):
    query = f'''
        SELECT  ha.tstp, round(avg(ha.energy),5) as  energy
            FROM {table} as ha 
        GROUP BY ha.tstp
       ORDER BY ha.tstp ASC
    '''
    return query

In [72]:
df_AllHouseHoldsGroup = spark.sql(createQueryGroupByTime('allHouseholds'))

In [61]:
df_householdAffluentGroup = spark.sql(createQueryGroupByTime('householdAffluent'))

In [62]:
df_householdComfortableGroup = spark.sql(createQueryGroupByTime('householdComfortable'))
df_householdAdversityGroup = spark.sql(createQueryGroupByTime('householdAdversity'))

In [73]:
df_AllHouseHoldsGroup.count()

40405

In [63]:
df_householdAdversityGroup.count()

40359

In [64]:
df_householdComfortableGroup.count()

40332

In [65]:
df_householdAffluentGroup.count()

40368

In [66]:
dfp_Adversity = df_householdAdversityGroup.toPandas()  

In [67]:
dfp_householdComfortable = df_householdComfortableGroup.toPandas() 
dfp_householdAffluent = df_householdAffluentGroup.toPandas()

In [68]:
dfp_Adversity.to_pickle("./data/AdversityGroup.pkl")
dfp_householdComfortable.to_pickle("./data/ComfortableGroup.pkl")
dfp_householdAffluent.to_pickle("./data/AffluentGroup.pkl")

In [69]:
dfp_Adversity.to_csv ('./data/AdversityGroup.csv', index = False, header=True)
dfp_householdComfortable.to_csv ('./data/ComfortableGroup.csv', index = False, header=True)
dfp_householdAffluent.to_csv ('./data/AffluentGroup.csv', index = False, header=True)

In [76]:
dfp_AllHouseHolds = df_AllHouseHoldsGroup.toPandas()

In [77]:
dfp_AllHouseHolds.to_csv ('./data/all.csv', index = False, header=True)

In [73]:
# Postgres info to connect
import psycopg2 as pg

connection_args = {
    'host': 'localhost',  # We are connecting to our _local_ version of psql
    'dbname': 'names',    # DB that we are connecting to
    'port': 5432          # port for psql
}

# We will talk about this magic Python trick!
connection = pg.connect(**connection_args)
from sqlalchemy import create_engine
engine = create_engine('postgresql://hansremy:Hryr1321@@localhost:5432/Project5')
#dfp_Adversity.to_sql('Adversity', engine)
#dfp_householdComfortable.to_sql('Comfortable', engine)
#dfp_householdAffluent.to_sql('Affluent', engine)