### Why SBA Loan Approval?
During my MBA program, I thoroughly enjoyed delving into the financial statements of various businesses and gaining insights into the operational dynamics of diverse industries. This experience not only underscored the significance of small businesses but also shed light on the challenges faced by entrepreneurs when establishing their ventures, including the initial capital requirements.

Small business owners often turn to the Small Business Administration (SBA) for loans, as these loans are partially guaranteed by the SBA. In essence, this means that the SBA assumes a portion of the losses in the event of loan default, thereby reducing the risk for the business owner(s). However, this increased risk for the SBA can sometimes make it challenging for businesses to secure acceptance into their loan programs. Given the current global impact of the COVID-19 pandemic, which has severely crippled numerous small businesses, the role of the SBA in facilitating their success has become even more crucial.

Intrigued by this scenario, I sought to explore the factors that determine the acceptance or rejection of an SBA loan, such as the industry in which the business operates, the loan size, and the guaranteed loan amount. By analyzing these characteristics, I aimed to ascertain whether an SBA loan should be accepted or declined.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive



To choose between PySpark and Pandas for my project, I have consider several factors and make a decision based on my specific project requirements and constraints. Here's a direct answer:

Data Size and Scale:
Use Pandas if dataset fits comfortably in memory (RAM) and is relatively small (typically up to a few gigabytes). Pandas is well-suited for small to medium-sized datasets.
Use PySpark if dealing with large-scale data that cannot fit into memory. PySpark is designed to handle big data and distributed computing, making it suitable for large datasets that require parallel processing.

Performance:
Pandas is often faster for small to medium-sized datasets due to its in-memory processing. If performance is a crucial factor for your project and your data fits in memory, Pandas might be the better choice.
PySpark shines when dealing with big data. Its distributed computing capabilities enable it to scale out and process large datasets efficiently across a cluster of machines.

Ecosystem and Libraries:
Pandas has a rich ecosystem of data analysis and visualization libraries, making it a good choice for data exploration and analysis.
PySpark has a more limited ecosystem but integrates well with other big data tools like Hadoop and Hive.

In summary, I Choose PySpark for small-scale dataset because I needed to explore the Big Data Technologies such as Pyspark and Hive through my project and present my learnings

In [2]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=95a56a86e42dc56b6f900b4764e3712e4454cff2424ed04db997e7d67130c71d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [4]:
# Create a SparkSession
spark = SparkSession.builder.appName("SBALoanAnalysis").getOrCreate()

# Load the SBA loan data
df = spark.read.csv("/content/drive/MyDrive/Credit Analysis/SBAnational.csv", header=True, inferSchema=True)

# Make a copy for exploration (Note: PySpark DataFrames are immutable)
df_copy = df




# **Data Cleaning, Formatting, and Feature Engineering**


In [5]:
# Display the first few rows
df_copy.show()

# Check the shape of the data as it stands
num_rows = df_copy.count()
num_columns = len(df_copy.columns)

# Print the shape
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+------------+------------+
|LoanNr_ChkDgt|                Name|          City|State|  Zip|                Bank|BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|NewExist|CreateJob|RetainedJob|FranchiseCode|UrbanRural|RevLineCr|LowDoc|ChgOffDate|DisbursementDate|DisbursementGross|BalanceGross|MIS_Status|ChgOffPrinGr|      GrAppv|    SBA_Appv|
+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+------------+------------+
|   1000014003|      ABC HOBBYCRAFT|    EVA

In [6]:
# Get the shape of the DataFrame
print("Number of rows and columns:", (df_copy.count(), len(df_copy.columns)))

Number of rows and columns: (899164, 27)


In [7]:
from pyspark.sql.functions import col, sum
# Check for null values in each column
null_counts = df_copy.select([sum(col(column).isNull().cast("int")).alias(column) for column in df_copy.columns])

# Show the result
null_counts.show(vertical=True, truncate=False)

-RECORD 0-------------------
 LoanNr_ChkDgt     | 0      
 Name              | 3      
 City              | 30     
 State             | 14     
 Zip               | 0      
 Bank              | 1559   
 BankState         | 1566   
 NAICS             | 0      
 ApprovalDate      | 0      
 ApprovalFY        | 0      
 Term              | 0      
 NoEmp             | 0      
 NewExist          | 136    
 CreateJob         | 0      
 RetainedJob       | 0      
 FranchiseCode     | 0      
 UrbanRural        | 0      
 RevLineCr         | 4528   
 LowDoc            | 2582   
 ChgOffDate        | 736402 
 DisbursementDate  | 2431   
 DisbursementGross | 0      
 BalanceGross      | 0      
 MIS_Status        | 1997   
 ChgOffPrinGr      | 0      
 GrAppv            | 0      
 SBA_Appv          | 0      



I am not concerned with the ChgOffDate column as our focus lies in determining whether a loan gets charged off or not, rather than the specific timing of it. However, the other columns may present some issues. To address this, let us begin by eliminating records from these columns that contain null values.

I have chosen to remove the rows entirely instead of imputing the missing data. This decision is based on the fact that we have a substantial number of records to work with. Additionally, given the nature of the information, it is challenging to determine the most appropriate imputing method. For instance, assuming the status of a business as either new or existing could potentially be a crucial feature for us to consider, and I do not want to make any assumptions in this regard.

By removing the rows with null values, we can ensure a more accurate and reliable dataset for our analysis.

In [8]:
# Drop null values from specified columns
columns_to_drop_na = ['Name', 'City', 'State', 'BankState', 'NewExist', 'RevLineCr', 'LowDoc', 'DisbursementDate', 'MIS_Status']
df_copy = df_copy.na.drop(subset=columns_to_drop_na)

In [9]:
# Check for null values in each column
null_counts = df_copy.select([sum(col(column).isNull().cast("int")).alias(column) for column in df_copy.columns])

# Show the result
null_counts.show(vertical=True, truncate=False)

-RECORD 0-------------------
 LoanNr_ChkDgt     | 0      
 Name              | 0      
 City              | 0      
 State             | 0      
 Zip               | 0      
 Bank              | 0      
 BankState         | 0      
 NAICS             | 0      
 ApprovalDate      | 0      
 ApprovalFY        | 0      
 Term              | 0      
 NoEmp             | 0      
 NewExist          | 0      
 CreateJob         | 0      
 RetainedJob       | 0      
 FranchiseCode     | 0      
 UrbanRural        | 0      
 RevLineCr         | 0      
 LowDoc            | 0      
 ChgOffDate        | 725316 
 DisbursementDate  | 0      
 DisbursementGross | 0      
 BalanceGross      | 0      
 MIS_Status        | 0      
 ChgOffPrinGr      | 0      
 GrAppv            | 0      
 SBA_Appv          | 0      



Next, I want to start making sure each field is the appropriate data type.

In [10]:
# Check data types of each feature
df_copy.dtypes

[('LoanNr_ChkDgt', 'bigint'),
 ('Name', 'string'),
 ('City', 'string'),
 ('State', 'string'),
 ('Zip', 'string'),
 ('Bank', 'string'),
 ('BankState', 'string'),
 ('NAICS', 'string'),
 ('ApprovalDate', 'string'),
 ('ApprovalFY', 'string'),
 ('Term', 'int'),
 ('NoEmp', 'int'),
 ('NewExist', 'int'),
 ('CreateJob', 'int'),
 ('RetainedJob', 'int'),
 ('FranchiseCode', 'int'),
 ('UrbanRural', 'int'),
 ('RevLineCr', 'string'),
 ('LowDoc', 'string'),
 ('ChgOffDate', 'string'),
 ('DisbursementDate', 'string'),
 ('DisbursementGross', 'string'),
 ('BalanceGross', 'string'),
 ('MIS_Status', 'string'),
 ('ChgOffPrinGr', 'string'),
 ('GrAppv', 'string'),
 ('SBA_Appv', 'string')]

Looks like we're going to need to make some changes here. I begin with the currency fields that are currently being read as objects rather than floats.

Let's see how they are being read right now.

In [11]:
selected_columns = ['DisbursementGross', 'BalanceGross', 'ChgOffPrinGr', 'GrAppv', 'SBA_Appv']
df_copy.select(selected_columns).show()

