In [None]:

# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES
# TO THE CORRECT LOCATION IN YOUR NOTEBOOK,
# THEN FEEL FREE TO DELETE THIS CELL.

import os
import sys
from tempfile import NamedTemporaryFile
from urllib.request import urlopen
from urllib.parse import unquote, urlparse
from urllib.error import HTTPError
from zipfile import ZipFile
import tarfile
import shutil

CHUNK_SIZE = 40960
DATA_SOURCE_MAPPING = 'smart-meters-in-london:https%3A%2F%2Fstorage.googleapis.com%2Fkaggle-data-sets%2F4021%2F3684057%2Fbundle%2Farchive.zip%3FX-Goog-Algorithm%3DGOOG4-RSA-SHA256%26X-Goog-Credential%3Dgcp-kaggle-com%2540kaggle-161607.iam.gserviceaccount.com%252F20240314%252Fauto%252Fstorage%252Fgoog4_request%26X-Goog-Date%3D20240314T161125Z%26X-Goog-Expires%3D259200%26X-Goog-SignedHeaders%3Dhost%26X-Goog-Signature%3D2b762e5d40b3eaa570877f5e351063c925a87574ac0401558d103ffb3ade39bcf29961253726e033414ed08dc8928163b4a66a441f2bb4ce913108bc9f705f8302a3b669f2b7a2e55a23580ef497c83988ca6c97e720076ca93ad755ca0017b2b09ee191536412429c9712bc81a135a327524367347ef80634f6fe1b04b7ee36a7d1aa319848612fe55d8181b8031279b72fa8b9aae4d55f7dc2bd32331fd9b42b06c2d982ccf644bc1b75aad5e50cecb11bc8ef6b3a40bc6a99d6a5c4cc672fafbd647eb38e6c5cf40b9dc8ecf9373e82ee9ab09569809e77f8bf04ad86538511d13fd89e2bff24e0ea31f07e3a4fc42211b8a4ed68227117c29f3732005ea4'

KAGGLE_INPUT_PATH='/kaggle/input'
KAGGLE_WORKING_PATH='/kaggle/working'
KAGGLE_SYMLINK='kaggle'

!umount /kaggle/input/ 2> /dev/null
shutil.rmtree('/kaggle/input', ignore_errors=True)
os.makedirs(KAGGLE_INPUT_PATH, 0o777, exist_ok=True)
os.makedirs(KAGGLE_WORKING_PATH, 0o777, exist_ok=True)

try:
  os.symlink(KAGGLE_INPUT_PATH, os.path.join("..", 'input'), target_is_directory=True)
except FileExistsError:
  pass
try:
  os.symlink(KAGGLE_WORKING_PATH, os.path.join("..", 'working'), target_is_directory=True)
except FileExistsError:
  pass

for data_source_mapping in DATA_SOURCE_MAPPING.split(','):
    directory, download_url_encoded = data_source_mapping.split(':')
    download_url = unquote(download_url_encoded)
    filename = urlparse(download_url).path
    destination_path = os.path.join(KAGGLE_INPUT_PATH, directory)
    try:
        with urlopen(download_url) as fileres, NamedTemporaryFile() as tfile:
            total_length = fileres.headers['content-length']
            print(f'Downloading {directory}, {total_length} bytes compressed')
            dl = 0
            data = fileres.read(CHUNK_SIZE)
            while len(data) > 0:
                dl += len(data)
                tfile.write(data)
                done = int(50 * dl / int(total_length))
                sys.stdout.write(f"\r[{'=' * done}{' ' * (50-done)}] {dl} bytes downloaded")
                sys.stdout.flush()
                data = fileres.read(CHUNK_SIZE)
            if filename.endswith('.zip'):
              with ZipFile(tfile) as zfile:
                zfile.extractall(destination_path)
            else:
              with tarfile.open(tfile.name) as tarfile:
                tarfile.extractall(destination_path)
            print(f'\nDownloaded and uncompressed: {directory}')
    except HTTPError as e:
        print(f'Failed to load (likely expired) {download_url} to path {destination_path}')
        continue
    except OSError as e:
        print(f'Failed to load {download_url} to path {destination_path}')
        continue

print('Data source import complete.')


