In [46]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
import logging
from pyspark.sql.functions import regexp_replace, col, when, sum as spark_sum, to_date, window, lit
import pandas as pd


In [47]:
# Create a logger
logger = logging.getLogger('pyspark_project')
logger.setLevel(logging.INFO)  # Set the logging level

# Create a file handler that logs messages to a file
file_handler = logging.FileHandler('spark_project.log')
file_handler.setLevel(logging.INFO)  # Set the handler level

# Create a formatter and set it for the handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# Add the handler to the logger
logger.addHandler(file_handler)

In [48]:
##importing all the UDF functions
from fetch import get_all_companies_df, \
get_company_df,\
get_exchange_rates_df,\
get_all_sepa_transactions_df,\
get_all_swift_transactions_df

In [49]:
#All the pandas dataframes
limit = 1000

print("Company Details")
display(get_all_companies_df()) #this will fetch all the companies thus without any set limit

# print("Company Details by ID")
# display(get_company_df())

print("Exchange Rates")
display(get_exchange_rates_df())

print("Currency EUR")
display(get_all_sepa_transactions_df(1000))

print("Currency USD")
display(get_all_swift_transactions_df(limit))


Company Details


Unnamed: 0,id,ibans,name,address
0,1,['FR5875918640140818391314958'],Legendre Maury S.A.,Flat 6\nSara ford\nClarkstad\nW3 3XF
1,2,['GB51SJEV83747591864014'],Poirier S.A.R.L.,66 Gallagher river\nWest Jacobburgh\nB0E 5TY
2,3,['DE14065609541598144038'],Bourdon S.A.,"62, boulevard de Imbert\n52593 Hebert-les-Bains"
3,4,['DE77193919112673928344'],Samson,Nisapad 85\n2112 OC\nKootstertille
4,5,['NL28SZUK8962850954'],Trespa,rue Carpentier\n08913 David
...,...,...,...,...
95,96,['FR8971028934697447354528337'],Cameron-Buckley,"70, chemin Mathieu\n61934 Diallo-les-Bains"
96,97,['GB03FIGW90406560954159'],Fischer,Matthiasstraat 87\n4571 JK\nRhoon
97,98,['NL64VYXQ3747591864'],Scholz AG,"14, chemin Alice Guyot\n25452 Perrin"
98,99,['DE25158328581042042324'],Franke,Döhnring 864\n53661 Soltau


Exchange Rates


Unnamed: 0,currency,usd_rate,eur_rate
0,EUR,1.129,1.0
1,GBP,1.3469,0.88048
2,USD,1.0,1.0545
3,CHF,0.9188,0.9879


Currency EUR


Unnamed: 0,id,payer,receiver,amount,currency,timestamp
0,321655fb-baaa-4c1f-9db5-a18e6dcf2a9b,DE28769442194283747591,DE83269553931086167923,46698.5370,EUR,2021-01-05T23:27:55.037925Z
1,c2dac74a-1429-47e3-b79a-bc45443fad91,GB14BJFP97283080256499,FR7209925764719308621072695,78540.6545,EUR,2021-01-09T16:14:41.160841Z
2,da546b40-4090-491a-b461-ef96ae7411b0,GB51SJEV83747591864014,FR6703114243904065609541598,57279.2525,EUR,2021-01-09T16:14:41.160841Z
3,f4ddd063-8b6a-423e-83be-414859ac6229,DE22852399214609282672,DE28769442194283747591,84210.2831,EUR,2021-01-13T18:35:09.5248Z
4,9370bff3-a206-413f-814f-e91f0ed45bf1,GB53SGBR62107269553931,NL86QGLJ2281132931,19508.4356,EUR,2021-01-14T05:36:06.595899Z
...,...,...,...,...,...,...
495,f074bc52-b8d8-48cd-86ff-9038120beff3,DE45935285711210464241,NL29JIUW1905724186,47891.9428,EUR,2023-05-14T16:11:16.045573Z
496,76688b98-0099-48eb-a20e-ab1e891cf0be,FR9053931086167923317090557,NL73QLRU1042042324,5004.9713,EUR,2023-05-17T09:49:54.526892Z
497,6c8e20f3-56eb-4ed7-bdff-4a34ca025ce3,GB82GLCT20327785811877,FR9053931086167923317090557,43240.1286,EUR,2023-05-27T20:28:14.926212Z
498,20329834-e639-407f-a768-53e7e50d11ba,FR6914403835422811329318493,GB52OFVC96730974809855,98827.7993,EUR,2023-05-31T16:46:22.309889Z


