# ***Proiect Cirstean Paul***

### ***Task 0*** Setup

#### Import all tools required for the app to work

In [None]:
from pyspark.sql import SparkSession, Window, functions as f 
from pyspark.sql.types import StructType,StructField, StringType, List
spark = SparkSession.builder.master("local[*]").config('spark.driver.memory', '3g').getOrCreate();

#### Read data from the files

In [2]:
raw_df = spark.read.format('json').load('./data/raw_time_series/json/');
raw_df.printSchema();

[Stage 0:>                                                          (0 + 8) / 9]

root
 |-- annotations: string (nullable = true)
 |-- contract_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- value: double (nullable = true)
 |-- value_source: string (nullable = true)



                                                                                

#### Entries count

In [3]:
print(raw_df.count());



3549246


                                                                                

## **Task 1** Clean contract_id

### Analyse data set
This column as multiple spaces which should not be present.

In [4]:
raw_df.select("contract_id").filter(f.col('contract_id').contains(" ")).show();

+-------------------+
|        contract_id|
+-------------------+
|  04_02 _111_CHR12 |
|  04_02 _111_CHR12 |
|  04 _02 _111_CHR12|
|04 _02_ 111 _CHR12 |
|  04_02 _111_CHR12 |
| 04 _02_ 111 _CHR12|
|  04 _02 _111_CHR12|
| 04_02 _111 _CHR12 |
|04 _02_ 111 _CHR12 |
|04_ 02 _ 111 _CHR12|
| 04 _02_111_ CHR12 |
| 04 _02_111_ CHR12 |
|  04 _02 _111_CHR12|
|04_ 02 _ 111 _CHR12|
|04 _02_ 111 _CHR12 |
| 04_02 _111 _CHR12 |
|  04_02 _111_CHR12 |
|  04_02 _111_CHR12 |
|04_ 02 _ 111 _CHR12|
|  04 _02 _111_CHR12|
+-------------------+
only showing top 20 rows



### Solution

We just replace the column with a regex which eliminates all spaces

In [5]:
data_contract_id_cleaned_df = raw_df.withColumn("contract_id", f.regexp_replace(f.col("contract_id"), "\\s+", ""));

### Result
Let's filter all contract_ids which contains spaces.

In [6]:
print(data_contract_id_cleaned_df.filter(f.col('contract_id').contains(" ")).count());

[Stage 5:>                                                          (0 + 8) / 9]

0


                                                                                

## **Task 2** Clean value_source 
If the **value_source** is missing or invalid it should have the value **missing**

### Analyse data set
If the value_source is null we should put missing in that column. \
We first filter these rows to see which ones should we change. \
No such entries were found. So 2 entries were added. One has value_source null and the other is empty.

In [7]:
(data_contract_id_cleaned_df.select("contract_id", "value_source")
    .filter((f.col("value_source") != "measurement") | (f.col("value_source").isNull())).show());

                                                                                

+---------------+------------+
|    contract_id|value_source|
+---------------+------------+
|04_02_111_CHR12|            |
|04_02_111_CHR12|        NULL|
+---------------+------------+



                                                                                

### Solution
We realize from these rows that when ```value_source != mesurement``` we should put the value missing

In [8]:
data_value_source_cleaned_df = data_contract_id_cleaned_df.withColumn("value_source", 
        f.when(f.col("value_source") == "measurement", "measurement").otherwise("missing"));

### Result
We should have 2 entries with value **missing**

In [9]:
data_value_source_cleaned_df.select("contract_id", "value_source").filter(f.col("value_source") != "measurement").show()



+---------------+------------+
|    contract_id|value_source|
+---------------+------------+
|04_02_111_CHR12|     missing|
|04_02_111_CHR12|     missing|
+---------------+------------+



                                                                                

## **Task 3** Clean timestamp inconsistency
Be sure that the timestamps are done by 15 minutes. (e.g. 00:30, 00:45, 01:00)

