In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext.getOrCreate()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [5]:
data = spark.read.csv('D:\CDAC\Project\IBRD\IBRD_Statement_Of_Loans_-_Historical_Data_20240207.csv',header=True)

In [6]:
data.show(10)

+--------------------+-----------+--------------------+------------+-----------+-------------------+----------------------+---------+---------+------------+-------------+----------------------+----------+--------------------+-------------------------+----------------+------------------+----------------+--------------+-----------+-------------------+---------------------+--------------+----------------+-------------+----------+--------------------+--------------------+----------------------+--------------------+----------------------------+-------------------------+----------------------+
|       End of Period|Loan Number|              Region|Country Code|    Country|           Borrower|Guarantor Country Code|Guarantor|Loan Type| Loan Status|Interest Rate|Currency of Commitment|Project ID|       Project Name |Original Principal Amount|Cancelled Amount|Undisbursed Amount|Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|Sold 3rd Party|Repaid 3rd Party|

In [7]:
# Get the number of rows
num_rows = data.count()

# Get the number of columns
num_columns = len(data.columns)

print("Shape of the DataFrame: ({}, {})".format(num_rows, num_columns))

Shape of the DataFrame: (1278141, 33)


In [8]:
# Identify and remove duplicate rows
data = data.dropDuplicates()

# Count the number of rows after removing duplicates
num_rows_no_duplicates = data.count()

print("Number of duplicate rows removed:", num_rows - num_rows_no_duplicates)

Number of duplicate rows removed: 93


In [9]:
num_rows = data.count()
print("Shape of the DataFrame: ({}, {})".format(num_rows, num_columns))

Shape of the DataFrame: (1278048, 33)


In [10]:
# Get the data types of columns in the DataFrame
column_types = data.dtypes

# Print the data types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' : '{data_type}'")

Column 'End of Period' : 'string'
Column 'Loan Number' : 'string'
Column 'Region' : 'string'
Column 'Country Code' : 'string'
Column 'Country' : 'string'
Column 'Borrower' : 'string'
Column 'Guarantor Country Code' : 'string'
Column 'Guarantor' : 'string'
Column 'Loan Type' : 'string'
Column 'Loan Status' : 'string'
Column 'Interest Rate' : 'string'
Column 'Currency of Commitment' : 'string'
Column 'Project ID' : 'string'
Column 'Project Name ' : 'string'
Column 'Original Principal Amount' : 'string'
Column 'Cancelled Amount' : 'string'
Column 'Undisbursed Amount' : 'string'
Column 'Disbursed Amount' : 'string'
Column 'Repaid to IBRD' : 'string'
Column 'Due to IBRD' : 'string'
Column 'Exchange Adjustment' : 'string'
Column 'Borrower's Obligation' : 'string'
Column 'Sold 3rd Party' : 'string'
Column 'Repaid 3rd Party' : 'string'
Column 'Due 3rd Party' : 'string'
Column 'Loans Held' : 'string'
Column 'First Repayment Date' : 'string'
Column 'Last Repayment Date' : 'string'
Column 'Agreem

### Value count for target column i.e Loan Status 

In [11]:
# Group by the "Loan Status" column and count the occurrences of each unique value
loan_status_counts = data.groupBy("Loan Status").count()

# Show the value counts
loan_status_counts.show()

+-------------------+------+
|        Loan Status| count|
+-------------------+------+
|         Disbursing| 81199|
|Disbursing&Repaying| 11733|
|           Approved|  7136|
|               NULL|    48|
|    Fully Cancelled| 18475|
|          Cancelled| 12678|
|             Signed|  4613|
|          Effective|  4893|
|         Terminated|  7665|
|    Fully Disbursed| 23500|
|          Disbursed| 83398|
|           Repaying| 89279|
|            8000000|    11|
|  Fully Transferred| 18690|
|             Repaid|370482|
|       Fully Repaid|544246|
|              Draft|     1|
|         Negotiated|     1|
+-------------------+------+



