# Trabalho de Grupo Processamento de Big Data

## Grupo 7

### Turma CDB2
#### Afonso Santos, 111431
#### Afonso Lourenço, 111487
#### Catarina Lameira, 111390
#### Leonor Laborinho, 111287

Base de dados utilizada: Smart meters in London

https://www.kaggle.com/datasets/jeanmidev/smart-meters-in-london

Docente: Adriano Lopes

Maio de 2024

## 1 Notebook

#### Importação dos dados

Na primeira fase do projeto, foram criados arquivos Parquet para armazenar os dados transformados, optando-se pelo formato Parquet devido à sua eficiência em termos de armazenamento e processamento de grandes volumes de dados. 

Importaram-se os conjuntos de dados necessários, incluindo registos de consumo energético e dados meteorológicos, e filtraram-se os dados para incluir apenas o ano de 2013, focando a análise num período específico e facilitando a identificação de padrões relevantes.

Neste notebook também será feita a análise de dados de consumo energético residencial e condições meteorológicas em Londres. Após a preparação inicial dos dados, o objetivo é aprofundar a análise e categorizar os consumidores com base nos seus padrões de consumo de eletricidade ao longo de 2013. O principal objetivo é agrupar os consumidores com base na similaridade dos seus comportamentos típicos de consumo de eletricidade, facilitando a análise do conjunto de dados. Para isso, será utilizado o algoritmo de agrupamento K-means para identificar padrões nos consumos residenciais. Esta análise também levará em conta as condições meteorológicas, para fornecer uma visão mais completa sobre os fatores que influenciam o consumo energético.

Iniciou-se com a importação das bibliotecas necessárias e configuração do ambiente de análise com o Apache Spark.


#### Importação das bibliotecas necessárias e configuração da sessão Spark.

In [1]:
#import findspark
#findspark.init()
#findspark.find()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime

# build our SparkSession

spark = SparkSession\
        .builder\
        .appName("TrabalhoFinal")\
        .config("spark.sql.shuffle.partitions",6)\
        .config("spark.sql.repl.eagereval.enabled",True)\
        .getOrCreate()

Funções para filtrar e criar os Parquet
1. `transform`: para converter colunas de data e filtrar dados para o ano de 2013.
2. `parquet`: para guardar os dados no formato Parquet.

In [2]:
def transform(df, date_col='day', date_format='yyyy-MM-dd', filter_year=2013):
    # Convert the date column to timestamp
    df = df.withColumn(date_col, F.to_timestamp(df[date_col],date_format))
        # Filter rows for the specified year
    df = df.filter(F.year(df[date_col]) == filter_year)
    return df

def parquet(df,parquetname):
    df.write.mode("overwrite").parquet(parquetname)

Importar os diferentes conjuntos de dados necessários para o projeto. Estes incluem:
- Dados de consumo energético medido a cada meia hora (`halfhourly_dataset`);
- Informações sobre os agregados familiares (`informations_households`);
- Detalhes sobre a classificação Acorn (`acorn_details`);
- Dados meteorológicos horários (`weather_hourly`).

#### Halfhourly_dataset:

In [3]:
! head -n 3 ../Trabalho_Datasets/halfhourly_dataset/halfhourly_dataset/block_0.csv

LCLid,tstp,energy(kWh/hh)
MAC000002,2012-10-12 00:30:00.0000000, 0 
MAC000002,2012-10-12 01:00:00.0000000, 0 


In [4]:
filename = '../Trabalho_Datasets/halfhourly_dataset/halfhourly_dataset'

halfhourly_dataset_blocks = spark.read.csv(filename, header= True, sep=",")

In [5]:
dfname='halfhourly_dataset_blocks'
parquetname=dfname+"_parquet"
parquet(halfhourly_dataset_blocks
        ,parquetname)


In [6]:
dfname='halfhourly_dataset_blocks'
parquetname=dfname+"_parquet"
halfhourly_dataset_blocks = spark.read.parquet(parquetname)

halfhourly_dataset_blocks.count()



167817021

In [7]:
halfhourly_dataset_blocks.show(10,truncate=False)

+---------+---------------------------+--------------+
|LCLid    |tstp                       |energy(kWh/hh)|
+---------+---------------------------+--------------+
|MAC000071|2011-12-10 09:30:00.0000000| 0.356        |
|MAC000071|2011-12-10 10:00:00.0000000| 0.245        |
|MAC000071|2011-12-10 10:30:00.0000000| 0.168        |
|MAC000071|2011-12-10 11:00:00.0000000| 0.216        |
|MAC000071|2011-12-10 11:30:00.0000000| 0.16         |
|MAC000071|2011-12-10 12:00:00.0000000| 0.271        |
|MAC000071|2011-12-10 12:30:00.0000000| 0.159        |
|MAC000071|2011-12-10 13:00:00.0000000| 0.2          |
|MAC000071|2011-12-10 13:30:00.0000000| 0.276        |
|MAC000071|2011-12-10 14:00:00.0000000| 0.375        |
+---------+---------------------------+--------------+
only showing top 10 rows



Transformar e filtrar os dados para incluir apenas o ano de 2013:

