In [6]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, regexp_replace


# Create SparkSession
spark = SparkSession.builder.appName("TestGetNumbersFromString").getOrCreate()


def get_numbers_from_string(df: DataFrame, column: str) -> DataFrame:
    """
    Extract all numbers from a single string column containing a mix of strings and integers.

    Args:
        df: Input PySpark DataFrame.
        column: Name of the column to process.

    Returns:
        DataFrame with an additional column containing all extracted numbers.

    Example:
        >>> df = spark.createDataFrame([("123ABC456",), ("789DEF012",)], ["mixed_column"])
        >>> result = get_numbers_from_string(df, "mixed_column")
        >>> result.show()
        +------------+------------------------+
        |mixed_column|mixed_column_only_numbers|
        +------------+------------------------+
        |   123ABC456|                   123456|
        |   789DEF012|                   789012|
        +------------+------------------------+
    """
    return df.withColumn(
        f"{column}_only_numbers",
        regexp_replace(col(column).cast(StringType()), r"[^0-9]", ""),
    )


# Create an example dataset
schema = StructType([StructField("mixed_column", StringType(), True)])

data = [
    ("123ABC456DEF789",),
    ("987DEF654ABC321",),
    ("111AAA222BBB333",),
    ("XYZ789",),
    ("456ABC",),
]

example_df = spark.createDataFrame(data, schema)

print("Original DataFrame:")
example_df.show()

# Test the function
print("\nExtracting all numbers:")
result = get_numbers_from_string(example_df, "mixed_column")
result.show()

# Stop the SparkSession
spark.stop()

Original DataFrame:


                                                                                

+---------------+
|   mixed_column|
+---------------+
|123ABC456DEF789|
|987DEF654ABC321|
|111AAA222BBB333|
|         XYZ789|
|         456ABC|
+---------------+


Extracting all numbers:
+---------------+-------------------------+
|   mixed_column|mixed_column_only_numbers|
+---------------+-------------------------+
|123ABC456DEF789|                123456789|
|987DEF654ABC321|                987654321|
|111AAA222BBB333|                111222333|
|         XYZ789|                      789|
|         456ABC|                      456|
+---------------+-------------------------+



In [22]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import StringType, StructType, StructField

# Create SparkSession
spark = SparkSession.builder.appName("TestGetTextFromString").getOrCreate()


def get_text_from_string(df: DataFrame, column: str) -> DataFrame:
    """
    Extract all alphabetic characters from a single string column containing a mix of strings and other characters.

    Args:
        df: Input PySpark DataFrame.
        column: Name of the column to process.

    Returns:
        DataFrame with an additional column containing all extracted alphabetic characters.

    Example:
        >>> df = spark.createDataFrame([("123ABC456",), ("DEF789GHI",)], ["mixed_column"])
        >>> result = get_text_from_string(df, "mixed_column")
        >>> result.show()
        +------------+----------------------+
        |mixed_column|mixed_column_only_text|
        +------------+----------------------+
        |   123ABC456|                   ABC|
        |   DEF789GHI|                DEFGHI|
        +------------+----------------------+
    """
    return df.withColumn(
        f"{column}_only_text",
        regexp_replace(col(column).cast(StringType()), r"[^a-zA-Z]", ""),
    )


# Create an example dataset
schema = StructType([StructField("mixed_column", StringType(), True)])

data = [
    ("123ABC456DEF789",),
    ("XYZ987DEF654ABC",),
    ("111aaa222BBB333",),
    ("789XYZ",),
    ("ABCdef123",),
]

example_df = spark.createDataFrame(data, schema)

print("Original DataFrame:")
example_df.show()

# Test the function
print("\nExtracting all text:")
result = get_text_from_string(example_df, "mixed_column")
result.show()

# Stop the SparkSession

Original DataFrame:


[Stage 0:>                                                          (0 + 1) / 1]

+---------------+
|   mixed_column|
+---------------+
|123ABC456DEF789|
|XYZ987DEF654ABC|
|111aaa222BBB333|
|         789XYZ|
|      ABCdef123|
+---------------+


Extracting all text:
+---------------+----------------------+
|   mixed_column|mixed_column_only_text|
+---------------+----------------------+
|123ABC456DEF789|                ABCDEF|
|XYZ987DEF654ABC|             XYZDEFABC|
|111aaa222BBB333|                aaaBBB|
|         789XYZ|                   XYZ|
|      ABCdef123|                ABCdef|
+---------------+----------------------+



                                                                                

In [23]:
from pyspark.sql import DataFrame, Column
from pyspark.sql.functions import col, lit, lower, trim, when
from pyspark.sql.types import StringType
from typing import List, Optional, Union, Dict


def _validate_column_existence(df: DataFrame, columns: Optional[List[str]]) -> None:
    """
    Validates the existence of specified columns in the DataFrame.

    Args:
        df: Input PySpark DataFrame.
        columns: List of columns to validate.

    Raises:
        ValueError: If the DataFrame is empty or if any specified columns don't exist.
    """
    if df.rdd.isEmpty():
        raise ValueError("Input DataFrame is empty.")

    if columns:
        non_existent_cols = set(columns) - set(df.columns)
        if non_existent_cols:
            raise ValueError(f"Columns not found in DataFrame: {non_existent_cols}")


def _get_string_columns(df: DataFrame) -> List[str]:
    """
    Retrieves all string columns from the DataFrame.

    Args:
        df: Input PySpark DataFrame.

    Returns:
        List of string column names.
    """
    return [
        field.name
        for field in df.schema.fields
        if isinstance(field.dataType, StringType)
    ]


def _create_missing_value_expression(column: str, custom_patterns: List[str]) -> Column:
    """
    Creates an expression to identify missing values based on default and custom patterns.

    Args:
        column: Column name for which the missing value expression is created.
        custom_patterns: List of custom regex patterns to identify missing values.

    Returns:
        Column expression with missing values replaced by None.
    """
    base_patterns = [r"^[\s\W]*$", r"^\s*$", r".*null.*", r".*n/a.*"]
    all_patterns = base_patterns + custom_patterns
    pattern = "|".join(f"({p})" for p in all_patterns)

    return when(
        trim(lower(col(column))).rlike(pattern) | col(column).isNull(), lit(None)
    ).otherwise(col(column))


def replace_missing_values_with_none(
    df: DataFrame,
    columns: Optional[Union[str, List[str]]] = None,
    missing_patterns: Optional[Dict[str, List[str]]] = None,
) -> DataFrame:
    """
    Replace missing values with None in specified columns of a PySpark DataFrame.

    Args:
        df: Input PySpark DataFrame.
        columns: Column(s) to process. If None, all string columns are processed.
        missing_patterns: Custom patterns to identify missing values for specific columns.

    Returns:
        DataFrame with missing values replaced by None.

    Raises:
        ValueError: If the input DataFrame is empty or if specified columns don't exist.
    """
    columns_to_validate = [columns] if isinstance(columns, str) else columns
    _validate_column_existence(df, columns_to_validate)

    columns = (
        _get_string_columns(df)
        if columns is None
        else [columns] if isinstance(columns, str) else columns
    )
    missing_patterns = missing_patterns or {}

    for column in columns:
        custom_patterns = missing_patterns.get(column, [])
        df = df.withColumn(
            column, _create_missing_value_expression(column, custom_patterns)
        )

    return df


# Define schema for test data
schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("value", StringType(), True),
        StructField("comment", StringType(), True),
    ]
)

