# Changelog

* il timebucket non parte dal giorno prima ma dalla mezzanotte dal punto 0 del giorno corrente (aggiustato per la granularità)

* il timebucket supporta nativamente h, d, w, m, y

* il timebucket non è soggetto a problemi di timezone/summer time

* maggior chiarezza del range dei timebucket (es. 1h: 00:00:00 - 00:59:59)


# TODO:

* Riconvertire i bucket_start e bucket_end in datetime

In [13]:
from pyspark.sql import SparkSession
from ts_train.step.core import AbstractPipelineStep
from ts_train.step.time_bucketing import TimeBucketing
from pyspark.sql import Row
from pydantic import BaseModel, StrictStr
from pyspark.sql import functions as F
from pyspark.sql.functions import expr

# Codice per visualizzazione su notebook
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
from pandas.tseries.offsets import DateOffset
    
def get_data_offset(time_bucket_size,time_bucket_granularity) -> DateOffset:
    """
    Get the offset for the provided bucket size and granularity.

    Returns:
        offset (DateOffset): Offset for the provided bucket size and granularity.
    """

    # Available Granularity:
    # https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases

    if time_bucket_granularity[0].upper() == "H":
        return DateOffset(hours=time_bucket_size)
    elif time_bucket_granularity[0].upper() == "D":
        return DateOffset(days=time_bucket_size)
    elif time_bucket_granularity[0].upper() == "W":
        return DateOffset(weeks=time_bucket_size)
    elif time_bucket_granularity[0].upper() == "M":
        return DateOffset(months=time_bucket_size)
    elif time_bucket_granularity[0].upper() == "Y":
        return DateOffset(years=time_bucket_size)
    else:
        raise ValueError(
            f"Granularity {time_bucket_granularity} not supported"
        )

# pytest

In [3]:
from functools import reduce
from pyspark.sql.functions import col, lit
from typing import List

def test_null_values(df, time_column_name: str) -> None:
    # Check if the specified column contains any null values
    contains_nulls = df.where(col(time_column_name).isNull()).count() > 0
    assert not contains_nulls, f"Column '{time_column_name}' contains null values."

# Verifica che non ci sia stata corruzione di dai durante la conversione da pandas a spark
def test_time_range(bucket_df, min_date, max_date):
    min_date = str(min_date)
    max_date = str(max_date)
    first_element = bucket_df.first()["bucket_start"]
    last_element = bucket_df.tail(1)[0]["bucket_end"]

    assert first_element == str(min_date), F"first_element: {first_element} min_date: {min_date}"

    # since last_element also contains the offset, it should be bigger then the max_date 
    assert last_element > str(max_date), F"last_element: {last_element} > max_date: {max_date}"


'''
def test_bucket_size(date_range, time_bucket_size, time_bucket_granularity) -> None:
    if len(date_range) > 1:
        
        differences = [date_range[i] - date_range[i-1] for i in range(1, len(date_range))]
        num_of_differences = len(set(differences))

        if time_bucket_granularity.upper() == "Y":
            max_num_possible_diffences = 2 
        elif time_bucket_granularity.upper() == "M":
            max_num_possible_diffences = 3
        else:
            max_num_possible_diffences = 1
        
        # some years are 365 days long, some are 366 days long, So for years we can have 1 or 2 differences            
        assert num_of_differences <= max_num_possible_diffences, f"num_of_differences: {num_of_differences} > 2, differences: {differences}, date_range: {date_range}"
        
        
        if time_bucket_granularity.upper() == "H" or time_bucket_granularity == "D":
            bucket_size = pd.Timedelta(f"{time_bucket_size}{time_bucket_granularity}")
            for i in range(1, len(date_range)):
                assert date_range[i] - date_range[i-1] == bucket_size , f"date_range[i] - date_range[i-1]: {date_range[i] - date_range[i-1]} != bucket_size: {bucket_size}"
'''


def test_buckets_monotonicity(df, time_column_name: str) -> None:
    all_dates = df.select(time_column_name).collect()
    
    for id_date in range(1,len(all_dates)):
        assert all_dates[id_date] > all_dates[id_date-1], f"during test for {time_column_name}, at row {id_date}, the date {all_dates[id_date]} is <= then date at {id_date} that is {all_dates[id_date-1]}"
        

    
