# Notebook Electricity Consumption  in Barcelona

> *Andrea Ramirez*
>
> Master in Data Science
>
> Universitat de Girona, 2023

# 1. Notebook Preparation

*Import the basic libraries to run this notebook.*

In [37]:
import json
import os
from datetime import datetime, time

import numpy as np
import pandas as pd
import requests
import seaborn as sns
import matplotlib.pyplot as plt 
from matplotlib import rcParams
from babel.dates import format_date
import datetime
from datetime import timedelta

*Import the libraries perform ML techniques.*

In [2]:
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# 2. Data Extraction

*Import of libraries to run Spark in this section.*

In [3]:
from itertools import chain
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import regexp_replace, to_date
from pyspark.sql.types import *
from pyspark.sql.window import Window

*Creation of the Spark session*

In [4]:
os.environ["SPARK_MASTER"] = "local[*]"
os.environ["SPARK_JAR_PACKAGES"] = "org.postgresql:postgresql:42.6.0"
os.environ["SPARK_WAREHOUSE"] = "/tmp/"

In [5]:
spark = (
    SparkSession.builder.master(os.environ["SPARK_MASTER"])
    .appName("Spark-Electricity-BCN")
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.ui.enabled", False)
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .config("spark.driver.memory", "2g")
    .config("spark.ui.showConsoleProgress", False)
    .config("spark.jars.packages", os.environ["SPARK_JAR_PACKAGES"])
    .getOrCreate()
)

*Getting the `url` per year*.

In [6]:
data = {
    2019: {
        "year": "2019",
        "url": "https://opendata-ajuntament.barcelona.cat/data/dataset/d9479057-781f-42b4-85e6-721bd0284130/resource/41b9a3e0-f079-4a4a-a8c3-670adc6bfb95/download/2019_consum_electricitat_bcn.csv",
    },
    2020: {
        "year": "2020",
        "url": "https://opendata-ajuntament.barcelona.cat/data/dataset/d9479057-781f-42b4-85e6-721bd0284130/resource/3c45a329-0414-4055-8f17-82456fc78dea/download/2020_consum_electricitat_bcn.csv",
    },
    2021: {
        "year": "2021",
        "url": "https://opendata-ajuntament.barcelona.cat/data/dataset/d9479057-781f-42b4-85e6-721bd0284130/resource/2b6ef2c2-cbd0-437c-a229-889e992e1dd6/download/2021_consum_electricitat_bcn.csv",
    },
    2022: {
        "year": "2022",
        "url": "https://opendata-ajuntament.barcelona.cat/data/dataset/d9479057-781f-42b4-85e6-721bd0284130/resource/e685fa1a-9fbe-41a4-98a0-173464cd0c5f/download/2022_consum_electricitat_bcn.csv",
    },
    2023: {
        "year": "2023",
        "url": "https://opendata-ajuntament.barcelona.cat/data/dataset/d9479057-781f-42b4-85e6-721bd0284130/resource/25caee3a-a9f2-4e1a-8c8b-1b5fcac4f152/download/2023_consum_electricitat_bcn.csv",
    },
}

In [7]:
def get_data_per_year(diccionary, element):
    if element in diccionary:
        data_per_year = diccionary[element]
        year = data_per_year["year"]
        url = data_per_year["url"]
        return year, url
    else:
        return None

*Requesting the data from the Open Data website*

In [8]:
def extract_data(year, url):
    file_name = f"{year}.csv"
    response = requests.get(url)
    with open(file_name, "wb") as f:
        f.write(response.content)
    df = spark.read.csv(file_name, header=True, inferSchema=True)
    return df

In [9]:
combined_df = None

for year, data_per_year in data.items():
    year = data_per_year["year"]
    url = data_per_year["url"]
    initial_df = extract_data(year, url)
    if combined_df is None:
        combined_df = initial_df
    else:
        combined_df = combined_df.union(initial_df)

*Translating the columns' names*

In [10]:
df = (
    combined_df.withColumnRenamed("Any", "year_observed")
    .withColumnRenamed("Data", "date_observed")
    .withColumnRenamed("Codi_Postal", "postal_code")
    .withColumnRenamed("Sector_Economic", "sector_category")
    .withColumnRenamed("Tram_Horari", "time_range")
    .withColumnRenamed("Valor", "consumption_value")
)

