# Data Exploration


In [118]:
# import pyspark and start a session
from datetime import datetime
from pyspark.sql import SparkSession

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    TimestampType,
    DoubleType,
    IntegerType,
    BooleanType,
)

import pyspark.sql.functions as F

# import the window function module
from pyspark.sql.window import Window

# import row_number function
from pyspark.sql.functions import row_number

spark = SparkSession.builder.appName("spark").getOrCreate()

**Data Description:**

**1. VorgangsID:**

- This column comprises identifiers for distinct processes, where each process may consist of multiple rows within the dataset.

**2. Datum:**

- Indicates the date and time for each event (Funktion) associated with a particular process (VorgangsID). The date is formatted as `YYYY-MM-DD HH:MM`.

**3. Funktion:**

- Represents various events within each process (VorgangsID), with unique values including:
  - `Antrag (E-Sign)`
  - `Antrag (Papier)`
  - `Antrag (VE)`
  - `Remote E-Sign`
  - `Antrag Eingereicht`
  - `Antrag Start`
  - `Antrag Vollstaendig`
  - `Berechnung`
  - `Erstellt`
  - `Geloescht`
  - `Laden`
  - `Sync`
  - `Vorschlag`

**4. Tarifname:**

- Contains the name of the tariff associated with each process.

**5. Sparte:**

- Indicates the category of insurance, with values such as:
  - `BU`
  - `BUZ`
  - `EU`
  - `EUZ`
  - `KLV`
  - `KLVZ`
  - `PKV`
  - `PKVZ`
  - `RIS`
  - `RISZ`
  - `RUV`
  - `RUVZ`
  - `SBU`

**6. Betrag:**

- Denotes the amount associated with each process.

**7. Beitragssumme:**

- Represents the sum of contributions related to each process.

**8. Zahlweise:**

- Indicates the payment frequency, with values including:
  - `Einmal`
  - `Jährlich`
  - `Monatlich`
  - `Quartal`
  - `Sofort`
  - `Zweijährlich`

**9. Waehrung:**

- Almost exclusively contains the value `EUR` denoting currency.

**10. Bewertungsstuffe:** - Contains numeric values ranging from 0 to 5.

**11. Vorschlagsnummer:** - Contains identifiers for proposals associated with processes.

**12. Benutzername:** - Contains the name of the user associated with each process.

**13. Benutzergruppe:** - Represents the user group, with possible values including: - `Berater` - `Kunde` - `Kundenberater` - `Kundenbetreuer` - `Kundenbetreuerin`

**14. SichtenID:** - Contains identifiers for views related to processes.

**15. IDzurAggtNr:** - Contains identifiers for agents associated with processes.

**16. KZ_Risikoprüfung:** - Represents a boolean value indicating risk assessment.

**17. AIS_CaseID:** - Contains identifiers for health checks within life insurance processes.

**18. EsignFaehig:** - Represents a boolean value indicating electronic signature capability.

**19. Antragsnummer:** - Contains identifiers for applications associated with processes.

**20. Vertriebsstelle:** - Contains identifiers for sales points associated with processes.


In [131]:
# Define a function to convert string to timestamp
def str_to_timestamp(str_date):
    return datetime.strptime(str_date, "%Y-%m-%d %H:%M:%S")


# Define schema for the DataFrame
schema = StructType(
    [
        StructField("VorgangsID", StringType(), True),
        StructField("Datum", TimestampType(), True),
        StructField("Funktion", StringType(), True),
        StructField("Tarifname", StringType(), True),
        StructField("Sparte", StringType(), True),
        StructField("Betrag", DoubleType(), True),
        StructField("Beitragssumme", DoubleType(), True),
        StructField("Zahlweise", StringType(), True),
        StructField("Waehrung", StringType(), True),
        StructField("Bewertungsstuffe", IntegerType(), True),
        StructField("Vorschlagsnummer", StringType(), True),
        StructField("Benutzername", StringType(), True),
        StructField("Benutzergruppe", StringType(), True),
        StructField("SichtenID", StringType(), True),
        StructField("IDzurAggtNr", StringType(), True),
        StructField("KZ_Risikoprüfung", BooleanType(), True),
        StructField("AIS_CaseID", StringType(), True),
        StructField("EsignFaehig", BooleanType(), True),
        StructField("Antragsnummer", StringType(), True),
        StructField("Vertriebsstelle", StringType(), True),
    ]
)