def test_all_buckets_are_equidistant_multi_user(df, time_column_name: str, identifier_cols_name: List[str]) -> None:
    df = df.withColumn(
        "timestamp_unix", F.unix_timestamp(time_column_name)
    )
    timestamps_per_user = df.groupBy(*identifier_cols_name).agg(
        F.collect_list("timestamp_unix").alias("timestamps_list")
    )
    all_users = timestamps_per_user.select(*identifier_cols_name).distinct().collect()
    for user_row in all_users:
        user_identifier_values = user_row.asDict()
        # Build a single filter condition for all identifier columns
        filter_condition = (
            col(col_name) == lit(col_value)
            for col_name, col_value in user_identifier_values.items()
        )
        user_timestamps = (
            timestamps_per_user.filter(reduce(lambda x, y: x & y, filter_condition))
            .select("timestamps_list")
            .collect()[0][0]
        )
        # Calculate the differences between each element and the next one
        # using list comprehension
        differences = [
            user_timestamps[i + 1] - user_timestamps[i]
            for i in range(len(user_timestamps) - 1)
        ]

        # Check if all differences are equal
        assert all(difference == differences[0] for difference in differences)

def test_all_transactions_are_in_the_correct_bucket(df, time_column_name):
    df = df.withColumn("data", F.unix_timestamp(time_column_name))
    df = df.withColumn("bucket_start", F.unix_timestamp("bucket_start"))
    df = df.withColumn("bucket_end", F.unix_timestamp("bucket_end"))
    
    all_dates = [row for row in df.select("data").collect()]
    all_bucket_starts = [row for row in df.select("bucket_start").collect()]
    all_bucket_ends = [row for row in df.select("bucket_end").collect()]

    for i in range(len(all_dates)):
        assert all_bucket_starts[i] <= all_dates[i] < all_bucket_ends[i], f"{all_bucket_starts[i]} <= {all_dates[i]} < {all_bucket_ends[i]} is False for i = {i}"

# Creazione dei dati

In [4]:

import pandas as pd
from datetime import datetime

# Date fornite
data = [
    ("2023-03-25 23:00:00",),
    ("2023-03-28 04:00:00",),
    ("2023-03-26 05:00:00",),
    ("2023-03-23 07:00:00",),
    ("2023-03-25 01:00:00",),
    ("2023-03-25 22:00:00",),
    ("2023-03-25 23:00:00",),
    ("2023-03-25 04:00:00",),
    ("2023-03-25 04:00:00",),
    ("2023-03-26 02:00:00",),
    ("2023-03-25 02:00:00",),
    ("2023-03-26 02:00:00",),
    ("2023-03-21 02:00:00",),
    ("2023-03-28 02:00:00",),
]

# Converte le date fornite in oggetti datetime
dates = [datetime.strptime(date[0], "%Y-%m-%d %H:%M:%S") for date in data]

# Genera altri id_utente e importo in modo deterministico
user_ids = [1, 2, 3]
importo_values = [20.0, 30.0, 40.0, 23., 40.0,  50.0, 60.0, 70.0, 20.0, 30.0, 40.0, 23., 40.0,  50.0, 60.0]  # Valori deterministici

# Genera le righe del DataFrame
data_rows = []
for i in range(len(dates)-1):
    data_rows.append((user_ids[i % len(user_ids)-1], dates[i], importo_values[i]))

data_rows.append((3, dates[-1], importo_values[-1]))

# Crea il DataFrame
df = pd.DataFrame(data_rows, columns=["id_utente", "data", "importo"])

sorted_df = df.sort_values(by=['id_utente', 'data'])

# Crea la sessione Spark
spark = SparkSession.builder \
    .appName("Date Addition with date_add") \
    .getOrCreate()

original_df = spark.createDataFrame(sorted_df)

# Stampa il DataFrame
original_df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/22 15:30:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---------+-------------------+-------+
|id_utente|               data|importo|
+---------+-------------------+-------+
|        1|2023-03-25 01:00:00|   40.0|
|        1|2023-03-25 02:00:00|   40.0|
|        1|2023-03-25 04:00:00|   70.0|
|        1|2023-03-28 04:00:00|   30.0|
|        2|2023-03-25 04:00:00|   20.0|
|        2|2023-03-25 22:00:00|   50.0|
|        2|2023-03-26 03:00:00|   23.0|
|        2|2023-03-26 05:00:00|   40.0|
|        3|2023-03-21 02:00:00|   40.0|
|        3|2023-03-23 07:00:00|   23.0|
|        3|2023-03-25 23:00:00|   20.0|
|        3|2023-03-25 23:00:00|   60.0|
|        3|2023-03-26 03:00:00|   30.0|
|        3|2023-03-28 02:00:00|   60.0|
+---------+-------------------+-------+



                                                                                