# Create test data
data = [
    ("1", "John", "", "Normal"),
    ("2", "Jane", "n/a", "Contains null"),
    ("3", "Bob", "@#$", "Special characters"),
    ("4", "Alice", "N/A", "Common missing value indicator"),
    ("5", "Charlie", " ", "Empty string"),
    ("6", "David", "NULL", "Uppercase NULL"),
    ("7", "Eve", "nice", "Only whitespace"),
    ("8", "Frank", "---", "Dashes"),
    ("9", "Grace", "null", "Lowercase null"),
    ("10", "Henry", "None", "None string"),
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

print("Original DataFrame:")
df.show(truncate=False)

# Test scenario 1: Replace missing values in all string columns
result1 = replace_missing_values_with_none(df)
print("\nScenario 1: Replace missing values in all string columns:")
result1.show(truncate=False)

# Test scenario 2: Replace missing values in specific columns
result2 = replace_missing_values_with_none(df, columns=["value", "comment"])
print("\nScenario 2: Replace missing values in 'value' and 'comment' columns:")
result2.show(truncate=False)

# Test scenario 3: Use custom missing patterns
custom_patterns = {"value": [r"---", r"None"], "comment": [r"Normal"]}
result3 = replace_missing_values_with_none(
    df, columns=["value", "comment"], missing_patterns=custom_patterns
)
print("\nScenario 3: Use custom missing patterns:")
result3.show(truncate=False)

# Stop SparkSession

Original DataFrame:
+---+-------+-----+------------------------------+
|id |name   |value|comment                       |
+---+-------+-----+------------------------------+
|1  |John   |     |Normal                        |
|2  |Jane   |n/a  |Contains null                 |
|3  |Bob    |@#$  |Special characters            |
|4  |Alice  |N/A  |Common missing value indicator|
|5  |Charlie|     |Empty string                  |
|6  |David  |NULL |Uppercase NULL                |
|7  |Eve    |nice |Only whitespace               |
|8  |Frank  |---  |Dashes                        |
|9  |Grace  |null |Lowercase null                |
|10 |Henry  |None |None string                   |
+---+-------+-----+------------------------------+


Scenario 1: Replace missing values in all string columns:
+---+-------+-----+------------------------------+
|id |name   |value|comment                       |
+---+-------+-----+------------------------------+
|1  |John   |null |Normal                        |
|2

In [73]:
from pyspark.sql import DataFrame
from typing import Union
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import re


def replace_substring_after_string(
    df: DataFrame,
    column: str,
    target: str,
    after_string: str,
    replacement: Union[str, None],
    whole_word: bool = False,
) -> DataFrame:
    """
    Replace or remove a substring, but only after a specified string occurs in the text.

    This function replaces a substring in a single column of a PySpark DataFrame
    after a specified string. It supports whole word replacements and character removal.

    Args:
        df (DataFrame): Input DataFrame.
        column (str): Name of the column to process.
        target (str): The substring to replace.
        after_string (str): The string after which the replacement should occur.
        replacement (Union[str, None]): The replacement string. Use an empty string or None to remove the substring.
        whole_word (bool): If True, only whole words will be replaced. Defaults to False.

    Returns:
        DataFrame: DataFrame with the substring replaced or removed after the specified string.

    Raises:
        ValueError: If the specified column does not exist in the DataFrame.
        TypeError: If the input types are incorrect.

    Examples:
        >>> df = spark.createDataFrame([
        ...     ("The quick brown fox jumps over the quick dog. The fox is quick.",)
        ... ], ["text"])
        >>> # Example 1: Replace "quick" with "slow" after "over the"
        >>> result1 = replace_substring_after_string(
        ...     df=df,
        ...     column="text",
        ...     target="quick",
        ...     after_string="over the",
        ...     replacement="slow"
        ... )
        >>> result1.show(truncate=False)
        +-----------------------------------------------------------------------------------+
        |text                                                                               |
        +-----------------------------------------------------------------------------------+
        |The quick brown fox jumps over the slow dog. The fox is quick.                     |
        +-----------------------------------------------------------------------------------+
    """
    # Error checking
    if column not in df.columns:
        raise ValueError(f"Column '{column}' does not exist in the DataFrame.")

    if not isinstance(target, str):
        raise TypeError("The 'target' must be a string.")

    if not isinstance(after_string, str):
        raise TypeError("The 'after_string' must be a string.")

    if not (isinstance(replacement, str) or replacement is None):
        raise TypeError("The 'replacement' must be a string or None.")

    if replacement is None:
        replacement = ""

    # Define the UDF function
    def replace_after(text):
        if text is None:
            return None

        # Find the last occurrence of after_string
        after_pos = text.find(after_string)
        if after_pos == -1:
            # after_string not found, return original text
            return text

        # Position after after_string
        start_pos = after_pos + len(after_string)

        before = text[:start_pos]
        after = text[start_pos:]

        # Prepare the pattern for target
        if whole_word:
            pattern = r"\b" + re.escape(target) + r"\b"
        else:
            pattern = re.escape(target)

        # Replace the target in the after part
        after_modified = re.sub(pattern, replacement, after)

        # Return the concatenation
        return before + after_modified

    # Register the UDF
    replace_udf = udf(replace_after, StringType())

    # Apply the UDF to the DataFrame
    df = df.withColumn(column, replace_udf(col(column)))

    return df


# Test example dataset
df = spark.createDataFrame(
    [("The quick brown fox jumps over the quick dog. The fox is quick.",)], ["text"]
)

# Example 1: Replace "quick" with "slow" after "over the"
result1 = replace_substring_after_string(
    df=df, column="text", target="quick", after_string="over the", replacement="slow"
)
result1.show(truncate=False)

# Example 2: Remove "fox" after "The" (last occurrence)
result2 = replace_substring_after_string(
    df=df, column="text", target="fox", after_string="The", replacement=""
)
result2.show(truncate=False)

# Example 3: Replace "quick" with "swift" after "The" using whole word option
result3 = replace_substring_after_string(
    df=df,
    column="text",
    target="quick",
    after_string="The",
    replacement="swift",
    whole_word=True,
)
result3.show(truncate=False)

+-------------------------------------------------------------+
|text                                                         |
+-------------------------------------------------------------+
|The quick brown fox jumps over the slow dog. The fox is slow.|
+-------------------------------------------------------------+

+---------------------------------------------------------+
|text                                                     |
+---------------------------------------------------------+
|The quick brown  jumps over the quick dog. The  is quick.|
+---------------------------------------------------------+

+---------------------------------------------------------------+
|text                                                           |
+---------------------------------------------------------------+
|The swift brown fox jumps over the swift dog. The fox is swift.|
+---------------------------------------------------------------+



In [75]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test").getOrCreate()

# Sample data to test the function with a timestamp column
data = [
    ("2023-09-28 12:45:00",),
    ("2024-01-01 10:15:30",),
    ("2023-12-25 23:59:59",),
]

# Create DataFrame
df_test = spark.createDataFrame(data, ["timestamp_col"])

# Display the original dataset
print("Original DataFrame:")
df_test.show(truncate=False)


# Function to cast timestamp to date
def cast_timestamp_to_date(df: DataFrame, columns: list[str]) -> DataFrame:
    """
    Casts the specified timestamp columns to date format in the given DataFrame.

    Parameters
    ----------
    df : DataFrame
        The input DataFrame containing the timestamp columns to be cast.
    columns : list of str
        A list of column names in the DataFrame that need to be cast to date format.

    Returns
    -------
    DataFrame
        The DataFrame with the specified timestamp columns cast to date format.
    """
    for column in columns:
        df = df.withColumn(column, to_date(col(column)))
    return df


# Cast the timestamp column to date
df_result = cast_timestamp_to_date(df_test, ["timestamp_col"])

# Display the result
print("DataFrame after casting timestamp to date:")
df_result.show(truncate=False)

Original DataFrame:
+-------------------+
|timestamp_col      |
+-------------------+
|2023-09-28 12:45:00|
|2024-01-01 10:15:30|
|2023-12-25 23:59:59|
+-------------------+

DataFrame after casting timestamp to date:
+-------------+
|timestamp_col|
+-------------+
|2023-09-28   |
|2024-01-01   |
|2023-12-25   |
+-------------+



In [76]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, col

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("test").getOrCreate()

# Sample data for testing
data = [
    ("2023-09-28",),
    ("2024-01-01",),
    ("2023-12-25",),
]

# Create DataFrame
df_test = spark.createDataFrame(data, ["date_col"])

from pyspark.sql import DataFrame
from pyspark.sql.functions import date_format, col


def add_day_of_week_column(
    df: DataFrame, date_column: str, new_column_name: str = "day_of_week"
) -> DataFrame:
    """
    Adds a new column to the DataFrame with the day of the week as long text (e.g., 'Monday')
    derived from a date column.

    Parameters
    ----------
    df : DataFrame
        The input DataFrame containing the date column.
    date_column : str
        The name of the date column from which the day of the week is derived.
    new_column_name : str, optional
        The name of the new column containing the day of the week. Defaults to "day_of_week".

    Returns
    -------
    DataFrame
        DataFrame with a new column containing the day of the week as long text.

    Examples
    --------
    >>> df = spark.createDataFrame([("2023-09-28",), ("2024-01-01",)], ["date_col"])
    >>> df_with_day_of_week = add_day_of_week_column(df, "date_col")
    >>> df_with_day_of_week.show(truncate=False)
    +----------+------------+
    |date_col  |day_of_week |
    +----------+------------+
    |2023-09-28|Thursday    |
    |2024-01-01|Monday      |
    +----------+------------+
    """
    return df.withColumn(new_column_name, date_format(col(date_column), "EEEE"))


# Apply the function
df_with_day_of_week = add_day_of_week_column(df_test, "date_col")

# Display the result
df_with_day_of_week.show(truncate=False)

+----------+-----------+
|date_col  |day_of_week|
+----------+-----------+
|2023-09-28|Thursday   |
|2024-01-01|Monday     |
|2023-12-25|Monday     |
+----------+-----------+



24/09/28 18:58:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2073263 ms exceeds timeout 120000 ms
24/09/28 18:58:48 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/28 18:58:48 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.

24/09/28 20:43:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Input DataFrame:
+-------+-----+------+----------+
|   name|group|gender|birth_date|
+-------+-----+------+----------+
|  Alice|    A|  Male|2023-09-01|
|    Bob|    B|Female|2023-08-15|
|Charlie|    A|  Male|2023-07-20|
|  David|    A|  Male|2023-09-01|
|    Eve|    B|Female|2023-08-15|
+-------+-----+------+----------+



                                                                                

IllegalArgumentException: requirement failed: Output column group_indexed already exists.

In [38]:
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


def extract_email_from_column(df: DataFrame, column_name: str) -> DataFrame:
    """
    Extract email addresses from a given string column in a PySpark DataFrame.

    Parameters
    ----------
    df : DataFrame
        The input PySpark DataFrame containing the column with email addresses.
    column_name : str
        The name of the column from which to extract the email address.

    Returns
    -------
    DataFrame
        A new PySpark DataFrame with an additional column 'extracted_email' containing the extracted emails.

    Example
    -------
    df = extract_email_from_column(input_df, 'text_column')
    """
    # Define the regular expression pattern for matching email addresses
    email_pattern = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"

    # Apply regexp_extract to extract the email addresses from the specified column
    df_with_email = df.withColumn(
        "extracted_email", F.regexp_extract(F.col(column_name), email_pattern, 0)
    )

    return df_with_email


# Example usage:
if __name__ == "__main__":
    spark = SparkSession.builder.appName("EmailExtractor").getOrCreate()

    # Example DataFrame
    data = [
        ("Contact me at john.doe@example.com",),
        ("My email is jane_doe123@company.org",),
        ("Reach out via admin@service.net",),
    ]

    df = spark.createDataFrame(data, ["text_column"])

    # Extract emails
    result_df = extract_email_from_column(df, "text_column")

    # Show the result
    result_df.show(truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

+-----------------------------------+-----------------------+
|text_column                        |extracted_email        |
+-----------------------------------+-----------------------+
|Contact me at john.doe@example.com |john.doe@example.com   |
|My email is jane_doe123@company.org|jane_doe123@company.org|
|Reach out via admin@service.net    |admin@service.net      |
+-----------------------------------+-----------------------+



                                                                                

In [40]:
def extract_street_and_house_number(df, address_column):
    """
    Extracts German street names and house numbers from an address column in a PySpark DataFrame.
    Adds two new columns: 'street_name' and 'house_number'.

    Parameters:
    df (DataFrame): Input PySpark DataFrame.
    address_column (str): Name of the column containing the address strings.

    Returns:
    DataFrame: PySpark DataFrame with additional 'street_name' and 'house_number' columns.
    """
    from pyspark.sql.functions import regexp_extract, trim, col

    # Regular expression pattern to match street name and house number
    pattern = r"^(.+?)(?:\s*)(\d+\s*[a-zA-Z]?(\s*[-/]{1,2}\s*\d+\s*[a-zA-Z]?)*\s*)$"

    # Extract street_name and house_number using regex and trim any extra spaces
    df = df.withColumn(
        "street_name", trim(regexp_extract(col(address_column), pattern, 1))
    )
    df = df.withColumn(
        "house_number", trim(regexp_extract(col(address_column), pattern, 2))
    )

    return df


from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.functions as F


def test_extract_street_and_house_number_complex_cases():
    # Initialize Spark session
    spark = (
        SparkSession.builder.master("local")
        .appName("StreetExtractorComplexCasesTest")
        .getOrCreate()
    )

    # Example data covering provided edge cases
    test_data = [
        ("Musterstrasse 123",),  # Standard address
        ("Beispielallee 45- A",),  # Address with spaces and dash
        (
            "Beispielallee 45 -A",
        ),  # Address with spaces and dash in a different position
        (
            "Lindenstrasse 12 - 14",
        ),  # Address with range and extra spaces around the dash
        (
            "Hauptstrasse 25a / 2",
        ),  # Address with letter and fractional house number with spaces around slash
        (
            "Hauptstrasse 25b/ 2c",
        ),  # Complex fractional house number with letters and varying spaces
    ]

    # Create DataFrame
    df = spark.createDataFrame(test_data, ["address_column"])

    # Apply the extract_street_and_house_number function
    df_result = extract_street_and_house_number(df, "address_column")

    # Show the output to verify the result
    df_result.show(truncate=False)

    # Expected results based on provided address column examples
    expected_results = [
        ("Musterstrasse 123", "Musterstrasse", "123"),
        ("Beispielallee 45- A", "Beispielallee", "45- A"),
        ("Beispielallee 45 -A", "Beispielallee", "45 -A"),
        ("Lindenstrasse 12 - 14", "Lindenstrasse", "12 - 14"),
        ("Hauptstrasse 25a / 2", "Hauptstrasse", "25a / 2"),
        ("Hauptstrasse 25b/ 2c", "Hauptstrasse", "25b/ 2c"),
    ]

    # Assertion: Compare the expected and actual results
    for row, expected in zip(df_result.collect(), expected_results):
        assert (
            row["address_column"] == expected[0]
        ), f"Address mismatch: {row['address_column']} != {expected[0]}"
        assert (
            row["street_name"] == expected[1]
        ), f"Street mismatch: {row['street_name']} != {expected[1]}"
        assert (
            row["house_number"] == expected[2]
        ), f"House number mismatch: {row['house_number']} != {expected[2]}"

    print("All complex test cases passed.")


# Run the test
if __name__ == "__main__":
    test_extract_street_and_house_number_complex_cases()

24/09/29 11:35:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---------------------+-------------+------------+
|address_column       |street_name  |house_number|
+---------------------+-------------+------------+
|Musterstrasse 123    |Musterstrasse|123         |
|Beispielallee 45- A  |             |            |
|Beispielallee 45 -A  |             |            |
|Lindenstrasse 12 - 14|Lindenstrasse|12 - 14     |
|Hauptstrasse 25a / 2 |Hauptstrasse |25a / 2     |
|Hauptstrasse 25b/ 2c |Hauptstrasse |25b/ 2c     |
+---------------------+-------------+------------+



AssertionError: Street mismatch:  != Beispielallee

In [60]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col,
    regexp_replace,
    regexp_extract,
    when,
    length,
    lit,
    concat,
)


def adjust_contract_number_format(df: DataFrame, column: str) -> DataFrame:
    """
    Adjusts the contract number format in the specified column of a PySpark DataFrame.
    The function transforms various input formats into the standardized format: "XXCC-NNNNNNN".

    Args:
        df (DataFrame): Input PySpark DataFrame.
        column (str): Name of the column containing the contract numbers.

    Returns:
        DataFrame: The original DataFrame with an additional 'Vertragsnummer' column.
    """

    # Clean the input by removing any non-alphanumeric characters (excluding spaces)
    df = df.withColumn("clean_input", regexp_replace(col(column), r"[^A-Za-z0-9 ]", ""))

    # Define regular expression patterns for different input formats
    patterns = [
        # Pattern 1: Leading zeros + Two digits + Two letters + Seven digits
        (
            r"^0*(\d{2})([A-Za-z ]{2})(\d{7})$",
            lambda m: f"{m.group(1)}{m.group(2)}-{m.group(3)}",
        ),
        # Pattern 2: Seven digits + Two letters + Two digits
        (
            r"^(\d{7})([A-Za-z ]{2})(\d{2})$",
            lambda m: f"{m.group(3)}{m.group(2)}-{m.group(1)}",
        ),
        # Pattern 3: Seven digits + Two digits + Two letters
        (
            r"^(\d{7})(\d{2})([A-Za-z ]{2})$",
            lambda m: f"{m.group(2)}{m.group(3)}-{m.group(1)}",
        ),
        # Pattern 4: Two digits + Two letters + Seven digits
        (
            r"^(\d{2})([A-Za-z ]{2})(\d{7})$",
            lambda m: f"{m.group(1)}{m.group(2)}-{m.group(3)}",
        ),
        # Pattern 5: Leading zeros + Seven digits + Two letters + Two digits
        (
            r"^0*(\d{7})([A-Za-z ]{2})(\d{2})$",
            lambda m: f"{m.group(3)}{m.group(2)}-{m.group(1)}",
        ),
        # Pattern 6: Leading zeros + Seven digits + Two digits + Two letters
        (
            r"^0*(\d{7})(\d{2})([A-Za-z ]{2})$",
            lambda m: f"{m.group(2)}{m.group(3)}-{m.group(1)}",
        ),
    ]

    # Apply patterns sequentially
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType

    def transform_contract_number(value):
        import re

        for pattern, formatter in patterns:
            match = re.match(pattern, value)
            if match:
                return formatter(match)
        # If no pattern matches, return the original value
        return value

    # Register the UDF
    transform_udf = udf(transform_contract_number, StringType())

    # Apply the UDF to create the 'Vertragsnummer' column
    df = df.withColumn("Vertragsnummer", transform_udf(col("clean_input")))

    # Drop the intermediate 'clean_input' column
    df = df.drop("clean_input")

    return df


# Sample Data
data = [
    ("23 B1234567",),
    ("1234567AB23",),
    ("1234567-23AB",),
    ("023AB1234567",),
    ("45CD7654321",),
    ("7654321CD45",),
    ("7654321-45CD",),
    ("045CD7654321",),
    ("12EF0987654",),
    ("0987654EF12",),
    ("0987654-12EF",),
    ("012EF0987654",),
    ("67GH3456789",),
    ("3456789GH67",),
    ("3456789-67GH",),
    ("067GH3456789",),
    ("34JK9876543",),
    ("9876543JK34",),
    ("9876543-34JK",),
    ("034JK9876543",),
    ("034 K9876543",),
    ("22 L9876543",),
    ("001 L9876543",),
    ("00001 L9876543",),
    ("220129876543",),
    ("16720129876543",),
]
df = spark.createDataFrame(data, ["input_column"])

# Apply the function
result_df = adjust_contract_number_format(df, "input_column")

# Show the result
result_df.show(truncate=False)

+------------+--------------+
|input_column|Vertragsnummer|
+------------+--------------+
|23 B1234567 |23 B-1234567  |
|1234567AB23 |23AB-1234567  |
|1234567-23AB|23AB-1234567  |
|023AB1234567|23AB-1234567  |
|45CD7654321 |45CD-7654321  |
|7654321CD45 |45CD-7654321  |
|7654321-45CD|45CD-7654321  |
|045CD7654321|45CD-7654321  |
|12EF0987654 |12EF-0987654  |
|0987654EF12 |12EF-0987654  |
|0987654-12EF|12EF-0987654  |
|012EF0987654|12EF-0987654  |
|67GH3456789 |67GH-3456789  |
|3456789GH67 |67GH-3456789  |
|3456789-67GH|67GH-3456789  |
|067GH3456789|67GH-3456789  |
|34JK9876543 |34JK-9876543  |
|9876543JK34 |34JK-9876543  |
|9876543-34JK|34JK-9876543  |
|034JK9876543|34JK-9876543  |
+------------+--------------+
only showing top 20 rows



In [62]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, regexp_replace, udf
from pyspark.sql.types import StringType
import re

# Initialize a Spark session
spark = SparkSession.builder.master("local").appName("PySparkTest").getOrCreate()

# Define a sample dataset for testing
data = {
    "input_string": [
        "AB-12 1234567 CD34",  # Example with 7 digits and letters
        "00123456789XY",  # Example with leading zeros and 7 digits
        "9876543ZAB123",  # Example with digits followed by letters
        "NothingHere123",  # Example with no 7-digit sequence
        "12-1234567",  # 7 digits with letters directly after
        "RandomStringWithoutNumbers",  # No digits
        "X001234567 YZ",  # Leading zeros followed by 7 digits
    ]
}

# Convert the data into a Pandas DataFrame first for testing
pandas_df = pd.DataFrame(data)

# Convert Pandas DataFrame to PySpark DataFrame
spark_df = spark.createDataFrame(pandas_df)


# Define the function to extract 7-digit numbers
def get_vsnr_from_string(df: DataFrame, column: str) -> DataFrame:
    """
    Extracts the 7-digit number from the specified column in a PySpark DataFrame
    and stores it in a new column called 'vsnr'.

    Args:
        df (DataFrame): Input PySpark DataFrame.
        column (str): Name of the column containing the input strings.

    Returns:
        DataFrame: The original DataFrame with an additional 'vsnr' column containing the 7-digit number.
    """

    # Clean the input by removing any non-alphanumeric characters (excluding spaces)
    df = df.withColumn("clean_input", regexp_replace(col(column), r"[^A-Za-z0-9 ]", ""))

    def extract_vsnr(value: str) -> str:
        """
        Extracts the first occurrence of 7 consecutive digits from the given value.

        Args:
            value (str): The input string from which the 7-digit number is extracted.

        Returns:
            str: The extracted 7-digit number, or an empty string if no match is found.
        """
        match = re.search(r"\d{7}", value)
        if match:
            return match.group(0)
        return ""

    # Register the UDF
    extract_vsnr_udf = udf(extract_vsnr, StringType())

    # Apply the UDF to create the 'vsnr' column
    df = df.withColumn("vsnr", extract_vsnr_udf(col("clean_input")))

    # Drop the intermediate 'clean_input' column
    df = df.drop("clean_input")

    return df


# Apply the function to the sample dataset
result_df = get_vsnr_from_string(spark_df, "input_string")

# Show the result
result_df.show(truncate=False)

+--------------------------+-------+
|input_string              |vsnr   |
+--------------------------+-------+
|AB-12 1234567 CD34        |1234567|
|00123456789XY             |0012345|
|9876543ZAB123             |9876543|
|NothingHere123            |       |
|12-1234567                |1212345|
|RandomStringWithoutNumbers|       |
|X001234567 YZ             |0012345|
+--------------------------+-------+



In [4]:
from pyspark.sql import DataFrame, Column
from pyspark.sql.functions import col
from functools import reduce
from typing import List, Union


def filter_rows_by_and_conditions(
    df: DataFrame, conditions: List[Union[Column, str]]
) -> DataFrame:
    """
    Filters a PySpark DataFrame by applying multiple conditions combined with AND logic.

    Parameters
    ----------
    df : DataFrame
        The input PySpark DataFrame to be filtered.
    conditions : List[Union[Column, str]]
        A list of conditions, where each condition is either a PySpark Column or a string
        representing a condition (e.g., "age > 30").

    Returns
    -------
    DataFrame
        The filtered DataFrame after applying all conditions combined with AND.

    Examples
    --------
    # Example 1: Filter for active users in New York with age >= 30
    conditions = [
        col("status") == "active",
        col("age") >= 30,
        col("city") == "New York"
    ]
    df_filtered = filter_rows_by_and_conditions(df, conditions)

    # Example 2: Filter for users whose city is either New York or Los Angeles
    conditions_isin = [
        col("city").isin(["New York", "Los Angeles"]),
        col("status") == "active"
    ]
    df_filtered_isin = filter_rows_by_and_conditions(df, conditions_isin)

    # Example 3: Filter for users whose city is NOT New York or Los Angeles
    conditions_is_not_in = [
        ~col("city").isin(["New York", "Los Angeles"]),
        col("status") == "active"
    ]
    df_filtered_is_not_in = filter_rows_by_and_conditions(df, conditions_is_not_in)

    # Example 4: If no conditions are provided, the original DataFrame is returned
    df_filtered_empty = filter_rows_by_and_conditions(df, [])
    """

    if not conditions:
        # If no conditions are provided, return the original DataFrame unfiltered
        return df

    # Convert any string conditions to PySpark column expressions
    condition_exprs = [
        col(condition) if isinstance(condition, str) else condition
        for condition in conditions
    ]

    # Combine all conditions with AND using reduce
    combined_condition = reduce(lambda cond1, cond2: cond1 & cond2, condition_exprs)

    # Apply the combined condition to filter the DataFrame
    return df.filter(combined_condition)


# Create sample data
data = [
    ("John Doe", 28, "active", "New York", "john.doe@gmail.com"),
    ("Jane Smith", 35, "inactive", "San Francisco", "jane.smith@yahoo.com"),
    ("Jim Brown", 40, "active", "New York", "jim.brown@gmail.com"),
    ("Lucy Black", 32, "active", "Los Angeles", "lucy.black@gmail.com"),
    ("Mike Davis", 25, "inactive", "New York", "mike.davis@hotmail.com"),
]

# Define schema
columns = ["name", "age", "status", "city", "email"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show original DataFrame
print("Original DataFrame:")
df.show()

# Example usage with various conditions
conditions = [
    col("status") == "active",  # Active users
    col("age") >= 30,  # Age greater or equal to 30
    col("city").like("New%"),  # City starts with 'New'
]

# Filtered DataFrame
df_filtered = filter_rows_by_and_conditions(df, conditions)

# Show filtered DataFrame
print("Filtered DataFrame:")
df_filtered.show()


### 1. Using "isin"
# Example: Filter for users who live in either "New York" or "Los Angeles"
conditions_isin = [
    col("city").isin(["New York", "Los Angeles"]),  # 'isin' condition
    col("status") == "active",
]

df_filtered_isin = filter_rows_by_and_conditions(df, conditions_isin)
print("Filtered DataFrame with 'isin':")
df_filtered_isin.show()

### 2. Using "is not in"
# Example: Filter for users who do not live in "New York" or "Los Angeles"
conditions_is_not_in = [
    ~col("city").isin(
        ["New York", "Los Angeles"]
    ),  # 'is not in' condition using '~' (negation)
    col("status") == "active",
]

df_filtered_is_not_in = filter_rows_by_and_conditions(df, conditions_is_not_in)
print("Filtered DataFrame with 'is not in':")
df_filtered_is_not_in.show()

# Additional Test Cases (Edge Cases)

### 3. Empty conditions list (edge case)
# Example: No conditions provided
df_filtered_empty = filter_rows_by_and_conditions(df, [])
print("Filtered DataFrame with no conditions (empty list):")
df_filtered_empty.show()

### 4. Condition based on column with `None` values (edge case)
# Add a `None` value to test for null handling
data_with_none = [
    ("John Doe", None, "active", "New York", "john.doe@gmail.com"),
    ("Jane Smith", 35, "inactive", "San Francisco", "jane.smith@yahoo.com"),
    ("Jim Brown", 40, "active", "New York", "jim.brown@gmail.com"),
    ("Lucy Black", 32, "active", "Los Angeles", "lucy.black@gmail.com"),
    ("Mike Davis", None, "inactive", "New York", "mike.davis@hotmail.com"),
]

df_with_none = spark.createDataFrame(data_with_none, columns)

# Example: Filter for users where age is not null and status is 'active'
conditions_none = [
    col("age").isNotNull(),  # Check for non-null ages
    col("status") == "active",
]

df_filtered_none = filter_rows_by_and_conditions(df_with_none, conditions_none)
print("Filtered DataFrame with non-null age:")
df_filtered_none.show()

### 5. Invalid column name (edge case)
# Example: Filter with an invalid column name
try:
    conditions_invalid = [
        col("non_existent_column") == "some_value"  # Non-existent column
    ]
    df_filtered_invalid = filter_rows_by_and_conditions(df, conditions_invalid)
    df_filtered_invalid.show()
except Exception as e:
    print(f"Error filtering with invalid column: {e}")

Original DataFrame:
+----------+---+--------+-------------+--------------------+
|      name|age|  status|         city|               email|
+----------+---+--------+-------------+--------------------+
|  John Doe| 28|  active|     New York|  john.doe@gmail.com|
|Jane Smith| 35|inactive|San Francisco|jane.smith@yahoo.com|
| Jim Brown| 40|  active|     New York| jim.brown@gmail.com|
|Lucy Black| 32|  active|  Los Angeles|lucy.black@gmail.com|
|Mike Davis| 25|inactive|     New York|mike.davis@hotmai...|
+----------+---+--------+-------------+--------------------+

Filtered DataFrame:
+---------+---+------+--------+-------------------+
|     name|age|status|    city|              email|
+---------+---+------+--------+-------------------+
|Jim Brown| 40|active|New York|jim.brown@gmail.com|
+---------+---+------+--------+-------------------+

Filtered DataFrame with 'isin':
+----------+---+------+-----------+--------------------+
|      name|age|status|       city|               email|
+--

In [5]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from typing import Union
from datetime import date


def filter_rows_by_date_range(
    df: DataFrame,
    date_column: str,
    start_date: Union[str, date],
    end_date: Union[str, date],
) -> DataFrame:
    """
    Filters a PySpark DataFrame to include only rows where the date_column is within the specified date range.

    Parameters
    ----------
    df : DataFrame
        The input PySpark DataFrame to be filtered.
    date_column : str
        The name of the column containing date values to be filtered.
    start_date : Union[str, date]
        The start date of the range (inclusive). Can be a string in 'YYYY-MM-DD' format or a datetime.date object.
    end_date : Union[str, date]
        The end date of the range (inclusive). Can be a string in 'YYYY-MM-DD' format or a datetime.date object.

    Returns
    -------
    DataFrame
        The filtered DataFrame where the date_column is within the specified range.

    Examples
    --------
    # Filter rows where the 'event_date' is between '2023-01-01' and '2023-12-31'
    df_filtered = filter_rows_by_date_range(df, "event_date", "2023-01-01", "2023-12-31")
    """

    return df.filter((col(date_column) >= start_date) & (col(date_column) <= end_date))

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `event_date` cannot be resolved. Did you mean one of the following? [`name`, `age`, `email`, `status`, `city`].;
'Filter (('event_date >= 2023-01-01) AND ('event_date <= 2023-12-31))
+- LogicalRDD [name#292, age#293L, status#294, city#295, email#296], false


In [27]:
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, trim, udf
from pyspark.sql.types import StringType
from typing import Optional


def extract_contract_number(df: DataFrame, column: str) -> DataFrame:
    """
    Cleans and extracts the contract number from a given column in a PySpark DataFrame.

    The contract number will be formatted as 'XXCC-NNNNNNN' or 'XXC-NNNNNNN' if there's only one letter.
    If the contract number cannot be extracted, the result will be None.

    Args:
        df (DataFrame): The input PySpark DataFrame.
        column (str): The column name that contains the uncleaned contract numbers.

    Returns:
        DataFrame: The original DataFrame with an additional 'contract_number_cleaned' column that contains
                   the cleaned and formatted contract numbers.
    """

    # Clean the input by trimming spaces
    df = df.withColumn("clean_input", trim(col(column)))

    # Regular expressions for different valid patterns
    patterns = [
        # Pattern 1: Two or more digits (including leading zeros) + optional space + 1 or more letters + Seven digits
        (
            r"^(\d{1,3})\s*([A-Za-z]{1,3})(\d{7})$",
            lambda m: (
                f"{m.group(1)[-2:]}{m.group(2).strip()}-{m.group(3)}"
                if len(m.group(2).strip()) > 1
                else f"{m.group(1)[-2:]} {m.group(2).strip()}-{m.group(3)}"
            ),
        ),
        # Pattern 2: Seven digits (leading zeros optional) + 2 or 3 letters + Two digits
        (
            r"^(\d{7})([A-Za-z]{2,3})(\d{2})$",
            lambda m: f"{m.group(3)}{m.group(2)}-{m.group(1)}",
        ),
        # Pattern 3: Seven digits (with or without leading zeros) + optional dash + Two digits + 1 or more letters
        (
            r"^(\d{7})-?(\d{2})([A-Za-z]{1,3})$",
            lambda m: (
                f"{m.group(2)} {m.group(3).strip()}-{m.group(1)}"
                if len(m.group(3).strip()) == 1
                else f"{m.group(2)}{m.group(3)}-{m.group(1)}"
            ),
        ),
    ]

    def transform_contract_number(value: Optional[str]) -> Optional[str]:
        if value is None:
            return None

        # Standardize by removing any non-alphanumeric characters except spaces
        value = re.sub(r"[^A-Za-z0-9 ]", "", value)

        # Loop through the patterns and apply the first one that matches
        for pattern, formatter in patterns:
            match = re.match(pattern, value)
            if match:
                # Apply the formatter and ensure no digits are removed from the second part
                cleaned_value = formatter(match)
                return cleaned_value

        # Return None if no pattern matches
        return None

    # Register the UDF
    transform_udf = udf(transform_contract_number, StringType())

    # Apply the UDF to create the 'contract_number_cleaned' column
    df = df.withColumn("contract_number_cleaned", transform_udf(col("clean_input")))

    # Drop the intermediate 'clean_input' column
    df = df.drop("clean_input")

    return df


from pyspark.sql import SparkSession, Row

# Initialize SparkSession
spark = SparkSession.builder.master("local").appName("ContractNumberTest").getOrCreate()

# Creating a more comprehensive dataset with 15 different cases to test the function
data_15_cases = [
    {
        "uncleaned_contract_number": "003 B1234567",
        "contract_number": "003 B-1234567",
    },  # Valid with leading zero in the first part
    {
        "uncleaned_contract_number": "0234567AB230",
        "contract_number": "023AB-0234567",
    },  # Leading zero removed from the first part, second part intact
    {
        "uncleaned_contract_number": "023AB0234567",
        "contract_number": "23AB-0234567",
    },  # Leading zero in first part removed, second part intact
    {
        "uncleaned_contract_number": "123CD9876543",
        "contract_number": "123CD-9876543",
    },  # No leading zeros in second part, first part adjusted
    {
        "uncleaned_contract_number": "12EF8765432",
        "contract_number": "12EF-8765432",
    },  # No changes, correct format
    {
        "uncleaned_contract_number": "001GH8765432",
        "contract_number": "01GH-8765432",
    },  # Single leading zero in first part kept
    {
        "uncleaned_contract_number": "12 J8765432",
        "contract_number": "12 J-8765432",
    },  # Single character after digits
    {
        "uncleaned_contract_number": "045KL1234567",
        "contract_number": "45KL-1234567",
    },  # Leading zero removed from first part
    {
        "uncleaned_contract_number": "00098MN6543210",
        "contract_number": "98MN-6543210",
    },  # Multiple leading zeros in first part, removed
    {
        "uncleaned_contract_number": "56OP0987654",
        "contract_number": "56OP-0987654",
    },  # No leading zero in first part, second part intact with leading zeros
    {
        "uncleaned_contract_number": "009QR0123456",
        "contract_number": "09QR-0123456",
    },  # Leading zero in first part removed, second part intact
    {
        "uncleaned_contract_number": "7ST1234567",
        "contract_number": "07ST-1234567",
    },  # Only 1 digit before letters, zero added to make it two digits
    {
        "uncleaned_contract_number": "123UV0001234",
        "contract_number": "23UV-0001234",
    },  # Leading zero preserved in the second part
    {
        "uncleaned_contract_number": "002XY0123456",
        "contract_number": "02XY-0123456",
    },  # Leading zeros removed from the first part, second part intact
    {
        "uncleaned_contract_number": "90Z0123456",
        "contract_number": "90Z-0123456",
    },  # Valid with no changes
]

# Create a PySpark DataFrame from the dataset
df_15_cases = spark.createDataFrame([Row(**row) for row in data_15_cases])

# Apply the function you have to clean the contract numbers
df_cleaned_15_cases = extract_contract_number(df_15_cases, "uncleaned_contract_number")

# Show the cleaned DataFrame
df_cleaned_15_cases.show(truncate=False)

+-------------------------+---------------+-----------------------+
|uncleaned_contract_number|contract_number|contract_number_cleaned|
+-------------------------+---------------+-----------------------+
|003 B1234567             |003 B-1234567  |03 B-1234567           |
|0234567AB230             |023AB-0234567  |null                   |
|023AB0234567             |23AB-0234567   |23AB-0234567           |
|123CD9876543             |123CD-9876543  |23CD-9876543           |
|12EF8765432              |12EF-8765432   |12EF-8765432           |
|001GH8765432             |01GH-8765432   |01GH-8765432           |
|12 J8765432              |12 J-8765432   |12 J-8765432           |
|045KL1234567             |45KL-1234567   |45KL-1234567           |
|00098MN6543210           |98MN-6543210   |null                   |
|56OP0987654              |56OP-0987654   |56OP-0987654           |
|009QR0123456             |09QR-0123456   |09QR-0123456           |
|7ST1234567               |07ST-1234567   |7ST-1

In [63]:
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, trim, udf
from pyspark.sql.types import StringType
from typing import Optional

def extract_contract_number(df: DataFrame, column: str, 
                            running_number_length: int = 2, 
                            policy_number_length: int = 7) -> DataFrame:
    """
    Cleans and extracts the contract number from a given column in a PySpark DataFrame.
    
    The contract number will be formatted based on the provided running_number_length and policy_number_length.
    
    Args:
        df (DataFrame): The input PySpark DataFrame.
        column (str): The column name that contains the uncleaned contract numbers.
        running_number_length (int): Length of the running number (digits before the insurance division).
                                     Default is 2.
        policy_number_length (int): Length of the policy number (digits after the insurance division).
                                    Default is 7.

    Returns:
        DataFrame: The original DataFrame with an additional 'contract_number_cleaned' column 
                   that contains the cleaned and formatted contract numbers.
    """

    # Clean the input by trimming spaces
    df = df.withColumn("clean_input", trim(col(column)))

    # Adjust the regex pattern dynamically based on the lengths of running_number_length and policy_number_length
    patterns = [
        # Pattern 1: Dynamic length for running number + optional space + 1 or more letters + dynamic length policy_number
        (
            rf"^(\d{{1,{running_number_length + 1}}})\s*([A-Za-z]{{1,3}})(\d{{{policy_number_length}}})$",
            lambda m: f"{m.group(1)[-running_number_length:]}{m.group(2).strip()}-{m.group(3)}"
            if len(m.group(2).strip()) > 1 else f"{m.group(1)[-running_number_length:]} {m.group(2).strip()}-{m.group(3)}",
        ),
        # Pattern 2: policy_number_length digits + 2 or 3 letters + running_number_length digits
        (
            rf"^(\d{{{policy_number_length}}})([A-Za-z]{{2,3}})(\d{{{running_number_length}}})$",
            lambda m: f"{m.group(3)}{m.group(2)}-{m.group(1)}",
        ),
        # Pattern 3: policy_number_length digits + optional dash + running_number_length digits + 1 or more letters
        (
            rf"^(\d{{{policy_number_length}}})-?(\d{{{running_number_length}}})([A-Za-z]{{1,3}})$",
            lambda m: f"{m.group(2)} {m.group(3).strip()}-{m.group(1)}"
            if len(m.group(3).strip()) == 1 else f"{m.group(2)}{m.group(3)}-{m.group(1)}",
        ),
    ]

    def transform_contract_number(value: Optional[str]) -> Optional[str]:
        if value is None:
            return None

        # Standardize by removing any non-alphanumeric characters except spaces
        value = re.sub(r"[^A-Za-z0-9 ]", "", value)

        # Loop through the patterns and apply the first one that matches
        for pattern, formatter in patterns:
            match = re.match(pattern, value)
            if match:
                # Apply the formatter and return the cleaned value
                return formatter(match)
        
        # Return None if no pattern matches
        return None

    # Register the UDF
    transform_udf = udf(transform_contract_number, StringType())

    # Apply the UDF to create the 'contract_number_cleaned' column
    df = df.withColumn("contract_number_cleaned", transform_udf(col("clean_input")))

    # Drop the intermediate 'clean_input' column
    df = df.drop("clean_input")

    return df


from pyspark.sql import SparkSession, Row

# Initialize SparkSession
spark = SparkSession.builder.master("local").appName("ContractNumberTest").getOrCreate()

# Creating a comprehensive dataset with 15 different cases to test the dynamic function
data_dynamic_test_cases = [
    {"uncleaned_contract_number": "03 B123456712451", "contract_number": "03 B-1234567123423"},   # Default case
    {"uncleaned_contract_number": "0234567AB23", "contract_number": "23AB-0234567"},   # Leading zero removed from first part
    {"uncleaned_contract_number": "023AB0234567", "contract_number": "23AB-0234567"},  # Leading zero in first part removed
    {"uncleaned_contract_number": "123CD9876543", "contract_number": "23CD-9876543"},  # Default format
    {"uncleaned_contract_number": "12EF876543234", "contract_number": "12EF-8765432"},   # Valid without changes
    {"uncleaned_contract_number": "001GH8765432", "contract_number": "01GH-8765432"},  # Leading zeros preserved in the first part
    {"uncleaned_contract_number": "112 J8765432", "contract_number": "12 J-8765432"},   # Single character
    {"uncleaned_contract_number": "045KL1234567", "contract_number": "45KL-1234567"},  # Leading zero removed in first part
    {"uncleaned_contract_number": "00098MN6543210", "contract_number": "98MN-6543210"}, # Leading zeros removed from first part
    {"uncleaned_contract_number": "560OP0987654", "contract_number": "56OP-0987654"},   # Leading zeros preserved in second part
    {"uncleaned_contract_number": "009QR0123456", "contract_number": "09QR-0123456"},  # Leading zero in first part removed
    {"uncleaned_contract_number": "7ST1234567", "contract_number": "07ST-1234567"},    # One digit before letters, zero added
    {"uncleaned_contract_number": "123UV0001234", "contract_number": "23UV-0001234"},  # Leading zeros preserved in second part
    {"uncleaned_contract_number": "002XY0123456", "contract_number": "02XY-0123456"},  # Leading zeros in the first part removed
    {"uncleaned_contract_number": "90Z0123456", "contract_number": "90Z-0123456"},     # Valid with no changes
]

# Create a PySpark DataFrame from the dataset
df_dynamic_test_cases = spark.createDataFrame([Row(**row) for row in data_dynamic_test_cases])

# Apply the dynamic function with default running_number_length = 4, policy_number_length = 10
df_cleaned_dynamic_test_cases = extract_contract_number(df_dynamic_test_cases, "uncleaned_contract_number", running_number_length=2, policy_number_length=7)

# Show the cleaned DataFrame
df_cleaned_dynamic_test_cases.show(truncate=False)


+-------------------------+------------------+-----------------------+
|uncleaned_contract_number|contract_number   |contract_number_cleaned|
+-------------------------+------------------+-----------------------+
|03 B123456712451         |03 B-1234567123423|null                   |
|0234567AB23              |23AB-0234567      |23AB-0234567           |
|023AB0234567             |23AB-0234567      |23AB-0234567           |
|123CD9876543             |23CD-9876543      |23CD-9876543           |
|12EF876543234            |12EF-8765432      |null                   |
|001GH8765432             |01GH-8765432      |01GH-8765432           |
|112 J8765432             |12 J-8765432      |12 J-8765432           |
|045KL1234567             |45KL-1234567      |45KL-1234567           |
|00098MN6543210           |98MN-6543210      |null                   |
|560OP0987654             |56OP-0987654      |60OP-0987654           |
|009QR0123456             |09QR-0123456      |09QR-0123456           |
|7ST12

In [65]:
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, trim, udf
from pyspark.sql.types import StringType
from typing import Optional

def extract_policy_number(df: DataFrame, column: str, policy_number_length: int = 7) -> DataFrame:
    """
    Extracts the policy number from the uncleaned contract number in the specified column.
    
    The function extracts the sequence of digits based on the given policy_number_length 
    and stores the result in a new column called 'policy_number'.
    
    Args:
        df (DataFrame): Input PySpark DataFrame.
        column (str): Name of the column containing the uncleaned contract numbers.
        policy_number_length (int): Length of the policy number to extract (default: 7).

    Returns:
        DataFrame: The original DataFrame with an additional 'policy_number' column.
    """

    def extract_policy(value: Optional[str]) -> Optional[str]:
        if value is None:
            return None

        # Standardize by removing any non-alphanumeric characters except spaces
        value = re.sub(r"[^A-Za-z0-9 ]", "", value)

        # Use a regular expression to find the policy number (sequence of digits of specified length)
        pattern = rf"(\d{{{policy_number_length}}})"
        match = re.search(pattern, value)  # Extract the specified number of consecutive digits
        if match:
            return match.group(1)
        
        return None

    # Register the UDF for policy number extraction
    extract_policy_udf = udf(extract_policy, StringType())

    # Apply the UDF to extract the policy number
    df = df.withColumn("policy_number", extract_policy_udf(trim(col(column))))

    return df


# Example data
data = [
    {"uncleaned_contract_number": "23 B1234567"},
    {"uncleaned_contract_number": "1234567AB23"},
    {"uncleaned_contract_number": "1234567-23AB"},
    {"uncleaned_contract_number": "023AB1234567"},
    {"uncleaned_contract_number": "45CD7654321"},
    {"uncleaned_contract_number": "7654321CD45"},
    {"uncleaned_contract_number": "7654321-45CD"},
    {"uncleaned_contract_number": "045CD7654321"},
    {"uncleaned_contract_number": "12EF0987654"},
    {"uncleaned_contract_number": "0987654EF12"},
    {"uncleaned_contract_number": "0987654-12EF"},
    {"uncleaned_contract_number": "012EF0987654"},
    {"uncleaned_contract_number": "67GH3456789"},
    {"uncleaned_contract_number": "3456789GH67"},
    {"uncleaned_contract_number": "3456789-67GH"},
    {"uncleaned_contract_number": "067GH3456789"},
    {"uncleaned_contract_number": "34JK9876543"},
    {"uncleaned_contract_number": "9876543JK34"},
    {"uncleaned_contract_number": "9876543-34JK"},
    {"uncleaned_contract_number": "9876543-34LVB"},
    {"uncleaned_contract_number": "34LVB9876543"},
    {"uncleaned_contract_number": "34LVB 9876543"},
    {"uncleaned_contract_number": "9876543LVB34"}
]

# Create PySpark DataFrame
df = spark.createDataFrame([Row(**row) for row in data])

# Apply the function to extract policy numbers
df_with_policy_numbers = extract_policy_number(df, "uncleaned_contract_number")

# Show the resulting DataFrame with policy numbers
df_with_policy_numbers.show(truncate=False)


+-------------------------+-------------+
|uncleaned_contract_number|policy_number|
+-------------------------+-------------+
|23 B1234567              |1234567      |
|1234567AB23              |1234567      |
|1234567-23AB             |1234567      |
|023AB1234567             |1234567      |
|45CD7654321              |7654321      |
|7654321CD45              |7654321      |
|7654321-45CD             |7654321      |
|045CD7654321             |7654321      |
|12EF0987654              |0987654      |
|0987654EF12              |0987654      |
|0987654-12EF             |0987654      |
|012EF0987654             |0987654      |
|67GH3456789              |3456789      |
|3456789GH67              |3456789      |
|3456789-67GH             |3456789      |
|067GH3456789             |3456789      |
|34JK9876543              |9876543      |
|9876543JK34              |9876543      |
|9876543-34JK             |9876543      |
|9876543-34LVB            |9876543      |
+-------------------------+-------

In [67]:
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, trim, udf
from pyspark.sql.types import StringType
from typing import Optional

def extract_running_number(df: DataFrame, column: str, running_number_length: int = 2) -> DataFrame:
    """
    Extracts the running number from the uncleaned contract number in the specified column.
    
    The function extracts the sequence of digits at the start of the uncleaned contract number 
    based on the given running_number_length and stores the result in a new column called 'running_number'.
    
    Args:
        df (DataFrame): Input PySpark DataFrame.
        column (str): Name of the column containing the uncleaned contract numbers.
        running_number_length (int): Maximum length of the running number to extract (default: 2).

    Returns:
        DataFrame: The original DataFrame with an additional 'running_number' column.
    """

    def extract_running(value: Optional[str]) -> Optional[str]:
        if value is None:
            return None

        # Standardize by removing any non-alphanumeric characters except spaces
        value = re.sub(r"[^A-Za-z0-9 ]", "", value)

        # Use a regular expression to find the running number (sequence of digits at the start)
        pattern = rf"^(\d{{1,{running_number_length}}})"  # Capture the first 1 to running_number_length digits
        match = re.search(pattern, value)  # Match running number at the start
        if match:
            return match.group(1)
        
        return None

    # Register the UDF for running number extraction
    extract_running_udf = udf(extract_running, StringType())

    # Apply the UDF to extract the running number
    df = df.withColumn("running_number", extract_running_udf(trim(col(column))))

    return df

# Example data
data = [
    {"uncleaned_contract_number": "23 B1234567"},
    {"uncleaned_contract_number": "1234567AB23"},
    {"uncleaned_contract_number": "1234567-23AB"},
    {"uncleaned_contract_number": "023AB1234567"},
    {"uncleaned_contract_number": "45CD7654321"},
    {"uncleaned_contract_number": "7654321CD45"},
    {"uncleaned_contract_number": "7654321-45CD"},
    {"uncleaned_contract_number": "045CD7654321"},
    {"uncleaned_contract_number": "12EF0987654"},
    {"uncleaned_contract_number": "0987654EF12"}
]

# Create PySpark DataFrame
df = spark.createDataFrame([Row(**row) for row in data])

# Apply the function with the default running_number_length = 2
df_with_running_numbers = extract_running_number(df, "uncleaned_contract_number")

# Show the resulting DataFrame with running numbers
df_with_running_numbers.show(truncate=False)


+-------------------------+--------------+
|uncleaned_contract_number|running_number|
+-------------------------+--------------+
|23 B1234567              |23            |
|1234567AB23              |12            |
|1234567-23AB             |12            |
|023AB1234567             |02            |
|45CD7654321              |45            |
|7654321CD45              |76            |
|7654321-45CD             |76            |
|045CD7654321             |04            |
|12EF0987654              |12            |
|0987654EF12              |09            |
+-------------------------+--------------+



24/10/04 17:39:26 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 951592 ms exceeds timeout 120000 ms
24/10/04 17:39:26 WARN SparkContext: Killing executors is not supported by current scheduler.
24/10/04 17:39:26 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B