Currency USD


Unnamed: 0,id,sender,beneficiary,amount,currency,timestamp
0,aafad863-2487-4a47-a6ab-f2148904b49f,DE16190572418695454227,GB41QLOD12104642412552,32777.3687,USD,2021-01-01T07:50:28.308495Z
1,c0ae366c-14e5-40de-af2b-f666a5e3042f,AT571436619956067656,CH9443043897520109122,58409.7006,GBP,2021-01-01T09:24:38.686109Z
2,98517a20-7779-4822-9c77-16386c1f6feb,NL60UXDS1126739283,FR7868098366976944219428374,86500.9390,CHF,2021-01-01T16:52:30.067201Z
3,1f3a4d6b-d377-4a8f-af7a-0d35b78822fc,AT858792009978489872,AT202035283792830612,60469.1585,USD,2021-01-01T18:28:43.838303Z
4,c8a32704-f06e-48e4-8be0-0f352130d89c,ES7702940454537092395387,FI1335283792830612,60469.1585,USD,2021-01-01T18:28:43.838303Z
...,...,...,...,...,...,...
995,5ce38e5d-f793-483a-975e-4e93f9a3f934,vutmbvreqymwlu,rofx3319la48jr30k8v2b3h6n7s2a5s3n4p6t9,4072.0000,XXX111XXX,2021-03-03T15:36:09.482687Z
996,7bab3714-320b-4a5f-bf37-a0afae7be47e,AT747255153722410438,CH3394633276677060089,30880.2752,EUR,2021-03-03T18:35:48.077191Z
997,aa69a9a7-6914-47a5-a01c-e01d5e971abf,pqitvggqbjtrvh,muvq8039fn21jo45o2t0w8v5k4n8m3p7f0s2s4,5942.0000,,2021-03-03T18:35:48.077191Z
998,c954403d-1f30-4a32-9570-6c1ccaed1f3a,FR7868098366976944219428374,NL09OGWL4528337449,28419.6349,GBP,2021-03-03T18:35:48.077191Z


In [50]:
# Initialize a Spark session
try:

    spark = SparkSession.builder \
        .appName("CashFlow") \
        .getOrCreate()


# Log the Spark session start
    logger.info("Spark session started.")

except Exception as e:
    logger.error("Error: Spark session could not start: {e}")


In [51]:
##Converting to spark DataFrame
companies_all_data = get_all_companies_df()
exchange_all_data = get_exchange_rates_df()
sepa_all_data = get_all_sepa_transactions_df(limit)
swift_all_data = get_all_swift_transactions_df(limit)
 
# Create a DataFrame
all_companies_df_spark = spark.createDataFrame(companies_all_data)
exchange_df_spark = spark.createDataFrame(exchange_all_data)
sepa_df_spark = spark.createDataFrame(sepa_all_data)
swift_df_spark = spark.createDataFrame(swift_all_data)

In [52]:
##Renameing the columns
sepa_df_spark = sepa_df_spark.withColumn("Sender", col("payer"))

# Select relevant columns and rename beneficiary to receival
sepa_df_spark = sepa_df_spark.select("id", col("payer").alias("Sender"),"receiver", "amount", "currency", "timestamp")  
sepa_df_spark.show()