In [8]:
dfname='halfhourly_dataset_blocks'
parquetname=dfname+"_parquet"
halfhourly_dataset_blocks = spark.read.parquet(parquetname)

parquetname=dfname+"_2013_parquet"
df=transform(halfhourly_dataset_blocks, date_col='tstp', date_format='yyyy-MM-dd HH:mm:ss.SSSSSSS', filter_year=2013)
parquet(df ,parquetname)

del df
del halfhourly_dataset_blocks

#### Informations_households:

In [9]:
! head -n 3 ../Trabalho_Datasets/informations_households.csv

LCLid,stdorToU,Acorn,Acorn_grouped,file
MAC005492,ToU,ACORN-,ACORN-,block_0
MAC001074,ToU,ACORN-,ACORN-,block_0


In [10]:
filename = '../Trabalho_Datasets/informations_households.csv'

informations_households = spark.read.csv(filename, header= True, sep=",")

dfname='informations_households'
parquetname=dfname+"_parquet"
parquet(informations_households ,parquetname)

del informations_households

#### Acorn_details:

In [11]:
! head -n 3 ../Trabalho_Datasets/acorn_details.csv

MAIN CATEGORIES,CATEGORIES,REFERENCE,ACORN-A,ACORN-B,ACORN-C,ACORN-D,ACORN-E,ACORN-F,ACORN-G,ACORN-H,ACORN-I,ACORN-J,ACORN-K,ACORN-L,ACORN-M,ACORN-N,ACORN-O,ACORN-P,ACORN-Q
POPULATION,Age,Age 0-4,77.0,83.0,72.0,100.0,120.0,77.0,97.0,97.0,63.0,119.0,67.0,114.0,113.0,89.0,123.0,138.0,133.0
POPULATION,Age,Age 5-17,117.0,109.0,87.0,69.0,94.0,95.0,102.0,106.0,67.0,95.0,64.0,108.0,116.0,86.0,89.0,136.0,106.0


In [12]:
filename = '../Trabalho_Datasets/acorn_details.csv'

acorn_details = spark.read.csv(filename, header= True, sep=",")

dfname='acorn_details'
parquetname=dfname+"_parquet"
parquet(acorn_details ,parquetname)

del acorn_details

#### Weather_hourly:


In [13]:
! head -n 3 ../Trabalho_Datasets/weather_hourly_darksky.csv/weather_hourly_darksky.csv

visibility,windBearing,temperature,time,dewPoint,pressure,apparentTemperature,windSpeed,precipType,icon,humidity,summary
5.97,104,10.24,2011-11-11 00:00:00,8.86,1016.76,10.24,2.77,rain,partly-cloudy-night,0.91,Partly Cloudy
4.88,99,9.76,2011-11-11 01:00:00,8.83,1016.63,8.24,2.95,rain,partly-cloudy-night,0.94,Partly Cloudy


In [14]:
filename = '../Trabalho_Datasets/weather_hourly_darksky.csv/weather_hourly_darksky.csv'

weather_hourly = spark.read.csv(filename, header= True, sep=",")

In [15]:
dfname='weather_hourly'
parquetname=dfname+"_parquet"
parquet(weather_hourly
        ,parquetname)

In [16]:
dfname='weather_hourly'
parquetname=dfname+"_parquet"
weather_hourly = spark.read.parquet(parquetname)

weather_hourly.count()

21165

In [17]:
weather_hourly.show(10,truncate=False)

+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|visibility|windBearing|temperature|time               |dewPoint|pressure|apparentTemperature|windSpeed|precipType|icon               |humidity|summary      |
+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|5.97      |104        |10.24      |2011-11-11 00:00:00|8.86    |1016.76 |10.24              |2.77     |rain      |partly-cloudy-night|0.91    |Partly Cloudy|
|4.88      |99         |9.76       |2011-11-11 01:00:00|8.83    |1016.63 |8.24               |2.95     |rain      |partly-cloudy-night|0.94    |Partly Cloudy|
|3.7       |98         |9.46       |2011-11-11 02:00:00|8.79    |1016.36 |7.76               |3.17     |rain      |partly-cloudy-night|0.96    |Partly Cloudy|
|3.12      |99         |9.23       |2011-11-11

In [18]:
parquetname=dfname+"_2013_parquet"
df=transform(weather_hourly, date_col='time', date_format='yyyy-MM-dd HH:mm:ss', filter_year=2013)
parquet(df ,parquetname)

del df
del weather_hourly

Nesta fase, importámos e transformámos os dados necessários para análise. Estes dados foram armazenados no formato Parquet, facilitando o seu uso nas próximas etapas do projeto, como a análise exploratória, treino de modelos e visualização dos resultados.


#### Análise de Dados

Implementação de funções auxiliares para manipulação dos dados. Os dados de consumo de energia, informações das residências e dados meteorológicos serão carregados e verificados quanto à sua integridade e estrutura. De seguida, faremos a integração desses conjuntos de dados, criando um DataFrame consolidado que permite uma análise detalhada. O conjunto de dados será enriquecido com novas colunas, como períodos do dia e identificação de fins de semana, o que facilitará a análise dos padrões de consumo. Por fim, selecionaremos as variáveis mais relevantes para focar nos aspectos mais críticos da análise e preparar os dados para as próximas etapas de exploração e modelação.