## Importing Libraries

In [None]:
!pip install pyspark

In [None]:
# Install Prophet
!pip install Prophet

In [None]:
!java -version

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import os
import sys
import glob

from matplotlib import pyplot as plt

from prophet.plot import plot_plotly, plot_components_plotly

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import pandas_udf, PandasUDFType

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from prophet import Prophet

## Creating Spark App

In [None]:
spark = SparkSession.builder.appName('Smart-Meter-Analysis').getOrCreate()
spark

In [None]:
sc = spark.sparkContext
sc

In [None]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

## Reading Daily Dataset and Information household dataset

In [None]:
block_1_df = pd.read_csv('../input/smart-meters-in-london/daily_dataset/daily_dataset/block_1.csv')
block_1_df

In [None]:
acorn_group_df = pd.read_csv('../input/smart-meters-in-london/informations_households.csv')
acorn_group_df.head()

In [None]:
acorn_group_df["Acorn_grouped"].unique()

### Using date to find the calender day, month, year

In [None]:
def daily_to_df(file_path : str) -> pd.DataFrame:
    df = pd.read_csv(file_path)
    df['day'] = pd.to_datetime(df['day'])
    df["year"] = df["day"].apply(lambda x : x.year)
    df["month"] = df["day"].apply(lambda x : x.month)
    df["dayofweek"] = df["day"].apply(lambda x : x.dayofweek)
    df["day_name"] = df["day"].apply(lambda x : x.day_name())
    df = df.merge(acorn_group_df, on="LCLid")
    df = df[df["year"].isin([2012, 2013])]
    return df[["LCLid", "day", "year", "month", "day_name", "dayofweek", "Acorn_grouped", "energy_sum"]]

df = daily_to_df('../input/smart-meters-in-london/daily_dataset/daily_dataset/block_1.csv')
df.head()

In [None]:
df.tail()

### Reading all the blocks in the daily dataset folder and storing them into one dataframe

In [None]:
all_daily_df = pd.DataFrame()

for i, file_path in enumerate(glob.glob('../input/smart-meters-in-london/daily_dataset/daily_dataset/*.csv')):
    all_daily_df = all_daily_df.append(daily_to_df(file_path))
    #print(all_daily_df.shape)

In [None]:
all_daily_df.head()

### Pre processing the dataset

In [None]:
spark_all_daily_df=spark.createDataFrame(all_daily_df)
spark_all_daily_df.printSchema()
spark_all_daily_df.show()

In [None]:
spark_all_daily_df = spark_all_daily_df.drop_duplicates() # Drop any Duplicates values
spark_all_daily_df = spark_all_daily_df.na.drop() # Drop any null values or NAN

In [None]:
spark_all_daily_df = spark_all_daily_df.withColumn('energy_sum', F.round(spark_all_daily_df['energy_sum'], 3))

In [None]:
spark_all_daily_df.show()

### Exploratory Data Analysis

In [None]:
#Total Number of House Records
spark_all_daily_df.select('LCLid').count()

In [None]:
#Finding only the unique houses
spark_all_daily_df.select('LCLid').distinct().count()

In [None]:
spark_all_daily_df.select('LCLid').distinct().show()

### Statistical analysis for all the columns

In [None]:
spark_all_daily_df.describe(['energy_sum']).show()

In [None]:
spark_all_daily_df.select('year').distinct().show()

### Finding patterns and trends for the year 2012 and 2013

In [None]:
spark_y2012_df = spark_all_daily_df.filter("year == 2012")
spark_y2012_df.show()

In [None]:
spark_y2012_df.count()

In [None]:
spark_y2013_df = spark_all_daily_df.filter("year == 2013")
spark_y2013_df.show()

In [None]:
spark_y2013_df.count()

In [None]:
spark_y2013_df.groupBy('Acorn_grouped').count().show()

In [None]:
spark_y2012_df.groupBy('Acorn_grouped').count().show()

In [None]:
y2012_df = spark_y2012_df.toPandas()
y2013_df = spark_y2013_df.toPandas()

In [None]:
y2012_df = y2012_df[y2012_df["Acorn_grouped"].isin(["Adversity", "Affluent", "Comfortable"])]
y2013_df = y2013_df[y2013_df["Acorn_grouped"].isin(["Adversity", "Affluent", "Comfortable"])]