+--------------------+--------------------+--------------------+----------+--------+--------------------+
|                  id|              Sender|            receiver|    amount|currency|           timestamp|
+--------------------+--------------------+--------------------+----------+--------+--------------------+
|321655fb-baaa-4c1...|DE287694421942837...|DE832695539310861...| 46698.537|     EUR|2021-01-05T23:27:...|
|c2dac74a-1429-47e...|GB14BJFP972830802...|FR720992576471930...|78540.6545|     EUR|2021-01-09T16:14:...|
|da546b40-4090-491...|GB51SJEV837475918...|FR670311424390406...|57279.2525|     EUR|2021-01-09T16:14:...|
|f4ddd063-8b6a-423...|DE228523992146092...|DE287694421942837...|84210.2831|     EUR|2021-01-13T18:35:...|
|9370bff3-a206-413...|GB53SGBR621072695...|  NL86QGLJ2281132931|19508.4356|     EUR|2021-01-14T05:36:...|
|31697b46-aa31-4cc...|  NL56PCSS6306993503|FR731676155117771...| 2833.9843|     EUR|2021-01-14T18:40:...|
|3b11242e-28a0-4cd...|  NL79ORKQ1187701939|GB1

In [53]:
#Converting Swift data currencies to EUR
 
try:
    # Join the SWIFT transactions with currency rates
    joined_df = swift_df_spark.join(exchange_df_spark, on="currency", how="inner")
    
    # Convert amount to EUR
    converted_df = joined_df.withColumn("amount", col("amount").cast("float") / col("eur_rate"))
    
    # Set currency to 'EUR' for all records
    converted_df_with_eur = converted_df.withColumn("currency", lit("EUR"))
    
    # Select relevant columns and rename beneficiary to receiver
    swift_final_df = converted_df_with_eur.select(
        "id", 
        "sender", 
        col("beneficiary").alias("receiver"), 
        "amount", 
        "currency", 
        "timestamp"
    )
    
    # Show the final DataFrame with the renamed column
    swift_final_df.show()
    
    # Count the records
    swift_final_df.count()

    logger.info("SWIFT table currencies converted to EUR.")

except Exception as e:
    logger.error("Error: SWIFT table currencies conversion to EUR stopped.: {e}")



+--------------------+--------------------+--------------------+------------------+--------+--------------------+
|                  id|              sender|            receiver|            amount|currency|           timestamp|
+--------------------+--------------------+--------------------+------------------+--------+--------------------+
|98517a20-7779-482...|  NL60UXDS1126739283|FR786809836697694...| 87560.41856463205|     EUR|2021-01-01T16:52:...|
|c7c85b83-eeee-474...|AT836336939647096974|  FI3011113343325027|28577.210022522522|     EUR|2021-01-03T01:49:...|
|4f9af593-d67f-426...|GB83MWOM599511488...|DE233542281132931...| 92025.38212369673|     EUR|2021-01-03T09:17:...|
|e2cf8c97-44f4-4f2...|GB35AZDS391911267...|DE228523992146092...| 811.5851955141588|     EUR|2021-01-03T09:17:...|
|c8b0db56-9e26-484...|DE393664215183988...|DE366816761551177...|21114.639481475857|     EUR|2021-01-05T09:07:...|
|d63dce9f-ff5d-449...|ES644568241717306...|AT570709402024025968| 30323.20373393056|     

In [54]:
##Merging the swift and sepa dataframe to create a MasterDataFrame

try:
    final_merged_df = sepa_df_spark.union(swift_final_df)
    logger.info("Merging SWIFT & SEPA tables into one")  
except Exception as e:
    logger.error("Error: Merging SWIFT & SEPA tables into one failed: {e}")


# Show the merged DataFrame
print("Merged DataFrame:") 
final_merged_df.count()
final_merged_df.show()
final_merged_df.describe()

Merged DataFrame:
+--------------------+--------------------+--------------------+----------+--------+--------------------+
|                  id|              Sender|            receiver|    amount|currency|           timestamp|
+--------------------+--------------------+--------------------+----------+--------+--------------------+
|321655fb-baaa-4c1...|DE287694421942837...|DE832695539310861...| 46698.537|     EUR|2021-01-05T23:27:...|
|c2dac74a-1429-47e...|GB14BJFP972830802...|FR720992576471930...|78540.6545|     EUR|2021-01-09T16:14:...|
|da546b40-4090-491...|GB51SJEV837475918...|FR670311424390406...|57279.2525|     EUR|2021-01-09T16:14:...|
|f4ddd063-8b6a-423...|DE228523992146092...|DE287694421942837...|84210.2831|     EUR|2021-01-13T18:35:...|
|9370bff3-a206-413...|GB53SGBR621072695...|  NL86QGLJ2281132931|19508.4356|     EUR|2021-01-14T05:36:...|
|31697b46-aa31-4cc...|  NL56PCSS6306993503|FR731676155117771...| 2833.9843|     EUR|2021-01-14T18:40:...|
|3b11242e-28a0-4cd...|  NL79