In [12]:
from pyspark.sql.functions import when

# Replace values in the "Loan Status" column
data = data.withColumn("Loan Status", 
                       when(data["Loan Status"] == "Fully Repaid", "Repaid")
                       .when(data["Loan Status"] == "Fully Cancelled", "Cancelled")
                       .when(data["Loan Status"] == "Fully Transferred", "Disbursed")
                       .when(data["Loan Status"] == "Fully Disbursed", "Disbursed")
                       .when(data["Loan Status"] == "Disbursing&Repaying", "Disbursing")
                       .when(data["Loan Status"] == "Repaying", "Effective")
                       .when(data["Loan Status"] == "Negotiated", "Signed")
                       .when(data["Loan Status"] == "Draft", "Signed")
                       .otherwise(data["Loan Status"]))

# Group by the "Loan Status" column and count the occurrences of each unique value
loan_status_counts = data.groupBy("Loan Status").count()

# Show the value counts
loan_status_counts.show()

+-----------+------+
|Loan Status| count|
+-----------+------+
| Disbursing| 92932|
|   Approved|  7136|
|       NULL|    48|
|  Cancelled| 31153|
|     Signed|  4615|
|  Effective| 94172|
| Terminated|  7665|
|  Disbursed|125588|
|    8000000|    11|
|     Repaid|914728|
+-----------+------+



## Converting Date columns to DateTime format columns

In [13]:
from pyspark.sql.functions import to_date

# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert columns to datetime format
data = data.withColumn("First Repayment Date", to_date("First Repayment Date", "MM/dd/yyyy"))
data = data.withColumn("Last Repayment Date", to_date("Last Repayment Date", "MM/dd/yyyy"))
data = data.withColumn("Agreement Signing Date", to_date("Agreement Signing Date", "MM/dd/yyyy"))
data = data.withColumn("Board Approval Date", to_date("Board Approval Date", "MM/dd/yyyy"))
data = data.withColumn("Effective Date (Most Recent)", to_date("Effective Date (Most Recent)", "MM/dd/yyyy"))
data = data.withColumn("Closed Date (Most Recent)", to_date("Closed Date (Most Recent)", "MM/dd/yyyy"))
data = data.withColumn("Last Disbursement Date", to_date("Last Disbursement Date", "MM/dd/yyyy"))


# Convert column to datetime format
data = data.withColumn("End of Period", to_date("End of Period", "MM/dd/yyyy HH:mm:ss"))


# Get the data types of columns in the DataFrame
column_types = data.dtypes

# Print the data types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' : '{data_type}'")

Column 'End of Period' : 'date'
Column 'Loan Number' : 'string'
Column 'Region' : 'string'
Column 'Country Code' : 'string'
Column 'Country' : 'string'
Column 'Borrower' : 'string'
Column 'Guarantor Country Code' : 'string'
Column 'Guarantor' : 'string'
Column 'Loan Type' : 'string'
Column 'Loan Status' : 'string'
Column 'Interest Rate' : 'string'
Column 'Currency of Commitment' : 'string'
Column 'Project ID' : 'string'
Column 'Project Name ' : 'string'
Column 'Original Principal Amount' : 'string'
Column 'Cancelled Amount' : 'string'
Column 'Undisbursed Amount' : 'string'
Column 'Disbursed Amount' : 'string'
Column 'Repaid to IBRD' : 'string'
Column 'Due to IBRD' : 'string'
Column 'Exchange Adjustment' : 'string'
Column 'Borrower's Obligation' : 'string'
Column 'Sold 3rd Party' : 'string'
Column 'Repaid 3rd Party' : 'string'
Column 'Due 3rd Party' : 'string'
Column 'Loans Held' : 'string'
Column 'First Repayment Date' : 'date'
Column 'Last Repayment Date' : 'date'
Column 'Agreement Si

### Converting numerical columns to float

In [14]:
from pyspark.sql.functions import col