# Utilizzo time bucket step

In [5]:
time_bucket_step = TimeBucketing(
    time_column_name = "data",
    time_bucket_size = 1,
    time_bucket_granularity = "days",
)

time_bucket_df = time_bucket_step._process(original_df, spark)
time_bucket_df = time_bucket_df.sort(["id_utente","data"])
time_bucket_df.show(truncate=False)

+---------+-------------------+-------+-------------------+-------------------+
|id_utente|data               |importo|bucket_start       |bucket_end         |
+---------+-------------------+-------+-------------------+-------------------+
|1        |2023-03-25 01:00:00|40.0   |2023-03-25 00:00:00|2023-03-25 23:59:59|
|1        |2023-03-25 02:00:00|40.0   |2023-03-25 00:00:00|2023-03-25 23:59:59|
|1        |2023-03-25 04:00:00|70.0   |2023-03-25 00:00:00|2023-03-25 23:59:59|
|1        |2023-03-28 04:00:00|30.0   |2023-03-28 00:00:00|2023-03-28 23:59:59|
|2        |2023-03-25 04:00:00|20.0   |2023-03-25 00:00:00|2023-03-25 23:59:59|
|2        |2023-03-25 22:00:00|50.0   |2023-03-25 00:00:00|2023-03-25 23:59:59|
|2        |2023-03-26 03:00:00|23.0   |2023-03-26 00:00:00|2023-03-26 23:59:59|
|2        |2023-03-26 05:00:00|40.0   |2023-03-26 00:00:00|2023-03-26 23:59:59|
|3        |2023-03-21 02:00:00|40.0   |2023-03-21 00:00:00|2023-03-21 23:59:59|
|3        |2023-03-23 07:00:00|23.0   |2

# Aggregation

In [8]:
aggregated_df = time_bucket_df.groupBy(["id_utente", "bucket_start","bucket_end"]).agg(F.sum("importo").alias("importo"))
aggregated_df = aggregated_df.orderBy(["id_utente", "bucket_start"])

#aggregated_df = aggregated_df.filter(aggregated_df["id_utente"] == 2)

aggregated_df.show(truncate=False)


+---------+-------------------+-------------------+-------+
|id_utente|bucket_start       |bucket_end         |importo|
+---------+-------------------+-------------------+-------+
|1        |2023-03-25 00:00:00|2023-03-25 23:59:59|150.0  |
|1        |2023-03-28 00:00:00|2023-03-28 23:59:59|30.0   |
|2        |2023-03-25 00:00:00|2023-03-25 23:59:59|70.0   |
|2        |2023-03-26 00:00:00|2023-03-26 23:59:59|63.0   |
|3        |2023-03-21 00:00:00|2023-03-21 23:59:59|40.0   |
|3        |2023-03-23 00:00:00|2023-03-23 23:59:59|23.0   |
|3        |2023-03-25 00:00:00|2023-03-25 23:59:59|80.0   |
|3        |2023-03-26 00:00:00|2023-03-26 23:59:59|30.0   |
|3        |2023-03-28 00:00:00|2023-03-28 23:59:59|60.0   |
+---------+-------------------+-------------------+-------+



# Creazione del filling step

In [16]:
new_aggregated_df = aggregated_df
# add new column fillled by random integers
new_aggregated_df = new_aggregated_df.withColumn("id_utente_2", col("id_utente")+1)
new_aggregated_df = new_aggregated_df.withColumn("importo_2", col("id_utente")+1)

new_aggregated_df.show(truncate=False)