DataFrame[summary: string, id: string, Sender: string, receiver: string, amount: string, currency: string, timestamp: string]

In [55]:
##Clearing the companies ibans column
 
# Use regexp_replace to remove square brackets and single quotes
cleaned_all_companies_df = all_companies_df_spark.withColumn("ibans", regexp_replace(col("ibans"), "[\\[\\]' ]", ""))
 
# Show the result
cleaned_all_companies_df.show()
cleaned_all_companies_df.describe()

+---+--------------------+--------------------+--------------------+
| id|               ibans|                name|             address|
+---+--------------------+--------------------+--------------------+
|  1|FR587591864014081...| Legendre Maury S.A.|Flat 6\nSara ford...|
|  2|GB51SJEV837475918...|    Poirier S.A.R.L.|66 Gallagher rive...|
|  3|DE140656095415981...|        Bourdon S.A.|62, boulevard de ...|
|  4|DE771939191126739...|              Samson|Nisapad 85\n2112 ...|
|  5|  NL28SZUK8962850954|              Trespa|rue Carpentier\n0...|
|  6|FR839628509541608...|    Remmers & Gruijl|Orhan-Schottin-St...|
|  7|DE192726665299152...|Houghton, Murray ...|Jakesingel 30\n94...|
|  8|GB31YQDI403835422...|      Leblanc S.A.S.|61, chemin de Fou...|
|  9|  NL06DCVI2439040656| Guillon Lesage S.A.|avenue de Lebreto...|
| 10|  NL76AWSK4159814403|      Hayes and Sons|Schomberweg 2\n55...|
| 11|DE091679668416445...|    Wagenvoort Groep|94, boulevard Mor...|
| 12|  NL28SGBR6210726955|      Fi

DataFrame[summary: string, id: string, ibans: string, name: string, address: string]

In [56]:
###Company wise current account balance 

try:
    # Join the DataFrames on ibans with sender and receiver
    # We will perform two separate joins: one for sender and one for receiver
    sender_join = final_merged_df.join(cleaned_all_companies_df, final_merged_df.Sender == cleaned_all_companies_df.ibans, "inner") \
        .select(cleaned_all_companies_df.name, final_merged_df.amount, final_merged_df.currency, final_merged_df.timestamp)

    receiver_join = final_merged_df.join(cleaned_all_companies_df, final_merged_df.receiver == cleaned_all_companies_df.ibans, "inner") \
        .select(cleaned_all_companies_df.name, final_merged_df.amount, final_merged_df.currency, final_merged_df.timestamp)

    # Rename the amount columns to indicate whether it's from sender or receiver
    sender_join = sender_join.withColumnRenamed("amount", "sent_amount")
    receiver_join = receiver_join.withColumnRenamed("amount", "received_amount")

    # Combine sender and receiver data
    combined_df = sender_join.join(receiver_join, ["name", "currency", "timestamp"], "outer")   

    # Handle null values by replacing them with 0
    combined_df = combined_df.fillna(0)

    # Calculate the total amount for each company
    master_table_df = combined_df.groupBy("name", "currency", "timestamp") \
        .agg((F.sum("sent_amount") - F.sum("received_amount")).alias("current_amount"))

    # Show the resulting master table
    master_table_df.show(truncate=False)

    logger.info("Company wise current account balance table formed.")

except Exception as e:
    logger.error("Error: Company wise current account balance table failed: {e}")