+-----------------+------------+------------+------------+------------+
|DisbursementGross|BalanceGross|ChgOffPrinGr|      GrAppv|    SBA_Appv|
+-----------------+------------+------------+------------+------------+
|      $60,000.00 |      $0.00 |      $0.00 | $60,000.00 | $48,000.00 |
|      $40,000.00 |      $0.00 |      $0.00 | $40,000.00 | $32,000.00 |
|     $287,000.00 |      $0.00 |      $0.00 |$287,000.00 |$215,250.00 |
|      $35,000.00 |      $0.00 |      $0.00 | $35,000.00 | $28,000.00 |
|     $229,000.00 |      $0.00 |      $0.00 |$229,000.00 |$229,000.00 |
|     $517,000.00 |      $0.00 |      $0.00 |$517,000.00 |$387,750.00 |
|     $600,000.00 |      $0.00 |$208,959.00 |$600,000.00 |$499,998.00 |
|      $45,000.00 |      $0.00 |      $0.00 | $45,000.00 | $36,000.00 |
|     $305,000.00 |      $0.00 |      $0.00 |$305,000.00 |$228,750.00 |
|      $70,000.00 |      $0.00 |      $0.00 | $70,000.00 | $56,000.00 |
|      $70,000.00 |      $0.00 |      $0.00 | $70,000.00 | $56,0

It looks like they're coming in as strings because the '$' sign and commas are included. I can't change the type to a float without removing those, so I make those edits here.

In [12]:
from pyspark.sql.functions import regexp_replace

columns_to_clean = ['DisbursementGross', 'BalanceGross', 'ChgOffPrinGr', 'GrAppv', 'SBA_Appv']

for column in columns_to_clean:
    df_copy = df_copy.withColumn(column, regexp_replace(col(column), '[\$,]', '').cast('float'))

In [13]:
selected_columns = ['DisbursementGross', 'BalanceGross', 'ChgOffPrinGr', 'GrAppv', 'SBA_Appv']
df_copy.select(selected_columns).show()

+-----------------+------------+------------+--------+--------+
|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|
+-----------------+------------+------------+--------+--------+
|          60000.0|         0.0|         0.0| 60000.0| 48000.0|
|          40000.0|         0.0|         0.0| 40000.0| 32000.0|
|         287000.0|         0.0|         0.0|287000.0|215250.0|
|          35000.0|         0.0|         0.0| 35000.0| 28000.0|
|         229000.0|         0.0|         0.0|229000.0|229000.0|
|         517000.0|         0.0|         0.0|517000.0|387750.0|
|         600000.0|         0.0|    208959.0|600000.0|499998.0|
|          45000.0|         0.0|         0.0| 45000.0| 36000.0|
|         305000.0|         0.0|         0.0|305000.0|228750.0|
|          70000.0|         0.0|         0.0| 70000.0| 56000.0|
|          70000.0|         0.0|         0.0| 70000.0| 56000.0|
|         150000.0|         0.0|         0.0|300000.0|225000.0|
|         253400.0|         0.0|        

Next, I take a look at ApprovalFY which should be an integer but is coming up as an object type.

In [14]:
from pyspark.sql.functions import count, lit

# Cast the 'ApprovalFY' column to StringType and count each data type
data_type_counts = df_copy.withColumn("ApprovalFY", col("ApprovalFY").cast("string")) \
    .groupBy("ApprovalFY") \
    .agg(count(lit(1)).alias("count")) \
    .orderBy("count", ascending=False)

data_type_counts.show()

+----------+-----+
|ApprovalFY|count|
+----------+-----+
|      2005|76906|
|      2006|75660|
|      2007|71406|
|      2004|68069|
|      2003|57686|
|      2002|44102|
|      1995|41632|
|      1996|39945|
|      2008|38618|
|      1997|37668|
|      1999|37275|
|      2000|37231|
|      2001|37162|
|      1998|35965|
|      1994|31454|
|      1993|23137|
|      1992|20669|
|      2009|18293|
|      2010|16026|
|      1991|15444|
+----------+-----+
only showing top 20 rows



In [15]:
unique_values = df_copy.select("ApprovalFY").distinct().rdd.flatMap(lambda x: x).collect()

# Show all unique values
for value in unique_values:
    print(value)

1987
1972
15-Oct-99
1988
1984
1982
2005
2000
2-May-88
1981
1978
1974
2009
2006
2004
1989
2011
2008
1999
1997
1973
2007
1983
1980
1986
1985
1979
1998
2001
2010
1990
2003
1991
2012
1977
2014
1971
2013
2002
1995
1976
1976A
1968
1992
24-Aug-94
1994
1996
1969
1993
28-Oct-94
31-Dec-02
1975
1970


We have a mixture of integers and strings here, with one record including an 'A' as well. I clean these next.

In [16]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType

# Define the UDF to clean the string values
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def clean_str(x):
    if isinstance(x, str):
        return x.replace('A', '')
    return x

clean_str_udf = udf(clean_str, StringType())

# Apply the UDF and cast the column to IntegerType
df_copy = df_copy.withColumn('ApprovalFY', clean_str_udf(col('ApprovalFY')).cast(IntegerType()))

In [17]:
unique_values_after = df_copy.select("ApprovalFY").distinct().rdd.flatMap(lambda x: x).collect()

# Show all unique values
for value in unique_values_after:
    print(value)

1990
2003
2007
1974
2006
1978
1997
1988
1973
1979
2004
1991
1982
1989
1998
1985
1987
2009
1980
2001
1972
1983
2005
1984
2000
1981
2010
1986
2011
2008
1999
1975
1977
2013
1994
1968
2014
1971
1969
1996
1970
2012
1995
1992
1976
2002
1993
None


Now I'll change the type of a few other columns as appropriate.

In [18]:
from pyspark.sql.types import IntegerType, StringType, FloatType

# Change the type of NewExist to an integer
df_copy = df_copy.withColumn("NewExist", df_copy["NewExist"].cast(IntegerType()))

# Change the type of Zip and UrbanRural to str (categorical)
df_copy = df_copy.withColumn("Zip", df_copy["Zip"].cast(StringType()))
df_copy = df_copy.withColumn("UrbanRural", df_copy["UrbanRural"].cast(StringType()))

# Change all currency-related fields to float values
currency_columns = ['DisbursementGross', 'BalanceGross', 'ChgOffPrinGr', 'GrAppv', 'SBA_Appv']
for col_name in currency_columns:
    df_copy = df_copy.withColumn(col_name, df_copy[col_name].cast(FloatType()))

# Check the data types
df_copy.printSchema()

root
 |-- LoanNr_ChkDgt: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Bank: string (nullable = true)
 |-- BankState: string (nullable = true)
 |-- NAICS: string (nullable = true)
 |-- ApprovalDate: string (nullable = true)
 |-- ApprovalFY: integer (nullable = true)
 |-- Term: integer (nullable = true)
 |-- NoEmp: integer (nullable = true)
 |-- NewExist: integer (nullable = true)
 |-- CreateJob: integer (nullable = true)
 |-- RetainedJob: integer (nullable = true)
 |-- FranchiseCode: integer (nullable = true)
 |-- UrbanRural: string (nullable = true)
 |-- RevLineCr: string (nullable = true)
 |-- LowDoc: string (nullable = true)
 |-- ChgOffDate: string (nullable = true)
 |-- DisbursementDate: string (nullable = true)
 |-- DisbursementGross: float (nullable = true)
 |-- BalanceGross: float (nullable = true)
 |-- MIS_Status: string (nullable = true)
 |-- ChgOffPrin

 I care more about whether or not a business is a franchise or not for this analysis. I create a flag field for this.

In [19]:
from pyspark.sql.functions import when

# Use the 'withColumn' method to create the 'IsFranchise' column
df_copy = df_copy.withColumn("IsFranchise", when(df_copy['FranchiseCode'] <= 1, 0).otherwise(1))

# Show the DataFrame with the new 'IsFranchise' column
df_copy.show()


+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+--------+--------+-----------+
|LoanNr_ChkDgt|                Name|          City|State|  Zip|                Bank|BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|NewExist|CreateJob|RetainedJob|FranchiseCode|UrbanRural|RevLineCr|LowDoc|ChgOffDate|DisbursementDate|DisbursementGross|BalanceGross|MIS_Status|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|
+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+--------+--------+-----------+
|   1000014003|      ABC HOBBYC

Next I look at some of the fields that are considered flags already but aren't necessarily in a useable format right now. These include the NewExist, RevLineCr, LowDoc, and MIS_Status fields.

In [20]:
# NewExist
# Makesure NewExist has only 1s and 2s; Remove records where NewExist isn't 1 or 2
unique_values = df_copy.select("NewExist").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+--------+
|NewExist|
+--------+
|1       |
|4       |
|7       |
|2       |
|0       |
|10      |
+--------+



In [21]:
# Keep records where NewExist == 1 or 2
df_copy = df_copy.filter((df_copy['NewExist'] == 1) | (df_copy['NewExist'] == 2))