+---------+-------------------+-------------------+-------+-----------+---------+
|id_utente|bucket_start       |bucket_end         |importo|id_utente_2|importo_2|
+---------+-------------------+-------------------+-------+-----------+---------+
|1        |2023-03-25 00:00:00|2023-03-25 23:59:59|150.0  |2          |2        |
|1        |2023-03-28 00:00:00|2023-03-28 23:59:59|30.0   |2          |2        |
|2        |2023-03-25 00:00:00|2023-03-25 23:59:59|70.0   |3          |3        |
|2        |2023-03-26 00:00:00|2023-03-26 23:59:59|63.0   |3          |3        |
|3        |2023-03-21 00:00:00|2023-03-21 23:59:59|40.0   |4          |4        |
|3        |2023-03-23 00:00:00|2023-03-23 23:59:59|23.0   |4          |4        |
|3        |2023-03-25 00:00:00|2023-03-25 23:59:59|80.0   |4          |4        |
|3        |2023-03-26 00:00:00|2023-03-26 23:59:59|30.0   |4          |4        |
|3        |2023-03-28 00:00:00|2023-03-28 23:59:59|60.0   |4          |4        |
+---------+-----

In [23]:
def filling(df,new_time_bucket_step,identifier_cols_name):
    # Creo la nuova timeline per tutti in pandas
    new_time_bucket_step.time_column_name = "bucket_start"
    
    # Creates a list of identifier columns
    identifier_cols = [
        F.col(identifier_col_name)
        for identifier_col_name in identifier_cols_name
    ]

    # Creates aliases for simplicity and code readability
    time_bucket_start = f"{time_bucket_col_name}_start"
    time_bucket_end = f"{time_bucket_col_name}_end"
    min_time_bucket_start = f"min_{time_bucket_col_name}_start"
    max_time_bucket_end = f"max_{time_bucket_col_name}_end"

    # Creates a new DataFrame with only the identifier columns
    # Splits the bucket into two column, start and end assigning to new columns
    ids_df = df.select(
        *identifier_cols,
        F.col("bucket_start").alias(time_bucket_start),
        F.col("bucket_end").alias(time_bucket_end),
    )


    # Takes only one record for every user
    # Saves only the min start and the max end
    ids_df = ids_df.groupBy(*identifier_cols).agg(
        F.min(time_bucket_start).alias(min_time_bucket_start),
        F.max(time_bucket_end).alias(max_time_bucket_end),
    )
    

    # create the new timeline with every buckets 
    timeline,_,_ = time_bucket_step._create_timeline(df)
    bucket_df = time_bucket_step._create_df_with_buckets(spark, timeline)
    bucket_df = bucket_df.withColumn(
        "bucket_end", expr("bucket_end - interval 1 second")
    )

    # Collego gli utenti alla nuova timeline
    # Converte le colonne delle date in tipo timestamp
    bucket_df = bucket_df.withColumn("bucket_start", col("bucket_start").cast("timestamp"))
    bucket_df = bucket_df.withColumn("bucket_end", col("bucket_end").cast("timestamp"))

    # Esegue la join basata sulla condizione di intervallo
    result_df = ids_df.join(
        bucket_df,
        (bucket_df['bucket_start'] >= ids_df[min_time_bucket_start]) &
        (bucket_df['bucket_end'] <= ids_df[max_time_bucket_end])
    )

    # Seleziona le colonne desiderate per la tabella finale
    all_timestamp_per_clients = result_df.select(*identifier_cols_name, "bucket_start", "bucket_end")
    all_timestamp_per_clients = all_timestamp_per_clients.orderBy(identifier_cols_name+["bucket_start"])
    
    # Joins the DataFrame with the new DataFrame in which has been generated
    # timestamps for every user from its min timestamp to his max
    # Fills with 0 null values of every column
    # Drops time bucket column
    join_on_cols = identifier_cols_name+["bucket_start", "bucket_end"]
    df = (
        df.join(all_timestamp_per_clients, on=join_on_cols, how="right")
        .fillna(0)
    )

    df = df.orderBy(*join_on_cols)
    return df

identifier_cols_name = ["id_utente","id_utente_2"]

filled_df = filling(new_aggregated_df, time_bucket_step , identifier_cols_name)
filled_df.show(truncate=False)