### Analyse data set
We see that there are many timestamps which does not respect the format

In [10]:
raw_df.select(f.col("timestamp")).filter(f.minute(f.col("timestamp")) % 15 != 0).show(n = 10, truncate=False);

+------------------------+
|timestamp               |
+------------------------+
|2023-01-04T00:28:14.000Z|
|2023-01-04T16:58:28.000Z|
|2023-01-04T20:43:21.000Z|
|2023-01-06T22:13:19.000Z|
|2023-01-06T23:28:02.000Z|
|2023-01-08T13:13:09.000Z|
|2023-01-08T14:13:25.000Z|
|2023-01-12T20:58:25.000Z|
|2023-01-12T23:28:19.000Z|
|2023-01-13T16:58:10.000Z|
+------------------------+
only showing top 10 rows



24/04/08 13:51:50 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Solution
We will convert the timezone to UTC because we will use unix_timestamp to convert the timestamp.

In [11]:
spark.conf.set("spark.sql.session.timeZone", "UTC");

First we will convert the the column to Unix. \
```f.date_trunc("minute")``` is used to remove the seconds from the timestamps.

In [12]:
convertToUnix = f.unix_timestamp(f.date_trunc("minute", f.col("timestamp")));

Next we will remove the minutes from the unit time.

In [13]:
unixWithoutMinutes = convertToUnix - f.minute(f.col('timestamp')) * 60;

Lastly we need to round the minutes to the closest 15 minutes interval.

In [14]:
roundedMinutes = f.round((f.minute(f.col('timestamp')) /15)) * 60 * 15;

Everything wrapped togheter.

In [15]:
data_timestamp_cleaned_df = (data_value_source_cleaned_df
    .withColumn("timestamp", (unixWithoutMinutes + roundedMinutes).cast('timestamp')));

### Result
Now all timestamp are in interval of 15 minutes exactly.

In [16]:
data_timestamp_cleaned_df.select("timestamp").show(n = 10, truncate=False);

+-------------------+
|timestamp          |
+-------------------+
|2023-01-01 05:45:00|
|2023-01-01 07:45:00|
|2023-01-01 08:30:00|
|2023-01-01 08:45:00|
|2023-01-01 13:00:00|
|2023-01-01 14:30:00|
|2023-01-01 15:00:00|
|2023-01-01 15:15:00|
|2023-01-01 15:30:00|
|2023-01-01 15:45:00|
+-------------------+
only showing top 10 rows



## **Task 4** Extract the region field 

The ***annotations*** column has a json which contains the field **region**

### Analyse data set
Let's see some entries from the data set.

First we need to declare the schema of our json column. 
We only need region so we can add only this field discarding others.

In [17]:
data_timestamp_cleaned_df.select("annotations").show(truncate=False)

+------------------------------------------------------------------------------------+
|annotations                                                                         |
+------------------------------------------------------------------------------------+
|{"region":"Europe/Berlin"}                                                          |
|{"region":"Europe/Berlin"}                                                          |
|{"region":"Europe/Berlin"}                                                          |
|{"region":"Europe/Berlin"}                                                          |
|{"region":"Europe/Berlin","events":{"GRID_SELL":"1.45940454750907","PV":"1.473775"}}|
|{"region":"Europe/Berlin","events":{"PV":"1.266225"}}                               |
|{"region":"Europe/Berlin"}                                                          |
|{"region":"Europe/Berlin"}                                                          |
|{"region":"Europe/Berlin"}                

### Solution
First we will define the schema we need. We only need the region field, everything else can be discarded.

In [18]:
schema = StructType([ 
    StructField("region",StringType(),True), 
  ]);

Now we will use json_tuple to get the **region** field to a column named **region**

In [19]:
data_with_region__df = data_timestamp_cleaned_df.withColumn('region', f.json_tuple(f.col("annotations"), "region"));

