In [1]:
# REFERENCE: https://www.kaggle.com/code/sercanyesiloz/pyspark-tutorial/notebook
import os
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import split, count, when, isnan, col, regexp_replace

In [2]:
import kaggle
import zipfile
from os.path import exists

# Authenticate
from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()

# check for file and if it does not exist, download and extract dataset
if os.path.exists('./darksky') is False:
    api.dataset_download_files('jeanmidev/smart-meters-in-london', path = './')
    with zipfile.ZipFile('./smart-meters-in-london.zip', 'r') as zipref:
        zipref.extractall('./darksky')

# list files
!ls -R ./darksky

acorn_details.csv                     informations_households.csv
[34mdaily_dataset[m[m                         uk_bank_holidays.csv
darksky_parameters_documentation.html weather_daily_darksky.csv
[34mhalfhourly_dataset[m[m                    weather_hourly_darksky.csv
[34mhhblock_dataset[m[m

./darksky/daily_dataset:
[34mdaily_dataset[m[m

./darksky/daily_dataset/daily_dataset:
block_111.csv

./darksky/halfhourly_dataset:
[34mhalfhourly_dataset[m[m

./darksky/halfhourly_dataset/halfhourly_dataset:
block_102.csv block_22.csv  block_42.csv  block_62.csv  block_82.csv
block_103.csv block_23.csv  block_43.csv  block_63.csv  block_83.csv
block_104.csv block_24.csv  block_44.csv  block_64.csv  block_84.csv
block_105.csv block_25.csv  block_45.csv  block_65.csv  block_85.csv
block_106.csv block_26.csv  block_46.csv  block_66.csv  block_86.csv
block_107.csv block_27.csv  block_47.csv  block_67.csv  block_87.csv
block_108.csv block_28.csv  block_48.csv  block_68.csv  block_88.cs

In [3]:
# importing the necessary modules

import findspark
findspark.init()

# Initialize a SparkSession
from pyspark.sql import SparkSession

# Creating SparkSession object
spark = SparkSession.builder \
    .appName('TeamDarkSky') \
    .config("spark.jars", "/Library/Frameworks/Python.framework/Versions/3.11/bin/sqljdbc_12.2/enu/mssql-jdbc-12.2.0.jre8.jar")\
    .getOrCreate()

# Caling the session variable object

spark

23/05/20 19:15:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# create Spark dataframe for daily energy usage data

dailyDf = spark.read.csv('./darksky/daily_dataset/daily_dataset', inferSchema=True, header=True)

dailyDf.printSchema()



root
 |-- LCLid: string (nullable = true)
 |-- day: timestamp (nullable = true)
 |-- energy_median: double (nullable = true)
 |-- energy_mean: double (nullable = true)
 |-- energy_max: double (nullable = true)
 |-- energy_count: integer (nullable = true)
 |-- energy_std: double (nullable = true)
 |-- energy_sum: double (nullable = true)
 |-- energy_min: double (nullable = true)



In [5]:
# acorn_detals.csv

acorn = spark.read.csv('./darksky/acorn_details.csv', inferSchema=True, header=True)

acorn.show()

+---------------+----------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|MAIN CATEGORIES|      CATEGORIES|           REFERENCE|ACORN-A|ACORN-B|ACORN-C|ACORN-D|ACORN-E|ACORN-F|ACORN-G|ACORN-H|ACORN-I|ACORN-J|ACORN-K|ACORN-L|ACORN-M|ACORN-N|ACORN-O|ACORN-P|ACORN-Q|
+---------------+----------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|     POPULATION|             Age|             Age 0-4|   77.0|   83.0|   72.0|  100.0|  120.0|   77.0|   97.0|   97.0|   63.0|  119.0|   67.0|  114.0|  113.0|   89.0|  123.0|  138.0|  133.0|
|     POPULATION|             Age|            Age 5-17|  117.0|  109.0|   87.0|   69.0|   94.0|   95.0|  102.0|  106.0|   67.0|   95.0|   64.0|  108.0|  116.0|   86.0|   89.0|  136.0|  106.0|
|     POPULATION|             Age|      

In [6]:
# Create Spark dataframe for hourly weather
whdDf = spark.read.csv('./darksky/weather_hourly_darksky.csv', inferSchema=True, header=True)

whdDf.printSchema()

root
 |-- visibility: double (nullable = true)
 |-- windBearing: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- dewPoint: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- apparentTemperature: double (nullable = true)
 |-- windSpeed: double (nullable = true)
 |-- precipType: string (nullable = true)
 |-- icon: string (nullable = true)
 |-- humidity: double (nullable = true)
 |-- summary: string (nullable = true)



In [7]:
# Create Spark dataframe for hhblock
hhblockDf = spark.read.csv('./darksky/hhblock_dataset/hhblock_dataset', inferSchema=True, header=True)

hhblockDf.printSchema()



root
 |-- LCLid: string (nullable = true)
 |-- day: timestamp (nullable = true)
 |-- hh_0: double (nullable = true)
 |-- hh_1: double (nullable = true)
 |-- hh_2: double (nullable = true)
 |-- hh_3: double (nullable = true)
 |-- hh_4: double (nullable = true)
 |-- hh_5: double (nullable = true)
 |-- hh_6: double (nullable = true)
 |-- hh_7: double (nullable = true)
 |-- hh_8: double (nullable = true)
 |-- hh_9: double (nullable = true)
 |-- hh_10: double (nullable = true)
 |-- hh_11: double (nullable = true)
 |-- hh_12: double (nullable = true)
 |-- hh_13: double (nullable = true)
 |-- hh_14: double (nullable = true)
 |-- hh_15: double (nullable = true)
 |-- hh_16: double (nullable = true)
 |-- hh_17: double (nullable = true)
 |-- hh_18: double (nullable = true)
 |-- hh_19: double (nullable = true)
 |-- hh_20: double (nullable = true)
 |-- hh_21: double (nullable = true)
 |-- hh_22: double (nullable = true)
 |-- hh_23: double (nullable = true)
 |-- hh_24: double (nullable = true)
 |-- 

                                                                                

In [8]:
informationDf = spark.read.csv('./darksky/informations_households.csv', inferSchema=True, header=True)

informationDf.show()

+---------+--------+-------+-------------+-------+
|    LCLid|stdorToU|  Acorn|Acorn_grouped|   file|
+---------+--------+-------+-------------+-------+
|MAC005492|     ToU| ACORN-|       ACORN-|block_0|
|MAC001074|     ToU| ACORN-|       ACORN-|block_0|
|MAC000002|     Std|ACORN-A|     Affluent|block_0|
|MAC003613|     Std|ACORN-A|     Affluent|block_0|
|MAC003597|     Std|ACORN-A|     Affluent|block_0|
|MAC003579|     Std|ACORN-A|     Affluent|block_0|
|MAC003566|     Std|ACORN-A|     Affluent|block_0|
|MAC003557|     Std|ACORN-A|     Affluent|block_0|
|MAC003553|     Std|ACORN-A|     Affluent|block_0|
|MAC003482|     Std|ACORN-A|     Affluent|block_0|
|MAC003463|     Std|ACORN-A|     Affluent|block_0|
|MAC003449|     Std|ACORN-A|     Affluent|block_0|
|MAC003428|     Std|ACORN-A|     Affluent|block_0|
|MAC003423|     Std|ACORN-A|     Affluent|block_0|
|MAC003422|     Std|ACORN-A|     Affluent|block_0|
|MAC003400|     Std|ACORN-A|     Affluent|block_0|
|MAC003394|     Std|ACORN-A|   

In [9]:
holidaysDf = spark.read.csv('./darksky/uk_bank_holidays.csv', inferSchema=True, header=True)

holidaysDf.show()

+-------------------+--------------------+
|      Bank holidays|                Type|
+-------------------+--------------------+
|2012-12-26 00:00:00|          Boxing Day|
|2012-12-25 00:00:00|       Christmas Day|
|2012-08-27 00:00:00| Summer bank holiday|
|2012-05-06 00:00:00|Queen?s Diamond J...|
|2012-04-06 00:00:00|Spring bank holid...|
|2012-07-05 00:00:00|Early May bank ho...|
|2012-09-04 00:00:00|       Easter Monday|
|2012-06-04 00:00:00|         Good Friday|
|2012-02-01 00:00:00|New Year?s Day (s...|
|2013-12-26 00:00:00|          Boxing Day|
|2013-12-25 00:00:00|       Christmas Day|
|2013-08-26 00:00:00| Summer bank holiday|
|2013-05-27 00:00:00| Spring bank holiday|
|2013-06-05 00:00:00|Early May bank ho...|
|2013-01-04 00:00:00|       Easter Monday|
|2013-03-29 00:00:00|         Good Friday|
|2013-01-01 00:00:00|      New Year?s Day|
|2014-12-26 00:00:00|          Boxing Day|
|2014-12-25 00:00:00|       Christmas Day|
|2014-08-25 00:00:00| Summer bank holiday|
+----------

In [11]:
wddDf = spark.read.csv('./darksky/weather_daily_darksky.csv', inferSchema=True, header=True)

wddDf.printSchema()

root
 |-- temperatureMax: double (nullable = true)
 |-- temperatureMaxTime: timestamp (nullable = true)
 |-- windBearing: integer (nullable = true)
 |-- icon: string (nullable = true)
 |-- dewPoint: double (nullable = true)
 |-- temperatureMinTime: timestamp (nullable = true)
 |-- cloudCover: double (nullable = true)
 |-- windSpeed: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- apparentTemperatureMinTime: timestamp (nullable = true)
 |-- apparentTemperatureHigh: double (nullable = true)
 |-- precipType: string (nullable = true)
 |-- visibility: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- apparentTemperatureHighTime: timestamp (nullable = true)
 |-- apparentTemperatureLow: double (nullable = true)
 |-- apparentTemperatureMax: double (nullable = true)
 |-- uvIndex: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- sunsetTime: timestamp (nullable = true)
 |-- temperatureLow: double (nullable = true)
 |-- temperatureMin: 

X acorn_details.csv                       X informations_households.csv
X daily_dataset                           X uk_bank_holidays.csv
/ darksky_parameters_documentation.html   X weather_daily_darksky.csv
X halfhourly_dataset                      X weather_hourly_darksky.csv
X hhblock_dataset

In [12]:
halfhourDf = spark.read.csv('./darksky/halfhourly_dataset/halfhourly_dataset', inferSchema=True, header=True)

halfhourDf.printSchema()



root
 |-- LCLid: string (nullable = true)
 |-- tstp: timestamp (nullable = true)
 |-- energy(kWh/hh): string (nullable = true)



                                                                                

In [14]:
# cache datasets
dailyDf.cache()

acorn.cache()

halfhourDf.cache()

hhblockDf.cache()

informationDf.cache()

holidaysDf.cache()

whdDf.cache()

wddDf.cache()

23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.
23/05/20 19:22:19 WARN CacheManager: Asked to cache already cached data.


DataFrame[temperatureMax: double, temperatureMaxTime: timestamp, windBearing: int, icon: string, dewPoint: double, temperatureMinTime: timestamp, cloudCover: double, windSpeed: double, pressure: double, apparentTemperatureMinTime: timestamp, apparentTemperatureHigh: double, precipType: string, visibility: double, humidity: double, apparentTemperatureHighTime: timestamp, apparentTemperatureLow: double, apparentTemperatureMax: double, uvIndex: double, time: timestamp, sunsetTime: timestamp, temperatureLow: double, temperatureMin: double, temperatureHigh: double, sunriseTime: timestamp, temperatureHighTime: timestamp, uvIndexTime: timestamp, summary: string, temperatureLowTime: timestamp, apparentTemperatureMin: double, apparentTemperatureMaxTime: timestamp, apparentTemperatureLowTime: timestamp, moonPhase: double]