+--------------------------+--------+---------------------------+-------------------+
|name                      |currency|timestamp                  |current_amount     |
+--------------------------+--------+---------------------------+-------------------+
|Bosch & van Haspengouw    |EUR     |2021-11-12T21:48:42.752304Z|-25644.941         |
|Gröttner                  |EUR     |2022-03-20T03:46:55.566935Z|-42050.1207        |
|Hanegraaff & Hekker       |EUR     |2021-11-26T03:00:39.389898Z|50783.115          |
|Le Gall SARL              |EUR     |2021-01-30T19:27:03.43481Z |56307.1015625      |
|Poirier S.A.R.L.          |EUR     |2021-02-06T18:04:17.050639Z|-7201.8525515647225|
|Remmers & Gruijl          |EUR     |2021-01-21T02:34:02.50913Z |67464.69784391133  |
|Simpson, Thompson and Lamb|EUR     |2021-03-23T23:27:53.407815Z|25688.251          |
|Kostolzin AG              |EUR     |2023-02-18T20:51:39.519305Z|-39156.9565        |
|Kostolzin AG              |EUR     |2023-03-28T22:44:

In [57]:
# ##Current account balance sheet by company name
# from pyspark.sql import functions as F
 
# def get_company_balance(company_name):
#     try:
#         # Join the DataFrames on ibans with sender and receiver
#         sender_join = final_merged_df.join(cleaned_all_companies_df, final_merged_df.Sender == cleaned_all_companies_df.ibans, "inner") \
#             .select(cleaned_all_companies_df.name, final_merged_df.amount, final_merged_df.currency, final_merged_df.timestamp)
#         receiver_join = final_merged_df.join(cleaned_all_companies_df, final_merged_df.receiver == cleaned_all_companies_df.ibans, "inner") \
#             .select(cleaned_all_companies_df.name, final_merged_df.amount, final_merged_df.currency, final_merged_df.timestamp)
#         # Rename the amount columns to indicate whether it's from sender or receiver
#         sender_join = sender_join.withColumnRenamed("amount", "sent_amount")
#         receiver_join = receiver_join.withColumnRenamed("amount", "received_amount")
#         # Combine sender and receiver data
#         combined_df = sender_join.join(receiver_join, ["name", "currency", "timestamp"], "outer")
#         # Handle null values by replacing them with 0
#         combined_df = combined_df.fillna(0)
#         # Calculate the total amount for each company
#         master_table_df = combined_df.groupBy("name", "currency", "timestamp") \
#             .agg((F.sum("sent_amount") - F.sum("received_amount")).alias("current_amount"))
#         # Filter by the provided company name
#         company_balance_df = master_table_df.filter(F.col("name") == company_name)
#         # Show the current balance for the given company
#         if company_balance_df.count() > 0:
#             company_balance_df.show(truncate=False)
#             logger.info(f"Current balance for {company_name} displayed.")
        
#         else:
#             logger.info(f"No balance found for {company_name}.")
#     except Exception as e:
#         logger.error(f"Error: Unable to fetch balance for {company_name}: {str(e)}")
 
# # Example usage:
# get_company_balance("Didier")

In [58]:
##Country-code to Country mapping
# Path to the CSV file
path = r'C:\Work\Freelancing\1\CashFlow\data\IBAN_country_code_map.csv'
 
# Read the CSV into a Pandas DataFrame
country_code_df_pd = pd.read_csv(path)
 
# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Create DataFrame Example") \
    .getOrCreate()
 
# Convert Pandas DataFrame to Spark DataFrame
all_countries_df_spark = spark.createDataFrame(country_code_df_pd)
 
# Show the Spark DataFrame
all_countries_df_spark.show()
all_countries_df_spark.describe()

+-------------------+------------+
|            Country|Alpha-2_code|
+-------------------+------------+
|        Afghanistan|          AF|
|            Albania|          AL|
|            Algeria|          DZ|
|     American Samoa|          AS|
|            Andorra|          AD|
|             Angola|          AO|
|           Anguilla|          AI|
|         Antarctica|          AQ|
|Antigua and Barbuda|          AG|
|          Argentina|          AR|
|            Armenia|          AM|
|              Aruba|          AW|
|          Australia|          AU|
|            Austria|          AT|
|         Azerbaijan|          AZ|
|      Bahamas (the)|          BS|
|            Bahrain|          BH|
|         Bangladesh|          BD|
|           Barbados|          BB|
|            Belarus|          BY|
+-------------------+------------+
only showing top 20 rows



DataFrame[summary: string, Country: string, Alpha-2_code: string]

In [59]:
##List of countries the client transacts within