In [None]:
y2012_df.groupby("Acorn_grouped").count()["LCLid"]

In [None]:

y2013_df.groupby("Acorn_grouped").count()["LCLid"]

In [None]:
#Converting months int to month names
import calendar
y2012_df["month"] = y2012_df["month"].apply(lambda x: calendar.month_abbr[x])
y2013_df["month"] = y2013_df["month"].apply(lambda x: calendar.month_abbr[x])

## Exploring Energy consumption for different acorn groups

#### Finding Average Energy consumption during 2012

In [None]:
sum_y2012_df = pd.concat([y2012_df.groupby("Acorn_grouped").sum()["energy_sum"],
                         y2012_df.groupby("Acorn_grouped").count()["LCLid"]], axis = 1)

sum_y2012_df["average_eng"] = sum_y2012_df["energy_sum"]/sum_y2012_df["LCLid"]

sum_y2012_df

#### Finding Average Energy consumption during 2013

In [None]:
sum_y2013_df = pd.concat([y2013_df.groupby("Acorn_grouped").sum()["energy_sum"],
                         y2013_df.groupby("Acorn_grouped").count()["LCLid"]], axis = 1)

sum_y2013_df["average_eng"] = sum_y2013_df["energy_sum"]/sum_y2013_df["LCLid"]

sum_y2013_df

### Energy consumption between different acorn groups

In [None]:
bar_df = pd.DataFrame({'2012': sum_y2012_df["average_eng"],
                   '2013': sum_y2013_df["average_eng"]}).plot.bar(rot=0, title='Energy Consumption between Acorn groups')
bar_df.set_xlabel("Acorn Groups")
bar_df.set_ylabel("Energy Sum")

### Energy Consumption during the week

In [None]:
day_name_bar_df = pd.DataFrame({'2012': y2012_df.groupby("day_name").sum()["energy_sum"].sort_values(),
                   '2013': y2013_df.groupby("day_name").sum()["energy_sum"].sort_values()}).plot.bar(rot=0, title='Energy Consumption during the week', figsize=(12,8))
day_name_bar_df.set_xlabel("Day Name")
day_name_bar_df.set_ylabel("Energy Sum")

### Energy consumption during the year

In [None]:
day_name_bar_df = pd.DataFrame({'2012': y2012_df.groupby("month").sum()["energy_sum"].sort_values(),
                   '2013': y2013_df.groupby("month").sum()["energy_sum"].sort_values()}).plot.bar(rot=0, title='Energy Consumption usages during the year', figsize=(15,7))
day_name_bar_df.set_xlabel("Month")
day_name_bar_df.set_ylabel("Energy Sum")

### Energy consumption during 2012

In [None]:
y2012_df.groupby("month").sum()["energy_sum"].sort_values().plot.bar()

### Energy consumption during 2013

In [None]:
y2013_df.groupby("month").sum()["energy_sum"].sort_values().plot.bar()

## Correlation with weather data and implementing clustering algorithm

In [None]:
all_daily_df_2 = all_daily_df.copy()

In [None]:
all_daily_df.head()

In [None]:
len(all_daily_df)

In [None]:
total_houses = all_daily_df.groupby('day')[['LCLid']].nunique()
total_houses.head()

In [None]:
all_daily_df = all_daily_df.groupby('day')[['energy_sum']].sum()
all_daily_df = all_daily_df.merge(total_houses, on = ['day'])
all_daily_df = all_daily_df.reset_index()

In [None]:
all_daily_df.count()

In [None]:
all_daily_df.day = pd.to_datetime(all_daily_df.day,format='%Y-%m-%d').dt.date

In [None]:
all_daily_df['avg_energy'] = all_daily_df['energy_sum']/all_daily_df['LCLid']
print("Start date of the dataset", min(all_daily_df.day))
print("End date of the dataset", max(all_daily_df.day))

In [None]:
all_daily_df.describe()

In [None]:
weather_daily_df = pd.read_csv('../input/smart-meters-in-london/weather_daily_darksky.csv')
weather_daily_df.head()

In [None]:
weather_daily_df.describe()

