In [13]:
import requests
import pandas as pd
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
sql_server_pth="/opt/sqljdbc_12.8/enu/jars/mssql-jdbc-12.8.1.jre8.jar"
spark = SparkSession.builder\
    .config("spark.jars.packages", "com.microsoft.sqlserver:mssql-jdbc:9.4.0.jre8")\
    .getOrCreate()
sc = spark.sparkContext

In [14]:
from pyspark.sql.types import *
schema=StructType([StructField('end_of_period', StringType(), True), 
                    StructField('loan_number', StringType(), True), 
                    StructField('region', StringType(), True), 
                    StructField('country_code', StringType(), True), 
                    StructField('country', StringType(), True), 
                    StructField('borrower', StringType(), True), 
                    StructField('guarantor_country_code', StringType(), True), 
                    StructField('guarantor', StringType(), True), 
                    StructField('loan_type', StringType(), True), 
                    StructField('loan_status', StringType(), True), 
                    StructField('interest_rate', DoubleType(), True), 
                    StructField('currency_of_commitment', StringType(), True),
                    StructField('project_id', StringType(), True), 
                    StructField('project_name_', StringType(), True), 
                    StructField('original_principal_amount', DoubleType(), True), 
                    StructField('cancelled_amount', DoubleType(), True), 
                    StructField('undisbursed_amount', DoubleType(), True), 
                    StructField('disbursed_amount', DoubleType(), True), 
                    StructField('repaid_to_ibrd', DoubleType(), True), 
                    StructField('due_to_ibrd', DoubleType(), True), 
                    StructField('exchange_adjustment', DoubleType(), True), 
                    StructField('borrowers_obligation', DoubleType(), True), 
                    StructField('sold_3rd_party', DoubleType(), True), 
                    StructField('repaid_3rd_party', DoubleType(), True), 
                    StructField('due_3rd_party', DoubleType(), True), 
                    StructField('loans_held', DoubleType(), True), 
                    StructField('first_repayment_date', StringType(), True), 
                    StructField('last_repayment_date', StringType(), True), 
                    StructField('agreement_signing_date', StringType(), True), 
                    StructField('board_approval_date', StringType(), True), 
                    StructField('effective_date_most_recent', StringType(), True), 
                    StructField('closed_date_most_recent', StringType(), True), 
                    StructField('last_disbursement_date', StringType(), True)])
limit=50000
offset=600000
rows = 1342181

url='https://datacatalogapi.worldbank.org/dexapps/fone/api/apiservice?datasetId=DS00975&resourceId=RS00905&type=json'

params = {
        "top": limit,
        "skip": offset
        }
response = requests.get(url, params=params)
if response.status_code == 200:
    data = response.json()
    data = data.get('data')
    df=spark.createDataFrame(data,schema=schema)


'limit=50000\noffset=0\nrows = 1342181\n\nurl=\'https://datacatalogapi.worldbank.org/dexapps/fone/api/apiservice?datasetId=DS00975&resourceId=RS00905&type=json\'\n\nparams = {\n        "top": limit,\n        "skip": offset\n        }\nresponse = requests.get(url, params=params)\nif response.status_code == 200:\n    data = response.json()\n    data = data.get(\'data\')\n    df=spark.createDataFrame(data,schema=schema)\n'

In [15]:
limit=50000
offset=0
rows = 1342181
rows_runs=600000
df = spark.createDataFrame([], schema=schema)
url='https://datacatalogapi.worldbank.org/dexapps/fone/api/apiservice?datasetId=DS00975&resourceId=RS00905&type=json'
while offset < rows_runs:
    params = {
            "top": limit,
            "skip": offset
            }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        data = data.get('data')
        if not data:
            break
        df_temp=spark.createDataFrame(data,schema=schema)
        df = df.union(df_temp)
        offset += limit

In [16]:
from pyspark.sql.functions import lower
dates=["30-Jun-2024","30-Jun-2023","30-Jun-2022","30-Jun-2021","30-Jun-2020","30-Jun-2019","30-Jun-2018","30-Jun-2017","30-Jun-2016","30-Jun-2015","30-Jun-2014"
       ,'30-Jun-2013',"30-Jun-2012","30-Jun-2011"]

columns_to_drop=['currency_of_commitment','exchange_adjustment','last_disbursement_date','agreement_signing_date','effective_date_most_recent','closed_date_most_recent']

df_filtered=df.filter(df.end_of_period.isin(dates))
df_dropped=df_filtered.drop(*columns_to_drop)



# Get all string columns
string_columns = [field.name for field in df_dropped.schema.fields if (field.dataType== StringType())]
df_lower=df_dropped
# Convert all string columns to uppercase
for col_name in string_columns:
    df_lower = df_lower.withColumn(col_name, lower(df_dropped[col_name]))


In [17]:
import csv

