In [None]:
# Read data once
df1 = pd.read_excel('test_4.xlsx')

# Create schema for Spark DataFrame
df1_schema = StructType([StructField(col_name, StringType(), True) for col_name in df1.columns])

# Create Spark DataFrame
df = spark.createDataFrame(df1, schema=df1_schema)

# Assuming df and dummy_transactions_df are already defined and have a column 'txn_ref_no'

# Load the provided CSV files
# Load additional CSV files for the second part of the logic
cerlic_df = pd.read_excel('Modified_Cerlic_Banks.xlsx')
cerlic_df1 = StructType([StructField(col_name, StringType(), True) for col_name in cerlic_df.columns])
cerlic_banks_df=spark.createDataFrame(cerlic_df,schema=cerlic_df1)

related_party_df = pd.read_excel('Related party statistics .xlsx')
related_party_df1 = StructType([StructField(col_name, StringType(), True) for col_name in related_party_df.columns])
related_party_df=spark.createDataFrame(related_party_df,schema=related_party_df1)

keywords_df = pd.read_csv("classification_keywords.csv")

# Load the keywords from a JSON file
with open('corporate_keywords.json', 'r') as file:
    data = json.load(file)
    corporate_keywords = data['corporate_keywords']

# Step 2: Convert the pandas DataFrame to a dictionary for keyword lookup
keywords_dict = {}
for index, row in keywords_df.iterrows():
    keywords_str = str(row['KEYWORDS']).strip()
    try:
        if keywords_str.startswith("[") and keywords_str.endswith("]"):
            keywords_list = ast.literal_eval(keywords_str)
        else:
            keywords_list = [keyword.strip() for keyword in keywords_str.split(",")]
        for keyword in keywords_list:
            keywords_dict[keyword.lower()] = (row['CATEGORY_LEVEL1'], row['CATEGORY_LEVEL2'])
    except Exception as e:
        print("Skipping this row...\n")

# Function to set category levels
def set_category_levels(base_txn_text, benef_name):
    base_txn_text = '' if base_txn_text is None else base_txn_text.lower()
    benef_name = '' if benef_name is None else benef_name.lower()
    
    for keyword, (cat_level1, cat_level2) in keywords_dict.items():
        if keyword in base_txn_text or keyword in benef_name:
            return (cat_level1, cat_level2)
    
    return ('OTHER TRANSFER', 'OTHER')

# Correcting the returnType to be StructType
schema = StructType([
    StructField("category_level1", StringType(), False),
    StructField("category_level2", StringType(), False)
])

# Register UDF for category levels
category_udf = udf(set_category_levels, schema)

def classify_transaction(benef_ifsc, benef_account_no, source, benef_name):
    def contains_corporate_keyword(name):
        return any(keyword in name.lower() for keyword in corporate_keywords)
    
    is_corporate = contains_corporate_keyword(benef_name)
    
    # Check if benef_ifsc starts with "YESB"
    if benef_ifsc and benef_ifsc.startswith("YESB"):
        # If source is not None, then use it to determine the output
        if source == 'current':
            return 'ybl_corp'
        elif source == 'saving':
            return 'ybl_ind'
        # If source is None, then decide based on is_corporate
        else:
            return 'ybl_corp' if is_corporate else 'ybl_ind'
    # If benef_ifsc does not start with "YESB"
    else:
        return 'non_ybl_cor' if is_corporate else 'non_ybl_ind'

classify_transaction_udf = udf(classify_transaction, StringType())
# Add a check to see if 'source' column exists in DataFrame and if not, add it with default None values
if 'source' not in df.columns:
    df = df.withColumn('source', lit(None))

# Perform the classification with the modified UDF that handles None source
df = df.withColumn('cor_ind_benf', classify_transaction_udf(
    col('benef_ifsc'),
    col('benef_account_no'),
    col('source'),
    col('benef_name'))
)

# First, create the 'categories' struct column
df = df.withColumn('categories', category_udf(col('base_txn_text'), col('benef_name')))