In [None]:
weather_daily_df['day'] = pd.to_datetime(weather_daily_df['time'])
weather_daily_df['day'] = pd.to_datetime(weather_daily_df['day'], format = '%Y%m%d').dt.date

# selecting columns with numeric values
weather_daily_df = weather_daily_df[['temperatureMax', 'windBearing', 'dewPoint', 'cloudCover', 'windSpeed',
       'pressure', 'apparentTemperatureHigh', 'visibility', 'humidity',
       'apparentTemperatureLow', 'apparentTemperatureMax', 'uvIndex',
       'temperatureLow', 'temperatureMin', 'temperatureHigh',
       'apparentTemperatureMin', 'moonPhase','day']]
weather_daily_df = weather_daily_df.dropna()

### Finding association between weather data with energy comsumption data

In [None]:
weather_with_energy = all_daily_df.merge(weather_daily_df, on = 'day')
weather_with_energy.head()

### 1. Temperature

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.temperatureMax, color = 'tab:red')
ax1.plot(weather_with_energy.day, weather_with_energy.temperatureMin, color = 'tab:grey')
ax1.set_ylabel('Temperature')
ax1.legend()
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
ax2.legend(bbox_to_anchor=(0.0, 1.02, 1.0, 0.102))
plt.title('Energy Consumption and Temperature')
fig.tight_layout()
plt.show()

### 2. Humidity

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.humidity, color = 'tab:pink')
ax1.set_ylabel('Humidity',color = 'tab:pink')
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
plt.title('Energy Consumption and Humidity')
fig.tight_layout()
plt.show()

### 3. Cloud Cover

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.cloudCover, color = 'tab:grey')
ax1.set_ylabel('Cloud Cover',color = 'tab:grey')
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
plt.title('Energy Consumption and Cloud Cover')
fig.tight_layout()
plt.show()

### 4. Visibility

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.visibility, color = 'tab:purple')
ax1.set_ylabel('Visibility',color = 'tab:purple')
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
plt.title('Energy Consumption and Visibility')
fig.tight_layout()
plt.show()

### 5. Wind Speed

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.windSpeed, color = 'tab:brown')
ax1.set_ylabel('Wind Speed',color = 'tab:brown')
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
plt.title('Energy Consumption and Wind Speed')
fig.tight_layout()
plt.show()

### 6. UV index

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.uvIndex, color = 'tab:green')
ax1.set_ylabel('UV Index',color = 'tab:green')
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
plt.title('Energy Consumption and UV Index')
fig.tight_layout()
plt.show()

### 7. Dew Point

In [None]:
fig, ax1 = plt.subplots(figsize = (20,5))
ax1.plot(weather_with_energy.day, weather_with_energy.dewPoint, color = 'tab:orange')
ax1.set_ylabel('Dew Point',color = 'tab:orange')
ax2 = ax1.twinx()
ax2.plot(weather_with_energy.day,weather_with_energy.avg_energy,color = 'tab:blue')
ax2.set_ylabel('Average Energy/Household',color = 'tab:blue')
plt.title('Energy Consumption and Dew Point')
fig.tight_layout()
plt.show()

In [None]:
corr_weather_with_energy = weather_with_energy[['avg_energy','temperatureMax','dewPoint', 'cloudCover', 'windSpeed','pressure', 'visibility', 'humidity','uvIndex', 'moonPhase']].corr()
corr_weather_with_energy

### K-Means Clustering on the highly correlated columns

In [None]:
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range=(0, 1))
X = scaler.fit_transform(weather_daily_df[['temperatureMax','humidity','windSpeed']])

Nc = range(1, 20)
kmeans = [KMeans(n_clusters=i) for i in Nc]
score = [kmeans[i].fit(X).score(X) for i in range(len(kmeans))]

plt.plot(Nc,score)
plt.xlabel('Number of Clusters')
plt.ylabel('Score')
plt.title('Elbow Curve')
plt.show()

In [None]:
kmeans = KMeans(n_clusters=4).fit(X)
centroids = kmeans.cluster_centers_
print(centroids)

## Forcasting using prophet using Apache Spark (Pyspark)

In [None]:
# Convert the date column to datetime
all_daily_df_2['day'] = pd.to_datetime(all_daily_df_2['day'])