#### Funções Auxiliares

Definem-se algumas funções auxiliares que ajudarão no processamento dos dados.

- `parquet(df, parquetname)`: Guarda um DataFrame em formato Parquet, permitindo que os dados sejam armazenados de forma eficiente e recuperados rapidamente para análises futuras.
- `compute_nulls_and_uniques(df, cols)`: Calcula a quantidade e a percentagem de valores nulos, NaNs e valores únicos em cada coluna de um DataFrame.
- `get_time_period`, `is_weekend`, `extract_month`, `extract_day`, `extract_hour`, `extract_minute`: Funções para extração de componentes de data e hora:


     - `get_time_period`: Converte uma data e hora num período do dia, como "Morning", "Noon", "Evening" ou "Night". 
     - `is_weekend`: Determina se uma data é um fim de semana. Retorna 1 se a data for um sábado ou domingo, e 0 caso contrário.
     - `extract_month`: Extrai o mês de uma data e hora.
     - `extract_day`: Extrai o dia de uma data e hora.
     - `extract_minute`: Extrai o minuto de uma data e hora.

In [2]:
# Function to figure out the profile of nulls and uniques for ecah column in a DataFrame
def parquet(df,parquetname):
    df.write.mode("overwrite").parquet(parquetname)

def compute_nulls_and_uniques(df, cols):
    total = df.count()
    results = []
    
    for cl in cols:
        # Count nulls and calculate their percentage
        knulls = df.filter(F.col(cl).isNull()).count()
        knullsperc = knulls / total
        
        # Initialize NaNs count and percentage
        knans = 0
        knansperc = 0
        
        # Check if the column is numeric before counting NaNs
        if isinstance(df.schema[cl].dataType, (DoubleType, FloatType)):
            knans = df.filter(F.isnan(cl)).count()
            knansperc = knans / total
        
        # Count unique values and calculate their percentage
        kuniques = df.select(cl).distinct().count()
        kuniquesperc = kuniques / total
        
        # Append results
        results.append(Row(
            feature=cl, 
            count_nulls=knulls, 
            percentage_nulls=knullsperc,
            count_nans=knans, 
            percentage_nans=knansperc,
            count_uniques=kuniques, 
            percentage_uniques=kuniquesperc
        ))

    return spark.createDataFrame(results)

# Define function to categorize datetime into periods
def get_time_period(datetime_str):
    if isinstance(datetime_str, datetime):
        datetime_str = datetime_str.strftime("%Y-%m-%d %H:%M:%S")
    hour = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S").hour
    if 6 <= hour < 12:
        return "Morning"
    elif 12 <= hour < 18:
        return "Noon"
    elif 18 <= hour < 24:
        return "Evening"
    else:
        return "Night"
    
# Define function to detect weekends
def is_weekend(date):
    if isinstance(date, str):
        date_obj = datetime.strptime(date, "%Y-%m-%d")
    elif isinstance(date, datetime):
        date_obj = date
    else:
        raise TypeError("Input must be a string or datetime object")
        
    return 1 if date_obj.weekday() in [5, 6] else 0

# Define functions to extract each component
def extract_month(datetime_str):
    if isinstance(datetime_str, datetime):
        datetime_str = datetime_str.strftime("%Y-%m-%d %H:%M:%S")
    return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S").month

def extract_day(datetime_str):
    if isinstance(datetime_str, datetime):
        datetime_str = datetime_str.strftime("%Y-%m-%d %H:%M:%S")
    return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S").day

def extract_hour(datetime_str):
    if isinstance(datetime_str, datetime):
        datetime_str = datetime_str.strftime("%Y-%m-%d %H:%M:%S")
    return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S").hour

def extract_minute(datetime_str):
    if isinstance(datetime_str, datetime):
        datetime_str = datetime_str.strftime("%Y-%m-%d %H:%M:%S")
    return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S").minute




#### Leitura dos Ficheiros Parquet

#### Análise dos Dados de Consumo de Energia

Carregam-se os dados de consumo de energia, provenientes da base de dados `halfhourly_dataset_blocks_2013_parquet`, e verifica-se o número de registos e a estrutura dos dados.

In [3]:
consumos_meiahora = spark.read.parquet("halfhourly_dataset_blocks_2013_parquet")

In [4]:
consumos_meiahora.count()

93024229

In [5]:
consumos_meiahora.show(5, truncate=False)

+---------+-------------------+--------------+
|LCLid    |tstp               |energy(kWh/hh)|
+---------+-------------------+--------------+
|MAC000071|2013-01-01 00:00:00| 0.113        |
|MAC000071|2013-01-01 00:30:00| 0.062        |
|MAC000071|2013-01-01 01:00:00| 0.107        |
|MAC000071|2013-01-01 01:30:00| 0.069        |
|MAC000071|2013-01-01 02:00:00| 0.102        |
+---------+-------------------+--------------+
only showing top 5 rows



Os dados de consumo de energia contêm 93.024.229 registos, com três colunas: `LCLid`, `tstp` e `energy(kWh/hh)`. Estes dados representam o consumo de energia a cada meia hora para diferentes residências ao longo de 2013.


