In [1]:
from pyspark.sql.functions import col, collect_set, regexp_replace, coalesce, udf, asc, sort_array, regexp_replace, substring
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

In [2]:
IBAN_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"

bban_format ={
    "DE": {
        "composition": "{bank_code:08d}{account_number:010d}",
        "country": "Germany",
        "length": 22,
        "bank_code" : [4,12],
        "account_number":[12,22]
    },

}

class Iban(object):

    # Note that the 97 divisor is a constant used in the algorithm for computing 
    # the check digits of the IBAN. The check digits are the last two digits of the IBAN, 
    # and they are computed by replacing the letters in the IBAN with their 
    # corresponding digits, concatenating the resulting string, 
    # and computing the remainder of the resulting number divided by 97. 
    @classmethod
    def validate_iban(cls, iban):
        # check if country code is valid,
        # then check if iban length is correct for that country
        length_for_country = bban_format.get(iban[:2],{}).get("length", 0)
        if not length_for_country or len(iban) != length_for_country:
            return False
        return cls._to_base_10_Str(iban) % 97 == 1
    
    @classmethod
    def _to_base_10_Str(cls, iban):
        # move first 4 chars to the end
        result = iban[4:]+iban[:4]
        # 0->0, 1->1, ..., A -> 10, B->11, ..., Z->35
        result = "".join(str(IBAN_ALPHABET.index(c)) for c in result)
        # cut leading zeros
        return int(result)

    # The check digits are then obtained by subtracting the remainder from 98 
    # and padding the result with a leading zero if necessary.
    @classmethod
    def _iban_check_sum(cls, country_code, bban):
        check_string = bban + country_code + "00"
        check_string = "".join(str(IBAN_ALPHABET.index(c)) for c in check_string)
        check_sum = 98 - int(check_string) % 97
        return "%02d" % check_sum
    

# - `^` - Matches the start of the string.
# - `([A-Z]{2})` - Matches any two uppercase letters. This is the country code of the IBAN.
# - `([0-9]{20})` - Matches any 20 digits. This is the account number of the IBAN.
# Taken together, this regular expression matches a string that starts with a 
# two-letter country code, followed by 20 digits for the account number. 
# The regular expression ensures that the IBAN string is well-formed and has the correct structure.
# The `&` operator is used to combine this regular expression with another regular 
# expression that checks the length of the IBAN string. 
# The second regular expression `col('iban').rlike('^.{22}')` 
# matches any string that is exactly 22 characters long. 
# This ensures that the IBAN string is the correct length.
#  Note that this regular expression does not validate the actual content of the IBAN, 
# such as whether the country code is valid or whether the check digits are correct. 
# It only checks that the IBAN has the correct structure and length.
def validate_iban(iban_col):
    # TODO: Implement IBAN validation using a library or service
    return iban_col.rlike('^([A-Z]{2})([0-9]{20})') & (col('iban').rlike('^.{22}'))

In [3]:
# Create a SparkSession
spark = SparkSession.builder.appName("Counterparty Data").getOrCreate()

# Define the schema for the input CSV file
schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("iban", StringType(), True)
    ])

# Load the CSV file into a Spark DataFrame
try:
    df = spark.read.csv('source_1_2_2_1.csv', header=True, schema=schema)
except Exception as e:
    print(f"Failed to load input file {input_path}: {e}")

In [4]:
# show df
df.show(truncate=False)