# 3. Data Exploration and Pre-Processing

First of all, let's filter the data to only keep the registers to explore.

In [11]:
df = (
    df.filter(col("sector_category").isin("Serveis"))
    .filter(
        (col("time_range") == "De 00:00:00 a 05:59:59 h")
        | (col("time_range") == "De 18:00:00 a 23:59:59 h")
    )
    .drop("sector_category", "time_range")
)

Let's check the initial attributes.

In [12]:
def dataframe_description(dataframe):
    data_types = dataframe.dtypes
    unique_values = [
        dataframe.select(col).distinct().count() for col in dataframe.columns
    ]
    info_columns = spark.createDataFrame(
        list(zip(dataframe.columns, data_types, unique_values)),
        ["column", "data_types", "unique_values"],
    )
    return info_columns

In [13]:
# info_initial_columns = dataframe_description(df)
# info_initial_columns.show(truncate=False)

> From this description, the variables that need fixing are:
>
> The `date_observed` is a `string` type instead of `date`.
>
> The `year_observed` is a `numeric` type instead of `category`.
>
> The `postal_code` is a `numeric` type instead of `category`.

## 3.1. Data Cleaning

*Transformation from `string` to `date` type.*

In [14]:
df = df.withColumn("date_observed", to_date(df.date_observed))

In [15]:
# info_initial_columns = dataframe_description(df)
# info_initial_columns.show(truncate=False)

Function to map dictionaries.

In [16]:
def mapping_strings(dataframe, mapping_dic, mapped_column, original_column):
    mapping = create_map([lit(x) for x in chain(*mapping_dic.items())])
    dataframe = dataframe.withColumn(
        mapped_column,
        when(
            col(original_column).isin(list(mapping_dic.keys())),
            mapping[col(original_column)],
        ).otherwise(col(original_column)),
    )
    return dataframe

In [17]:
map_years = {2019: "2019", 2020: "2020", 2021: "2021", 2022: "2022", 2023: "2023"}
df = mapping_strings(df, map_years, "year_observed_cat", "year_observed")

In [18]:
unique_postal_code = df.select("postal_code").distinct().collect()

# for row in unique_postal_code:
#    print(row[0])

In [19]:
map_postal_code = {
    8001: "8001",
    8002: "8002",
    8003: "8003",
    8004: "8004",
    8005: "8005",
    8006: "8006",
    8007: "8007",
    8008: "8008",
    8009: "8009",
    8010: "8010",
    8011: "8011",
    8012: "8012",
    8013: "8013",
    8014: "8014",
    8015: "8015",
    8016: "8016",
    8017: "8017",
    8018: "8018",
    8019: "8019",
    8020: "8020",
    8021: "8021",
    8022: "8022",
    8023: "8023",
    8024: "8024",
    8025: "8025",
    8026: "8026",
    8027: "8027",
    8028: "8028",
    8029: "8029",
    8030: "8030",
    8031: "8031",
    8032: "8032",
    8033: "8033",
    8034: "8034",
    8035: "8035",
    8036: "8036",
    8037: "8037",
    8038: "8038",
    8039: "8039",
    8040: "8040",
    8041: "8041",
    8042: "8042",
}
df = mapping_strings(df, map_postal_code, "postal_code", "postal_code")

In [20]:
# info_initial_columns = dataframe_description(df)
# info_initial_columns.show(truncate=False)

## 3.2. Data Normalization

# Inicializar el Min-Max Scaler
scaler = MinMaxScaler(inputCol="consumption_value", outputCol="scaled_consumption_value")

# Ajustar el scaler al DataFrame y normalizar la columna "features"
scaler_model = scaler.fit(df)
normalized_df = scaler_model.transform(df)

# Mostrar el resultado
normalized_df.show()

## 3.3. Data Integration

From the data, let's create new attributes from the `date_observed`. Let's find the `month` and `day of the week` the records were observed.

In [21]:
def capitalizacion(dataframe, column):
    dataframe = dataframe.withColumn(column, initcap(dataframe[column]))
    return dataframe