# Convert columns to float format
data = data.withColumn("Interest Rate", col("Interest Rate").cast("float"))
data = data.withColumn("Currency of Commitment", col("Currency of Commitment").cast("float"))
data = data.withColumn("Original Principal Amount", col("Original Principal Amount").cast("float"))

data = data.withColumn("Cancelled Amount", col("Cancelled Amount").cast("float"))
data = data.withColumn("Undisbursed Amount", col("Undisbursed Amount").cast("float"))
data = data.withColumn("Disbursed Amount", col("Disbursed Amount").cast("float"))

data = data.withColumn("Repaid to IBRD", col("Repaid to IBRD").cast("float"))
data = data.withColumn("Due to IBRD", col("Due to IBRD").cast("float"))
data = data.withColumn("Exchange Adjustment", col("Exchange Adjustment").cast("float"))


data = data.withColumn("Borrower's Obligation", col("Borrower's Obligation").cast("float"))
data = data.withColumn("Sold 3rd Party", col("Sold 3rd Party").cast("float"))
data = data.withColumn("Repaid 3rd Party", col("Repaid 3rd Party").cast("float"))

data = data.withColumn("Due 3rd Party", col("Due 3rd Party").cast("float"))
data = data.withColumn("Loans Held", col("Loans Held").cast("float"))
# data = data.withColumn("Original Principal Amount", col("Original Principal Amount").cast("float"))

# Get the data types of columns in the DataFrame
column_types = data.dtypes

# Print the data types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' : '{data_type}'")

Column 'End of Period' : 'date'
Column 'Loan Number' : 'string'
Column 'Region' : 'string'
Column 'Country Code' : 'string'
Column 'Country' : 'string'
Column 'Borrower' : 'string'
Column 'Guarantor Country Code' : 'string'
Column 'Guarantor' : 'string'
Column 'Loan Type' : 'string'
Column 'Loan Status' : 'string'
Column 'Interest Rate' : 'float'
Column 'Currency of Commitment' : 'float'
Column 'Project ID' : 'string'
Column 'Project Name ' : 'string'
Column 'Original Principal Amount' : 'float'
Column 'Cancelled Amount' : 'float'
Column 'Undisbursed Amount' : 'float'
Column 'Disbursed Amount' : 'float'
Column 'Repaid to IBRD' : 'float'
Column 'Due to IBRD' : 'float'
Column 'Exchange Adjustment' : 'float'
Column 'Borrower's Obligation' : 'float'
Column 'Sold 3rd Party' : 'float'
Column 'Repaid 3rd Party' : 'float'
Column 'Due 3rd Party' : 'float'
Column 'Loans Held' : 'float'
Column 'First Repayment Date' : 'date'
Column 'Last Repayment Date' : 'date'
Column 'Agreement Signing Date' : 

In [15]:
from pyspark.sql.functions import col, sum

# Count the total number of rows in the DataFrame
total_rows = data.count()

# Calculate the percentage of null values in each column
null_percentages = data.agg(*[
    (sum(col(c).isNull().cast("int")) / total_rows * 100).alias(c)
    for c in data.columns
])

# Show the percentage of null values for each column
null_percentages.show()

+------------------+-----------+------+------------+-------+------------------+----------------------+-----------------+--------------------+--------------------+-----------------+----------------------+--------------------+-----------------+-------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------------------+--------------------+----------------------------+-------------------------+----------------------+
|     End of Period|Loan Number|Region|Country Code|Country|          Borrower|Guarantor Country Code|        Guarantor|           Loan Type|         Loan Status|    Interest Rate|Currency of Commitment|          Project ID|    Project Name |Original Principal Amount|    Cancelled Amount|  Undisbursed Amount|    Disbursed Amount|     

 ## New dataframe i.e feature_data  selecting columns that are revelant and not selecting columns having imbalance data and null values more than 30%