+---+-----------------------+---------------------------+
|id |name                   |iban                       |
+---+-----------------------+---------------------------+
|1  |Puls Technologies      |DE89 3704 0044 0532 0130 00|
|2  |Widget Corp            |DE93 1001 0010 0850 8833 10|
|3  |Data Dynamics          |DE75 1007 0024 0944 3786 00|
|4  |Invalid Corp           |INVALID                    |
|5  |Puls Technologies      |DE89 3704 0044 0532 0130 00|
|6  |Widget Corp.           |DE93 1001 0010 0850 8833 10|
|7  |Data Dynamics Inc      |DE75 1007 0024 0944 3786 00|
|8  |Puls Technologies GmbH |DE89 3704 0044 0532 0130 00|
|9  |Advanced Analytics     |DE71 1002 0890 0027 1738 12|
|10 |Dynamic Data           |DE14 1007 0000 0077 2277 00|
|11 |Widget Corp            |DE93 1001 0010 0850 8833 10|
|12 |Dynamic Data           |DE14 1007 0000 0077 2277 01|
|13 |Puls Technologies      |DE89 3704 0044 0532 0130 01|
|14 |Advanced Analytics Corp|DE71 1002 0890 0027 1738 13|
|15 |Data Dyna

In [5]:
df_1 = df.withColumn("iban_clean", regexp_replace(col("iban"), " ", ""))

In [6]:
df_1.show(truncate=False)

+---+-----------------------+---------------------------+----------------------+
|id |name                   |iban                       |iban_clean            |
+---+-----------------------+---------------------------+----------------------+
|1  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|
|2  |Widget Corp            |DE93 1001 0010 0850 8833 10|DE93100100100850883310|
|3  |Data Dynamics          |DE75 1007 0024 0944 3786 00|DE75100700240944378600|
|4  |Invalid Corp           |INVALID                    |INVALID               |
|5  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|
|6  |Widget Corp.           |DE93 1001 0010 0850 8833 10|DE93100100100850883310|
|7  |Data Dynamics Inc      |DE75 1007 0024 0944 3786 00|DE75100700240944378600|
|8  |Puls Technologies GmbH |DE89 3704 0044 0532 0130 00|DE89370400440532013000|
|9  |Advanced Analytics     |DE71 1002 0890 0027 1738 12|DE71100208900027173812|
|10 |Dynamic Data           

In [7]:
# Define a UDF that applies the Iban.validate_iban() method to a column
validate_iban_udf = udf(lambda iban: Iban.validate_iban(iban), BooleanType())

In [8]:
df_1 = df_1.withColumn('is_valid', validate_iban_udf(col('iban_clean')))

In [9]:
df_sorted = df_1.orderBy(asc('iban'), asc('name'))

In [10]:
df_sorted.show(truncate=False)

+---+-----------------------+---------------------------+----------------------+--------+
|id |name                   |iban                       |iban_clean            |is_valid|
+---+-----------------------+---------------------------+----------------------+--------+
|10 |Dynamic Data           |DE14 1007 0000 0077 2277 00|DE14100700000077227700|false   |
|12 |Dynamic Data           |DE14 1007 0000 0077 2277 01|DE14100700000077227701|true    |
|22 |Dynamic Data           |DE14 1007 0000 0077 2277 02|DE14100700000077227702|false   |
|24 |Dynamic Data           |DE14 1007 0000 0077 2277 03|DE14100700000077227703|false   |
|9  |Advanced Analytics     |DE71 1002 0890 0027 1738 12|DE71100208900027173812|false   |
|14 |Advanced Analytics Corp|DE71 1002 0890 0027 1738 13|DE71100208900027173813|false   |
|21 |Advanced Analytics     |DE71 1002 0890 0027 1738 14|DE71100208900027173814|false   |
|26 |Advanced Analytics Corp|DE71 1002 0890 0027 1738 15|DE71100208900027173815|false   |
|3  |Data 

### we have artificial data and for now we will check only if iban contains first 2 letters of country code and last digital

df_2 = df_1.withColumn('is_valid', validate_iban(col('iban')))

In [11]:
 # Drop rows with invalid IBANs
df_2 = df_1.withColumn('is_valid', validate_iban(col('iban_clean')))

In [12]:
df_2.show(truncate=False)