# Now, you can access the struct fields category_level1 and category_level2 from 'categories'
df = df.withColumn('category_level1', col('categories')['category_level1'])

# Define the UDF to check if all words in remitter_name are in benef_name
def all_words_present(remitter_name, benef_name):
    remitter_words = set(remitter_name.lower().split(' '))  # Split by space explicitly
    benef_words = set(benef_name.lower().split(' '))        # Split by space explicitly
    return remitter_words.issubset(benef_words)

# Register the UDF with Spark
all_words_present_udf = udf(all_words_present, BooleanType())

# Add a new column that uses the UDF to determine if category_level2 should be 'Personal Transfer'
df = df.withColumn(
    'category_level2',
    when(
        all_words_present_udf(col('remitter_name'), col('benef_name')),
        'PERSONAL TRANSFER'
    ).otherwise(
        col('categories')['category_level2']
    )
)

df = df.drop('categories', 'source')


# Define clean string UDF
def clean_string(s):
    return s.replace(" ", "").lower() if s else ""
clean_string_udf = udf(clean_string, StringType())

# Clean data
cerlic_banks_df = cerlic_banks_df.withColumn("Borrower Name", clean_string_udf(cerlic_banks_df["Borrower Name"]))
cerlic_banks_df = cerlic_banks_df.withColumn("Name of Lender Bank", clean_string_udf(cerlic_banks_df["Name of Lender Bank"]))
related_party_df = related_party_df.withColumn("Related Party Name", clean_string_udf(related_party_df["Related Party Name"]))
related_party_df = related_party_df.withColumn("Borrower Name", clean_string_udf(related_party_df["Borrower Name"]))


# Broadcast the dataframes for efficient joins
broadcast_cerlic_banks = spark.sparkContext.broadcast(cerlic_banks_df.collect())
broadcast_related_party = spark.sparkContext.broadcast(related_party_df.collect())


def classify_bank_v2(beneficiary_bank_name, remitter_name, direction, category_level1, category_level2):
    # Directly return 'N/A' if direction is 'in'
    if direction == 'in':
        return 'N/A'

    # Check for 'out' direction and specific category conditions
    if direction == 'out' and (category_level1 == 'Personal Transfer' or category_level2 in ['Personal Transfer', 'transfer self']):
        # Iterate over rows in broadcast_cerlic_banks
        for row in broadcast_cerlic_banks.value:
            # Check if remitter and beneficiary bank match
            if remitter_name == row['Borrower Name'] and beneficiary_bank_name == row['Name of Lender Bank']:
                return 'Lending Bank'
        # Return 'Non Lending Bank' if no match found
        return 'Non Lending Bank'

def classify_linkage_v2(benef_name, remitter_name, direction):
    if direction == 'in':
        return 'N/A'
    elif direction == 'out':
        for row in broadcast_related_party.value:
            if benef_name == row['Related Party Name'] and remitter_name == row['Borrower Name']:
                return row['Linkages']
    return 'None'

def flag_related_party_match(beneficiary_name, remitter_name, direction):
    if direction == 'in':
        return 'N/A'
    elif direction == 'out':
        for row in broadcast_related_party.value:
            if beneficiary_name == row['Related Party Name'] and remitter_name == row['Borrower Name']:
                return 'Y'
    return 'N'

flag_related_party_match_udf = udf(flag_related_party_match, StringType())
classify_bank_with_direction_udf = udf(classify_bank_v2, StringType())
classify_linkage_with_direction_udf = udf(classify_linkage_v2, StringType())

# Applying the flagging UDF to the dataframe
df = df.withColumn('Related Party Match',flag_related_party_match_udf(col('benef_name'), col('remitter_name'),col('direction')))
# Applying modified classification functions with direction
df = df.withColumn('Bank Classification',classify_bank_with_direction_udf(col('beneficiary_bank_name'),col('remitter_name'),col('direction'),col('category_level1'),col('category_level1')                                                                     ))
df = df.withColumn('Linkage Classification',classify_linkage_with_direction_udf(col('beneficiary_bank_name'),col('remitter_name'),col('direction')))
df.show()