In [16]:
# Select the columns you want to copy to the new DataFrame
feature_data = data.select("End of Period", "Region", "Country", "Borrower", "Guarantor", "Loan Type", "Loan Status", "Interest Rate",
                           "Original Principal Amount", "Cancelled Amount", "Undisbursed Amount", "Project Name ",
                          "Disbursed Amount", "Repaid to IBRD", "Due to IBRD", "Exchange Adjustment", "Borrower's Obligation", 
                           "Loans Held", "First Repayment Date", "Last Repayment Date")

In [17]:
from pyspark.sql.functions import col, sum

# Count the number of null values in each column
null_counts = feature_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in feature_data.columns])

# Show the null value counts
null_counts.show()

+-------------+------+-------+--------+---------+---------+-----------+-------------+-------------------------+----------------+------------------+-------------+----------------+--------------+-----------+-------------------+---------------------+----------+--------------------+-------------------+
|End of Period|Region|Country|Borrower|Guarantor|Loan Type|Loan Status|Interest Rate|Original Principal Amount|Cancelled Amount|Undisbursed Amount|Project Name |Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|Loans Held|First Repayment Date|Last Repayment Date|
+-------------+------+-------+--------+---------+---------+-----------+-------------+-------------------------+----------------+------------------+-------------+----------------+--------------+-----------+-------------------+---------------------+----------+--------------------+-------------------+
|           11|     0|      0|    8837|    74047|       48|         48|        30290|               

## Value counts in Project name column

In [18]:
# Count the occurrences of each unique value in the Project Name column
project_name_counts = data.groupBy("Project Name ").count().orderBy(col("count").desc())

# Show the count of occurrences for each unique value in the Project Name column
project_name_counts.show()

+-------------------+------+
|      Project Name | count|
+-------------------+------+
|               NULL|159162|
|              POWER|  5575|
|       EDUCATION II|  4488|
|              SAL I|  3398|
|      EDUCATION III|  3128|
|        HIGHWAYS II|  3128|
|        HIGHWAYS IV|  2856|
|          POWER III|  2855|
|                SAL|  2846|
|        RAILWAYS II|  2584|
|             SAL II|  2582|
|       HIGHWAYS III|  2448|
|        EDUCATION I|  2448|
|           POWER II|  2448|
|AGRICULTURAL CREDIT|  2312|
|            POWER I|  2312|
|GENERAL DEVELOPMENT|  2176|
|           POWER IV|  2176|
| POWER TRANSMISSION|  2171|
|             HEALTH|  2169|
+-------------------+------+
only showing top 20 rows



## Filling up null values 

In [19]:
from pyspark.sql.functions import mean, col

# List of numerical columns
numerical_columns = ['Interest Rate','Original Principal Amount', 'Cancelled Amount', 'Undisbursed Amount', 
                     'Disbursed Amount', 'Repaid to IBRD', 'Due to IBRD', 'Exchange Adjustment', 
                     "Borrower's Obligation", 'Loans Held']

# Calculate the mean value for each numerical column
mean_values = feature_data.select(*(mean(col(c)).alias(c) for c in numerical_columns)).collect()[0].asDict()

# Fill null values with the mean value of each numerical column
for column in numerical_columns:
    feature_data = feature_data.withColumn(column, when(col(column).isNull(), mean_values[column]).otherwise(col(column)))
    
    
    


# List of string columns
string_columns = ['Borrower', 'Loan Type', 'Loan Status']

# Calculate the mode value for each string column
mode_values = {}
for column in string_columns:
    mode_value = feature_data.groupBy(column).count().orderBy(col("count").desc()).select(column).first()[0]
    mode_values[column] = mode_value

# Fill null values with the mode value of each string column
for column in string_columns:
    feature_data = feature_data.withColumn(column, when(col(column).isNull(), mode_values[column]).otherwise(col(column)))
    