In [6]:
consumos_meiahora_nulls_uniques = compute_nulls_and_uniques(consumos_meiahora,consumos_meiahora.columns)

In [7]:
consumos_meiahora_nulls_uniques.show(100, truncate=False)

+--------------+-----------+----------------+----------+---------------+-------------+--------------------+
|feature       |count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|percentage_uniques  |
+--------------+-----------+----------------+----------+---------------+-------------+--------------------+
|LCLid         |0          |0.0             |0         |0              |5528         |5.94253783065485E-5 |
|tstp          |0          |0.0             |0         |0              |17530        |1.884455285299919E-4|
|energy(kWh/hh)|0          |0.0             |0         |0              |6757         |7.263699008996893E-5|
+--------------+-----------+----------------+----------+---------------+-------------+--------------------+



A análise revela que não existem valores nulos ou NaNs nas colunas dos dados de consumo de energia. A coluna `LCLid` possui 5.528 valores únicos, indicando o número de residências distintas. A coluna `tstp` possui 17.530 valores únicos, que representam diferentes carimbos temporais. A coluna `energy(kWh/hh)` possui 6.757 valores únicos, refletindo a variabilidade nos consumos de energia.

#### Análise dos Dados das Casas

Carregam-se os dados de informações das casas (aglomerados familiares) provenientes da base de dados `informations_households_parquet` e verifica-se o número de registos e a estrutura dos dados.

In [4]:
informacao_casas=spark.read.parquet("informations_households_parquet")

In [9]:
informacao_casas.count()

5566

In [10]:
informacao_casas.show(5, truncate=False)

+---------+--------+-------+-------------+-------+
|LCLid    |stdorToU|Acorn  |Acorn_grouped|file   |
+---------+--------+-------+-------------+-------+
|MAC005492|ToU     |ACORN- |ACORN-       |block_0|
|MAC001074|ToU     |ACORN- |ACORN-       |block_0|
|MAC000002|Std     |ACORN-A|Affluent     |block_0|
|MAC003613|Std     |ACORN-A|Affluent     |block_0|
|MAC003597|Std     |ACORN-A|Affluent     |block_0|
+---------+--------+-------+-------------+-------+
only showing top 5 rows



Os dados de informações das casas contêm 5.566 registos, com cinco colunas: `LCLid`, `stdorToU`, `Acorn`, `Acorn_grouped` e `file`. Estes dados fornecem informações adicionais sobre cada residência, como o tipo de tarifa (`stdorToU`), a classificação socioeconómica (`Acorn` e `Acorn_grouped`) e onde está a respetiva residência (dados de consumo) (`file`).

In [11]:
informacao_casas_nulls_uniques = compute_nulls_and_uniques(informacao_casas,informacao_casas.columns)

In [12]:
informacao_casas_nulls_uniques.show(100, truncate=False)

+-------------+-----------+----------------+----------+---------------+-------------+---------------------+
|feature      |count_nulls|percentage_nulls|count_nans|percentage_nans|count_uniques|percentage_uniques   |
+-------------+-----------+----------------+----------+---------------+-------------+---------------------+
|LCLid        |0          |0.0             |0         |0              |5566         |1.0                  |
|stdorToU     |0          |0.0             |0         |0              |2            |3.5932446999640676E-4|
|Acorn        |0          |0.0             |0         |0              |19           |0.003413582464965864 |
|Acorn_grouped|0          |0.0             |0         |0              |5            |8.983111749910168E-4 |
|file         |0          |0.0             |0         |0              |112          |0.02012217031979878  |
+-------------+-----------+----------------+----------+---------------+-------------+---------------------+



A análise revela que não existem valores nulos ou NaNs nas colunas dos dados de informações das casas. A coluna `LCLid` possui 5.566 valores únicos, correspondendo ao número de residências. A coluna `stdorToU` possui 2 valores únicos, indicando os diferentes tipos de tarifas. As colunas `Acorn` e `Acorn_grouped` possuem 19 e 5 valores únicos, respectivamente, representando diferentes classificações socioeconómicas. A coluna `file` possui 112 valores únicos, refletindo a origem dos dados.

#### Análise dos Dados Meteorológicos

Carregam-se os dados metereológicos provenientes da base de dados `weather_hourly_2013_parquete` e verifica-se o número de registos e a estrutura dos dados.

In [5]:
meterologia_hora= spark.read.parquet("weather_hourly_2013_parquet")

In [14]:
meterologia_hora.count()

8758

In [6]:
meterologia_hora=meterologia_hora.orderBy("time")
meterologia_hora.show(10, truncate=False)

+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|visibility|windBearing|temperature|time               |dewPoint|pressure|apparentTemperature|windSpeed|precipType|icon               |humidity|summary      |
+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|13.28     |269        |7.01       |2013-01-01 00:00:00|2.6     |1008.19 |3.66               |5.46     |rain      |partly-cloudy-night|0.73    |Partly Cloudy|
|13.07     |273        |7.49       |2013-01-01 01:00:00|2.73    |1008.75 |4.24               |5.51     |rain      |partly-cloudy-night|0.72    |Partly Cloudy|
|13.52     |274        |7.16       |2013-01-01 02:00:00|1.73    |1009.47 |3.74               |5.74     |rain      |partly-cloudy-night|0.68    |Partly Cloudy|
|13.07     |272        |7.04       |2013-01-01