# Sample data
data_1 = [
    (
        "ID1",
        str_to_timestamp("2024-04-27 08:30:00"),
        "Berechnung",
        "Tariff1",
        "BU",
        1000.0,
        500.0,
        "Jährlich",
        "EUR",
        3,
        "Vorschlag1",
        "User1",
        "Berater",
        "View1",
        "Agent1",
        True,
        "Case1",
        True,
        "App1",
        "SalesPoint1",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 10:45:00"),
        "Antrag (Papier)",
        "Tariff2",
        "EU",
        1500.0,
        750.0,
        "Monatlich",
        "EUR",
        4,
        "Vorschlag2",
        "User2",
        "Kunde",
        "View2",
        "Agent2",
        False,
        "Case2",
        False,
        "App2",
        "SalesPoint2",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 12:00:00"),
        "Laden",
        "Tariff3",
        "BU",
        2000.0,
        1000.0,
        "Jährlich",
        "EUR",
        5,
        "Vorschlag3",
        "User3",
        "Berater",
        "View3",
        "Agent3",
        True,
        "Case3",
        True,
        "App3",
        "SalesPoint3",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 13:15:00"),
        "Sync",
        "Tariff4",
        "EU",
        2500.0,
        1250.0,
        "Monatlich",
        "EUR",
        6,
        "Vorschlag4",
        "User4",
        "Kunde",
        "View4",
        "Agent4",
        False,
        "Case4",
        False,
        "App4",
        "SalesPoint4",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 14:30:00"),
        "Antrag Start",
        "Tariff5",
        "BU",
        3000.0,
        1500.0,
        "Jährlich",
        "EUR",
        7,
        "Vorschlag5",
        "User5",
        "Berater",
        "View5",
        "Agent5",
        True,
        "Case5",
        True,
        "App5",
        "SalesPoint5",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 15:45:00"),
        "Antrag (E-Sign)",
        "Tariff6",
        "EU",
        3500.0,
        1750.0,
        "Monatlich",
        "EUR",
        8,
        "Vorschlag6",
        "User6",
        "Kunde",
        "View6",
        "Agent6",
        False,
        "Case6",
        False,
        "App6",
        "SalesPoint6",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 17:00:00"),
        "Geloescht",
        "Tariff7",
        "BU",
        4000.0,
        2000.0,
        "Jährlich",
        "EUR",
        9,
        "Vorschlag7",
        "User7",
        "Berater",
        "View7",
        "Agent7",
        True,
        "Case7",
        True,
        "App7",
        "SalesPoint7",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 18:15:00"),
        "Antrag (Papier)",
        "Tariff8",
        "EU",
        4500.0,
        2250.0,
        "Monatlich",
        "EUR",
        10,
        "Vorschlag8",
        "User8",
        "Kunde",
        "View8",
        "Agent8",
        False,
        "Case8",
        False,
        "App8",
        "SalesPoint8",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 19:30:00"),
        "Laden",
        "Tariff9",
        "BU",
        5000.0,
        2500.0,
        "Jährlich",
        "EUR",
        11,
        "Vorschlag9",
        "User9",
        "Berater",
        "View9",
        "Agent9",
        True,
        "Case9",
        True,
        "App9",
        "SalesPoint9",
    ),
    (
        "ID2",
        str_to_timestamp("2024-04-27 20:45:00"),
        "Sync",
        "Tariff10",
        "EU",
        5500.0,
        2750.0,
        "Monatlich",
        "EUR",
        12,
        "Vorschlag10",
        "User10",
        "Kunde",
        "View10",
        "Agent10",
        False,
        "Case10",
        False,
        "App10",
        "SalesPoint10",
    ),
]