# Replace null values in the 'Project Name', 'Guarantor' column with 'Unknown'
feature_data = feature_data.withColumn("Project Name ", when(col("Project Name ").isNull(), "Unknown").otherwise(col("Project Name ")))
feature_data = feature_data.withColumn("Guarantor", when(col("Guarantor").isNull(), "Unknown").otherwise(col("Guarantor")))
    
 
    
    
# List of date columns
date_columns = ['End of Period', 'First Repayment Date', 'Last Repayment Date']


# Calculate the mode value for each date column
mode_values = {}
for column in date_columns:
    mode_value = feature_data.groupBy(column).count().orderBy(col("count").desc()).select(column).first()[0]
    mode_values[column] = mode_value

# Fill null values with the mode value of each date column
for column in date_columns:
    feature_data = feature_data.withColumn(column, when(col(column).isNull(), mode_values[column]).otherwise(col(column)))   


In [20]:
from pyspark.sql.functions import col, lit

# Count the total number of rows in the DataFrame
total_rows = feature_data.count()

# Calculate the percentage of null values in each column
null_percentage = feature_data.agg(*[(sum(col(c).isNull().cast("int")) * 100 / lit(total_rows)).alias(c) for c in feature_data.columns])

# Show the null percentages
null_percentage.show()

+-------------+------+-------+--------+---------+---------+-----------+-------------+-------------------------+----------------+------------------+-------------+----------------+--------------+-----------+-------------------+---------------------+----------+--------------------+-------------------+
|End of Period|Region|Country|Borrower|Guarantor|Loan Type|Loan Status|Interest Rate|Original Principal Amount|Cancelled Amount|Undisbursed Amount|Project Name |Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|Loans Held|First Repayment Date|Last Repayment Date|
+-------------+------+-------+--------+---------+---------+-----------+-------------+-------------------------+----------------+------------------+-------------+----------------+--------------+-----------+-------------------+---------------------+----------+--------------------+-------------------+
|          0.0|   0.0|    0.0|     0.0|      0.0|      0.0|        0.0|          0.0|               

## New column of date difference of Last Repayment Date and First Repayment Date

In [21]:
from pyspark.sql.functions import datediff

# Calculate the difference in days between the two dates
feature_data = feature_data.withColumn("Days Difference", datediff("Last Repayment Date", "First Repayment Date"))

# Drop the 'First Repayment Date' and 'Last Repayment Date' columns
feature_data = feature_data.drop("First Repayment Date", "Last Repayment Date")

# Show the DataFrame with the new 'Days Difference' column
# Display the list of columns
print(feature_data.columns)

['End of Period', 'Region', 'Country', 'Borrower', 'Guarantor', 'Loan Type', 'Loan Status', 'Interest Rate', 'Original Principal Amount', 'Cancelled Amount', 'Undisbursed Amount', 'Project Name ', 'Disbursed Amount', 'Repaid to IBRD', 'Due to IBRD', 'Exchange Adjustment', "Borrower's Obligation", 'Loans Held', 'Days Difference']


In [22]:
# Get the data types of columns in the DataFrame
column_types = feature_data.dtypes

# Print the data types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' : '{data_type}'")

Column 'End of Period' : 'date'
Column 'Region' : 'string'
Column 'Country' : 'string'
Column 'Borrower' : 'string'
Column 'Guarantor' : 'string'
Column 'Loan Type' : 'string'
Column 'Loan Status' : 'string'
Column 'Interest Rate' : 'double'
Column 'Original Principal Amount' : 'double'
Column 'Cancelled Amount' : 'double'
Column 'Undisbursed Amount' : 'double'
Column 'Project Name ' : 'string'
Column 'Disbursed Amount' : 'double'
Column 'Repaid to IBRD' : 'double'
Column 'Due to IBRD' : 'double'
Column 'Exchange Adjustment' : 'double'
Column 'Borrower's Obligation' : 'double'
Column 'Loans Held' : 'double'
Column 'Days Difference' : 'int'


In [23]:
feature_data.show(10)

