# import

In [1]:
import pyspark
import numpy as np
import pandas as pd 

# session

In [2]:
from pyspark.sql import SparkSession

# creating a session

In [3]:
spark = SparkSession.builder.appName('SBA').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/30 22:49:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/30 22:49:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# data

In [4]:
#                                        #infer schema prevent all values automatically taking 'string' values
df = spark.read.option('header','true').csv('SBAnational.csv', inferSchema= True)

df.printSchema()

[Stage 1:>                                                          (0 + 4) / 4]

root
 |-- LoanNr_ChkDgt: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Bank: string (nullable = true)
 |-- BankState: string (nullable = true)
 |-- NAICS: string (nullable = true)
 |-- ApprovalDate: string (nullable = true)
 |-- ApprovalFY: string (nullable = true)
 |-- Term: integer (nullable = true)
 |-- NoEmp: integer (nullable = true)
 |-- NewExist: integer (nullable = true)
 |-- CreateJob: integer (nullable = true)
 |-- RetainedJob: integer (nullable = true)
 |-- FranchiseCode: integer (nullable = true)
 |-- UrbanRural: integer (nullable = true)
 |-- RevLineCr: string (nullable = true)
 |-- LowDoc: string (nullable = true)
 |-- ChgOffDate: string (nullable = true)
 |-- DisbursementDate: string (nullable = true)
 |-- DisbursementGross: string (nullable = true)
 |-- BalanceGross: string (nullable = true)
 |-- MIS_Status: string (nullable = true)
 |-- ChgOffPr



In [5]:
# change col names
from pyspark.sql.functions import col

# Define the function to convert column names to snake case
def camel_to_snake(column_name):
    """Converts a camel case string to snake case"""
    snake_name = ''
    for i, char in enumerate(column_name):
        if i == 0:
            snake_name += char.lower()
        elif char.isupper():
            snake_name += '_' + char.lower()
        else:
            snake_name += char
    return snake_name

# Loop through all column names in the DataFrame and rename them to snake case
for column in df.columns:
    new_column_name = camel_to_snake(column)
    df = df.withColumnRenamed(column, new_column_name)


In [6]:
# Convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = df.toPandas()

# Display the null value counts for each column in the Pandas DataFrame
null_counts = pandas_df.isnull().sum()
print(null_counts)

23/04/30 22:49:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

loan_nr__chk_dgt           0
name                       3
city                      30
state                     14
zip                        0
bank                    1559
bank_state              1566
n_a_i_c_s                  0
approval_date              0
approval_f_y               0
term                       0
no_emp                     0
new_exist                136
create_job                 0
retained_job               0
franchise_code             0
urban_rural                0
rev_line_cr             4528
low_doc                 2582
chg_off_date          736402
disbursement_date       2431
disbursement_gross         0
balance_gross              0
m_i_s__status           1997
chg_off_prin_gr            0
gr_appv                    0
s_b_a__appv                0
dtype: int64


In [7]:
# Drop the all columns that i didn't take into modeling.

# Select only the desired columns
df = df.select(['zip',
                 'term',
                 'no_emp',
                 'new_exist',
                 'create_job',
                 'retained_job',
                 'franchise_code',
                 'urban_rural',
                 'rev_line_cr',
                 'low_doc',
                 'disbursement_gross',
                 'balance_gross',
                 'chg_off_prin_gr',
                 'gr_appv',
                 'state',
                 's_b_a__appv',
                 'm_i_s__status'])


In [8]:
df.printSchema()

root
 |-- zip: string (nullable = true)
 |-- term: integer (nullable = true)
 |-- no_emp: integer (nullable = true)
 |-- new_exist: integer (nullable = true)
 |-- create_job: integer (nullable = true)
 |-- retained_job: integer (nullable = true)
 |-- franchise_code: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- rev_line_cr: string (nullable = true)
 |-- low_doc: string (nullable = true)
 |-- disbursement_gross: string (nullable = true)
 |-- balance_gross: string (nullable = true)
 |-- chg_off_prin_gr: string (nullable = true)
 |-- gr_appv: string (nullable = true)
 |-- state: string (nullable = true)
 |-- s_b_a__appv: string (nullable = true)
 |-- m_i_s__status: string (nullable = true)



# drop subsets

In [9]:
# Drop rows with null values in the "City" column
df = df.dropna(subset=['new_exist','rev_line_cr', 'low_doc','state','m_i_s__status'])

In [10]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Define a UDF for the convert_price_string function
udf_convert_price_string = udf(lambda s: int(float(re.sub(r'[^\d\.]', '', s))), IntegerType())

# Apply the UDF to the 'DisbursementGross' column
df = df.withColumn('disbursement_gross', udf_convert_price_string(df['disbursement_gross']))

# Apply the UDF to the 'BalanceGross' column
df = df.withColumn('balance_gross', udf_convert_price_string(df['balance_gross']))

# Apply the UDF to the 'ChgOffPrinGr' column
df = df.withColumn('chg_off_prin_gr', udf_convert_price_string(df['chg_off_prin_gr']))

# Apply the UDF to the 'GrAppv' column
df = df.withColumn('gr_appv', udf_convert_price_string(df['gr_appv']))

# Apply the UDF to the 'SBA_Appv' column
df = df.withColumn('s_b_a__appv', udf_convert_price_string(df['s_b_a__appv']))


In [11]:
df.printSchema()

root
 |-- zip: string (nullable = true)
 |-- term: integer (nullable = true)
 |-- no_emp: integer (nullable = true)
 |-- new_exist: integer (nullable = true)
 |-- create_job: integer (nullable = true)
 |-- retained_job: integer (nullable = true)
 |-- franchise_code: integer (nullable = true)
 |-- urban_rural: integer (nullable = true)
 |-- rev_line_cr: string (nullable = true)
 |-- low_doc: string (nullable = true)
 |-- disbursement_gross: integer (nullable = true)
 |-- balance_gross: integer (nullable = true)
 |-- chg_off_prin_gr: integer (nullable = true)
 |-- gr_appv: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- s_b_a__appv: integer (nullable = true)
 |-- m_i_s__status: string (nullable = true)



In [None]:
pandas_df.to