try:

    # Extract the first two characters from Sender and Receiver as country codes
    final_merged_df = final_merged_df.withColumn("Sender_Country_Code", F.substring(F.col("Sender"), 1, 2))
    final_merged_df = final_merged_df.withColumn("Receiver_Country_Code", F.substring(F.col("Receiver"), 1, 2))

    # Perform the join on Sender's country code and Alpha-2 code
    sender_joined_df = final_merged_df.join(
        all_countries_df_spark,
        final_merged_df["Sender_Country_Code"] == all_countries_df_spark["Alpha-2_code"],
        "inner"
    ).select("id", "Sender", "Receiver", "amount", "currency", "timestamp", "Country")

    # Perform the join on Receiver's country code and Alpha-2 code
    receiver_joined_df = final_merged_df.join(
        all_countries_df_spark,
        final_merged_df["Receiver_Country_Code"] == all_countries_df_spark["Alpha-2_code"],
        "inner"
    ).select("id", "Sender", "Receiver", "amount", "currency", "timestamp", "Country")

    # Union the results from both Sender and Receiver joins and get distinct countries
    transacting_countries_df = sender_joined_df.union(receiver_joined_df).select("Country").distinct()

    # Show the final list of distinct countries where the client transacts
    transacting_countries_df.show()
    logger.info("List of countries the client transacts within table formed")

except Exception as e:
    logger.error("Error: List of countries the client transacts within table failed: {e}")

+--------------------+
|             Country|
+--------------------+
|United Kingdom of...|
|             Germany|
|              France|
|             Finland|
|   Netherlands (the)|
|               Spain|
|         Switzerland|
|             Austria|
+--------------------+



In [60]:
# ## Creating Master dataset
# from pyspark.sql import functions as F

# try:

#     # Step 1: Join final_merged_df with cleaned_all_companies_df for the sender
#     sender_join = final_merged_df.join(cleaned_all_companies_df, final_merged_df.Sender == cleaned_all_companies_df.ibans, "inner") \
#         .select(cleaned_all_companies_df.name.alias("Sender_name"), final_merged_df.amount.alias("sent_amount"), final_merged_df.currency, final_merged_df.timestamp)

#     # Step 2: Join final_merged_df with cleaned_all_companies_df for the receiver
#     receiver_join = final_merged_df.join(cleaned_all_companies_df, final_merged_df.receiver == cleaned_all_companies_df.ibans, "inner") \
#         .select(cleaned_all_companies_df.name.alias("Receiver_name"), final_merged_df.amount.alias("received_amount"), final_merged_df.currency, final_merged_df.timestamp)

#     # Step 3: Combine sender and receiver data into one DataFrame
#     combined_df = sender_join.join(receiver_join, ["currency", "timestamp"], "outer")

#     # Step 4: Handle null values (if any) by replacing them with 0
#     combined_df = combined_df.fillna({"sent_amount": 0, "received_amount": 0})

#     # Step 5: Calculate the total balance for each company
#     master_table_df = combined_df.groupBy("Sender_name", "Receiver_name", "currency", "timestamp") \
#         .agg(
#             F.sum("sent_amount").alias("total_sent"),
#             F.sum("received_amount").alias("total_received"),
#             (F.sum("received_amount") - F.sum("sent_amount")).alias("balance")
#         )

#     # Show the resulting master table
#     master_table_df.show(truncate=False)

#     logger.info("Master dataset created.")    

# except Exception as e:
#     logger.error("Error: Master dataset creation failed: {e}")    


In [61]:
## Creating Master dataset