A tabela apresenta dados meteorológicos horários, conforme os seguintes campos:

- `visibility`: Visibilidade em quilometros.
- `windBearing`: Direção do vento em graus.
- `temperature`: Temperatura em graus Celsius.
- `time`: Data e hora da medição.
- `dewPoint`: Ponto de orvalho em graus Celsius.
- `pressure`: Pressão atmosférica em milibares.
- `apparentTemperature`: Temperatura aparente em graus Celsius.
- `windSpeed`: Velocidade do vento em quilometros por hora.
- `precipType`: Tipo de precipitação (chuva, neve, etc.).
- `icon`: Icon a representar as condições climáticas.
- `humidity`: Humidade relativa do ar.
- `summary`: Resumo das condições climáticas.


#### Join dos DataFrames para Obter um DataFrame Mais Completo (`informacao_casas` com `consumos_meiahora`)

Realiza-se esta etapa com o objetivo de combinar diferentes conjuntos de dados num único DataFrame consolidado que contenha todas as informações relevantes para a análise subsequente. Neste caso específico, estamos a unir dados de consumo de energia  com informações sobre as residências.

Das colunas pertencentes à base de dados `informacao_casas`, seleciona-se as que consideramos mais relevantes ('LCLid', 'stdorToU', 'Acorn', 'Acorn_grouped') e une-se a base de dados com essas colunas a `consumos_meiahora`, utilizando a coluna 'LCLid' como chave de união.

Após o join, renomeia-se a coluna 'tstp' para 'time' para facilitar a identificação de registos temporais e simplificar futuras operações com os dados.

In [7]:
other_df_selected = informacao_casas.select('LCLid', 'stdorToU', 'Acorn', 'Acorn_grouped')

# Perform the join
result_df = consumos_meiahora.join(other_df_selected, on='LCLid', how='inner')

result_df=result_df.withColumnRenamed('tstp', 'time')


In [17]:
result_df.show(10,truncate=False)

+---------+-------------------+--------------+--------+-------+-------------+
|LCLid    |time               |energy(kWh/hh)|stdorToU|Acorn  |Acorn_grouped|
+---------+-------------------+--------------+--------+-------+-------------+
|MAC000071|2013-01-01 00:00:00| 0.113        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 00:30:00| 0.062        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 01:00:00| 0.107        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 01:30:00| 0.069        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 02:00:00| 0.102        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 02:30:00| 0.073        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 03:00:00| 0.099        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 03:30:00| 0.079        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 04:00:00| 0.091        |ToU     |ACORN-J|Comfortable  |
|MAC000071|2013-01-01 04:30:00| 0.085        |ToU     |ACORN-J|C

Assim, obtem-se uma base de dados que apresenta dados de consumo de energia em intervalos de meia hora, com os seguintes campos:

- `LCLid`: Identificador único do consumidor, usado para distinguir cada residência.
- `time`: Data e hora do registro de consumo de energia
- `energy(kWh/hh)`: Consumo de energia em kilowatt-hora por meia hora.
- `stdorToU`: Tipo de tarifa de energia (tarifa padrão: standard ou tarifa baseada em horário de uso: time-of-use).
- `Acorn`: Categoria de classificação socioeconómica do client, baseada no sistema ACORN (A Classification of Residential Neighborhoods).
- `Acorn_grouped`: Grupo de classificação socioeconómica do cliente (Divide as categorias ACORN em 18 grupos, que incluem perfis como Afluente, Confortável e Adversidade).

#### Join dos DataFrames para Obter um DataFrame Mais Completo (juntar `metereologia_hora` ao  criado antes)

De forma a poder fazer a junção desta nova base de dados com `metereologia_hora`, é necessário primeiro corrigir os minutos da base de dados criada. Uma vez que os dados metereológicos estão registados de hora a hora, e os dados de consumo de energia estão registados de meia em meia hora, mostra-se essencial ajustar os minutos da coluna 'time' para zero, de forma a que seja possível unir a base de dados referente aos dados metereológicos, à base de dados criada anteriormente (que contém os dados de consumo energético unidos com as informações das casas).

Após isto ser feito, tem-se a garantia que a data e hora das duas bases de dados estejam uniformemente formatadas em intervalos de uma hora. Cria-se uma nova coluna, 'time weather' que contém a data e hora corrigida. 
Nos dados metereológicos, a coluna 'time' é renomeada para 'time weather' para que possa corresponder à coluna criada nos dados de consumo de energia.

Com as duas colunas 'time weather' preparadas e uniformemente formatadas, realiza-se o join entre os dados de consumo de energia e os dados meteorológicos.

In [8]:
#Corrigir os minutos (para = 0) de modo a fazer left join
result_df = result_df.withColumn("time weather", F.date_format(F.col("time"), "yyyy-MM-dd HH:00:00"))

result_df.show(10,truncate=False)