data_2 = [
    (
        "ID1",
        str_to_timestamp("2024-04-27 08:30:00"),
        "Berechnung",
        "Tariff1",
        "BU",
        1000.0,
        500.0,
        "Jährlich",
        "EUR",
        3,
        "Vorschlag1",
        "User1",
        "Berater",
        "View1",
        "Agent1",
        True,
        "Case1",
        True,
        "App1",
        "SalesPoint1",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 10:45:00"),
        "Antrag (Papier)",
        "Tariff2",
        "EU",
        1500.0,
        750.0,
        "Monatlich",
        "EUR",
        4,
        "Vorschlag2",
        "User2",
        "Kunde",
        "View2",
        "Agent2",
        False,
        "Case2",
        False,
        "App2",
        "SalesPoint2",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 12:00:00"),
        "Laden",
        "Tariff3",
        "BU",
        2000.0,
        1000.0,
        "Jährlich",
        "EUR",
        5,
        "Vorschlag3",
        "User3",
        "Berater",
        "View3",
        "Agent3",
        True,
        "Case3",
        True,
        "App3",
        "SalesPoint3",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 13:15:00"),
        "Sync",
        "Tariff4",
        "EU",
        2500.0,
        1250.0,
        "Monatlich",
        "EUR",
        6,
        "Vorschlag4",
        "User4",
        "Kunde",
        "View4",
        "Agent4",
        False,
        "Case4",
        False,
        "App4",
        "SalesPoint4",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 14:30:00"),
        "Antrag Start",
        "Tariff5",
        "BU",
        3000.0,
        1500.0,
        "Jährlich",
        "EUR",
        7,
        "Vorschlag5",
        "User5",
        "Berater",
        "View5",
        "Agent5",
        True,
        "Case5",
        True,
        "App5",
        "SalesPoint5",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 15:45:00"),
        "Antrag (E-Sign)",
        "Tariff6",
        "EU",
        3500.0,
        1750.0,
        "Monatlich",
        "EUR",
        8,
        "Vorschlag6",
        "User6",
        "Kunde",
        "View6",
        "Agent6",
        False,
        "Case6",
        False,
        "App6",
        "SalesPoint6",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 17:00:00"),
        "Geloescht",
        "Tariff7",
        "BU",
        4000.0,
        2000.0,
        "Jährlich",
        "EUR",
        9,
        "Vorschlag7",
        "User7",
        "Berater",
        "View7",
        "Agent7",
        True,
        "Case7",
        True,
        "App7",
        "SalesPoint7",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 18:15:00"),
        "Antrag (Papier)",
        "Tariff8",
        "EU",
        4500.0,
        2250.0,
        "Monatlich",
        "EUR",
        10,
        "Vorschlag8",
        "User8",
        "Kunde",
        "View8",
        "Agent8",
        False,
        "Case8",
        False,
        "App8",
        "SalesPoint8",
    ),
    (
        "ID1",
        str_to_timestamp("2024-04-27 19:30:00"),
        "Laden",
        "Tariff9",
        "BU",
        5000.0,
        2500.0,
        "Jährlich",
        "EUR",
        11,
        "Vorschlag9",
        "User9",
        "Berater",
        "View9",
        "Agent9",
        True,
        "Case9",
        True,
        "App9",
        "SalesPoint9",
    ),
    (
        "ID2",
        str_to_timestamp("2024-04-27 20:45:00"),
        "Sync",
        "Tariff10",
        "EU",
        5500.0,
        2750.0,
        "Monatlich",
        "EUR",
        12,
        "Vorschlag10",
        "User10",
        "Kunde",
        "View10",
        "Agent10",
        False,
        "Case10",
        False,
        "App10",
        "SalesPoint10",
    ),
]
# Create DataFrame
df_1 = spark.createDataFrame(data_1, schema)

df_2 = spark.createDataFrame(data_2, schema)

# Data Testing and Comparison

**Information:**
I regularly update the dataset by adding new data (rows) based on the column names.

**Goal:**

- I want to know how much new rows are added based on the column names in each update.
- I want to compare the existing rows with the new rows based on the column names.
- I want to know if the new rows are consistent with the existing rows based on the column names.


In [145]:
def compare_rows(df_old, df_new):
    """
    Compare the number of rows between two Spark DataFrames.

    Parameters:
    - df_old (DataFrame): The original DataFrame.
    - df_new (DataFrame): The new DataFrame.

    Returns:
    None

    This function compares the number of rows between two Spark DataFrames.
    It prints a message indicating whether there are updates, deletions, or no changes.

    Example:
    >>> df1 = spark.createDataFrame([(1, 'A'), (2, 'B'), (3, 'C')], ['ID', 'Value'])
    >>> df2 = spark.createDataFrame([(1, 'A'), (2, 'B'), (4, 'D')], ['ID', 'Value'])
    >>> compare_rows(df1, df2)
    Updated: 1 row
    """
    rows_old = df_old.count()
    rows_new = df_new.count()
    if rows_old == rows_new:
        print("No Update")
    elif rows_old > rows_new:
        print(f"Deleted: {rows_old - rows_new} rows")
    else:
        print(f"Updated: {rows_new - rows_old} rows")
        print(f"Total Rows: {rows_new} rows")

In [146]:
# Test the function
compare_rows(df_2, df_1)

No Update


In [149]:
def get_schema_change(df_old, df_new):
    """
    Compare the schema of two Spark DataFrames and display the schema changes.

    Parameters:
    - df_old (DataFrame): The original DataFrame.
    - df_new (DataFrame): The new DataFrame.

    Returns:
    None

    This function compares the schema of two Spark DataFrames and prints any changes detected.
    It checks for changes in field names, data types, and the number of fields.

    Example:
    >>> df1 = spark.createDataFrame([(1, 'A'), (2, 'B'), (3, 'C')], ['ID', 'Value'])
    >>> df2 = spark.createDataFrame([(1, 'A'), (2, 'B'), (4, 'D')], ['ID', 'Value', 'NewColumn'])
    >>> get_schema_change(df1, df2)
    Number of Fields Change: 2 -> 3
    """

    schema_old = df_old.schema
    schema_new = df_new.schema

    if schema_old == schema_new:
        print("No Schema Change")
    else:
        if len(schema_old) == len(schema_new):
            for i in range(len(schema_old)):
                if schema_old[i].name != schema_new[i].name:
                    print(
                        f"Field Name Change: {schema_old[i].name} -> {schema_new[i].name}"
                    )
                if schema_old[i].dataType != schema_new[i].dataType:
                    print(
                        f"Data Type Change: {schema_old[i].dataType} -> {schema_new[i].dataType}"
                    )
        else:
            print(f"Number of Fields Change: {len(schema_old)} -> {len(schema_new)}")