try:
    # Initialize Spark session
    spark = SparkSession.builder.appName("HistoricalDataAnalysis").getOrCreate()
    
    # Step 1: Join transactions with company details based on sender and receiver IBANs
    # Rename the `id` columns to avoid ambiguity
    final_merged_df = final_merged_df.withColumnRenamed("id", "transaction_id")
    cleaned_all_companies_df = cleaned_all_companies_df.withColumnRenamed("id", "company_id")
    
    # Join on sender IBAN to match sending company
    joined_sender = final_merged_df.join(cleaned_all_companies_df, final_merged_df.Sender == cleaned_all_companies_df.ibans, "left").select(
        col("transaction_id"),
        col("Sender"),
        col("receiver"),
        col("amount"),
        col("currency"),
        col("timestamp"),
        col("company_id").alias("sender_company_id"),
        col("name").alias("sender_name"),
        col("address").alias("sender_address")
    )

    # joined_sender.show()
    
    # Join on receiver IBAN to match receiving company
    joined_receiver = joined_sender.join(cleaned_all_companies_df, joined_sender.receiver == cleaned_all_companies_df.ibans, "left").select(
        col("transaction_id"),
        col("Sender"),
        col("receiver"),
        col("amount").cast("double"),
        col("currency"),
        col("timestamp"),
        col("sender_company_id"),
        col("sender_name"),
        col("sender_address"),
        col("company_id").alias("receiver_company_id"),
        col("name").alias("receiver_name"),
        col("address").alias("receiver_address")
    )
    

    # joined_receiver.show()

    # Step 2: Define logic for balance calculation
    transactions_with_balance = joined_receiver.withColumn(
        "balance_change", when(col("Sender").isNotNull(), -col("amount")).otherwise(col("amount"))
    )
    
    # Step 3: Convert timestamp to customizable time intervals
    transactions_with_window = transactions_with_balance.withColumn(
        "date", to_date(col("timestamp"))
    ).groupBy(
        "receiver", "sender", "currency", window(col("timestamp"), "1 day").alias("time_window")
    ).agg(
        spark_sum("balance_change").alias("daily_balance_change")
    )
    
    # Step 4: Calculate cumulative balance
    from pyspark.sql.window import Window
    from pyspark.sql.functions import sum as cumulative_sum
    
    # Define a window partitioned by account (IBAN) and ordered by date
    window_spec = Window.partitionBy("receiver").orderBy("time_window.start")
    
    # Calculate cumulative balance for each account (receiver)
    master_table = transactions_with_window.withColumn(
        "cumulative_balance", cumulative_sum("daily_balance_change").over(window_spec)
    )
    

    master_table = master_table.withColumn(
        "time_window_start", F.col("time_window.start").cast("string")
    ).withColumn(
        "time_window_end", F.col("time_window.end").cast("string")
    )
    master_table = master_table.drop("time_window")

    # Step 5: Show or save the final master table with historical balances
    master_table = master_table.select(
        "Sender","receiver", "time_window_start", "time_window_end", "daily_balance_change", "cumulative_balance", "currency"
    )

    print(type(master_table))
    master_table.show()

    logger.info("Master dataset created.")    

except Exception as e:
    logger.error("Error: Master dataset creation failed: {e}")  

<class 'pyspark.sql.dataframe.DataFrame'>
+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------+
|              Sender|            receiver|  time_window_start|    time_window_end|daily_balance_change| cumulative_balance|currency|
+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------+
|  FI6504642412552733|AT032800425799090512|2021-02-11 05:30:00|2021-02-12 05:30:00|  -2660.166764506292| -2660.166764506292|     EUR|
|CH929168662377097...|AT047462380110915427|2021-02-25 05:30:00|2021-02-26 05:30:00|  -85810.49186239323| -85810.49186239323|     EUR|
|AT245285711210464241|AT062552733074758078|2021-01-09 05:30:00|2021-01-10 05:30:00|  -82799.61755997571| -82799.61755997571|     EUR|
|AT867175208098368478|AT067695026513847278|2021-02-03 05:30:00|2021-02-04 05:30:00|  -8622.960873443668| -8622.960873443668|     EUR|
|ES815217620944239..

In [62]:
##Converting Spark dataframe to Pandas dataframe & Loading it into a csv
try:
    master_table_pandas_df = master_table.toPandas()
    logger.info("Converted master dataset from Spark dataframe to Pandas dataframe successfully.")
except Exception as e:
    logger.error("Error: Failed to convert spark df to Pandas df: {e}")  

try:
    file_path = "C:/Work/Freelancing/1/CashFlow/output/pandas_data.csv"
    # Write the Pandas DataFrame to a CSV file
    master_table_pandas_df.to_csv(file_path, index=False)
    logger.info("Loaded master dataset to csv successfully.")

except Exception as e:
    logger.error("Failed to load master dataset to csv: {e}")