+---------+-------------------+--------------+--------+-------+-------------+-------------------+
|LCLid    |time               |energy(kWh/hh)|stdorToU|Acorn  |Acorn_grouped|time weather       |
+---------+-------------------+--------------+--------+-------+-------------+-------------------+
|MAC000071|2013-01-01 00:00:00| 0.113        |ToU     |ACORN-J|Comfortable  |2013-01-01 00:00:00|
|MAC000071|2013-01-01 00:30:00| 0.062        |ToU     |ACORN-J|Comfortable  |2013-01-01 00:00:00|
|MAC000071|2013-01-01 01:00:00| 0.107        |ToU     |ACORN-J|Comfortable  |2013-01-01 01:00:00|
|MAC000071|2013-01-01 01:30:00| 0.069        |ToU     |ACORN-J|Comfortable  |2013-01-01 01:00:00|
|MAC000071|2013-01-01 02:00:00| 0.102        |ToU     |ACORN-J|Comfortable  |2013-01-01 02:00:00|
|MAC000071|2013-01-01 02:30:00| 0.073        |ToU     |ACORN-J|Comfortable  |2013-01-01 02:00:00|
|MAC000071|2013-01-01 03:00:00| 0.099        |ToU     |ACORN-J|Comfortable  |2013-01-01 03:00:00|
|MAC000071|2013-01-0

In [19]:
# Assuming 'time' column is in string format
#result_df =result_df.withColumn('time unix', F.unix_timestamp(result_df['time'], 'yyyy-MM-dd HH:mm:ss'))

# Show the DataFrame
#result_df.show(10,truncate=False)


In [9]:
# Assuming 'time' column is in string format
#meterologia_hora  = meterologia_hora.withColumn('time unix', F.unix_timestamp(meterologia_hora['time'], 'yyyy-MM-dd HH:mm:ss'))

meterologia_hora  = meterologia_hora.withColumnRenamed('time', 'time weather')

# Show the DataFrame
meterologia_hora.show()

+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|visibility|windBearing|temperature|       time weather|dewPoint|pressure|apparentTemperature|windSpeed|precipType|               icon|humidity|      summary|
+----------+-----------+-----------+-------------------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|     13.28|        269|       7.01|2013-01-01 00:00:00|     2.6| 1008.19|               3.66|     5.46|      rain|partly-cloudy-night|    0.73|Partly Cloudy|
|     13.07|        273|       7.49|2013-01-01 01:00:00|    2.73| 1008.75|               4.24|     5.51|      rain|partly-cloudy-night|    0.72|Partly Cloudy|
|     13.52|        274|       7.16|2013-01-01 02:00:00|    1.73| 1009.47|               3.74|     5.74|      rain|partly-cloudy-night|    0.68|Partly Cloudy|
|     13.07|        272|       7.04|2013-01-01

Join entre os DataFrames `result_df` e `metereologia_hora` usando a coluna 'time weather' como chave:

In [10]:
join_df = result_df.join(meterologia_hora, on='time weather', how='left')

join_df.show(5,truncate=False)
join_df.show(1,truncate=False,vertical=True)

+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|time weather       |LCLid    |time               |energy(kWh/hh)|stdorToU|Acorn  |Acorn_grouped|visibility|windBearing|temperature|dewPoint|pressure|apparentTemperature|windSpeed|precipType|icon               |humidity|summary      |
+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+
|2013-01-01 00:00:00|MAC000071|2013-01-01 00:00:00| 0.113        |ToU     |ACORN-J|Comfortable  |13.28     |269        |7.01       |2.6     |1008.19 |3.66               |5.46     |rain      |partly-cloudy-night|0.73    |Partly Cloudy|
|2013-01-01 00:00:00|MAC000071|2013-01-01 00:30:00| 0.062   

#### Adição de novas colunas:

Regista-se a função 'get_time_period' como uma UDF (User Defined Function) e adiciona-se uma nova coluna `period_of_day` que indica o período do dia ("Morning", "Noon", "Evening" ou "Night") baseado na coluna 'time'.

In [11]:
# Register the function as a UDF
time_period_udf = F.udf(get_time_period, StringType())

# Add a new column with the respective time period
join_df = join_df.withColumn("period_of_day", time_period_udf(F.col('time')))

# Show the DataFrame with the new column
join_df.show()

+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+-------------+
|       time weather|    LCLid|               time|energy(kWh/hh)|stdorToU|  Acorn|Acorn_grouped|visibility|windBearing|temperature|dewPoint|pressure|apparentTemperature|windSpeed|precipType|               icon|humidity|      summary|period_of_day|
+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+-------------+
|2013-01-01 00:00:00|MAC000071|2013-01-01 00:00:00|        0.113 |     ToU|ACORN-J|  Comfortable|     13.28|        269|       7.01|     2.6| 1008.19|               3.66|     5.46|      rain|partly-cloudy-night|    0.73|Partly Cloudy|        Night|
|201

Regista-se a função 'is_weekend' como uma UDF e adiciona-se uma nova coluna `weekend` que indica se a data é um fim de semana (1) ou não (0).

In [12]:
# Register the function as a UDF
weekend_udf = F.udf(is_weekend, IntegerType())