+-------------+--------------------+------------------+--------------------+------------------+---------+-----------+------------------+-------------------------+----------------+------------------+--------------------+----------------+--------------+-----------+--------------------+---------------------+-----------+---------------+
|End of Period|              Region|           Country|            Borrower|         Guarantor|Loan Type|Loan Status|     Interest Rate|Original Principal Amount|Cancelled Amount|Undisbursed Amount|       Project Name |Disbursed Amount|Repaid to IBRD|Due to IBRD| Exchange Adjustment|Borrower's Obligation| Loans Held|Days Difference|
+-------------+--------------------+------------------+--------------------+------------------+---------+-----------+------------------+-------------------------+----------------+------------------+--------------------+----------------+--------------+-----------+--------------------+---------------------+-----------+------------

## Model Building 

In [24]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Define the columns to be encoded
columns_to_encode = ['Region', 'Loan Status', 'Loan Type', 'Country']

# Create a list to hold the stages of the pipeline
stages = []

# Loop through each column to be encoded
for col_name in columns_to_encode:
    # Create a StringIndexer for each column
    string_indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")

    # Create a OneHotEncoder for each column
    one_hot_encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[col_name + "_encoded"])

    # Add the stages to the list
    stages += [string_indexer, one_hot_encoder]

# Create the pipeline
pipeline = Pipeline(stages=stages)

# Fit and transform the pipeline to the feature_data
feature_data = pipeline.fit(feature_data).transform(feature_data)

# Show the first few rows of the encoded feature_data
feature_data.show(5)

+-------------+--------------------+------------------+--------------------+------------------+---------+-----------+------------------+-------------------------+----------------+------------------+--------------------+----------------+--------------+-----------+--------------------+---------------------+-----------+---------------+------------+--------------+-----------------+-------------------+---------------+-----------------+-------------+----------------+
|End of Period|              Region|           Country|            Borrower|         Guarantor|Loan Type|Loan Status|     Interest Rate|Original Principal Amount|Cancelled Amount|Undisbursed Amount|       Project Name |Disbursed Amount|Repaid to IBRD|Due to IBRD| Exchange Adjustment|Borrower's Obligation| Loans Held|Days Difference|Region_index|Region_encoded|Loan Status_index|Loan Status_encoded|Loan Type_index|Loan Type_encoded|Country_index| Country_encoded|
+-------------+--------------------+------------------+-------------

In [25]:
# Drop the 'End of Period' column from the DataFrame
feature_data = feature_data.drop("End of Period")

In [26]:
# Count the number of rows
num_rows = feature_data.count()

# Get the list of column names
columns = feature_data.columns
num_columns = len(columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

Number of rows: 1278048
Number of columns: 26


In [27]:
# Get the data types of columns in the DataFrame
column_types = feature_data.dtypes

# Print the data types
for column_name, data_type in column_types:
    print(f"Column '{column_name}' : '{data_type}'")

Column 'Region' : 'string'
Column 'Country' : 'string'
Column 'Borrower' : 'string'
Column 'Guarantor' : 'string'
Column 'Loan Type' : 'string'
Column 'Loan Status' : 'string'
Column 'Interest Rate' : 'double'
Column 'Original Principal Amount' : 'double'
Column 'Cancelled Amount' : 'double'
Column 'Undisbursed Amount' : 'double'
Column 'Project Name ' : 'string'
Column 'Disbursed Amount' : 'double'
Column 'Repaid to IBRD' : 'double'
Column 'Due to IBRD' : 'double'
Column 'Exchange Adjustment' : 'double'
Column 'Borrower's Obligation' : 'double'
Column 'Loans Held' : 'double'
Column 'Days Difference' : 'int'
Column 'Region_index' : 'double'
Column 'Region_encoded' : 'vector'
Column 'Loan Status_index' : 'double'
Column 'Loan Status_encoded' : 'vector'
Column 'Loan Type_index' : 'double'
Column 'Loan Type_encoded' : 'vector'
Column 'Country_index' : 'double'
Column 'Country_encoded' : 'vector'