In [None]:
len(all_daily_df_2)

In [None]:
df3 = all_daily_df_2[["day","energy_sum"]].head(1821)

# Rename columns - Following Prophet requirements
df3.columns = ["ds", "y"]

In [None]:
train = df3.iloc[:-365]
test = df3.iloc[-365:]

In [None]:
test.count()

In [None]:
train.count()

In [None]:
m = Prophet()
m.fit(train)
future = m.make_future_dataframe(periods=365) #MS for monthly, H for hourly
forecast = m.predict(future)

In [None]:
forecast.tail()

In [None]:
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

In [None]:
test.tail()

In [None]:
plot_plotly(m ,forecast)

In [None]:
plot_components_plotly(m, forecast)

### Evaluating prophet model

In [None]:
from statsmodels.tools.eval_measures import rmse
predictions = forecast.iloc[-365:]['yhat']
print("Root Mean Squared Error between actual and  predicted values: ",rmse(predictions,test['y']))
print("Mean Value of Test Dataset:", test['y'].mean())

# PySpark Prophey analysis on the whole dataset

In [None]:

# Select necessary columns
df2 = all_daily_df_2[["day","energy_sum"]].head(1000)

# Rename columns - Following Prophet requirements
df2.columns = ["ds", "y"]


# instantiate the model and set parameters
model = Prophet(
    interval_width=0.95,
    growth='linear',
    daily_seasonality=False,
    weekly_seasonality=True,
    yearly_seasonality=True,
    seasonality_mode='multiplicative'
)

# fit the model to historical data
model.fit(df2)


future_pd = model.make_future_dataframe(
    periods=90,
    freq='d',
    include_history=True
)

# predict over the dataset
forecast_pd = model.predict(future_pd)

predict_fig = model.plot(forecast_pd, xlabel='date', ylabel='Average energy usage')

In [None]:
# Select necessary columns
df = all_daily_df_2[['day', 'LCLid', 'energy_sum']]

# Rename columns - Following Prophet requirements
df.columns = ['ds', 'LCLid', 'y']

In [None]:
df

In [None]:
# Read the csv file
df = spark.createDataFrame(df)

# Display the schema
df.printSchema()

In [None]:
# Partition the data
df.createOrReplaceTempView("Homes")
sql = "select * from Homes"
df_part = (spark.sql(sql)\
   .repartition(spark.sparkContext.defaultParallelism,
   ['LCLid'])).cache()
df_part.explain()

In [None]:
# Define a schema
schema = StructType([
                     StructField('LCLid', StringType()),
                     StructField('ds', TimestampType()),
                     StructField('y', DoubleType()),
                     StructField('yhat', DoubleType()),
                     StructField('yhat_upper', DoubleType()),
                     StructField('yhat_lower', DoubleType()),
                     ])

In [None]:
# define the Pandas UDF
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(home_pd):
    # instantiate the model and set parameters
    model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
    )

    # fit the model to historical data
    model.fit(home_pd)

    # Create a data frame that lists 90 dates starting from Jan 1 2012
    future = model.make_future_dataframe(
      periods=90,
      freq='d',
      include_history=True)

    # Out of sample prediction
    future = model.predict(future)

    # Create a data frame that contains store, home address, y, and yhat
    f_pd = future[['ds', 'yhat', 'yhat_upper', 'yhat_lower']]
    hm_pd = home_pd[['ds', 'LCLid','y']]
    result_pd = f_pd.join(hm_pd.set_index('ds'), on='ds', how='left')

    # fill home address
    result_pd['LCLid'] = home_pd['LCLid'].iloc[0]

    return result_pd[['LCLid','ds', 'y', 'yhat',
                    'yhat_upper', 'yhat_lower']]




In [None]:
# Apply the function to all home addresses
results = df_part.groupby(['LCLid']).apply(apply_model)

# Print the results - calculate the time to run
import timeit
start = timeit.default_timer()
results.show()
stop = timeit.default_timer()

In [None]:
df.select('LCLid').distinct().show()

In [None]:
results.coalesce(1)
results.createOrReplaceTempView('forecasted')
spark.sql("SELECT * FROM forecasted WHERE LCLid LIKE '%MAC00001%'").show()