In [150]:
# Test the function
compare_schema(df_1, df_2)

No Schema Change


In [147]:
def get_data_changes(df_old, df_new, key_column):
    """
    Compares two DataFrames based on a specified key column.

    Parameters:
    - df_old (DataFrame): The old DataFrame to compare.
    - df_new (DataFrame): The new DataFrame to compare.
    - key_column (str): The column name to use as the key for comparison.

    Returns:
    - DataFrame: A DataFrame containing rows that are different between the two input DataFrames but exist in either one.


    Example:
    >>> df1 = spark.createDataFrame([(1, 'A'), (2, 'B'), (3, 'C')], ['ID', 'Value'])
    >>> df2 = spark.createDataFrame([(1, 'A'), (2, 'D'), (4, 'E')], ['ID', 'Value'])
    >>> compare_dataframes(df1, df2, 'ID').show()
    +---+-----+
    | ID|Value|
    +---+-----+
    |  2|    D|
    |  4|    E|
    +---+-----+

    """

    intersection = df_old.select(key_column).intersect(
        df_new.select(key_column))

    df_old_filtered = df_old.join(intersection, key_column, "inner")
    df_new_filtered = df_new.join(intersection, key_column, "inner")

    df_old_filtered.cache()
    df_new_filtered.cache()

    df_union = df_old_filtered.unionByName(
        df_new_filtered, allowMissingColumns=True)

    diff_df = df_union.subtract(df_old_filtered).union(
        df_union.subtract(df_new_filtered)
    )

    return diff_df

In [148]:
# Test the function
diff_df = get_data_changes(df_1, df_2, "VorgangsID")
diff_df.show()

24/04/28 15:24:41 WARN CacheManager: Asked to cache already cached data.
24/04/28 15:24:41 WARN CacheManager: Asked to cache already cached data.

+----------+-----+--------+---------+------+------+-------------+---------+--------+----------------+----------------+------------+--------------+---------+-----------+----------------+----------+-----------+-------------+---------------+
|VorgangsID|Datum|Funktion|Tarifname|Sparte|Betrag|Beitragssumme|Zahlweise|Waehrung|Bewertungsstuffe|Vorschlagsnummer|Benutzername|Benutzergruppe|SichtenID|IDzurAggtNr|KZ_Risikoprüfung|AIS_CaseID|EsignFaehig|Antragsnummer|Vertriebsstelle|
+----------+-----+--------+---------+------+------+-------------+---------+--------+----------------+----------------+------------+--------------+---------+-----------+----------------+----------+-----------+-------------+---------------+
+----------+-----+--------+---------+------+------+-------------+---------+--------+----------------+----------------+------------+--------------+---------+-----------+----------------+----------+-----------+-------------+---------------+





In [142]:
def get_first_occurrence(df, key_column, date_column):
    """
    Display the first occurrence date for each distinct value in the specified columns.

    Parameters:
    - df (DataFrame): The input DataFrame.
    - key_column (str): The column name containing distinct values.
    - date_column (str): The column name containing dates.

    Returns:
    None

    This function takes a DataFrame as input and displays each distinct value of the key column
    along with the date of its first occurrence in the date column.

    Example:
    >>> df = spark.createDataFrame([(1, '2024-04-27'), (2, '2024-04-28'), (1, '2024-04-29')], ['ID', 'Date'])
    >>> get_first_occurrence(df, 'ID', 'Date')
    +---+----------+
    | ID|first_date|
    +---+----------+
    |  1|2024-04-27|
    |  2|2024-04-28|
    +---+----------+
    """

    window_spec = Window.partitionBy(key_column).orderBy(date_column)
    first_occurrence = row_number().over(window_spec)

    df = df.withColumn("rn", first_occurrence)

    return df.filter(F.col("rn") == 1).select(key_column, date_column)

In [143]:
# test the function
display(display_first_occurrence(df_1, "VorgangsID", "Datum"))

DataFrame[VorgangsID: string, Datum: timestamp]

In [151]:
# close the session
spark.stop()