+---+-----------------------+---------------------------+----------------------+--------+
|id |name                   |iban                       |iban_clean            |is_valid|
+---+-----------------------+---------------------------+----------------------+--------+
|1  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |
|2  |Widget Corp            |DE93 1001 0010 0850 8833 10|DE93100100100850883310|true    |
|3  |Data Dynamics          |DE75 1007 0024 0944 3786 00|DE75100700240944378600|true    |
|4  |Invalid Corp           |INVALID                    |INVALID               |false   |
|5  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |
|6  |Widget Corp.           |DE93 1001 0010 0850 8833 10|DE93100100100850883310|true    |
|7  |Data Dynamics Inc      |DE75 1007 0024 0944 3786 00|DE75100700240944378600|true    |
|8  |Puls Technologies GmbH |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |
|9  |Advan

In [14]:
  # Drop rows with invalid IBANs
df_3 = df_2.filter(col('is_valid'))

In [15]:
df_3.show(truncate=False)

+---+-----------------------+---------------------------+----------------------+--------+
|id |name                   |iban                       |iban_clean            |is_valid|
+---+-----------------------+---------------------------+----------------------+--------+
|1  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |
|2  |Widget Corp            |DE93 1001 0010 0850 8833 10|DE93100100100850883310|true    |
|3  |Data Dynamics          |DE75 1007 0024 0944 3786 00|DE75100700240944378600|true    |
|5  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |
|6  |Widget Corp.           |DE93 1001 0010 0850 8833 10|DE93100100100850883310|true    |
|7  |Data Dynamics Inc      |DE75 1007 0024 0944 3786 00|DE75100700240944378600|true    |
|8  |Puls Technologies GmbH |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |
|9  |Advanced Analytics     |DE71 1002 0890 0027 1738 12|DE71100208900027173812|true    |
|10 |Dynam

In [16]:
df_3 = df_3.withColumn('bank_account', substring(col('iban_clean'), 5, 16).cast(StringType()))
df_3 = df_3.withColumn('subaccount', substring(col('iban_clean'), 21, 2).cast(StringType()))

In [17]:
df_3.show(truncate=False)

+---+-----------------------+---------------------------+----------------------+--------+----------------+----------+
|id |name                   |iban                       |iban_clean            |is_valid|bank_account    |subaccount|
+---+-----------------------+---------------------------+----------------------+--------+----------------+----------+
|1  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |3704004405320130|00        |
|2  |Widget Corp            |DE93 1001 0010 0850 8833 10|DE93100100100850883310|true    |1001001008508833|10        |
|3  |Data Dynamics          |DE75 1007 0024 0944 3786 00|DE75100700240944378600|true    |1007002409443786|00        |
|5  |Puls Technologies      |DE89 3704 0044 0532 0130 00|DE89370400440532013000|true    |3704004405320130|00        |
|6  |Widget Corp.           |DE93 1001 0010 0850 8833 10|DE93100100100850883310|true    |1001001008508833|10        |
|7  |Data Dynamics Inc      |DE75 1007 0024 0944 3786 00

In [18]:
# accumulate the names, ibans and subaccounts
def resolve_entities(df):
    return df.groupBy('bank_account').agg(sort_array(collect_set('subaccount')).alias('subaccount_list'), sort_array(collect_set('name')).alias('name_list'), sort_array(collect_set('iban')).alias('iban_list'))

In [19]:
   
# Entity Resolution
df_3 = resolve_entities(df_3)

In [20]:
df_3.show(truncate=False)   

+----------------+------------------------+---------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|bank_account    |subaccount_list         |name_list                                    |iban_list                                                                                                                                                                     |
+----------------+------------------------+---------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1002089000271738|[12, 13, 14, 15]        |[Advanced Analytics, Advanced Analytics Corp]|[DE71 1002 0890 0027 1738 12, DE71 1002 0890 0027 1738 13, DE71 1002 0890 0027 1738 14, DE71 1002 0890 0027 1738 15]

In [21]:
df_3.printSchema()

root
 |-- bank_account: string (nullable = true)
 |-- subaccount_list: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- name_list: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- iban_list: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [22]:
spark.stop()