### Result
Now we will have a column named region

In [20]:
data_with_region__df.select("contract_id", 'region').show(n=10)

+---------------+-------------+
|    contract_id|       region|
+---------------+-------------+
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
|04_02_111_CHR12|Europe/Berlin|
+---------------+-------------+
only showing top 10 rows



## **Task 5** Filter invalid regions.
There are invalid regions. Write all the entries with invalid regions in a file called InvalidClients.
Let's analyse the data. So let's see all our regions

### Analyse data set
Let's see all different regions found in the data set.

In [23]:
data_with_region__df.select(f.col('region')).distinct().show();



+-------------+
|       region|
+-------------+
|Europe/Berlin|
|     WakaWaka|
+-------------+



                                                                                

### Solution
We can see there are only 2 regions Europe/Berlin which seems much more real than WakaWaka. \
So for this specific dataset we just remove WakaWaka entries.

First let's create a dataset with all the WakaWaka entries and write them to InvalidClients folder.

In [None]:
(data_with_region__df.filter(f.col('region') == "WakaWaka")
    .write.format('csv').option('header', 'true').save('./data/InvalidClients'));

Now filter the data set.

In [25]:
data_region_filtered_df = data_with_region__df.filter(f.col('region') != "WakaWaka");

### Result
Let's check what happened.

First let's see how many different regions are left in the dataset. We can see we have only one valid region.

In [26]:
data_region_filtered_df.select("region").distinct().show();



+-------------+
|       region|
+-------------+
|Europe/Berlin|
+-------------+



                                                                                