# Create NewBusiness field where 0 = Existing business and 1 = New business; based on NewExist field
df_copy = df_copy.withColumn("NewBusiness", when(col("NewExist") == 1, 0).when(col("NewExist") == 2, 1).otherwise(None))

In [22]:
# NewExist
# Makesure NewExist has only 1s and 2s; Remove records where NewExist isn't 1 or 2
unique_values = df_copy.select("NewExist").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+--------+
|NewExist|
+--------+
|1       |
|2       |
+--------+



In [23]:
# RevLineCr
# Double check RevLineCr and LowDoc unique values
unique_values = df_copy.select("RevLineCr").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+---------+
|RevLineCr|
+---------+
|3        |
|0        |
|T        |
|Y        |
|C        |
|N        |
|R        |
|1        |
|`        |
|,        |
|2        |
|7        |
|Q        |
|5        |
|.        |
|-        |
|A        |
|4        |
+---------+



In [24]:
# LowDoc
# Double check RevLineCr and LowDoc unique values
unique_values = df_copy.select("LowDoc").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+------+
|LowDoc|
+------+
|Y     |
|C     |
|A     |
|N     |
|S     |
|R     |
|1     |
|0     |
+------+



In [25]:
# Remove records where RevLineCr != 'Y' or 'N' and LowDoc != 'Y' or 'N'
df_copy = df_copy.filter((col('RevLineCr').isin(['Y', 'N'])) & (col('LowDoc').isin(['Y', 'N'])))

# RevLineCr and LowDoc: 0 = No, 1 = Yes
df_copy = df_copy.withColumn('RevLineCr', when(col('RevLineCr') == 'N', 0).otherwise(1))
df_copy = df_copy.withColumn('LowDoc', when(col('LowDoc') == 'N', 0).otherwise(1))


In [26]:
# LowDoc
# Double check RevLineCr and LowDoc unique values
unique_values = df_copy.select("LowDoc").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+------+
|LowDoc|
+------+
|1     |
|0     |
+------+



In [27]:
# RevLineCr
# Double check RevLineCr and LowDoc unique values
unique_values = df_copy.select("RevLineCr").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+---------+
|RevLineCr|
+---------+
|1        |
|0        |
+---------+



In [28]:
# Create a new column 'Default' based on 'MIS_Status'
df_copy = df_copy.withColumn('Default', when(col('MIS_Status') == 'P I F', 0).otherwise(1))

# Count the values in the 'Default' column
df_copy.groupBy('Default').count().show()

+-------+------+
|Default| count|
+-------+------+
|      1|110549|
|      0|499589|
+-------+------+



In [29]:
# Double check DisbursementDate  unique values
unique_values = df_copy.select("DisbursementDate").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+----------------+
|DisbursementDate|
+----------------+
|13-May-98       |
|18-Sep-81       |
|2-May-80        |
|15-May-85       |
|17-Jun-99       |
|22-Apr-99       |
|26-Jan-99       |
|3-Aug-87        |
|10-Mar-00       |
|2-Jun-89        |
|14-Jul-09       |
|6-Oct-00        |
|10-Apr-73       |
|16-Jun-00       |
|26-Jul-00       |
|16-Apr-10       |
|10-Aug-90       |
|20-Jul-10       |
|22-May-91       |
|17-Dec-97       |
|14-Nov-79       |
|28-Jul-72       |
|26-Sep-97       |
|15-Apr-99       |
|3-Jan-99        |
|14-Jun-99       |
|17-Sep-85       |
|22-Jan-99       |
|23-Sep-87       |
|26-Oct-87       |
|14-Feb-08       |
|26-Feb-08       |
|21-Mar-89       |
|17-Oct-89       |
|15-Jul-80       |
|30-Nov-06       |
|30-Jun-81       |
|30-Jan-79       |
|18-Apr-83       |
|4-Mar-87        |
|24-Sep-84       |
|3-Dec-84        |
|24-Apr-85       |
|28-Oct-86       |
|8-Oct-07        |
|22-May-87       |
|2-May-87        |
|17-Dec-79       |
|3-Jun-87        |
|1-Apr-87   

Now that the flag fields have been addressed, let's tackle the date fields.

In [30]:
# Double check DisbursementDate  unique values
unique_values = df_copy.select("ApprovalDate").distinct()

# Show all unique values in the 'column_name' column
unique_values.show(unique_values.count(), truncate=False)

+------------+
|ApprovalDate|
+------------+
|9-Feb-05    |
|18-Sep-81   |
|23-Jan-79   |
|13-May-98   |
|11-May-85   |
|15-May-85   |
|2-May-80    |
|30-Aug-07   |
|20-Jan-99   |
|26-Jan-99   |
|22-Apr-99   |
|3-Aug-87    |
|17-Jun-99   |
|18-Apr-09   |
|23-Feb-89   |
|2-Jun-89    |
|14-Jul-09   |
|10-Mar-00   |
|16-Jun-00   |
|26-Jul-00   |
|6-Oct-00    |
|16-Apr-10   |
|20-Jul-10   |
|10-Aug-90   |
|15-Apr-99   |
|22-Feb-06   |
|14-Apr-81   |
|26-Nov-82   |
|26-Sep-97   |
|17-Dec-97   |
|14-Nov-79   |
|17-Sep-85   |
|28-Jul-72   |
|22-Jan-99   |
|14-Feb-08   |
|26-Feb-08   |
|23-Sep-87   |
|14-Jun-99   |
|26-Oct-87   |
|17-Apr-08   |
|4-Mar-09    |
|1-Mar-89    |
|21-Mar-89   |
|17-Oct-89   |
|5-Oct-90    |
|7-Aug-07    |
|16-Oct-03   |
|22-Oct-04   |
|2-Nov-04    |
|16-Nov-04   |
|30-Jun-81   |
|5-Aug-05    |
|18-Apr-83   |
|6-Feb-84    |
|19-Sep-78   |
|24-Sep-84   |
|30-Jan-79   |
|3-Dec-84    |
|30-Nov-06   |
|11-Jan-07   |
|24-Apr-85   |
|17-Dec-79   |
|18-Mar-07   |
|8-Oct-07 

In [31]:
df_copy.printSchema()

root
 |-- LoanNr_ChkDgt: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Bank: string (nullable = true)
 |-- BankState: string (nullable = true)
 |-- NAICS: string (nullable = true)
 |-- ApprovalDate: string (nullable = true)
 |-- ApprovalFY: integer (nullable = true)
 |-- Term: integer (nullable = true)
 |-- NoEmp: integer (nullable = true)
 |-- NewExist: integer (nullable = true)
 |-- CreateJob: integer (nullable = true)
 |-- RetainedJob: integer (nullable = true)
 |-- FranchiseCode: integer (nullable = true)
 |-- UrbanRural: string (nullable = true)
 |-- RevLineCr: integer (nullable = false)
 |-- LowDoc: integer (nullable = false)
 |-- ChgOffDate: string (nullable = true)
 |-- DisbursementDate: string (nullable = true)
 |-- DisbursementGross: float (nullable = true)
 |-- BalanceGross: float (nullable = true)
 |-- MIS_Status: string (nullable = true)
 |-- ChgOff

In [32]:
# Convert ApprovalDate and DisbursementDate columns to datetime values
# ChgOffDate not changed to datetime since it is not of value and will be removed later
# Convert 'ApprovalDate' and 'DisbursementDate' columns to datetime
# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

from pyspark.sql.functions import to_date, date_format, unix_timestamp

# Assuming your PySpark DataFrame is named 'df' and the column is named 'ApprovalDate'
df_copy = df_copy.withColumn("ApprovalDate",
                   to_date(unix_timestamp("ApprovalDate", "dd-MMM-yy").cast("timestamp")))
df_copy = df_copy.withColumn("ApprovalDate",
                   date_format("ApprovalDate", "yyyy-MM-dd"))

df_copy = df_copy.withColumn("DisbursementDate",
                   to_date(unix_timestamp("DisbursementDate", "dd-MMM-yy").cast("timestamp")))
df_copy = df_copy.withColumn("DisbursementDate",
                   date_format("DisbursementDate", "yyyy-MM-dd"))

In [33]:
df_copy.show()

+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+--------+--------+-----------+-----------+-------+
|LoanNr_ChkDgt|                Name|          City|State|  Zip|                Bank|BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|NewExist|CreateJob|RetainedJob|FranchiseCode|UrbanRural|RevLineCr|LowDoc|ChgOffDate|DisbursementDate|DisbursementGross|BalanceGross|MIS_Status|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|
+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+--------+--------+----

DaysToDisbursement is an important metric for lenders in the context of loans. It refers to the number of days it takes for a borrower to receive the loan funds after the loan application is approved. DaysToDisbursement is important for lenders as it impacts customer satisfaction, risk management, competitiveness, regulatory compliance, and operational efficiency. Lenders aim to strike a balance between speedy disbursements and thorough due diligence to mitigate risks associated with lending.

In [34]:
from pyspark.sql.functions import col, datediff
from pyspark.sql.types import IntegerType

# Calculate the number of days between DisbursementDate and ApprovalDate
df_copy = df_copy.withColumn(
    "DaysToDisbursement",
    datediff(col("DisbursementDate"), col("ApprovalDate"))
)

# Change DaysToDisbursement column from days to int64
df_copy = df_copy.withColumn(
    "DaysToDisbursement",
    df_copy["DaysToDisbursement"].cast(IntegerType())
)

In [35]:
from pyspark.sql.functions import year

# You can replace 'df_copy' with your actual DataFrame name
df_copy = df_copy.withColumn("DisbursementFY", year(df_copy["DisbursementDate"]))

In [36]:
df_copy.show()

+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+-----------------+------------+----------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+
|LoanNr_ChkDgt|                Name|          City|State|  Zip|                Bank|BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|NewExist|CreateJob|RetainedJob|FranchiseCode|UrbanRural|RevLineCr|LowDoc|ChgOffDate|DisbursementDate|DisbursementGross|BalanceGross|MIS_Status|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|
+-------------+--------------------+--------------+-----+-----+--------------------+---------+------+------------+----------+----+-----+--------+---------+-----------+-------------+----------+---------+------+----------+----------------+---------

Another metric I was interested in exploring is whether or not the bank servicing the loan was in the same state that the business was located. My assumption is that it would be more difficult to service a loan for a business in another state and that this could have a negative impact on a business's ability to repay the loan.

In [37]:
# Create StateSame flag field which identifies where the business State is the same as the BankState
df_copy = df_copy.withColumn('StateSame', when(col('State') == col('BankState'), 1).otherwise(0))

The next field I decided to create relates to the amount of the loan the SBA guaranteed. This is a unique feature SBA loans have where the SBA will 'guaranty' a percentage of the loan in the event of a loss. For example if a business took out a 500,000 loan and the SBA guaranteed 50%, if the business was unable to repay 200,000 of the loan the SBA would cover 100,000 of that loss. This makes these loans very attractive to small businesses because it mitigates their risk, but it also increases the risk for the SBA. This is why an analysis like this is important! These loans are typically guaranteed on a percentage basis rather than a specified dollar amount, so I create a field to represent this rather than the guaranteed amount provided in the original dataset.

In [38]:
# Create SBA_AppvPct field since the guaranteed amount is based on a percentage of the gross loan amount rather than dollar amount in most situations
df_copy = df_copy.withColumn("SBA_AppvPct", col("SBA_Appv") / col("GrAppv"))

In [39]:
# Create AppvDisbursed flag field signifying if the loan amount disbursed was equal to the full amount approved
df_copy = df_copy.withColumn("AppvDisbursed", when(col("DisbursementGross") == col("GrAppv"), 1).otherwise(0))

In [40]:
df_copy.printSchema()

root
 |-- LoanNr_ChkDgt: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Bank: string (nullable = true)
 |-- BankState: string (nullable = true)
 |-- NAICS: string (nullable = true)
 |-- ApprovalDate: string (nullable = true)
 |-- ApprovalFY: integer (nullable = true)
 |-- Term: integer (nullable = true)
 |-- NoEmp: integer (nullable = true)
 |-- NewExist: integer (nullable = true)
 |-- CreateJob: integer (nullable = true)
 |-- RetainedJob: integer (nullable = true)
 |-- FranchiseCode: integer (nullable = true)
 |-- UrbanRural: string (nullable = true)
 |-- RevLineCr: integer (nullable = false)
 |-- LowDoc: integer (nullable = false)
 |-- ChgOffDate: string (nullable = true)
 |-- DisbursementDate: string (nullable = true)
 |-- DisbursementGross: float (nullable = true)
 |-- BalanceGross: float (nullable = true)
 |-- MIS_Status: string (nullable = true)
 |-- ChgOff

In [41]:
# Check the shape of the data as it stands
num_rows = df_copy.count()
num_columns = len(df_copy.columns)

# Print the shape
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

Number of rows: 610138
Number of columns: 35


The final two aspects I wish to examine pertain to whether a loan is supported by Real Estate and whether it remained active during the Great Recession (2007-2009). These factors were mentioned in the dataset's accompanying document, which outlines its educational application. I believe they hold significant importance and should be taken into consideration.

To determine if a loan is backed by Real Estate, I devised a flag that indicates whether the loan term is equal to or exceeds 20 years. Real estate-backed loans typically span this duration as the loan term is typically linked to the useful life of the assets utilized as collateral. Regrettably, explicit information regarding this matter is absent from the data.

Regarding loans active during the Great Recession, I established a flag for loans where the period between DisbursementFY and DisbursementFY plus the loan term (in years) encompasses the Great Recession (2007-2009).

In [42]:
from pyspark.sql import functions as F

# Field for loans backed by Real Estate (loans with a term of at least 20 years)
df_copy = df_copy.withColumn('RealEstate', F.when(df_copy['Term'] >= 240, 1).otherwise(0))

# Field for loans active during the Great Recession (2007-2009)
df_copy = df_copy.withColumn('GreatRecession', F.when(
    ((F.col('DisbursementFY') >= 2007) & (F.col('DisbursementFY') <= 2009)) |
    ((F.col('DisbursementFY') < 2007) & (F.col('DisbursementFY') + (F.col('Term') / 12) >= 2007)),
    1
).otherwise(0))

When considering the timeframe for the records used in the subsequent analysis and modeling, I find the document accompanying the dataset to have a well-founded rationale. This rationale is outlined in the 3.3 Time Period section of the document. In summary, the focus was placed on default rates of loans disbursed up until 2010, taking into account the impact of the Great Recession. To ensure consistency, the time frame was restricted to loans disbursed prior to 2010, as loan terms typically extend for five years or more.

I have chosen to adopt this approach for my own analysis, thus establishing a selection criteria that includes only loans with a disbursement date prior to 2010.

In [43]:
# Select only records with a disbursement year through 2010
df_copy = df_copy.filter(col('DisbursementFY') <= 2010)

# Check how many records remain
num_records = df_copy.count()
print("Number of remaining records:", num_records)

Number of remaining records: 591699


Now that we have ensured that each field we are concerned with has the correct data type, it is time to eliminate the fields that will not contribute significantly to our analysis. These fields are as follows:

1. LoanNr_ChkDgt: These fields do not provide any value to the actual analysis and can be removed.

2. Zip:  Based on my assumption, it is unlikely that either of these fields would contain any particularly significant information for our analysis.

3. ChgOffDate: This field is only applicable when a loan is charged off and is not relevant to our analysis.

4. NewExist: This field can be replaced by the NewBusiness flag field, which will provide the necessary information.

5. FranchiseCode: The FranchiseCode field can be replaced by the IsFranchise flag field, which will serve the same purpose.

6. MIS_Status: The Default field will replace MIS_Status as the target field for our analysis.

By removing these fields, we can streamline our analysis and focus on the most relevant data.

In [44]:
# List of columns to drop
columns_to_drop = ['NewExist', 'FranchiseCode', 'ChgOffDate', 'Zip', 'MIS_Status']

# Drop the specified columns in the PySpark DataFrame
df_copy = df_copy.drop(*columns_to_drop)

I also wanted to remove records with a negative DaysToDisbursement under the assumption that loan funds would not be disbursed until they were approved.

In [45]:
# Create flag to signify if a larger amount was disbursed than what the Bank had approved
# Likely RevLineCr?
df_copy = df_copy.withColumn('DisbursedGreaterAppv',
                            when(col('DisbursementGross') > col('GrAppv'), 1).otherwise(0))

In [46]:
# Filter records where 'DaysToDisbursement' is greater than or equal to 0
df_copy = df_copy.filter(df_copy['DaysToDisbursement'] >= 0)

# Count the number of remaining records
record_count = df_copy.count()

# Show the result
df_copy.show()
print("Number of remaining records:", record_count)
df_copy.printSchema()

+-------------+--------------------+--------------+-----+--------------------+---------+------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+
|LoanNr_ChkDgt|                Name|          City|State|                Bank|BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|
+-------------+--------------------+--------------+-----+--------------------+---------+------+------------+----------+----+-----+---------+-----------+----------+---------+------+--

In [47]:
# State unique values
unique_values = df_copy.select("State").distinct()

# Show all unique values in the 'State' column
unique_values.show(unique_values.count(), truncate=False)

+-----+
|State|
+-----+
|SC   |
|AZ   |
|LA   |
|MN   |
|NJ   |
|DC   |
|OR   |
|VA   |
|RI   |
|KY   |
|WY   |
|NH   |
|MI   |
|NV   |
|WI   |
|ID   |
|CA   |
|CT   |
|NE   |
|MT   |
|NC   |
|VT   |
|MD   |
|DE   |
|MO   |
|IL   |
|ME   |
|ND   |
|WA   |
|MS   |
|AL   |
|IN   |
|OH   |
|TN   |
|IA   |
|NM   |
|PA   |
|SD   |
|NY   |
|TX   |
|WV   |
|GA   |
|MA   |
|KS   |
|FL   |
|CO   |
|AK   |
|AR   |
|OK   |
|UT   |
|HI   |
+-----+



In [48]:
# Create a dictionary of state abbreviations and full names
state_abbreviations = {
    "SC": "South Carolina",
    "AZ": "Arizona",
    "LA": "Louisiana",
    "MN": "Minnesota",
    "NJ": "New Jersey",
    "DC": "District of Columbia",
    "OR": "Oregon",
    "VA": "Virginia",
    "RI": "Rhode Island",
    "KY": "Kentucky",
    "WY": "Wyoming",
    "NH": "New Hampshire",
    "MI": "Michigan",
    "NV": "Nevada",
    "WI": "Wisconsin",
    "ID": "Idaho",
    "CA": "California",
    "CT": "Connecticut",
    "NE": "Nebraska",
    "MT": "Montana",
    "NC": "North Carolina",
    "VT": "Vermont",
    "MD": "Maryland",
    "SC": "South Carolina",
    "AZ": "Arizona",
    "LA": "Louisiana",
    "MN": "Minnesota",
    "NJ": "New Jersey",
    "DC": "District of Columbia",
    "OR": "Oregon",
    "VA": "Virginia",
    "RI": "Rhode Island",
    "KY": "Kentucky",
    "WY": "Wyoming",
    "NH": "New Hampshire",
    "MI": "Michigan",
    "NV": "Nevada",
    "WI": "Wisconsin",
    "ID": "Idaho",
    "CA": "California",
    "CT": "Connecticut",
    "NE": "Nebraska",
    "MT": "Montana",
    "NC": "North Carolina",
    "VT": "Vermont",
    "MD": "Maryland",
    "DE": "Delaware",
    "MO": "Missouri",
    "IL": "Illinois",
    "ME": "Maine",
    "ND": "North Dakota",
    "WA": "Washington",
    "MS": "Mississippi",
    "AL": "Alabama",
    "IN": "Indiana",
    "OH": "Ohio",
    "TN": "Tennessee",
    "IA": "Iowa",
    "NM": "New Mexico",
    "PA": "Pennsylvania",
    "SD": "South Dakota",
    "NY": "New York",
    "TX": "Texas",
    "WV": "West Virginia",
    "GA": "Georgia",
    "MA": "Massachusetts",
    "KS": "Kansas",
    "FL": "Florida",
    "CO": "Colorado",
    "AK": "Alaska",
    "AR": "Arkansas",
    "OK": "Oklahoma",
    "UT": "Utah",
    "HI": "Hawaii"
}

# Use a UDF to map the industry codes to names and create a new 'Industry' column
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

state_udf = udf(lambda code: state_abbreviations.get(code), StringType())
df_copy = df_copy.withColumn("State", state_udf(col("State")))


In [49]:
# State unique values
unique_values = df_copy.select("State").distinct()

# Show all unique values in the 'State' column
unique_values.show(unique_values.count(), truncate=False)

+--------------------+
|State               |
+--------------------+
|Utah                |
|Hawaii              |
|Minnesota           |
|Ohio                |
|Arkansas            |
|Oregon              |
|Texas               |
|North Dakota        |
|Pennsylvania        |
|Connecticut         |
|Vermont             |
|Nebraska            |
|Nevada              |
|Washington          |
|Illinois            |
|Oklahoma            |
|District of Columbia|
|Delaware            |
|Alaska              |
|New Mexico          |
|West Virginia       |
|Missouri            |
|Rhode Island        |
|Georgia             |
|Montana             |
|Virginia            |
|Michigan            |
|North Carolina      |
|Wyoming             |
|Kansas              |
|New Jersey          |
|Maryland            |
|Alabama             |
|Arizona             |
|Iowa                |
|Massachusetts       |
|Kentucky            |
|Louisiana           |
|Mississippi         |
|Tennessee           |
|New Hampsh

In [50]:
# Check for null values in each column
null_counts = df_copy.select([sum(col(column).isNull().cast("int")).alias(column) for column in df_copy.columns])

# Show the result
null_counts.show(vertical=True, truncate=False)

-RECORD 0-------------------
 LoanNr_ChkDgt        | 0   
 Name                 | 0   
 City                 | 0   
 State                | 0   
 Bank                 | 0   
 BankState            | 0   
 NAICS                | 0   
 ApprovalDate         | 0   
 ApprovalFY           | 0   
 Term                 | 0   
 NoEmp                | 0   
 CreateJob            | 0   
 RetainedJob          | 0   
 UrbanRural           | 0   
 RevLineCr            | 0   
 LowDoc               | 0   
 DisbursementDate     | 0   
 DisbursementGross    | 0   
 BalanceGross         | 0   
 ChgOffPrinGr         | 0   
 GrAppv               | 0   
 SBA_Appv             | 0   
 IsFranchise          | 0   
 NewBusiness          | 0   
 Default              | 0   
 DaysToDisbursement   | 0   
 DisbursementFY       | 0   
 StateSame            | 0   
 SBA_AppvPct          | 0   
 AppvDisbursed        | 0   
 RealEstate           | 0   
 GreatRecession       | 0   
 DisbursedGreaterAppv | 0   



In [51]:
# Create a dictionary of state abbreviations and full names
bankstate_abbreviations = {
    "SC": "South Carolina",
    "AZ": "Arizona",
    "LA": "Louisiana",
    "MN": "Minnesota",
    "NJ": "New Jersey",
    "DC": "District of Columbia",
    "OR": "Oregon",
    "VA": "Virginia",
    "RI": "Rhode Island",
    "KY": "Kentucky",
    "WY": "Wyoming",
    "NH": "New Hampshire",
    "MI": "Michigan",
    "NV": "Nevada",
    "WI": "Wisconsin",
    "ID": "Idaho",
    "CA": "California",
    "CT": "Connecticut",
    "NE": "Nebraska",
    "MT": "Montana",
    "NC": "North Carolina",
    "VT": "Vermont",
    "MD": "Maryland",
    "DE": "Delaware",
    "MO": "Missouri",
    "IL": "Illinois",
    "ME": "Maine",
    "GU": "Guam",
    "ND": "North Dakota",
    "WA": "Washington",
    "MS": "Mississippi",
    "AL": "Alabama",
    "IN": "Indiana",
    "OH": "Ohio",
    "TN": "Tennessee",
    "IA": "Iowa",
    "NM": "New Mexico",
    "PA": "Pennsylvania",
    "SD": "South Dakota",
    "NY": "New York",
    "TX": "Texas",
    "WV": "West Virginia",
    "GA": "Georgia",
    "MA": "Massachusetts",
    "KS": "Kansas",
    "FL": "Florida",
    "CO": "Colorado",
    "AK": "Alaska",
    "AR": "Arkansas",
    "OK": "Oklahoma",
    "PR": "Puerto Rico",
    "UT": "Utah",
    "HI": "Hawaii",
    "EN": "Unknown State"
}

# Use a UDF to map the industry codes to names and create a new 'Industry' column
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

bankstate_udf = udf(lambda code: bankstate_abbreviations.get(code), StringType())
df_copy = df_copy.withColumn("BankState", bankstate_udf(col("BankState")))


In [52]:
# State unique values
unique_values = df_copy.select("BankState").distinct()

# Show all unique values in the 'State' column
unique_values.show(unique_values.count(), truncate=False)

+--------------------+
|BankState           |
+--------------------+
|Utah                |
|Hawaii              |
|Minnesota           |
|Ohio                |
|Oregon              |
|Arkansas            |
|Texas               |
|North Dakota        |
|Pennsylvania        |
|Connecticut         |
|Nebraska            |
|Vermont             |
|Nevada              |
|Puerto Rico         |
|Washington          |
|Illinois            |
|Oklahoma            |
|District of Columbia|
|Delaware            |
|Alaska              |
|New Mexico          |
|West Virginia       |
|Missouri            |
|Rhode Island        |
|Georgia             |
|Montana             |
|Virginia            |
|Michigan            |
|Guam                |
|North Carolina      |
|Wyoming             |
|Kansas              |
|New Jersey          |
|Maryland            |
|Alabama             |
|Arizona             |
|Iowa                |
|Massachusetts       |
|Kentucky            |
|Louisiana           |
|Mississipp

In [53]:
# Remove records where State is an Unknown State
df_copy = df_copy.filter(df.BankState != "Unknown State")

In [54]:
# Remove records where NAICS code was a 0
df_copy = df_copy.filter(df.NAICS != 0)

In [55]:
# Display the first few rows
df_copy.show()

# Check the shape of the data as it stands
num_rows = df_copy.count()
num_columns = len(df_copy.columns)

# Print the shape
print("Number of rows:", num_rows)
print("Number of columns:", num_columns)

+-------------+--------------------+--------------+--------------+--------------------+--------------+------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+
|LoanNr_ChkDgt|                Name|          City|         State|                Bank|     BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|
+-------------+--------------------+--------------+--------------+--------------------+--------------+------+------------+----------+----+-----+---------+

In [56]:
# Copy for SQL
df_sql = df_copy

In [57]:
# Copy for PowerBI
df_powerbi = df_copy

In [58]:
from pyspark.sql.functions import col, substring
from pyspark.sql.types import StringType

# Create a new column 'Industry' with the first two characters of 'NAICS' as a StringType
df_powerbi = df_powerbi.withColumn("Industry", substring(col("NAICS").cast(StringType()), 1, 2))

# Define a mapping of industry codes to industry names
industry_mapping = {
    '11': 'Ag/For/Fish/Hunt',
    '21': 'Min/Quar/Oil_Gas_ext',
    '22': 'Utilities',
    '23': 'Construction',
    '31': 'Manufacturing',
    '32': 'Manufacturing',
    '33': 'Manufacturing',
    '42': 'Wholesale_trade',
    '44': 'Retail_trade',
    '45': 'Retail_trade',
    '48': 'Trans/Ware',
    '49': 'Trans/Ware',
    '51': 'Information',
    '52': 'Finance/Insurance',
    '53': 'RE/Rental/Lease',
    '54': 'Prof/Science/Tech',
    '55': 'Mgmt_comp',
    '56': 'Admin_sup/Waste_Mgmt_Rem',
    '61': 'Educational',
    '62': 'Healthcare/Social_assist',
    '71': 'Arts/Entertain/Rec',
    '72': 'Accom/Food_serv',
    '81': 'Other_no_pub',
    '92': 'Public_Admin'
}

# Use a UDF to map the industry codes to names and create a new 'Industry' column
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

industry_udf = udf(lambda code: industry_mapping.get(code), StringType())
df_powerbi = df_powerbi.withColumn("Industry", industry_udf(col("Industry")))

# List of columns to drop
columns_to_drop = ['NAICS']

# Drop the specified columns in the PySpark DataFrame
df_powerbi = df_powerbi.drop(*columns_to_drop)

# Show the resulting DataFrame
df_powerbi.show()

+-------------+--------------------+--------------+--------------+--------------------+--------------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+--------------------+
|LoanNr_ChkDgt|                Name|          City|         State|                Bank|     BankState|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|            Industry|
+-------------+--------------------+--------------+--------------+--------------------+--------------+------------+----------+

In [59]:
columns_to_convert = ['StateSame', 'IsFranchise', 'NewBusiness','RealEstate','GreatRecession','DisbursedGreaterAppv', 'Default','AppvDisbursed','RevLineCr','LowDoc' ]

# Use the 'when' function to perform the conversion
for column in columns_to_convert:
    df_powerbi = df_powerbi.withColumn(column, when(df_powerbi[column] == 1, "Yes").otherwise("No"))


In [60]:
columns_to_convert2 = ['UrbanRural']

# Use the 'when' function to perform the conversion
for column in columns_to_convert2:
        df_powerbi = df_powerbi.withColumn(column, when(df_powerbi[column] == 1, "Urban").when(df_powerbi[column] == 2, "Rural").otherwise("Undefined"))

In [61]:
# Show the resulting DataFrame
df_powerbi.show()

+-------------+--------------------+--------------+--------------+--------------------+--------------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+--------------------+
|LoanNr_ChkDgt|                Name|          City|         State|                Bank|     BankState|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|            Industry|
+-------------+--------------------+--------------+--------------+--------------------+--------------+------------+----------+

In [62]:
# Rename a column
df_powerbi = df_powerbi.withColumnRenamed("CreateJob", "JobsCreated")
# Rename a column
df_powerbi = df_powerbi.withColumnRenamed("RetainedJob", "JobsRetained")
# Rename a column
df_powerbi = df_powerbi.withColumnRenamed("UrbanRural", "SettlementType")
# Rename a column
df_powerbi = df_powerbi.withColumnRenamed("ChgOffPrinGr", "Charged-offAmount")

In [63]:
# Define the values to filter
values_to_filter = ["1127285007", "3645874010", "4480523009"]

# Filter out the rows with the specified values
df_powerbi = df_powerbi.filter(~col("LoanNr_ChkDgt").isin(values_to_filter))


In [64]:
# Show the resulting DataFrame
df_powerbi.show()
df_powerbi.printSchema()

+-------------+--------------------+--------------+--------------+--------------------+--------------+------------+----------+----+-----+-----------+------------+--------------+---------+------+----------------+-----------------+------------+-----------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+--------------------+
|LoanNr_ChkDgt|                Name|          City|         State|                Bank|     BankState|ApprovalDate|ApprovalFY|Term|NoEmp|JobsCreated|JobsRetained|SettlementType|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|Charged-offAmount|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|            Industry|
+-------------+--------------------+--------------+--------------+--------------------+--------------+

In [None]:
output_path = "/content/drive/MyDrive/pj notebooks/dataset/Processed_sbaloan.csv"

# Repartition the DataFrame to have a single partition
df_powerbi = df_powerbi.repartition(1)

# Export the DataFrame to a CSV file
df_powerbi.write.format("csv").option("header", "true").mode("overwrite").save(output_path)

In [65]:
# Create a new column 'Industry' with the first two characters of 'NAICS' as a StringType
df_sql = df_sql.withColumn("Industry code", substring(col("NAICS").cast(StringType()), 1, 2))


In [67]:
# List of columns to drop
columns_to_drop = ['NAICS']

# Drop the specified columns in the PySpark DataFrame
df_sql = df_sql.drop(*columns_to_drop)

In [66]:
# Show the resulting DataFrame
df_sql.show()
df_sql.printSchema()

+-------------+--------------------+--------------+--------------+--------------------+--------------+------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+-------------+
|LoanNr_ChkDgt|                Name|          City|         State|                Bank|     BankState| NAICS|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|Industry code|
+-------------+--------------------+--------------+--------------+--------------------+--------------+------+------------+----

In [68]:
# Load the industry type loan data
df_industry_type = spark.read.csv("/content/drive/MyDrive/Credit Analysis/industry_type.csv", header=True, inferSchema=True)

In [69]:
# Show the resulting DataFrame
df_industry_type.show()
df_industry_type.printSchema()

+-------------+--------------------+
|Industry_Code|            Industry|
+-------------+--------------------+
|           11|Agriculture/Fores...|
|           21|Mining/Quarrying/...|
|           22|           Utilities|
|           23|        Construction|
|           31|       Manufacturing|
|           32|       Manufacturing|
|           33|       Manufacturing|
|           42|     Wholesale Trade|
|           44|        Retail Trade|
|           45|        Retail Trade|
|           48|Transportation/Wa...|
|           49|Transportation/Wa...|
|           51|         Information|
|           52|Finance and Insur...|
|           53|Real Estate/Renta...|
|           54|Professional/Scie...|
|           55|Management of Com...|
|           56|Administrative/Su...|
|           61|Educational Services|
|           62|Healthcare and So...|
+-------------+--------------------+
only showing top 20 rows

root
 |-- Industry_Code: integer (nullable = true)
 |-- Industry: string (nullable = t

In [70]:
df_sql.createOrReplaceTempView("table1")
df_industry_type.createOrReplaceTempView("table2")

# Perform the SQL join operation
result = spark.sql("""
    SELECT *
    FROM table1
    JOIN table2
    ON table1.`Industry code` = table2.Industry_Code
""")

# Show the resulting DataFrame
result.show()

+-------------+--------------------+--------------+--------------+--------------------+--------------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+--------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+-------------+-------------+--------------------+
|LoanNr_ChkDgt|                Name|          City|         State|                Bank|     BankState|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|  GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|Industry code|Industry_Code|            Industry|
+-------------+--------------------+--------------+--------------+----

###**Find the total loan amount approved by each bank.**

In [73]:
df_sql.createOrReplaceTempView("table1")
df_industry_type.createOrReplaceTempView("table2")
result = spark.sql("""
    SELECT Bank, SUM(SBA_Appv) AS TotalApprovedAmount
    FROM table1
    GROUP BY Bank
""")

# Show the resulting DataFrame
result.show()

+--------------------+-------------------+
|                Bank|TotalApprovedAmount|
+--------------------+-------------------+
|MANUFACTURERS & T...|       5.59067973E8|
|IOWA ST. BK & TR ...|           294500.0|
|BANK OF THE SOUTH...|          7007527.0|
|        VOYAGER BANK|        1.3516117E7|
|GREAT LAKES ASSET...|           1.7536E7|
|BRATTLEBORO S & L...|           959550.0|
|FIRST STATE BANK ...|          4407200.0|
|BK OF TUSCALOOSA ...|          1186750.0|
|FIRST COMMUN BK O...|          3768760.0|
|            NXT BANK|          1806385.0|
| NORTHERN BK & TR CO|          1295200.0|
|UNITED BANKERS' BANK|          1425000.0|
|LOUISIANA BUSINES...|          6741000.0|
|            ROYAL CU|           407750.0|
|        BANK OF HAYS|          5821413.0|
|  THE COMMUNITY BANK|          2008375.0|
|CARVER FEDERAL SA...|          6715000.0|
|WEST VIRGINIA CER...|           1.0471E7|
|MILWAUKEE ECONOMI...|           1.0632E7|
|     PROVIDENCE BANK|          2524200.0|
+----------

###**List the states with the highest and lowest average loan approval percentages..**

In [76]:
result = spark.sql("""
SELECT *
FROM (
    SELECT State, AVG(SBA_AppvPct) AS AvgApprovalPercentage
    FROM table1
    GROUP BY State
    ORDER BY AvgApprovalPercentage DESC LIMIT 1
) AS HighestAvg

UNION

SELECT *
FROM (
    SELECT State, AVG(SBA_AppvPct) AS AvgApprovalPercentage
    FROM table1
    GROUP BY State
    ORDER BY AvgApprovalPercentage ASC LIMIT 1
) AS LowestAvg

""")

# Show the resulting DataFrame
result.show()

+----------+---------------------+
|     State|AvgApprovalPercentage|
+----------+---------------------+
|   Alabama|   0.7988787586980274|
|New Jersey|   0.5791424841366196|
+----------+---------------------+



###**Find the top 10 industries with the highest average loan approval percentages.**

In [77]:
result = spark.sql("""
SELECT Industry, AVG(SBA_AppvPct) AS AvgApprovalPercentage
FROM table1
JOIN table2 ON table1.`Industry code` = table2.Industry_Code
GROUP BY Industry
ORDER BY AvgApprovalPercentage DESC LIMIT 10
""")

# Show the resulting DataFrame
result.show()

+--------------------+---------------------+
|            Industry|AvgApprovalPercentage|
+--------------------+---------------------+
|Agriculture/Fores...|   0.7714523362206839|
|Public Administra...|   0.7241498944901641|
|Mining/Quarrying/...|   0.7232823891476645|
|       Manufacturing|   0.6976164353976838|
|Healthcare and So...|   0.6963029612517334|
|Arts/Entertainmen...|   0.6861196412023837|
|Management of Com...|   0.6840707964601771|
|        Retail Trade|   0.6681262627850917|
|Other Services (e...|   0.6669841388480727|
|Accommodation/Foo...|   0.6666885910598301|
+--------------------+---------------------+



###**Identify the cities with the most loans for new businesses.**

In [78]:
result = spark.sql("""
SELECT City, COUNT(*) AS NewBusinessLoans
FROM table1
WHERE NewBusiness = 1
GROUP BY City
ORDER BY NewBusinessLoans DESC
""")

# Show the resulting DataFrame
result.show()

+--------------+----------------+
|          City|NewBusinessLoans|
+--------------+----------------+
|       CHICAGO|             977|
|      NEW YORK|             930|
|       HOUSTON|             920|
|   LOS ANGELES|             848|
|      BROOKLYN|             744|
|         MIAMI|             528|
|     LAS VEGAS|             498|
|SALT LAKE CITY|             464|
|   SPRINGFIELD|             415|
|        DALLAS|             403|
|  INDIANAPOLIS|             391|
|  PHILADELPHIA|             383|
|      COLUMBUS|             374|
|       ATLANTA|             361|
|     ROCHESTER|             359|
|        AUSTIN|             358|
|       SEATTLE|             358|
|     SAN DIEGO|             358|
|      PORTLAND|             354|
|   SAN ANTONIO|             340|
+--------------+----------------+
only showing top 20 rows



###**Calculate the average loan amount and approval percentage for each industry.**

In [79]:
result = spark.sql("""
SELECT Industry, AVG(SBA_Appv) AS AvgLoanAmount, AVG(SBA_AppvPct) AS AvgApprovalPercentage
FROM table1
JOIN table2 ON table1.`Industry code` = table2.Industry_Code
GROUP BY Industry
""")

# Show the resulting DataFrame
result.show()

+--------------------+------------------+---------------------+
|            Industry|     AvgLoanAmount|AvgApprovalPercentage|
+--------------------+------------------+---------------------+
|Public Administra...| 63481.23888888889|   0.7241498944901641|
|Real Estate/Renta...| 79859.93679163035|    0.604887111290735|
|         Information| 90452.96610890527|   0.6273506215274965|
|Management of Com...|200507.21238938053|   0.6840707964601771|
|Transportation/Wa...| 68487.83116433615|   0.5883188332727214|
|Accommodation/Foo...| 160162.0923353034|   0.6666885910598301|
|        Construction| 79465.20900820474|   0.6175023934299787|
|Finance and Insur...| 60508.68158630903|    0.576864703186507|
|           Utilities|108458.05326876513|   0.6619810122431679|
|Administrative/Su...|55400.353851233434|   0.5894514205269253|
|Other Services (e...| 97903.76066884102|   0.6669841388480727|
|Mining/Quarrying/...|       191614.1544|   0.7232823891476645|
|     Wholesale Trade|154994.59240651043

###**Find the loan with the highest balance amount.**

In [80]:
result = spark.sql("""
SELECT *
FROM table1
WHERE BalanceGross = (SELECT MAX(BalanceGross) FROM table1)
""")

# Show the resulting DataFrame
result.show()

+-------------+--------------------+----------+-----+------------------+---------+------------+----------+----+-----+---------+-----------+----------+---------+------+----------------+-----------------+------------+------------+---------+--------+-----------+-----------+-------+------------------+--------------+---------+-----------+-------------+----------+--------------+--------------------+-------------+
|LoanNr_ChkDgt|                Name|      City|State|              Bank|BankState|ApprovalDate|ApprovalFY|Term|NoEmp|CreateJob|RetainedJob|UrbanRural|RevLineCr|LowDoc|DisbursementDate|DisbursementGross|BalanceGross|ChgOffPrinGr|   GrAppv|SBA_Appv|IsFranchise|NewBusiness|Default|DaysToDisbursement|DisbursementFY|StateSame|SBA_AppvPct|AppvDisbursed|RealEstate|GreatRecession|DisbursedGreaterAppv|Industry code|
+-------------+--------------------+----------+-----+------------------+---------+------------+----------+----+-----+---------+-----------+----------+---------+------+-----------

###**Determine the number of loans approved in each year.**

In [83]:
result = spark.sql("""
SELECT ApprovalFY, COUNT(*) AS NumLoansApproved
FROM table1
GROUP BY ApprovalFY
ORDER BY NumLoansApproved DESC
""")

# Show the resulting DataFrame
result.show()

+----------+----------------+
|ApprovalFY|NumLoansApproved|
+----------+----------------+
|      2007|           56999|
|      2006|           55706|
|      2005|           47455|
|      2004|           33076|
|      2008|           29464|
|      2003|           28228|
|      2002|           18945|
|      1995|           16342|
|      2009|           16230|
|      2001|           15651|
|      2010|           14743|
|      1994|           13780|
|      1996|           12786|
|      1997|           10620|
|      1993|           10322|
|      1992|            9433|
|      1999|            8760|
|      2000|            8588|
|      1998|            7907|
|      1991|            6955|
+----------+----------------+
only showing top 20 rows



###**List the banks that have approved loans for franchises and have the highest average loan amounts.**

In [84]:
result = spark.sql("""
SELECT Bank, AVG(SBA_Appv) AS AvgLoanAmount
FROM table1
WHERE IsFranchise = 1
GROUP BY Bank
ORDER BY AvgLoanAmount DESC
""")

# Show the resulting DataFrame
result.show()

+--------------------+------------------+
|                Bank|     AvgLoanAmount|
+--------------------+------------------+
| UNITED PACIFIC BANK|         4500000.0|
|HANA SMALL BUS. L...|2313498.6666666665|
|SAN FERNANDO VALL...|         1543000.0|
|HERITAGE BANK USA...|         1500000.0|
|FIRST NATIONAL BA...|         1500000.0|
|SEACOAST COMMERCE...|         1497600.0|
|     TRI-VALLEY BANK|         1424977.5|
|SOUTHEAST KENTUCK...|         1341000.0|
|           OPEN BANK|         1308750.0|
|MO-KAN DEVELOPMEN...|         1300000.0|
|CALIFORNIA COASTA...|         1298000.0|
|THREE RIVERS LOCA...|         1297000.0|
|     RIDGESTONE BANK|         1284975.0|
|    CONTINENTAL BANK|1283333.3333333333|
|      BLACKRIDGEBANK|         1259775.0|
|LOS ANGELES CNTY ...|         1229000.0|
|THE CORNER STONE ...|         1222500.0|
|TENNESSEE BUS. DE...|         1210000.0|
|1ST NATL BK OF CARMI|         1200000.0|
|GEORGIA CERT. DEV...|1172411.7647058824|
+--------------------+------------

###**Find the industry with the most loans for businesses in urban areas.**

In [85]:
result = spark.sql("""
SELECT Industry, COUNT(*) AS NumUrbanLoans
FROM table1
JOIN table2 ON table1.`Industry code` = table2.Industry_Code
WHERE UrbanRural = '1'
GROUP BY Industry
ORDER BY NumUrbanLoans DESC LIMIT 1
""")

# Show the resulting DataFrame
result.show()

+------------+-------------+
|    Industry|NumUrbanLoans|
+------------+-------------+
|Retail Trade|        45703|
+------------+-------------+



###**Calculate the total loan amount approved for each state in 1997 and 2006.**

In [86]:
result = spark.sql("""
SELECT State, ApprovalFY, SUM(SBA_Appv) AS TotalApprovedAmount
FROM table1
WHERE ApprovalFY IN (1997, 2006)
GROUP BY State, ApprovalFY
ORDER BY State, ApprovalFY
""")

# Show the resulting DataFrame
result.show()

+--------------------+----------+-------------------+
|               State|ApprovalFY|TotalApprovedAmount|
+--------------------+----------+-------------------+
|             Alabama|      1997|        2.1288803E7|
|             Alabama|      2006|        2.3561161E7|
|              Alaska|      1997|          5181588.0|
|              Alaska|      2006|          4675635.0|
|             Arizona|      1997|        1.4026428E7|
|             Arizona|      2006|        5.7089376E7|
|            Arkansas|      1997|        1.4555097E7|
|            Arkansas|      2006|        2.1272501E7|
|          California|      1997|       2.42874913E8|
|          California|      2006|       3.39108488E8|
|            Colorado|      1997|        3.1999033E7|
|            Colorado|      2006|        7.5072091E7|
|         Connecticut|      1997|        2.1229228E7|
|         Connecticut|      2006|        4.2544075E7|
|            Delaware|      1997|          2571465.0|
|            Delaware|      

###**Find the banks with the highest and lowest approval rates (SBA_AppvPct) for loans, considering only banks with a minimum of 100 loan approvals.**

In [87]:
result = spark.sql("""
WITH BankApprovalStats AS (
  SELECT
    Bank,
    COUNT(*) AS TotalApprovals,
    SUM(SBA_Appv) AS TotalApprovedAmount,
    AVG(SBA_AppvPct) AS AvgApprovalRate
  FROM
    table1
  GROUP BY
    Bank
  HAVING
    TotalApprovals >= 100
)
SELECT
  Bank,
  TotalApprovals,
  TotalApprovedAmount,
  AvgApprovalRate
FROM
  BankApprovalStats
WHERE
  AvgApprovalRate = (
    SELECT MAX(AvgApprovalRate) FROM BankApprovalStats
  )
  OR
  AvgApprovalRate = (
    SELECT MIN(AvgApprovalRate) FROM BankApprovalStats
  )
""")

# Show the resulting DataFrame
result.show()

+--------------------+--------------+-------------------+---------------+
|                Bank|TotalApprovals|TotalApprovedAmount|AvgApprovalRate|
+--------------------+--------------+-------------------+---------------+
|CERTIFIED DEVEL C...|           109|           5.0538E7|            1.0|
|COMMUNITY CAP. DE...|           258|           6.8732E7|            1.0|
|CAPITAL CERT. DEV...|           165|            8.456E7|            1.0|
|HOUSTON-GALVESTON...|           112|           5.1014E7|            1.0|
|CENTRAL MINNESOTA...|           126|           4.1292E7|            1.0|
|SBA - EDF ENFORCE...|           187|           8.1827E7|            1.0|
|DENVER URBAN ECON...|           129|           6.3739E7|            1.0|
|CAPITAL MATRIX, INC.|           282|           8.2514E7|            1.0|
|EMPIRE ST. CERT. ...|           601|       2.49059275E8|            1.0|
|CAPITAL ACCESS GR...|           191|          1.03457E8|            1.0|
|CEN CAL BUSINESS ...|           158| 

###**Find the top 5 states with the highest average loan approval percentages for new businesses in the "Construction" industry. Include only loans approved after 2000 and consider only banks with more than 500 approvals. Additionally, calculate the total number of loans and the total approved amount for each state in this category.**

This query calculates the approval statistics for each bank, including the total number of approvals, the total approved amount, and the average approval rate (SBA_AppvPct). The BankApprovalStats common table expression (CTE) filters out banks with less than 100 loan approvals.

The final result lists the banks with the highest and lowest average approval rates, considering only those banks with a minimum of 100 loan approvals. This query can help identify banks that are particularly good or poor at approving loans relative to their peers.

In [89]:
result = spark.sql("""
WITH FilteredLoans AS (
  SELECT
    d1.State,
    d1.SBA_AppvPct,
    d1.SBA_Appv,
    d1.Bank,
    d2.Industry
  FROM
    table1 d1
  JOIN
    table2 d2
  ON
    d1.`Industry code` = d2.Industry_Code
  WHERE
    d1.ApprovalDate >= '2000-01-01'
    AND d1.NewBusiness = 1
    AND d2.Industry = 'Construction'
),
BankApprovalStats AS (
  SELECT
    Bank,
    COUNT(*) AS TotalApprovals,
    SUM(SBA_Appv) AS TotalApprovedAmount
  FROM
    FilteredLoans
  GROUP BY
    Bank
  HAVING
    TotalApprovals > 500
)
SELECT
  State,
  AVG(SBA_AppvPct) AS AvgApprovalRate,
  COUNT(*) AS NumLoans,
  SUM(SBA_Appv) AS TotalApprovedAmount
FROM
  FilteredLoans
WHERE
  Bank IN (SELECT Bank FROM BankApprovalStats)
GROUP BY
  State
ORDER BY
  AvgApprovalRate DESC
LIMIT
  5

""")

# Show the resulting DataFrame
result.show()

+------------+------------------+--------+-------------------+
|       State|   AvgApprovalRate|NumLoans|TotalApprovedAmount|
+------------+------------------+--------+-------------------+
|        Utah|0.5388888888888889|       9|           263300.0|
|Pennsylvania|0.5178571428571429|      14|           769900.0|
|    Colorado|0.5178571428571429|      28|           916800.0|
|        Ohio|0.5094339622641509|      53|          1780100.0|
|       Texas|0.5076021880166994|     171|          3096646.0|
+------------+------------------+--------+-------------------+



This query combines various SQL concepts, including filtering, joining, subqueries, aggregation, and ranking. Here's a breakdown of what it does:

The FilteredLoans common table expression (CTE) filters loans based on specific criteria: loans for new businesses in the "Construction" industry approved after 2000.

The BankApprovalStats CTE calculates approval statistics for each bank, including the total number of approvals and the total approved amount, considering only banks with more than 500 approvals.

The main query calculates the average approval rate, total number of loans, and total approved amount for each state in the "Construction" industry, considering only banks with more than 500 approvals. It then ranks the states based on the average approval rate and limits the result to the top 5 states.

This query provides insights into the states with the highest average approval rates for new businesses in the "Construction" industry, considering specific criteria and bank thresholds.