# Open the CSV file
with open('Status_Cleaning.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    status = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        status[key] = value

# Print the result
print(status)

with open('loan_status_BK.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    loan_status_bk = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        loan_status_bk[key] = value
print(loan_status_bk)

#####################################################################################

with open('Type_Cleaning.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    type_dict = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        type_dict[key] = value

# Print the result
print(type_dict)

with open('loan_type_BK.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    loan_type_bk = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        loan_type_bk[key] = value
print(loan_type_bk)

#####################################################################################


with open("Countries_Cleaning.csv", mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    countries = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        countries[key] = value

# Print the result
print(countries)

with open('country_BK.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    country_bk = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        country_bk[key] = value
print(country_bk)

#####################################################################################

with open('Regions_Cleaning.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    regions = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        regions[key] = value
print(regions)

with open('regions_BK.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    regions_Bk = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        regions_Bk[key] = value
print(regions_Bk)

#####################################################################################


with open('Borrower_cleaning.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    borrower_dict = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        borrower_dict[key] = value
print(borrower_dict)

with open('borrower_BK_updated.csv', mode='r', newline='', encoding='utf-8') as file:
    # Create a DictReader object
    csv_reader = csv.reader(file)
    borrower_bk = {}
    next(csv_reader)
    for row in csv_reader:
        key = row[0].lower()   # Assuming the first column is the key
        value = row[1].lower() # Assuming the second column is the value
        borrower_bk[key] = value
print(borrower_bk)

{'repaid': 'fully repaid', 'cancelled': 'fully cancelled', 'disbursed': 'fully disbursed'}
{'disbursing': '0', 'disbursing&repaying': '1', 'effective': '2', 'fully cancelled': '3', 'fully disbursed': '4', 'fully repaid': '5', 'fully transferred': '6', 'repaying': '7', 'terminated': '8'}
{'scp eur': 'scp', 'scp jpy': 'scp', 'scp usd': 'scp', 'npl': 'non pool'}
{'cpl': '0', 'fsl': '1', 'non pool': '2', 'pool loan': '3', 'scl': '4', 'scp': '5', 'scpd': '6', 'scpm': '7', 'sngl crncy': '8'}
{'turkiye': 'turkey', 'macedonia, former yugoslav republic': 'macedonia', 'macedonia, former yugoslav republic of': 'macedonia', 'north macedonia': 'macedonia', 'czechia': 'czech republic', 'viet nam': 'vietnam', 'israel': 'zionist state'}
{'albania': '0', 'algeria': '1', 'angola': '2', 'antigua and barbuda': '3', 'argentina': '4', 'armenia': '5', 'australia': '6', 'austria': '7', 'azerbaijan': '8', 'bahamas, the': '9', 'bangladesh': '10', 'barbados': '11', 'belarus': '12', 'belgium': '13', 'belize': '14

In [18]:
from pyspark.sql.functions import lit, create_map,col, when
#mapping_status = create_map([lit(x) for pair in status.items() for x in pair])
df_status=df_lower.replace(status,subset=['loan_status']).replace(type_dict,subset=['loan_type'])
df_countries_cleaned=df_status.replace(countries,subset=['country','guarantor'])
df_regions_cleaned=df_countries_cleaned.replace(regions,subset=['region'])


df_borrower_cleaned = df_regions_cleaned
for country, borrower in borrower_dict.items():
    df_borrower_cleaned = df_borrower_cleaned.withColumn(
        "borrower",
        when(col("country") == country, borrower).otherwise(col("borrower"))
    )


In [19]:
country_BK_map = create_map([lit(x) for pair in country_bk.items() for x in pair])
borrower_BK_map = create_map([lit(x) for pair in borrower_bk.items() for x in pair])
regions_Bk_map = create_map([lit(x) for pair in regions_Bk.items() for x in pair])
loan_status_Bk_map = create_map([lit(x) for pair in loan_status_bk.items() for x in pair])
loan_type_Bk_map = create_map([lit(x) for pair in loan_type_bk.items() for x in pair])


df_null_filled=df_borrower_cleaned.na.fill('not_specified',subset=['borrower','guarantor'])

df_regions_bk=df_null_filled.withColumn("region_BK", regions_Bk_map[col("region")])
df_countries_bk = df_regions_bk.withColumn("country_BK", country_BK_map[col("country")])
df_guaranter_bk = df_countries_bk.withColumn("guarantor_BK", country_BK_map[col("guarantor")])
df_borrower_bk= df_guaranter_bk.withColumn("borrower_BK",borrower_BK_map[col("borrower")])
df_loan_status_bk = df_borrower_bk.withColumn("loan_status_BK",loan_status_Bk_map[col("loan_status")])
df_loan_type_bk = df_loan_status_bk.withColumn("loan_type_BK",loan_type_Bk_map[col("loan_type")])

In [20]:
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys
# Assuming df is your DataFrame and 'column_name' is the column you want to forward-fill
window_spec = Window.orderBy('loan_number').rowsBetween(-sys.maxsize, 0)

df_filled_project = df_loan_type_bk.withColumn(
    'project_name_',
    last('project_name_', ignorenulls=True).over(window_spec))
df_final=df_filled_project.withColumn("repaid",col('repaid_to_ibrd')+col('repaid_3rd_party'))\
    .withColumn("due",col('due_to_ibrd')+col('due_3rd_party'))\
        .drop('repaid_to_ibrd','repaid_3rd_party','due_to_ibrd','due_3rd_party','sold_3rd_party','loans_held')

In [22]:
url = "jdbc:sqlserver://192.168.1.6;databaseName=Loan_data"
table_name = "LoanData"
username='maher'
password='12345'

properties = {
    "user": username,
    'password': password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
df_final.write.jdbc(url=url, table=table_name, mode='overwrite', properties=properties)

24/09/26 00:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/26 00:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/26 00:59:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/26 00:59:04 WARN TaskSetManager: Stage 6 contains a task of very large size (2106 KiB). The maximum recommended task size is 1000 KiB.
24/09/26 00:59:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/26 00:59:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                  