# Time Analysis


In [1]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/03 09:02:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/03 09:02:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/06/03 09:02:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## Import Libraries


In [1]:
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from functools import reduce
from typing import List, Union
from pyspark.sql.functions import broadcast
from pyspark.sql.window import Window

## Read data


In [8]:
df = spark.read.csv("Raw/data.csv", header=True, inferSchema=False, sep=";")
df = df.withColumn("Datum", F.to_timestamp("Datum", "d.M.yy h:mm"))
# drop cases with missing case_key
df = df.filter(F.col("case_key").isNotNull())

In [17]:
def enrich_data_with_mapping(
    main_df: DataFrame,
    map_df: DataFrame,
    main_keys: Union[str, List[str]],
    map_keys: Union[str, List[str]],
) -> DataFrame:
    """
    Enriches the main_df by joining it with map_df on the provided keys.
    If map_keys are not unique, combines all different values in the columns using collect_list.

    Parameters:
    main_df (DataFrame): The main dataframe.
    map_df (DataFrame): The dataframe with additional information.
    main_keys (Union[str, List[str]]): The key(s) to join main_df on.
    map_keys (Union[str, List[str]]): The key(s) to join map_df on.

    Returns:
    DataFrame: The enriched dataframe with additional information from map_df.
    """
    # Ensure keys are lists
    if isinstance(main_keys, str):
        main_keys = [main_keys]
    if isinstance(map_keys, str):
        map_keys = [map_keys]

    # Check that the keys exist in their respective dataframes
    for key in main_keys:
        if key not in main_df.columns:
            raise ValueError(f"Key '{key}' not found in main_df columns")
    for key in map_keys:
        if key not in map_df.columns:
            raise ValueError(f"Key '{key}' not found in map_df columns")

    # Check for uniqueness in map_df using window function
    map_df_grouped = map_df.groupBy(map_keys).count()
    has_duplicates = map_df_grouped.filter(F.col("count") > 1).count() > 0

    if has_duplicates:
        # Aggregate map_df to handle non-unique keys using collect_list
        agg_exprs = [
            F.collect_list(col).alias(col)
            for col in map_df.columns
            if col not in map_keys
        ]
        map_df_aggregated = map_df.groupBy(map_keys).agg(*agg_exprs)
    else:
        map_df_aggregated = map_df

    # Alias the map_df columns to avoid ambiguity
    map_df_aliased = map_df_aggregated.select(
        *[F.col(c).alias(f"map_df_{c}") for c in map_df_aggregated.columns]
    )
    map_keys_aliased = [f"map_df_{key}" for key in map_keys]

    # Create join conditions
    join_condition = reduce(
        lambda x, y: x & y,
        [
            F.col(main_keys[i]) == F.col(map_keys_aliased[i])
            for i in range(len(main_keys))
        ],
    )

    # Select columns to join, avoiding ambiguity
    main_df_columns = main_df.columns
    map_df_columns = [
        F.col(f"map_df_{c}").alias(c)
        for c in map_df_aggregated.columns
        if c not in main_df_columns
    ]

    # Broadcast the map_df to improve performance
    map_df_broadcast = broadcast(
        map_df_aliased.select(map_keys_aliased + map_df_columns)
    )

    # Perform the join
    enriched_df = main_df.join(map_df_broadcast, join_condition, "left")

    return enriched_df


# Example Usage:
if __name__ == "__main__":
    spark = SparkSession.builder.appName("ProcessMining").getOrCreate()

    # Example dataframes creation
    data = [
        ("A1", "2022-01-23 08:00:00", "Antrag Start", None),
        ("A1", "2022-01-23 08:10:00", "Fristablauf ext. Signatur", "BU"),
    ]
    schema = ["CASE_KEY", "Datum", "Funktion", "Tarifname"]
    main_df = spark.createDataFrame(data, schema)

    map_data = [
        ("A1", "2022-01-23 08:00:00", "Additional Info 1"),
        ("A1", "2022-01-23 08:10:00", "Additional Info 2"),
        ("A2", "2022-01-23 08:00:00", "Additional Info 3"),
    ]
    map_schema = ["CASE_KEY", "Datum", "Info"]
    map_df = spark.createDataFrame(map_data, map_schema)

    enriched_df = enrich_data_with_mapping(main_df, map_df, ["CASE_KEY"], ["CASE_KEY"])
    enriched_df.show(truncate=False)

+--------+-------------------+-------------------------+---------+---------------+--------------------------------------+
|CASE_KEY|Datum              |Funktion                 |Tarifname|map_df_CASE_KEY|Info                                  |
+--------+-------------------+-------------------------+---------+---------------+--------------------------------------+
|A1      |2022-01-23 08:00:00|Antrag Start             |null     |A1             |[Additional Info 1, Additional Info 2]|
|A1      |2022-01-23 08:10:00|Fristablauf ext. Signatur|BU       |A1             |[Additional Info 1, Additional Info 2]|
+--------+-------------------+-------------------------+---------+---------------+--------------------------------------+



In [4]:
# close the spark session
# spark.stop()