+---------+-----------+-------------------+-------------------+-------+---------+
|id_utente|id_utente_2|bucket_start       |bucket_end         |importo|importo_2|
+---------+-----------+-------------------+-------------------+-------+---------+
|1        |2          |2023-03-25 00:00:00|2023-03-25 23:59:59|150.0  |2        |
|1        |2          |2023-03-26 00:00:00|2023-03-26 23:59:59|0.0    |0        |
|1        |2          |2023-03-27 00:00:00|2023-03-27 23:59:59|0.0    |0        |
|1        |2          |2023-03-28 00:00:00|2023-03-28 23:59:59|30.0   |2        |
|2        |3          |2023-03-25 00:00:00|2023-03-25 23:59:59|70.0   |3        |
|2        |3          |2023-03-26 00:00:00|2023-03-26 23:59:59|63.0   |3        |
|3        |4          |2023-03-21 00:00:00|2023-03-21 23:59:59|40.0   |4        |
|3        |4          |2023-03-22 00:00:00|2023-03-22 23:59:59|0.0    |0        |
|3        |4          |2023-03-23 00:00:00|2023-03-23 23:59:59|23.0   |4        |
|3        |4    

In [None]:
aggregated_df.show(truncate=False)

In [24]:
filled_df.schema

StructType([StructField('id_utente', LongType(), True), StructField('id_utente_2', LongType(), True), StructField('bucket_start', TimestampType(), True), StructField('bucket_end', TimestampType(), True), StructField('importo', DoubleType(), False), StructField('importo_2', LongType(), True)])

In [27]:
from pyspark.sql.functions import col, date_format

result_df = filled_df.withColumn("bucket_start", date_format(col("bucket_start"), "yyyy-MM-dd HH:mm:ss"))
result_df.show(truncate=False)
result_df.schema


+---------+-----------+-------------------+-------------------+-------+---------+
|id_utente|id_utente_2|bucket_start       |bucket_end         |importo|importo_2|
+---------+-----------+-------------------+-------------------+-------+---------+
|1        |2          |2023-03-25 00:00:00|2023-03-25 23:59:59|150.0  |2        |
|1        |2          |2023-03-26 00:00:00|2023-03-26 23:59:59|0.0    |0        |
|1        |2          |2023-03-27 00:00:00|2023-03-27 23:59:59|0.0    |0        |
|1        |2          |2023-03-28 00:00:00|2023-03-28 23:59:59|30.0   |2        |
|2        |3          |2023-03-25 00:00:00|2023-03-25 23:59:59|70.0   |3        |
|2        |3          |2023-03-26 00:00:00|2023-03-26 23:59:59|63.0   |3        |
|3        |4          |2023-03-21 00:00:00|2023-03-21 23:59:59|40.0   |4        |
|3        |4          |2023-03-22 00:00:00|2023-03-22 23:59:59|0.0    |0        |
|3        |4          |2023-03-23 00:00:00|2023-03-23 23:59:59|23.0   |4        |
|3        |4    

StructType([StructField('id_utente', LongType(), True), StructField('id_utente_2', LongType(), True), StructField('bucket_start', StringType(), True), StructField('bucket_end', TimestampType(), True), StructField('importo', DoubleType(), False), StructField('importo_2', LongType(), True)])

In [28]:
result_df = filled_df.withColumn("bucket_start", date_format(col("bucket_start"), "yyyy-MM-dd HH:mm:ss"))
result_df.show(truncate=False)
result_df.schema


+---------+-----------+-------------------+-------------------+-------+---------+
|id_utente|id_utente_2|bucket_start       |bucket_end         |importo|importo_2|
+---------+-----------+-------------------+-------------------+-------+---------+
|1        |2          |2023-03-25 00:00:00|2023-03-25 23:59:59|150.0  |2        |
|1        |2          |2023-03-26 00:00:00|2023-03-26 23:59:59|0.0    |0        |
|1        |2          |2023-03-27 00:00:00|2023-03-27 23:59:59|0.0    |0        |
|1        |2          |2023-03-28 00:00:00|2023-03-28 23:59:59|30.0   |2        |
|2        |3          |2023-03-25 00:00:00|2023-03-25 23:59:59|70.0   |3        |
|2        |3          |2023-03-26 00:00:00|2023-03-26 23:59:59|63.0   |3        |
|3        |4          |2023-03-21 00:00:00|2023-03-21 23:59:59|40.0   |4        |
|3        |4          |2023-03-22 00:00:00|2023-03-22 23:59:59|0.0    |0        |
|3        |4          |2023-03-23 00:00:00|2023-03-23 23:59:59|23.0   |4        |
|3        |4    

StructType([StructField('id_utente', LongType(), True), StructField('id_utente_2', LongType(), True), StructField('bucket_start', StringType(), True), StructField('bucket_end', TimestampType(), True), StructField('importo', DoubleType(), False), StructField('importo_2', LongType(), True)])