def get_day_of_week(date):
    return format_date(date, format="EEEE", locale="EN")


def get_month(date):
    return format_date(date, format="MMMM", locale="EN")


def add_date_columns(dataframe, date_column):
    get_day_of_week_udf = udf(get_day_of_week, StringType())
    get_month_udf = udf(get_month, StringType())

    dataframe = dataframe.withColumn(
        "day_of_week", get_day_of_week_udf(dataframe[date_column])
    ).withColumn("month", get_month_udf(dataframe[date_column]))
    dataframe = capitalizacion(dataframe, "day_of_week")
    dataframe = capitalizacion(dataframe, "month")
    return dataframe

In [22]:
df = add_date_columns(df, "date_observed")
# df.show(1)

## 3.3.1. Data Fusion

In [23]:
df_activities = spark.read.csv(
    "activities_bcn_2019_2022.csv", header=True, inferSchema=True
)

In [24]:
df_with_activities = df.join(df_activities, "date_observed", "left")

In [25]:
df_with_activities = df_with_activities.fillna("Unknown")

In [26]:
df_with_activities = df_with_activities.withColumn(
    "location",
    expr("CASE WHEN location LIKE '0%' THEN substring(location, 2) ELSE location END"),
)

In [27]:
df_with_activities = df_with_activities.withColumn(
    "event_location",
    when((col("postal_code") == col("location")), "Delimited zone")
    .when((col("location") == "Several places"), "Several places")
    .when((col("location") == "Unknown"), "Unknown")
    .when((col("location") == "None"), "Unknown")
    .otherwise("No event"),
)
df_with_activities = df_with_activities.drop("postal_code", "location")

In [28]:
#df_with_activities.show(1)

In [29]:
# info_initial_columns = dataframe_description(df_with_activities)
# info_initial_columns.show(truncate=False)

## 3.4. Data Discretization

*Definition of the categorical columns to encode*.

In [30]:
categorical_cols = [
    "day_of_week",
    "month",
    "year_observed_cat",
    "event_location",
    "organizers_type",
]

*Iteration over categorical columns and creation of new one-hot-encoded columns*.

In [31]:
for col_name in categorical_cols:
    categories = (
        df_with_activities.select(col(col_name))
        .distinct()
        .rdd.flatMap(lambda x: x)
        .collect()
    )

    for category in categories:
        new_col_name = col_name + "_" + category
        df_with_activities = df_with_activities.withColumn(
            new_col_name, (col(col_name) == lit(category)).cast("int")
        )

In [32]:
drop_columns = [
    "date_observed",
    "year_observed",
    "venue_type",
    "day_of_week",
    "month",
    "organizers_type",
    "event_location",
]
df_with_activities = df_with_activities.drop(*drop_columns)

In [33]:
df_with_activities.show(1)

+-----------------+-----------------+---------------------+-------------------+------------------+--------------------+--------------------+------------------+------------------+----------+--------------+-------------+-----------+---------+------------+-----------+----------+--------------+-------------+--------------+---------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-----------------------+-----------------------------+-----------------------------+----------------------+-----------------------+----------------------+-----------------------+
|consumption_value|year_observed_cat|day_of_week_Wednesday|day_of_week_Tuesday|day_of_week_Friday|day_of_week_Thursday|day_of_week_Saturday|day_of_week_Monday|day_of_week_Sunday|month_July|month_February|month_January|month_March|month_May|month_August|month_April|month_June|month_November|month_October|month_December|month_September|year_observ

In [33]:
years_train = ["2019", "2020", "2021"]
train_data = df_with_activities.filter(col("year_observed_cat").isin(years_train))
train_data = train_data.drop("year_observed_cat")

In [34]:
years_validate = ["2022"]
validate_data = df_with_activities.filter(col("year_observed_cat").isin(years_validate))
validate_data = validate_data.drop("year_observed_cat")

In [35]:
years_test = ["2023"]
test_data = df_with_activities.filter(col("year_observed_cat").isin(years_test))
test_data = test_data.drop("year_observed_cat")

In [None]:
# df_with_activities.coalesce(1).write.csv('energy_2019_2022.csv', header=True)