<a href="https://colab.research.google.com/github/AndromedaOMA/Advanced_Analytics_with_Apache_Spark---E.On_Software_Development/blob/main/Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Descriere proiect

Acest proiect are ca scop principal analiza consumului de energie al unui grup de consumatori fictivi, pe parcursul unui an, a unei companii de energie, folosind tehnicile de analiză din motorul Apache Spark.

1. Primul set de date reflectă atât consumul total de energie, cât și detalii specifice, dacă există, despre producția din panouri solare, consumul pentru vehicule electrice (EV), energia furnizată înapoi către rețeaua electrică, consumul și încărcarea bateriilor.
2. Al doilea set de date oferă atât tariful pe an și prețurile per kWh în diferite intervale de timp, unice pentru fiecare client în parte, cât și prețul de vânzare la nivelul companiei de energie pe diferite intervale de timp.

Proiectul implică curățarea și prelucrarea datelor, completarea valorilor lipsă, iar la final calculul facturii de energie, bonus fiind compararea facturii cu alți clienți similari.



---

# Seturile de date

## Pregătire mediu de lucru

In [36]:
from google.colab import drive
# drive.mount('/content/drive')
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [37]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://dlcdn.apache.org/spark/
!wget -q https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz
!tar xf spark-3.4.4-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j
import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.4.4-bin-hadoop3"
import findspark
findspark.init()
findspark.find()
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as f
spark= SparkSession.builder.getOrCreate()
spark

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.83)] [1 InRelease 14.2 kB/129[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [4,104 kB]
Get:8 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,837 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa

## Setul de Date – Raw Time Series

Acest set de date conține informații despre consumul de date a unor clienți fictivi ai unei companii de energie. Structura datelor este una similară cu schemele folosite la momentul actual pentru astfel de date.

In [71]:
parquet_path = '/content/drive/MyDrive/E.on/E.on_Data/Data/Project/raw_time_series/parquet'
raw_time_series_df = spark.read.parquet(parquet_path)
raw_time_series_df.show(truncate=False)

+-------------------+-------------------+--------------------+------------+--------------------------+
|contract_id        |timestamp          |value               |value_source|annotations               |
+-------------------+-------------------+--------------------+------------+--------------------------+
|04_02_111 _ CHR12  |2023-01-01 06:00:00|0.02591860654732236 |measurement |{"region":"Europe/Berlin"}|
|04 _02_111 _CHR12  |2023-01-01 17:00:00|0.07385444264936832 |measurement |{"region":"Europe/Berlin"}|
|04_02_111 _ CHR12  |2023-01-01 17:30:22|0.08180149515221906 |measurement |{"region":"Europe/Berlin"}|
|04 _02_111 _CHR12  |2023-01-01 21:30:00|0.08670661371854547 |measurement |{"region":"Europe/Berlin"}|
|04 _ 02 _111_CHR12 |2023-01-02 00:30:00|0.03597601881331959 |measurement |{"region":"Europe/Berlin"}|
|04 _02_111 _ CHR12 |2023-01-02 05:30:00|0.03638379308965683 |measurement |{"region":"Europe/Berlin"}|
|04 _02 _111 _CHR12 |2023-01-03 10:45:00|0.931575            |measurement

## Setul de Date – Customer Tariff

Acest set de date conține informații despre tarifele și prețurile a unor clienți fictivi, într-un interval de timp, ai unei companii de energie. Structura este una similară cu cea folosite la momentul actual pentru astfel de date.

In [72]:
parquet_path = '/content/drive/MyDrive/E.on/E.on_Data/Data/Project/customer_tariff/parquet'
customer_tariff_df = spark.read.parquet(parquet_path)
customer_tariff_df.show()

+---------------+----------------------------+--------------------------+-----------------+-----------+------+
|    contract_id|target_local_start_timestamp|target_local_end_timestamp|      tariff_name|charge_type| price|
+---------------+----------------------------+--------------------------+-----------------+-----------+------+
|04_02_111_CHR28|         2022-12-01 00:00:00|       2023-02-16 00:00:00|     Electric Pro|        buy| 0.302|
|04_02_111_CHR28|         2023-02-16 00:00:00|       2024-05-19 00:00:00| Electric Loyalty|        buy|0.3938|
|04_02_111_CHR28|         2024-05-19 00:00:00|       2024-08-31 00:00:00|     Eco Electric|        buy|0.2095|
|04_02_111_CHR28|         2024-08-31 00:00:00|       2024-10-09 00:00:00| Electric Loyalty|        buy|0.4047|
|04_02_111_CHR28|         2024-10-09 00:00:00|       2024-11-09 00:00:00|Business Electric|        buy|0.1716|
|04_02_111_CHR28|         2024-11-09 00:00:00|       2024-12-11 00:00:00|     Electric Pro|        buy|0.2944|
|

---

# Cerințe

## Curățarea Datelor

Setul de date al consumului prezintă unele probleme, precum spații suplimentare și informații lipsă sau semi-structurate.

Acestea trebuie rectificate înainte de a trece mai departe:

1. Curățarea coloanei „contract_id” de spații suplimentare
2. Setarea coloanei „value_source” cu valoarea „missing” atunci când valoarea lipsește.
3. Coloana „timestamp” are variații mici ce trebuie rectificate. Contoarele măsoară consumul odată la 15 minute exact începând cu ora 00:00, dar din cauza procesării, pot apărea variații la timpul pe care îl trimit. (Indiciu: Se găsește cea mai apropriat multiplu de 15 la minute și se scot secundele)



In [73]:
# 1.
raw_time_series_df = raw_time_series_df.withColumn('contract_id', f.translate(f.col('contract_id'), ' ', ''))
raw_time_series_df.show(truncate=False)

+---------------+-------------------+--------------------+------------+--------------------------+
|contract_id    |timestamp          |value               |value_source|annotations               |
+---------------+-------------------+--------------------+------------+--------------------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 17:30:22|0.08180149515221906 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-02 00:30:00|0.03597601881331959 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-02 05:30:00|0.03638379308965683 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-03 10:45:00|0.931575            |measurement |{"region":"Europe/Berlin"}|
|04_02_111

In [74]:
# 2.
# raw_time_series_df = raw_time_series_df.withColumn('value_source', f.coalesce(f.col('value_source'), f.lit('missing')))
raw_time_series_df = raw_time_series_df.withColumn('value_source', f.when(f.col('value').isNull(), f.lit('missing')).otherwise(f.col('value_source')))
raw_time_series_df.show(truncate=False)

+---------------+-------------------+--------------------+------------+--------------------------+
|contract_id    |timestamp          |value               |value_source|annotations               |
+---------------+-------------------+--------------------+------------+--------------------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 17:30:22|0.08180149515221906 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-02 00:30:00|0.03597601881331959 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-02 05:30:00|0.03638379308965683 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-03 10:45:00|0.931575            |measurement |{"region":"Europe/Berlin"}|
|04_02_111

In [75]:
# 3.
raw_time_series_df = raw_time_series_df.withColumn('timestamp',
                                                   f.from_unixtime((f.unix_timestamp('timestamp') / 900).cast('int') * 900).cast('timestamp'))
raw_time_series_df.show(truncate=False)

+---------------+-------------------+--------------------+------------+--------------------------+
|contract_id    |timestamp          |value               |value_source|annotations               |
+---------------+-------------------+--------------------+------------+--------------------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-02 00:30:00|0.03597601881331959 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-02 05:30:00|0.03638379308965683 |measurement |{"region":"Europe/Berlin"}|
|04_02_111_CHR12|2023-01-03 10:45:00|0.931575            |measurement |{"region":"Europe/Berlin"}|
|04_02_111

## Extragerea Informațiilor de localizare și filtrarea datelor invalide

Setul de date al consumului prezintă unele probleme, precum spații suplimentare și informații lipsă sau semi-structurate.

Acestea trebuie rectificate înainte de a trece mai departe:

1. Extragerea unei noi coloane „region” din „annotations” (Se recomandă folosirea funcțiilor de Spark de procesare JSON)
2. Clienții cu regiuni invalide se vor scoate din setul de date și se vor salva pe disk într-o locație separată.
3. Extragerea datei din coloana „timestamp” într-o nouă coloană „utc_date”
4. Calcularea datei locale pentru data și ora din „timestamp”, pe baza regiunii, într-o nouă coloană „local_timestamp”
5. Extragerea datei din coloana „ local_timestamp” într-o nouă coloană „local_date”



In [76]:
# 1.
raw_time_series_df = raw_time_series_df.withColumn('region', f.get_json_object(f.col('annotations'), '$.region'))
print(raw_time_series_df.count())
raw_time_series_df.show(5, truncate=False)

3549244
+---------------+-------------------+-------------------+------------+--------------------------+-------------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |
+---------------+-------------------+-------------------+------------+--------------------------+-------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-02 00:30:00|0.03597601881331959|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
+---------------+-------------------+-------------------+------------+--------------------------

In [77]:
# 2.
# Am verificat câte regiuni se găsesc în setul nostru de date -> REZULTAT: Europe/Berlin și WakaWaka
view_regions = raw_time_series_df.select('region').distinct().collect()
print(view_regions)

test_raw_time_series_df = raw_time_series_df.groupBy('region').avg()
test_raw_time_series_df.show(truncate=False)

# Așadar:
out_path = '/content/drive/MyDrive/E.on/E.on_Data/Data/Project/outputs'
invalid_regions_df = raw_time_series_df.where(f.col('region')!='Europe/Berlin')
invalid_regions_df.write.mode('overwrite').format('parquet').save(out_path)

# print(f"no. of rows of raw_time_series_df: {raw_time_series_df.count()}")
raw_time_series_df.show(5, truncate=False)

# print(f"no. of rows of invalid_regions_df: {invalid_regions_df.count()}")
invalid_regions_df.show(5, truncate=False)

[Row(region='Europe/Berlin'), Row(region='WakaWaka')]
+-------------+-------------------+
|region       |avg(value)         |
+-------------+-------------------+
|Europe/Berlin|0.490498974431521  |
|WakaWaka     |0.13178303122889254|
+-------------+-------------------+

+---------------+-------------------+-------------------+------------+--------------------------+-------------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |
+---------------+-------------------+-------------------+------------+--------------------------+-------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547|

In [78]:
# 3.
raw_time_series_df = raw_time_series_df.withColumn('utc_date', f.to_date(f.col('timestamp')))
raw_time_series_df.show(5, truncate=False)

+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |utc_date  |
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|
|04_02_111_CHR12|2023-01-02 00:30:00|0.03597601881331959|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-02|
+---------------

In [79]:
# 4.
raw_time_series_df = raw_time_series_df.withColumn('local_timestamp', f.from_utc_timestamp(f.col('timestamp'), f.col('region')))
raw_time_series_df.show(5, truncate=False)

+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |utc_date  |local_timestamp    |
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 07:00:00|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 18:00:00|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 18:30:00|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 22:30:00|

In [80]:
# 5.
raw_time_series_df = raw_time_series_df.withColumn('local_date', f.to_date('local_timestamp'))
raw_time_series_df.show(5, truncate=False)

+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |utc_date  |local_timestamp    |local_date|
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 07:00:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 18:00:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 18:30:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 21:30:00|0.08670661371854547|measurement |{"regi

## Extragerea Informațiilor de consum

Setul de date al consumului prezintă unele probleme, precum spații suplimentare și informații lipsă sau semi-structurate.



Acestea trebuie rectificate înainte de a trece mai departe:

1. Extragerea din coloana „annotations” a consumului de vehicul electric (EV), baterie (BATTERY_IN) și consumul trimis spre rețeaua electrică (GRID_SELL) în coloanele „sent_to_ev”, „sent_to_battery” și „sent_to_grid”. În cazul în care valoare lipsește, se consideră consumul 0.
2. Extragerea din coloana „annotations” a energiei primite de la panourile solare (PV) și baterie (BATTERY_OUT) în coloanele „received_from_pv” și „received_from_battery”. În cazul în care valoare lipsește, se consideră energia primită 0.

In [81]:
print("Seturile de date disponibile:")
print("raw_time_series_df")
raw_time_series_df.show(5, truncate=False)
print("customer_tariff_df")
customer_tariff_df.show(5, truncate=False)

Seturile de date disponibile:
raw_time_series_df
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |utc_date  |local_timestamp    |local_date|
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 07:00:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 18:00:00|2023-01-01|
|04_02_111_CHR12|2023-01-01 17:30:00|0.08180149515221906|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 18:30:00|2023-01-01|
|04_02_111_CHR12|2023-01-01

In [82]:
# 1.

# Verificăm tipul de date a coloanei "annotations":
print("Tipurile valorilot coloanei annotations:")
raw_time_series_df.select('annotations').printSchema()
# Verificăm valorile distincte pe care le primește coloana "annotations":
print("Valorile distincte ale coloanei annotations:")
raw_time_series_df.select(('annotations')).distinct().show(5, truncate=False)
print('===' * 29)

raw_time_series_df = raw_time_series_df.withColumn('sent_to_ev', f.when(f.get_json_object(f.col('annotations'), '$.events.EV').isNotNull(), f.get_json_object(f.col('annotations'), '$.events.EV')).otherwise(0))
raw_time_series_df = raw_time_series_df.withColumn('sent_to_battery', f.when(f.get_json_object(f.col('annotations'), '$.events.BATTERY_IN').isNotNull(), f.get_json_object(f.col('annotations'), '$.events.BATTERY_IN')).otherwise(0))
raw_time_series_df = raw_time_series_df.withColumn('sent_to_grid', f.when(f.get_json_object(f.col('annotations'), '$.events.GRID_SELL').isNotNull(), f.get_json_object(f.col('annotations'), '$.events.GRID_SELL')).otherwise(0))
raw_time_series_df.show(5,truncate=False)

# Pentru validarea faptului că valorile consumurilor nu sunt numai nule
raw_time_series_df.select('sent_to_ev').distinct().show(5,truncate=False)
raw_time_series_df.select('sent_to_battery').distinct().show(5,truncate=False)
raw_time_series_df.select('sent_to_grid').distinct().show(5,truncate=False)


Tipurile valorilot coloanei annotations:
root
 |-- annotations: string (nullable = true)

Valorile distincte ale coloanei annotations:
+-------------------------------------------------------------------------------------+
|annotations                                                                          |
+-------------------------------------------------------------------------------------+
|{"region":"Europe/Berlin","events":{"GRID_SELL":"1.753088873001001","PV":"1.839175"}}|
|{"region":"Europe/Berlin","events":{"EV":"1.035","PV":"1.674475"}}                   |
|{"region":"Europe/Berlin","events":{"PV":"1.076"}}                                   |
|{"region":"Europe/Berlin","events":{"PV":"1.077475"}}                                |
|{"region":"Europe/Berlin","events":{"BATTERY_IN":"1.633989566467353","PV":"1.69435"}}|
+-------------------------------------------------------------------------------------+
only showing top 5 rows

+---------------+-------------------+-----------

In [83]:
# 2.
raw_time_series_df = raw_time_series_df.withColumn('received_from_pv', f.when(f.get_json_object(f.col('annotations'), '$.events.PV').isNotNull(), f.get_json_object(f.col('annotations'), '$.events.PV')).otherwise(0))
raw_time_series_df = raw_time_series_df.withColumn('received_from_battery', f.when(f.get_json_object(f.col('annotations'), '$.events.BATTERY_IN').isNotNull(), f.get_json_object(f.col('annotations'), '$.events.PV')).otherwise(0))

raw_time_series_df.show(5,truncate=False)
raw_time_series_df.select('received_from_pv').distinct().show(5,truncate=False)
raw_time_series_df.select('received_from_battery').distinct().show(5,truncate=False)

+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+----------+---------------+------------+----------------+---------------------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |utc_date  |local_timestamp    |local_date|sent_to_ev|sent_to_battery|sent_to_grid|received_from_pv|received_from_battery|
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+----------+---------------+------------+----------------+---------------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 07:00:00|2023-01-01|0         |0              |0           |0               |0                    |
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement

## Filtrarea consumului neobișnuit

Anumite valori ale consumului sunt neobișnuit de mari și este necesară scoaterea lor.



1.  Setarea coloanei „value_source” în „plausability_check_failed” pentru valorile cu consum neobișnuit din setul de date. (Decizia valorilor mari fie se face cu o analiză vizuală a datelor (sortarea și identificarea lor vizual fie prin agregări simple fie mai complicate) sau bonus, pentru cine dorește, prin tehnici de învățare automată.)
2. Salvarea separată a datelor cu „value_source” având valoare „plausability_check_failed” într-o locație separată. Atenție, datele nu se scot din setul de date.
3. Setarea coloanei „value” în NULL pentru datele cu „value_source” având valoare „plausability_check_failed”.



In [84]:
# 1. + BONUS
raw_time_series_df.show(5, truncate=False)

# IMPLEMENTAREA I
print("IMPLEMENTAREA I")

# Prea multă putere de calcul necesară pentru realizarea plot-urilor
# pandas_df = view_details_rts_df.toPandas()
# pandas_df.value.value_counts().plot(kind='bar')

view_details_rts_df = raw_time_series_df.select('value').distinct().orderBy(f.col('value').desc()).limit(2000)
print("view_details_rts_df: ")
view_details_rts_df.show(truncate=False)
out_path = '/content/drive/MyDrive/E.on/E.on_Data/Data/Project/outputs/view_details_rts_df'
view_details_rts_df.coalesce(1).write.mode('overwrite').csv(out_path)

# Din CSV-ul salvat mai sus deducem aspectul constant a creșterii valorilor, dar în intervalul 39-42 valorile încep să crească mai alarmant.
raw_time_series_df = raw_time_series_df.withColumn('value_source', f.when(f.col('value')>39, 'plausability_check_failed').otherwise(f.col('value_source')))

# IMPLEMENTAREA II (BONUS)
print('=====' * 10)
print("IMPLEMENTAREA II")

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Corectare coloană 'value' prin ștergerea valorilor de 0
raw_time_series_df = raw_time_series_df.where(f.col('value')!=0)

# Vectorizare pe coloana 'value'
vector_assembler = VectorAssembler(inputCols=['value'], outputCol='value_features')
data_vectorized_df = vector_assembler.transform(raw_time_series_df)

# KMeans
kmeans = KMeans(featuresCol='value_features', predictionCol='cluster', k=2)
model = kmeans.fit(data_vectorized_df)

# Calculează distanța față de centrul clusterului
from pyspark.sql.types import DoubleType

center = model.clusterCenters()[0]

def distance(v):
    return float((v - center).norm(2))

distance_udf = f.udf(distance, DoubleType())
data_with_distance = data_vectorized_df.withColumn("distance", distance_udf("value_features"))

# Setează „value” pentru valori anormale
threshold = 3.0
raw_time_series_df = data_with_distance.withColumn(
    "value_source",
    f.when(f.col("distance") > threshold, "plausability_check_failed")
)

view_value_sources = raw_time_series_df.select('value_source').distinct()
view_value_sources.show(truncate=False)


+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+----------+---------------+------------+----------------+---------------------+
|contract_id    |timestamp          |value              |value_source|annotations               |region       |utc_date  |local_timestamp    |local_date|sent_to_ev|sent_to_battery|sent_to_grid|received_from_pv|received_from_battery|
+---------------+-------------------+-------------------+------------+--------------------------+-------------+----------+-------------------+----------+----------+---------------+------------+----------------+---------------------+
|04_02_111_CHR12|2023-01-01 06:00:00|0.02591860654732236|measurement |{"region":"Europe/Berlin"}|Europe/Berlin|2023-01-01|2023-01-01 07:00:00|2023-01-01|0         |0              |0           |0               |0                    |
|04_02_111_CHR12|2023-01-01 17:00:00|0.07385444264936832|measurement

In [86]:
# 2.
out_path = '/content/drive/MyDrive/E.on/E.on_Data/Data/Project/outputs/plausability_check_failed'
raw_time_series_df.where(f.col('value_source')=='plausability_check_failed').write.mode('overwrite').format('parquet').save(out_path)

Py4JJavaError: An error occurred while calling o2110.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 656.0 failed 1 times, most recent failure: Lost task 1.0 in stage 656.0 (TID 956) (4f39a33b8e80 executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/content/drive/MyDrive/E.on/E.on_Data/Data/Project/outputs/plausability_check_failed.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.time.zone.ZoneRulesException: Unknown time-zone ID: WakaWaka
	at java.base/java.time.zone.ZoneRulesProvider.getProvider(ZoneRulesProvider.java:279)
	at java.base/java.time.zone.ZoneRulesProvider.getRules(ZoneRulesProvider.java:234)
	at java.base/java.time.ZoneRegion.ofId(ZoneRegion.java:120)
	at java.base/java.time.ZoneId.of(ZoneId.java:408)
	at java.base/java.time.ZoneId.of(ZoneId.java:356)
	at java.base/java.time.ZoneId.of(ZoneId.java:312)
	at org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.getZoneId(SparkDateTimeUtils.scala:46)
	at org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.getZoneId$(SparkDateTimeUtils.scala:39)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.getZoneId(DateTimeUtils.scala:40)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromUTCTime(DateTimeUtils.scala:496)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils.fromUTCTime(DateTimeUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/content/drive/MyDrive/E.on/E.on_Data/Data/Project/outputs/plausability_check_failed.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.time.zone.ZoneRulesException: Unknown time-zone ID: WakaWaka
	at java.base/java.time.zone.ZoneRulesProvider.getProvider(ZoneRulesProvider.java:279)
	at java.base/java.time.zone.ZoneRulesProvider.getRules(ZoneRulesProvider.java:234)
	at java.base/java.time.ZoneRegion.ofId(ZoneRegion.java:120)
	at java.base/java.time.ZoneId.of(ZoneId.java:408)
	at java.base/java.time.ZoneId.of(ZoneId.java:356)
	at java.base/java.time.ZoneId.of(ZoneId.java:312)
	at org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.getZoneId(SparkDateTimeUtils.scala:46)
	at org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.getZoneId$(SparkDateTimeUtils.scala:39)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.getZoneId(DateTimeUtils.scala:40)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromUTCTime(DateTimeUtils.scala:496)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils.fromUTCTime(DateTimeUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
# 3.
raw_time_series_df = raw_time_series_df.withColumn('value', f.when(f.col('value_source') == 'plausability_check_failed', f.lit('Null')).otherwise(f.col('value')))
view_value = raw_time_series_df.select('value').distinct()
view_value.show(truncate=False)

## Completarea valorilor lipsă

Anumiți clienți au lipsuri prezintă câteva lipsuri în consum, unele dintre ele adăugate de noi la pasul precedent:




1. Prezicerea valorii „value” atunci când ea este NULL folosind următoarea metodă:

  * Calculăm mai întâi media din ultimele 8 săptămâni a valorilor din aceeași zi a săptămânii la aceeași oră, minut și secundă. Se folosesc din ultimele 8 săptămâni doar valorile care nu lipsesc, au coloana „value_source” setată pe valoarea „measurement”.
  * Facem suma coloanelor „sent_to_ev”, „sent_to_battery” și „sent_to_grid”.
  * Facem suma coloanelor „received_from_pv” și „received_from_battery”.
  * Completăm coloane „value” cu maximum dintre aceste 3 valori.
  * **Bonus**, pentru cine dorește, puteți folosi și algoritmi de învățare automată, precum Linear Regression, în loc de calcularea mediei din ultimele 8 săptămâni și să comparați cele 2 metode.


2. Calcularea valorii „received_from_grid” atunci când avem toate informațiile.
  * Facem suma coloanelor „sent_to_ev”, „sent_to_battery” și „sent_to_grid” și scădem valorile din coloanele „received_from_pv” și „received_from_battery”.



## Asocierea cu tarifere

Curățând datele și completând valorile lipsă, putem acuma să trecem la asocierea cu tarifele consumatorilor:



1. Asocierea intrărilor de consum cu cele de tarifare, pe bază numărului de contract, al timpului și al tipului de preț, cumpărare sau vânzare.
  * Prețul tarifat la cumpărare este prețul de cumpărare pentru intrare de consum înmulțit cu coloana „received_from_grid”
  * Prețul tarifat la vânzare este prețul de vânzare pentru intrare de consum înmulțit cu coloana „send_to_grid”.

## Calcularea consumului și a facturii

După asociere, putem trece la calculul factorii:


1. Per zi / săptămâna / lună / an, pentru fiecare client, să se calculeze:
  * Consumul de energie total (suma coloanei value) – „ kWh_total”
  * Energia extrasă din baterie și cea din PV – „kWh_from_battery”, „kWh_from_PV”
  * Energia folosită pentru EV – „kWh_for_EV”
  * Consumul de energie folosit de la rețea – „kWh_from_grid”
  * Costul consumului de energie folosit de la rețea – „price_billed”
  * Consumul de energie trimis către rețea – „kWh_to_grid”
  * Costul primit înapoi, a consumului de energie trimis către rețea – „price_cashback”
  * Costul total (diferența de cost) – „price_final”
  * Puneți codul de calculare al agregatelor pe orice perioadă de timp într-o funcție, iar pentru fiecare interval, zi / săptămâna / lună / an, rulați această funcție. Afișați pentru fiecare perioada top 10 clienți cu cel mai mare preț.