# Add a new column with the weekend indicator
join_df = join_df.withColumn("weekend", weekend_udf(join_df["time"]))

# Show the DataFrame with the new column
join_df.show()

+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+-------------+-------+
|       time weather|    LCLid|               time|energy(kWh/hh)|stdorToU|  Acorn|Acorn_grouped|visibility|windBearing|temperature|dewPoint|pressure|apparentTemperature|windSpeed|precipType|               icon|humidity|      summary|period_of_day|weekend|
+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+-------------+-------+
|2013-01-01 00:00:00|MAC000071|2013-01-01 00:00:00|        0.113 |     ToU|ACORN-J|  Comfortable|     13.28|        269|       7.01|     2.6| 1008.19|               3.66|     5.46|      rain|partly-cloudy-night|    0.73|Partly Cl

Regista-se as funções 'extract_month', 'extract_day' e 'extract_hour' como UDFs e adiciona-se novas colunas ao DataFrame, representando o mês (`month`), o dia (`day`) e a hora (`hour`) extraídos da coluna de tempo existente no DataFrame.

In [13]:
# Register the functions as UDFs
extract_month_udf = F.udf(extract_month, IntegerType())
extract_day_udf = F.udf(extract_day, IntegerType())
extract_hour_udf = F.udf(extract_hour, IntegerType())

# Add new columns for each component
join_df = join_df.withColumn("month", extract_month_udf(join_df["time"]))
join_df = join_df.withColumn("day", extract_day_udf(join_df["time"]))
join_df = join_df.withColumn("hour", extract_hour_udf(join_df["time"]))

# Show the DataFrame with the new columns
join_df.show()

+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+-------------+-------+-----+---+----+
|       time weather|    LCLid|               time|energy(kWh/hh)|stdorToU|  Acorn|Acorn_grouped|visibility|windBearing|temperature|dewPoint|pressure|apparentTemperature|windSpeed|precipType|               icon|humidity|      summary|period_of_day|weekend|month|day|hour|
+-------------------+---------+-------------------+--------------+--------+-------+-------------+----------+-----------+-----------+--------+--------+-------------------+---------+----------+-------------------+--------+-------------+-------------+-------+-----+---+----+
|2013-01-01 00:00:00|MAC000071|2013-01-01 00:00:00|        0.113 |     ToU|ACORN-J|  Comfortable|     13.28|        269|       7.01|     2.6| 1008.19|               3.66|     5.46|    

#### Seleção das colunas relevantes para a análise

In [14]:
join_df.columns

['time weather',
 'LCLid',
 'time',
 'energy(kWh/hh)',
 'stdorToU',
 'Acorn',
 'Acorn_grouped',
 'visibility',
 'windBearing',
 'temperature',
 'dewPoint',
 'pressure',
 'apparentTemperature',
 'windSpeed',
 'precipType',
 'icon',
 'humidity',
 'summary',
 'period_of_day',
 'weekend',
 'month',
 'day',
 'hour']

Após o tratamento efetuado aos dados, DataFrame está completo, e contem todas as seguintes colunas:
`time weather`, `LCLid`, `time`, `energy(kWh/hh)`, `stdorToU`, `Acorn`, `Acorn_grouped`,  `visibility`, `windBearing`, `temperature`, `dewPoint`,  `pressure`, `apparentTemperature`, `windSpeed`, `precipType`, `icon`, `humidity`, `summary`, `period_of_day`, `weekend`, `month`, `day` e `hour`.


Para a próxima etapa da análise exploratória, optou-se por focar apenas num conjunto específico de variáveis que se consideram mais relevantes para os objetivos do estudo. Estas variáveis foram cuidadosamente selecionadas com base na sua importância potencial para se entender melhor os padrões e tendências nos dados.

In [15]:
# Define the columns to select
selected_columns = [
    'LCLid', 'time', 'energy(kWh/hh)', 'stdorToU', 'Acorn', 'Acorn_grouped',
    'temperature', 'apparentTemperature', 'precipType', 'humidity',
    'summary', 'period_of_day', 'weekend', 'month', 'day', 'hour'
]

# Select the specified columns
join_df= join_df.select(*selected_columns)


As variáveis escolhidas incluem informações essenciais, como: 