Now the entries count was [Link to entries query](#Entries-count) **3549246** \
Now our dataset has: **3444114**

In [27]:
print(data_region_filtered_df.count());



3444114


                                                                                

## **Task 6**
Calculate utc date, local timestamp, local date

### **Solution**

Create **utc_date** column

In [169]:
data_utc_df = data_region_filtered_df.withColumn("utc_date", f.to_date(f.col('timestamp')));


Create **local timestamp** column

In [170]:
data_local_timestamp_df = data_utc_df.withColumn("local_timestamp", f.from_utc_timestamp(f.col("timestamp"), "Europe/Bucharest"));


Create **local date** column

In [171]:
data_local_date_df = data_local_timestamp_df.withColumn("local_date", f.to_date(f.col('local_timestamp')));

## **Task 7**
Extract all data from **annotations.event** json \
It containts details about the energy consumption and production

### Analyse data set
Let's see how the json is created.

In [182]:
data_local_date_df.select("annotations").sort(f.desc(f.length(f.col("annotations")))).show(n = 6,truncate=False)




+--------------------------------------------------------------------------------------------------------------+
|annotations                                                                                                   |
+--------------------------------------------------------------------------------------------------------------+
|{"region":"Europe/Berlin","events":{"EV":"1.150616841147254","BATTERY_IN":"1.047355403597352","PV":"2.26135"}}|
|{"region":"Europe/Berlin","events":{"EV":"2.491949999999996","BATTERY_OUT":"1.296228453956364"}}              |
|{"region":"Europe/Berlin","events":{"EV":"1.653001332208902","BATTERY_OUT":"1.619142268057457"}}              |
|{"region":"Europe/Berlin","events":{"EV":"2.292000000000129","BATTERY_OUT":"1.259414405510866"}}              |
|{"region":"Europe/Berlin","events":{"EV":"2.670999999999978","BATTERY_OUT":"3.294711253031282"}}              |
|{"region":"Europe/Berlin","events":{"EV":"1.172000000000001","BATTERY_OUT":"1.351665631988632"}

                                                                                

As we can see the json has only 2 properties region and events. We care only about the events. So we can define the schema of the \
**annotations** column as such:


In [183]:
annotations_schema = StructType(
    [
        StructField('region', StringType(), True),
        StructField('events', StringType(), True)
    ]
)

In [188]:
json_explode = data_local_date_df.withColumn('data', f.from_json("annotations", annotations_schema))
json_explode.select("data.events").filter(f.col("data.events").isNotNull()).show(truncate=False)

+--------------------------------------------------+
|events                                            |
+--------------------------------------------------+
|{"GRID_SELL":"1.45940454750907","PV":"1.473775"}  |
|{"PV":"1.266225"}                                 |
|{"BATTERY_IN":"1.420021373239755","PV":"1.441375"}|
|{"GRID_SELL":"1.273193767759511","PV":"1.2912"}   |
|{"EV":"2.75"}                                     |
|{"EV":"2.75"}                                     |
|{"BATTERY_IN":"1.271637606580326","PV":"1.308825"}|
|{"GRID_SELL":"1.504529714658364","PV":"1.527825"} |
|{"GRID_SELL":"1.48874948053448","PV":"1.527825"}  |
|{"EV":"1.035"}                                    |
|{"GRID_SELL":"1.363357640669544","PV":"1.39895"}  |
|{"GRID_SELL":"1.061857672908919","PV":"1.077475"} |
|{"EV":"1.343129999999995"}                        |
|{"BATTERY_IN":"1.380478844510013","PV":"1.4016"}  |
|{"EV":"2.75","BATTERY_OUT":"2.778265004340575"}   |
|{"EV":"2.75"}                                

Inspecting the events column we can see that we have the exact schema from the task: \
*GRID_SELL*, *PV*, *BATTERY_IN*, *EV*, *BATTERY_OUT* \
So we can define a schema for the events.

In [189]:
events_schema = StructType ([
        StructField("GRID_SELL", StringType(), True),
        StructField("PV", StringType(), True),
        StructField("EV", StringType(), True),
        StructField("BATTERY_IN", StringType(), True),
        StructField("BATTERY_OUT", StringType(), True),
]);

### Solution
Now we just need to create the columns using the schemas we've analysed before.

First we need to create 2 columns. Let's call them data and events \
Data column will contain the annotations column but with the defined schema \
Events column will contain all the events properties we defined in events_schema

In [194]:
json_explode = (data_local_date.withColumn('data', f.from_json("annotations", schema))
    .withColumn("events", f.from_json("data.events", eventsSchema)));

Now we will just extract each json property to its own column. \
**Note** We will check if the property is null and replace the null value with 0. \
After we create the new columns we will drop the data and events column.

In [195]:
consumers_data_df = (
    json_explode.withColumn("sent_to_ev", f.when(f.col("events.EV").isNull(), 0).otherwise(f.col("events.EV")))
    .withColumn("sent_to_battery", f.when(f.col("events.BATTERY_IN").isNull(), 0).otherwise(f.col("events.BATTERY_IN")))
    .withColumn("received_from_pv", f.when(f.col("events.PV").isNull(), 0).otherwise(f.col("events.PV")))
    .withColumn("sent_to_grid", f.when(f.col("events.GRID_SELL").isNull(), 0).otherwise(f.col("events.GRID_SELL")))
    .withColumn("received_from_battery", f.when(f.col("events.BATTERY_OUT").isNull(), 0).otherwise(f.col("events.BATTERY_OUT")))
    .drop("data", "events")
)

### Result
If we select the new columns we will have them populated.

In [196]:
consumers_data_df.select("contract_id", "sent_to_ev", "sent_to_battery", 
                         "received_from_pv", "sent_to_grid", "received_from_battery").show()

+---------------+----------+---------------+----------------+----------------+---------------------+
|    contract_id|sent_to_ev|sent_to_battery|received_from_pv|    sent_to_grid|received_from_battery|
+---------------+----------+---------------+----------------+----------------+---------------------+
|04_02_111_CHR12|         0|              0|               0|               0|                    0|
|04_02_111_CHR12|         0|              0|               0|               0|                    0|
|04_02_111_CHR12|         0|              0|               0|               0|                    0|
|04_02_111_CHR12|         0|              0|               0|               0|                    0|
|04_02_111_CHR12|         0|              0|        1.473775|1.45940454750907|                    0|
|04_02_111_CHR12|         0|              0|        1.266225|               0|                    0|
|04_02_111_CHR12|         0|              0|               0|               0|             

## ***Task 8***
Validate **value** column and set "plausability_check_failed" in the column **value_source** if the column is in a bad state.

### Analyse data set
First we need to make some assumptions to validate the value column

After some aggregations we have these results \
The mean of column **value** is: 0.490498692649539 \
Entries which have the **value** greater than 1: 568729 \
Entries which have the **value** less than 1: 2778745 \
Entries which have the **value** greater than 5: 12944 \
Max value of **sent_to_ev**: 2.75 \
Max value of **sent_to_battery**: 2.238648024029528 \
Max value of **received_from_pv**: 2.262325 \
Max value of **sent_to_grid**: 2.259371834882435 \
Max value of **received_from_battery**: 3.570313934135474 \

The value column is the aggregation of all of these fields. \
That being said if we received 2.26 from pv and charge ev for 2.75 the value should be around 2.75 

:et's say we have maximum consumation: ev + battery + grid = 2.75 + 2.23 + 2.25 ~= 7
In no case i should have more than 7

In [207]:
print(consumers_data_df.filter(f.col("value") > 7).count())

[Stage 472:>                                                        (0 + 8) / 9]

10917


                                                                                

We have 10917 entries with value > 7 so they should be removed. \
Now we can search the fact that we have no user between 5 and 15 consumption value > 5 : 12944 entries (just 2000 from 3444114) \
Even more running this script will show that all users have 0 data about pv, ev and others. Such consumation with data is ridiculous.

In [208]:
consumers_data_df.select('contract_id', "sent_to_ev", "value", "sent_to_battery", "sent_to_grid", "received_from_pv","received_from_battery").filter((f.col("value") > 5)).sort(f.col("value")).show(n=1000)



+---------------+----------+-----------------+---------------+------------+----------------+---------------------+
|    contract_id|sent_to_ev|            value|sent_to_battery|sent_to_grid|received_from_pv|received_from_battery|
+---------------+----------+-----------------+---------------+------------+----------------+---------------------+
|04_02_111_CHR05|         0|           5.0012|              0|           0|               0|                    0|
|04_02_111_CHR48|         0|5.002406459379735|              0|           0|               0|                    0|
|04_02_111_CHR34|         0|           5.0028|              0|           0|               0|                    0|
|04_02_111_CHR40|         0|          5.00445|              0|           0|               0|                    0|
|04_02_111_CHR56|         0|5.005943964659869|              0|           0|               0|                    0|
|04_02_111_CHR04|         0|5.006135105072085|              0|           0|     

                                                                                

With all this in mind we can safely consider value > 5 as a "plausability_check_failed". 

Looking to interval value = 3-4 we find multiple caseses where no ev and other utilities can be found. Still there is a posibility these data are corect.
For interval value < 3 if there is at least on consumer the data might be right.
So we filter out all data with value > 5.

### Solution
Based on the analysis above we wil just filter the data which has *value* more than 5.

In [214]:
consumption_data = json_explode.withColumn("value_source", 
        f.when(f.col("value") > 5, f.lit("plausability_check_failed"))
        .otherwise(f.col("value_source")));

Now we need to write all of them in a file on disk.

In [None]:
consumption_data.filter(f.col("value_source") == "plausability_check_failed").write.format('csv').option('header', 'true').save('./data/InvalidMeasurments');

Next we will set all the columns **value** into null if the value_source is *plausability_check_failed*

In [216]:
consumption_data = consumption_data.withColumn("value", 
                f.when(f.col("value_source") == "plausability_check_failed", f.lit(None))
                .otherwise(f.col("value"))).cache();

### Result

We can see that we've found many entries which are wrong.

In [217]:
print(consumption_data.filter(f.col("value_source") == "plausability_check_failed").count());

[Stage 489:>                                                        (0 + 8) / 9]

12944


                                                                                

## ***Task 9***
If the **value** is **NULL** we should try to predict what value it should be.

### Solution
First we will try to get the mean from the last 8 weeks.

For this the first step is to remove the entries with **value_source** in status "plausability_check_failed"\
We will partition the data by 8 weeks. To do that we will just convert the timestamp to long\
and just removing the number of seconds of a day 86400 multiplied by the number of days in a week 7.\
We don't have problem with **null** values because the *mean* function handles them by default.

In [220]:
days = lambda i: i * 86400;

window_spec = Window.partitionBy(
    "contract_id", f.dayofweek("timestamp"), f.hour("timestamp"), f.minute("timestamp")
).orderBy(f.col("timestamp").cast("long")).rangeBetween(-days(8 * 7), 0);


df_avg = consumption_data.filter(f.col("value_source") != "plausability_check_failed").withColumn(
    "avg_value_last_8_weeks",
    f.mean(f.col("value")).over(window_spec)
)

Next we need to calculate the sum of the consumers: *sent_total* *sent_to_ev* and *sent_to_battery* \
And the sum of the producers: *received_from_battery* and *received_from_pv*

In [38]:
df_avg = (df_avg
          .withColumn("received_total", f.col("received_from_pv") + f.col("received_from_battery"))
          .withColumn("sent_total", f.col("sent_to_ev") + f.col("sent_to_battery") + f.col("sent_to_grid")));

Now the we will just give the column *value* the max of all of these 3 values. And we can drop all this values.

In [None]:
value_correct = df_avg.withColumn("value", f.when(f.col("value").isNull(), 
                f.greatest("received_total", "sent_total", "avg_value_last_8_weeks")).otherwise(f.col("value")));
value_correct = value_correct.drop("avg_value_last_8_weeks", "received_total", "sent_total");

### Result
We have no entris with **value** null

In [221]:
print(value_correct.filter(f.col("value").isNull()).count());



0


                                                                                

## ***Task 10***
Calculate *received_from_grid value*

### Solution
We just calculate the column via the formula. \
We will remove some unnecessary column from our dataset.

In [224]:
received_from_grid_df = value_correct.withColumn("received_from_grid", 
                f.col("sent_to_ev") + f.col("sent_to_battery") + f.col("sent_to_grid") 
                - f.col("received_from_pv") - f.col("received_from_battery"))
received_from_grid_df = received_from_grid_df.drop('annotations', 'timestamp', 'utc_date', 'region');

## ***Task 11***
Add tariff prices to the dataset.

### Solution
We will join the 2 tables by contract id and another condition that the local_timestamp is between the contract interval timestamps.

In [249]:
tariff_df = spark.read.format('json').load('./data/customer_tariff/json/');
cond = [(tariff_df.contract_id == value.contract_id) &  
        (tariff_df.target_local_start_timestamp.cast("timestamp") <= value.local_timestamp) & 
        (value.local_timestamp <= tariff_df.target_local_end_timestamp.cast("timestamp"))]

value_tarrif = value.join(tariff_df, cond).select(value["*"], 
        tariff_df["charge_type"],tariff_df["price"], tariff_df["target_local_end_timestamp"], 
        tariff_df["target_local_start_timestamp"], tariff_df["tariff_name"]).cache();

24/04/09 00:17:40 WARN CacheManager: Asked to cache already cached data.


## ***Task 12***
Calculate bill details

### Analyse

We want this function to be flexible and reliable. So we will make it easy to use by abstracting it in a function.\
The only thing we need to calculate in this function is the cost which for now it's not calulated in any priort steps.

To do so we need to match the **charge_type = buy** with the **received_from_grid** and\
to match the **charge_type = sell** with the **sent_from_grid**

### Solution

Let's create the function to calculate the price.

In [229]:
calculate_bought_price = f.when(f.col("charge_type") == "buy", f.col("price") * f.col("received_from_grid")).otherwise(0);
calculate_earn_price = f.when(f.col("charge_type") == "sell", f.col("price") * f.col("sent_to_grid")).otherwise(0);

Next we will create an enum for the type of the interval our caller can use.

In [231]:
from enum import Enum

class TimePeriod(Enum):
    Day = 1
    Week = 2
    Month = 3
    Year = 4

Lastly we will have our function which will do all the aggregation.\
First it will filter only the data we need from the data set so it will make the aggregation as fast as possible. \
Then it will aggregate all the date by summing them. \
The last step is to show the highest 10 bills from the aggregation.

In [256]:
from pyspark.sql import DataFrame


def process_time_period(period, date) -> DataFrame:
    if period == TimePeriod.Day: 
        final_bill = value_tarrif.filter((f.dayofyear(f.lit(date)) == f.dayofyear(f.col("local_date"))) & (f.year(f.lit(date)) == f.year(f.col("local_date"))));
    if period == TimePeriod.Week:
        final_bill = value_tarrif.filter((f.weekofyear(f.lit(date)) == f.weekofyear(f.col("local_date"))) & (f.year(f.lit(date)) == f.year(f.col("local_date"))));
    if period == TimePeriod.Month:
        final_bill = value_tarrif.filter((f.month(f.lit(date)) == f.month(f.col("local_date"))) & (f.year(f.lit(date)) == f.year(f.col("local_date"))));
    if period == TimePeriod.Year:
        final_bill = value_tarrif.filter(f.year(f.lit(date)) == f.year(f.col("local_date")));
        
    final_bill = final_bill.groupBy("contract_id").agg(
    f.sum(f.col("value")).alias("kWh_total"),
    f.sum(f.col("received_from_pv")).alias("kWh_from_PV"),
    f.sum(f.col("received_from_battery")).alias("kWh_from_battery"),
    f.sum(f.col("sent_to_ev")).alias("kWh_for_EV"),
    f.sum(f.col("received_from_grid")).alias("kWh_from_grid"),
    f.sum(calculate_bought_price).alias("price_billed"),
    f.sum(f.col("sent_to_grid")).alias("kWh_to_grid"),
    f.sum(calculate_earn_price).alias("price_cashback"),
    );
    final_bill = final_bill.withColumn("price_final", f.col("price_billed") - f.col("price_cashback")).sort(f.desc("price_final"));
    return final_bill;

### Result

Let's take an arbitrary date such as: 2023-04-10 and run all the functions.
We will use a utilitary function to trim the number of decimals for readability.

In [258]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

def dataframe_format_float(df: DataFrame, num_decimals=4) -> DataFrame:
    r = []
    for c in df.dtypes:
        name, dtype = c[0], c[1]
        if dtype in ['float', 'double']:
            r.append(F.round(name, num_decimals).alias(name))
        else:
            r.append(name)
    df = df.select(r)
    return df

In [259]:
dataframe_format_float(process_time_period(TimePeriod.Day, "2023-04-10")).show(n=10);



+---------------+---------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|    contract_id|kWh_total|kWh_from_PV|kWh_from_battery|kWh_for_EV|kWh_from_grid|price_billed|kWh_to_grid|price_cashback|price_final|
+---------------+---------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|10_01_147_CHR10| 154.5566|    82.1257|          2.9628|  105.4512|      20.3627|       3.987|        0.0|           0.0|      3.987|
|10_01_147_CHR04| 143.1789|    82.1257|          5.5243|   104.163|       16.513|      3.1342|        0.0|           0.0|     3.1342|
|10_01_147_CHR59| 154.8676|    82.1257|          2.6304|   100.103|       15.347|      3.1239|        0.0|           0.0|     3.1239|
|10_01_147_CHR61| 138.7411|    82.1257|          5.4699|   99.9229|      12.3273|      2.3607|        0.0|           0.0|     2.3607|
|10_01_147_CHR20|  147.416|    82.1257|          3.2481|   99.

                                                                                

In [260]:
dataframe_format_float(process_time_period(TimePeriod.Week, "2023-04-10")).show(n=10);



+---------------+---------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|    contract_id|kWh_total|kWh_from_PV|kWh_from_battery|kWh_for_EV|kWh_from_grid|price_billed|kWh_to_grid|price_cashback|price_final|
+---------------+---------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
| 10_01_147_CHS1| 755.6515|   499.6478|         19.0332|   151.774|       -0.291|      -0.027|   359.0808|        7.0918|    -7.1188|
|10_01_147_CHR24| 730.7023|   499.6478|         21.1976|  148.6176|     -14.2685|     -1.5374|   352.8723|        5.8753|    -7.4128|
|10_01_147_CHR01| 773.2668|   499.6478|          8.6929|  148.4919|     -16.5812|     -1.6697|   333.4734|        6.9863|     -8.656|
|10_01_147_CHR11| 733.8127|   499.6478|         20.0897|  148.4919|      17.6149|      2.0275|   381.9489|       11.9932|    -9.9657|
|10_01_147_CHR39| 775.6658|   499.6478|         26.6244|  148.

                                                                                

In [261]:
dataframe_format_float(process_time_period(TimePeriod.Month, "2023-04-10")).show(n=10);



+---------------+---------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|    contract_id|kWh_total|kWh_from_PV|kWh_from_battery|kWh_for_EV|kWh_from_grid|price_billed|kWh_to_grid|price_cashback|price_final|
+---------------+---------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|10_01_147_CHR24| 3089.224|  2028.4156|         88.2519|  626.6839|       7.4029|      0.7977|  1476.8399|       24.5894|   -23.7917|
| 10_01_147_CHS1|3171.2793|  2028.4156|         75.0159|  627.1015|      61.5948|      5.7068|  1514.6631|       29.9146|   -24.2078|
|10_01_147_CHR01|3274.8048|  2028.4156|         33.7886|  626.6959|      21.3263|      2.1476|  1409.6188|       29.5315|    -27.384|
|10_01_147_CHR39|3179.0552|  2028.4156|          72.675|  628.0802|      88.1943|     12.1355|  1517.3645|        42.031|   -29.8955|
|10_01_147_CHR11|3064.0449|  2028.4156|         77.6795|  626.

                                                                                

In [262]:
dataframe_format_float(process_time_period(TimePeriod.Year, "2023-04-10")).show(n=10);



+---------------+----------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|    contract_id| kWh_total|kWh_from_PV|kWh_from_battery|kWh_for_EV|kWh_from_grid|price_billed|kWh_to_grid|price_cashback|price_final|
+---------------+----------+-----------+----------------+----------+-------------+------------+-----------+--------------+-----------+
|10_01_147_CHR61|29272.4644| 12339.5934|        362.3952| 7745.8618|    2482.8294|    436.7663|  6846.2887|      306.7009|   130.0654|
|10_01_147_CHR10|28263.8229| 12339.5934|       1130.1423| 7745.6773|    2269.5714|    411.4239|  7512.3937|       282.466|   128.9579|
|10_01_147_CHR01|28866.0065| 12339.5934|         426.978| 7747.9904|    2709.3654|     275.821|  7195.2758|      157.7963|   118.0247|
|10_01_147_CHR24|26227.1599| 12339.5934|       1072.0628| 7735.6048|    2180.7301|    239.1873|  7592.0863|         135.5|   103.6873|
|10_01_147_CHR39|28109.1286| 12339.5934|        855.354

                                                                                