- Identificação do Consumidor (`LCLid`): Um identificador único para cada residência/consumidor;
- Data e Hora (`time`): Registo de data e hora para cada medição de consumo, permitindo a análise temporal e a identificação de padrões de uso ao longo do dia e das estações do ano.
- Consumo de Energia (`energy(kWh/hh)`): Quantidade de energia consumida em kilowatt-hora, medida em intervalos de meia hora. Fornece uma visão detalhada do uso de energia em períodos curtos e permite a deteção de picos de consumo.
- Tipo de Tarifa (`stdorToU`): Informação sobre o tipo de tarifa elétrica aplicada, que pode ser uma tarifa padrão (Standard) ou uma tarifa diferenciada por horário de uso (Time-of-Use). É crucial para entender como diferentes estruturas tarifárias influenciam o consumo.
- Segmentação Demográfica (`Acorn` e `Acorn_grouped`): Classificação socioeconômica e demográfica das residências com base no sistema ACORN, que categoriza os consumidores em diferentes grupos de acordo com características econômicas e sociais. Acorn fornece uma categorização detalhada, enquanto Acorn_grouped agrupa essas categorias em segmentos mais amplos.
- Temperatura (`temperature`): Temperatura ambiente em graus Celsius, que pode influenciar diretamente o consumo de energia para aquecimento ou arrefecimento.
- Temperatura aparente (`apparentTemperature`): Reflete a sensação térmica, considerando fatores como vento e humidade, que influenciam o consumo de energia.
- Tipo de precipitação (`precipType`): Chuva ou neve, que pode afetar o comportamento de consumo de energia, especialmente para aquecimento e isolamento térmico.
- Humidade (`humidity`): Humidade relativa do ar, que pode influenciar a necessidade de climatização e, consequentemente, o consumo de energia.
- Resumo das condições climáticas (`summary`): Fornece uma descrição geral do tempo em cada registo, como "Ensolarado" ou "Nublado".
- Período do dia (`period_of_day`): "Morning", "Noon", "Evening" ou "Night".
- Fim de semana (`weekend`): Indica se o dia em que foi realizado o consumo era um fim de semana ou não.
- Mês (`month`): Mês em que foi realizada a medição do consumo.
- Dia (`day`): Dia em que foi realizada a medição do consumo.
- Hora (`hour`): Hora em que foi realizada a medição do consumo. 

A exclusão das outras variáveis foi uma decisão estratégica para simplificar a análise e focar apenas nos aspetos mais relevantes do conjunto de dados. Isso permitirá uma análise mais eficiente e uma compreensão mais aprofundada dos padrões de consumo de energia em relação às variáveis selecionadas.

In [16]:
join_df.show()

+---------+-------------------+--------------+--------+-------+-------------+-----------+-------------------+----------+--------+-------------+-------------+-------+-----+---+----+
|    LCLid|               time|energy(kWh/hh)|stdorToU|  Acorn|Acorn_grouped|temperature|apparentTemperature|precipType|humidity|      summary|period_of_day|weekend|month|day|hour|
+---------+-------------------+--------------+--------+-------+-------------+-----------+-------------------+----------+--------+-------------+-------------+-------+-----+---+----+
|MAC000071|2013-01-01 00:00:00|        0.113 |     ToU|ACORN-J|  Comfortable|       7.01|               3.66|      rain|    0.73|Partly Cloudy|        Night|      0|    1|  1|   0|
|MAC000071|2013-01-01 00:30:00|        0.062 |     ToU|ACORN-J|  Comfortable|       7.01|               3.66|      rain|    0.73|Partly Cloudy|        Night|      0|    1|  1|   0|
|MAC000071|2013-01-01 01:00:00|        0.107 |     ToU|ACORN-J|  Comfortable|       7.49|      

In [17]:
join_df.count()

93024229

O DataFrame passou a conter um total de 93024229 linhas

In [18]:
del result_df
del consumos_meiahora
del informacao_casas

Como os DataFrames (result_df, consumos_meiahora, join_df e informacao_casas) já não são necessários, foram eliminados da memória para libertar recursos e espaço que estavam a ser ocupados

Por fim, guarda-se o DataFrame final em formato Parquet, de forma a facilitar o armazenamento e acesso futuro

In [19]:
parquet(join_df,'dados_final_parquet')

In [23]:
del join_df

join_df = spark.read.parquet('dados_final_parquet')

Recarrega-se o DataFrame a partir do arquivo Parquet e exibe as primeiras 5 linhas para verificar a integridade dos dados salvos.

In [24]:
join_df.show(5,truncate=False)

+---------+-------------------+--------------+--------+-------+-------------+-----------+-------------------+----------+--------+-------------+-------------+-------+-----+---+----+
|LCLid    |time               |energy(kWh/hh)|stdorToU|Acorn  |Acorn_grouped|temperature|apparentTemperature|precipType|humidity|summary      |period_of_day|weekend|month|day|hour|
+---------+-------------------+--------------+--------+-------+-------------+-----------+-------------------+----------+--------+-------------+-------------+-------+-----+---+----+
|MAC000071|2013-01-01 00:00:00| 0.113        |ToU     |ACORN-J|Comfortable  |7.01       |3.66               |rain      |0.73    |Partly Cloudy|Night        |0      |1    |1  |0   |
|MAC000071|2013-01-01 00:30:00| 0.062        |ToU     |ACORN-J|Comfortable  |7.01       |3.66               |rain      |0.73    |Partly Cloudy|Night        |0      |1    |1  |0   |
|MAC000071|2013-01-01 01:00:00| 0.107        |ToU     |ACORN-J|Comfortable  |7.49       |4.24  

Este notebook preparou os dados de consumo de energia, integrando-os com informações demográficas e meteorológicas para facilitar a análise subsequente. Nos próximos passos, realizaremos uma análise exploratória para identificar tendências e padrões específicos nos dados de consumo e aplicaremos algoritmos de clustering para segmentar os consumidores com base nos seus comportamentos típicos de uso de energia. Este trabalho permitirá entender melhor as necessidades energéticas dos diferentes grupos de consumidores e como fatores externos, como o clima, impactam o consumo de energia.