In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=f3d221f3283c3f98eabd4c74a7f513ef46a811add3e1593fd911759a2a2482a8
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import DecisionTreeRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType, StringType, FloatType
from pyspark.sql.functions import col, when
from pyspark.sql.functions import struct
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.types import NumericType
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import to_date, month, year
import pickle
import time

In [None]:
spark = SparkSession.builder \
    .appName("Bank Project") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
    .getOrCreate()

In [None]:
test = spark.read.csv("/content/drive/MyDrive/Bank Project/test", header=True, inferSchema=True)

In [None]:
with open('/content/drive/MyDrive/Bank Project/columns_.pkl', 'rb') as file:
    final_cols = pickle.load(file)

In [None]:
test = test.select(*final_cols)

In [None]:
def drop_rows(data):
  def null_percentage(row):
    null_count = sum([1 for val in row if val is None])
    total_count = len(row)
    return (null_count / total_count) * 100

  null_percentage_udf = udf(null_percentage, FloatType())

  columns = data.columns
  df = data.withColumn("null_percentage", null_percentage_udf(struct(*columns)))

  filtered_df = df.filter(col("null_percentage") <= 80)

  filtered_df = filtered_df.drop("null_percentage")
  return filtered_df

In [None]:
def process_term(data):
    data = data.withColumn('term', regexp_extract(col('term'), r'(\d+)', 1))
    return data

In [None]:
def process_int_rate(data):
    data = data.withColumn('int_rate', regexp_replace(col('int_rate'), '%', ''))
    return data

In [None]:
def process_revol_util(data):
    data = data.withColumn('revol_util', regexp_replace(col('int_rate'), '%', ''))
    return data

In [None]:
def process_emp_length(data):
    data = data.withColumn('emp_length', regexp_replace(col('emp_length'), r'\+', ''))

    data = data.withColumn('emp_length', regexp_extract(col('emp_length'), r'(\d+)', 1))

    data = data.withColumn('emp_length', col('emp_length').cast('int'))

    return data

In [None]:
def process_grade(data):
    def map_sub_grade(sub_grade):
        mapping = {
            'A1': 0, 'A2': 1, 'A3': 2, 'A4': 3, 'A5': 4,
            'B1': 5, 'B2': 6, 'B3': 7, 'B4': 8, 'B5': 9,
            'C1': 10, 'C2': 11, 'C3': 12, 'C4': 13, 'C5': 14,
            'D1': 15, 'D2': 16, 'D3': 17, 'D4': 18, 'D5': 19,
            'E1': 20, 'E2': 21, 'E3': 22, 'E4': 23, 'E5': 24,
            'F1': 25, 'F2': 26, 'F3': 27, 'F4': 28, 'F5': 29,
            'G1': 30, 'G2': 31, 'G3': 32, 'G4': 33, 'G5': 34
        }
        return mapping.get(sub_grade, -1)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('sub_grade', map_sub_grade_udf(col('sub_grade')))
    return df_encoded

In [None]:
def process_emp_length(data):
    data = data.withColumn('emp_length', regexp_replace(col('emp_length'), r'\+', ''))

    data = data.withColumn('emp_length', regexp_extract(col('emp_length'), r'(\d+)', 1))

    data = data.withColumn('emp_length', col('emp_length').cast('int'))

    return data

In [None]:
def process_emp_title(data):
  def categorize_profession(title):
      if not title:
          return 'Unknown'
      title = title.lower()
      if any(keyword in title for keyword in ['manager', 'director', 'vp', 'executive', 'head', 'chief', 'president', 'supervisor', 'coordinator']):
          return 'Management and Executive Roles'
      elif any(keyword in title for keyword in ['assistant', 'coordinator', 'clerk', 'office', 'receptionist', 'secretary', 'administrator', 'data entry', 'support', 'specialist', 'scheduler']):
          return 'Administrative and Support Roles'
      elif any(keyword in title for keyword in ['engineer', 'technician', 'developer', 'it', 'analyst', 'architect', 'programmer', 'consultant', 'tech', 'network', 'systems', 'software', 'hardware']):
          return 'Technical and Engineering Roles'
      elif any(keyword in title for keyword in ['nurse', 'therapist', 'counselor', 'healthcare', 'clinician', 'practitioner', 'physician', 'medical', 'pharmacist', 'social worker', 'aide', 'caregiver']):
          return 'Healthcare and Social Services Roles'
      elif any(keyword in title for keyword in ['sales', 'customer service', 'representative', 'associate', 'agent', 'account manager', 'client service', 'business development', 'account executive']):
          return 'Sales and Customer Service Roles'
      elif any(keyword in title for keyword in ['teacher', 'instructor', 'professor', 'lecturer', 'tutor', 'educator', 'trainer', 'academic advisor']):
          return 'Education and Training Roles'
      elif any(keyword in title for keyword in ['technician', 'mechanic', 'electrician', 'plumber', 'carpenter', 'welder', 'machinist', 'laborer', 'foreman', 'operator']):
          return 'Skilled Trades and Labor'
      elif any(keyword in title for keyword in ['designer', 'artist', 'graphic designer', 'creative director', 'art director', 'web designer', 'illustrator', 'photographer', 'stylist']):
          return 'Creative and Design Roles'
      elif any(keyword in title for keyword in ['accountant', 'auditor', 'financial analyst', 'controller', 'bookkeeper', 'tax preparer', 'finance manager', 'investment analyst']):
          return 'Finance and Accounting Roles'
      elif any(keyword in title for keyword in ['attorney', 'lawyer', 'paralegal', 'legal assistant', 'compliance officer', 'legal advisor', 'legal secretary']):
          return 'Legal and Compliance Roles'
      else:
          return 'Unknown'

  categorize_profession_udf = udf(categorize_profession, StringType())

  df_categorized = data.withColumn('emp_title', categorize_profession_udf(col('emp_title')))

  indexer = StringIndexer(inputCol='emp_title', outputCol='emp_title_index')
  df_indexed = indexer.fit(df_categorized).transform(df_categorized)

  encoder = OneHotEncoder(inputCols=['emp_title_index'], outputCols=['emp_title_encoded'])
  df_encoded = encoder.fit(df_indexed).transform(df_indexed)
  df_encoded = df_encoded.drop('emp_title_index')
  df_encoded = df_encoded.drop('emp_title')
  return df_encoded

In [None]:
def process_home_ownership(data):
    def map_sub_grade(value):
        mapping = {
            'RENT': 0,
            'MORTGAGE': 1,
            'OWN': 2,
        }
        return mapping.get(value, 3)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('home_ownership', map_sub_grade_udf(col('home_ownership')))

    one_hot_encoder = OneHotEncoder(inputCols=['home_ownership'], outputCols=['home_ownership_encoded'])
    df_encoded = one_hot_encoder.fit(df_encoded).transform(df_encoded)
    df_encoded = df_encoded.drop('home_ownership')
    return df_encoded

In [None]:
def process_verification_status(data):
    def map_sub_grade(value):
        mapping = {
            'Verified': 0,
            'Source Verified': 1,
            'Not Verified': 2,
        }
        return mapping.get(value, 3)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('verification_status', map_sub_grade_udf(col('verification_status')))

    one_hot_encoder = OneHotEncoder(inputCols=['verification_status'], outputCols=['verification_status_encoded'], dropLast=False)

    df_encoded = one_hot_encoder.fit(df_encoded).transform(df_encoded)

    df_encoded = df_encoded.drop('verification_status')

    return df_encoded

In [None]:
def process_loan_status(data):
    def map_sub_grade(value):
        mapping = {
            'Fully Paid': 0,
            'Charged Off': 1,
            'Current': 2,
            'In Grace Period': 3,
            'Late (16-30 days)': 4,
            'Late (31-120 days)': 5,
        }
        return mapping.get(value, 6)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('loan_status', map_sub_grade_udf(col('loan_status')))

    one_hot_encoder = OneHotEncoder(inputCols=['loan_status'], outputCols=['loan_status_encoded'])
    df_encoded = one_hot_encoder.fit(df_encoded).transform(df_encoded)
    df_encoded = df_encoded.drop('loan_status')
    return df_encoded

In [None]:
def process_purpose(data):
    def map_sub_grade(value):
        mapping = {
            'wedding': 0,
            'educational': 1,
            'other': 2,
            'small_business': 3,
            'debt_consolidation': 4,
            'credit_card': 5,
            'moving': 6,
            'vacation': 7,
            'renewable_energy': 8,
            'house': 9,
            'car': 10,
            'major_purchase': 11,
            'medical': 12,
            'home_improvement': 13
        }
        return mapping.get(value, 14)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('purpose', map_sub_grade_udf(col('purpose')))

    one_hot_encoder = OneHotEncoder(inputCols=['purpose'], outputCols=['purpose_encoded'], dropLast=False)
    df_encoded = one_hot_encoder.fit(df_encoded).transform(df_encoded)
    df_encoded = df_encoded.drop('purpose')
    return df_encoded

In [None]:
def process_addr_state(data):
    def map_sub_grade(value):
        mapping = {
        'AK': 0,
        'AL': 1,
        'AR': 2,
        'AZ': 3,
        'CA': 4,
        'CO': 5,
        'CT': 6,
        'DE': 7,
        'FL': 8,
        'GA': 9,
        'HI': 10,
        'IA': 11,
        'ID': 12,
        'IL': 13,
        'IN': 14,
        'KS': 15,
        'KY': 16,
        'LA': 17,
        'MA': 18,
        'MD': 19,
        'ME': 20,
        'MI': 21,
        'MN': 22,
        'MO': 23,
        'MS': 24,
        'MT': 25,
        'NC': 26,
        'ND': 27,
        'NE': 28,
        'NH': 29,
        'NJ': 30,
        'NM': 31,
        'NV': 32,
        'NY': 33,
        'OH': 34,
        'OK': 35,
        'OR': 36,
        'PA': 37,
        'RI': 38,
        'SC': 39,
        'SD': 40,
        'TN': 41,
        'TX': 42,
        'UT': 43,
        'VA': 44,
        'VT': 45,
        'WA': 46,
        'WI': 47,
        'WV': 48,
        'WY': 49}
        return mapping.get(value, 50)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('addr_state', map_sub_grade_udf(col('addr_state')))

    return df_encoded

In [None]:
def process_initial_list_status(data):
    def map_sub_grade(value):
        mapping = {
            'f': 0,
            'w': 1,
        }
        return mapping.get(value, 2)

    map_sub_grade_udf = udf(map_sub_grade, IntegerType())

    df_encoded = data.withColumn('initial_list_status', map_sub_grade_udf(col('initial_list_status')))

    one_hot_encoder = OneHotEncoder(inputCols=['initial_list_status'], outputCols=['initial_list_status_encoded'], dropLast=False)
    df_encoded = one_hot_encoder.fit(df_encoded).transform(df_encoded)
    df_encoded = df_encoded.drop('initial_list_status')
    return df_encoded

In [None]:
def from_time_to_numeric(data):
  lst = ['earliest_cr_line', 'issue_d', 'last_pymnt_d', 'next_pymnt_d', 'last_credit_pull_d']
  input_date_format = "MMM-yyyy"
  for i in lst:
      data = data.withColumn("date", to_date(col(i), input_date_format))

      data = data.withColumn(f"month_{i}", month(col("date")))
      data = data.withColumn(f"year_{i}", year(col("date")))
      data = data.drop('date', i)
  return data

In [None]:
def process_delinq_2yrs(data):
    def map_delinq_2yrs(delinq_2yrs):
        mapping = {
            0: 0,
            1: 1,
            2: 2,
            3: 3,
            4: 3
        }
        return mapping.get(delinq_2yrs, 4)

    map_sub_grade_udf = udf(map_delinq_2yrs, IntegerType())

    df_encoded = data.withColumn('delinq_2yrs', map_sub_grade_udf(col('delinq_2yrs')))
    return df_encoded

In [None]:
def process_acc_now_delinq(data):
    def map_acc_now_delinq(acc_now_delinq):
        mapping = {
            0: 0
        }
        return mapping.get(acc_now_delinq, 1)

    map_sub_grade_udf = udf(map_acc_now_delinq, IntegerType())

    df_encoded = data.withColumn('acc_now_delinq', map_sub_grade_udf(col('acc_now_delinq')))
    return df_encoded

In [None]:
def cast_int(data):
  for i in data.columns:
    if data.select(i).dtypes[0][1] == 'string':
        data = data.withColumn(i, col(i).cast('integer'))
  return data

In [None]:
def replace_nulls_with_medians(data):
    columnset = data.columns[:7] + data.columns[8:13] + data.columns[14:28] + data.columns[-10:]
    train_parameters = spark.read.csv('/content/drive/MyDrive/Bank Project/train_parameters', header=True, inferSchema=True)
    medians = data.columns[:7] + data.columns[8:13] + data.columns[14:28] + data.columns[-10:]
    for col_name in columnset:
        median_value = train_parameters.filter(col('column_name') == col_name).select('median').collect()[0][0]

        data = data.withColumn(col_name, when(col(col_name).isNull(), median_value).otherwise(col(col_name)))

    return data

In [None]:
def fill_missing_values(spark_df):
    with open('/content/drive/MyDrive/Bank Project/median_replacement.pkl', 'rb') as file:
        median_replacement_columns = pickle.load(file)

    with open('/content/drive/MyDrive/Bank Project/model_replacement.pkl', 'rb') as file:
        model_replacement_columns = pickle.load(file)

    with open('/content/drive/MyDrive/Bank Project/train_colls.pkl', 'rb') as file:
        train_colls = pickle.load(file)
    relevant_columns = train_colls + ['acc_now_delinq', 'delinq_2yrs', 'delinq_amnt'] + model_replacement_columns
    spark_df = spark_df.select([col for col in spark_df.columns if col in relevant_columns])

    spark_df = spark_df.cache()

    median_parameters = spark.read.csv('/content/drive/MyDrive/Bank Project/train_parameters', header=True, inferSchema=True)

    median_dict = dict(zip(median_parameters.select('column_name').rdd.flatMap(lambda x: x).collect(),
                           median_parameters.select('median').rdd.flatMap(lambda x: x).collect()))
    for column in median_replacement_columns:
        if column in median_dict:
            median_value = median_dict[column]
            spark_df = spark_df.fillna(median_value, subset=[column])

    for column in model_replacement_columns:
        model_path = f'/content/drive/MyDrive/Bank Project/{column}_dt_model'
        model = DecisionTreeRegressionModel.load(model_path)

        assembler = VectorAssembler(inputCols=train_colls, outputCol='features')
        feature_df = assembler.transform(spark_df)

        predictions = model.transform(feature_df)

        predictions_dict = dict(predictions.select(column, 'prediction').rdd.map(lambda row: (row[column], row['prediction'])).collect())

        def replace_missing_value(value):
            return predictions_dict.get(value, None)

        replace_missing_value_udf = F.udf(replace_missing_value, DoubleType())

        spark_df = spark_df.withColumn(column, F.when(F.col(column).isNull(), replace_missing_value_udf(F.col(column))).otherwise(F.col(column)))

    spark_df.unpersist()

    return spark_df

In [None]:
def scale_data(spark, data, min_max_path):
    min_max_spark_df = spark.read.csv(min_max_path, header=True, inferSchema=True)

    column_name_col = min_max_spark_df.columns[0]
    cols_ = data.columns
    new_cols = cols_[:10] + cols_[11:] + [cols_[10]]
    data = data.select(new_cols)

    for column in data.columns:
        if column == 'acc_now_delinq':
            continue
        if column == 'delinq_2yrs':
            continue
        min_row = min_max_spark_df.filter(F.col(column_name_col) == column).select('min').first()
        max_row = min_max_spark_df.filter(F.col(column_name_col) == column).select('max').first()

        if min_row and max_row:
            min_val = min_row['min']
            max_val = max_row['max']

            if min_val == max_val:
                print(f"Column {column} has min_val == max_val == {min_val}. Filling this column with zeros.")
                data = data.withColumn(column, F.lit(0))
            else:
                data = data.withColumn(column, (F.col(column) - min_val) / (max_val - min_val))
        else:
            print(f"Min or Max value for column {column} not found in the min_max_df.")

    return data

In [None]:
def round_data(data):
    for column, dtype in data.dtypes:
        if isinstance(data.schema[column].dataType, NumericType):
            data = data.withColumn(column, F.round(F.col(column), 4))
    return data

In [None]:
def handle_outliers(data):
    if data.columns[-1] == 'delinq_amnt':
        whisker_path = '/content/drive/MyDrive/Bank Project/whiskers_delinq_amnt.pkl'
    elif data.columns[-1] == 'delinq_2yrs':
        whisker_path = '/content/drive/MyDrive/Bank Project/whiskers_delinq_2yrs.pkl'
    elif data.columns[-1] == 'acc_now_delinq':
        whisker_path = '/content/drive/MyDrive/Bank Project/whiskers_acc_now_delinq.pkl'

    with open(whisker_path, 'rb') as file:
        whiskers = pickle.load(file)

    train_parameters = spark.read.csv('/content/drive/MyDrive/Bank Project/train_parameters', header=True, inferSchema=True)

    for column in data.columns:
        if isinstance(data.schema[column].dataType, VectorUDT):
            continue
        outliers = True


        outliers = False
        median_value = train_parameters.filter(col('column_name') == column).select('median').collect()[0][0]
        min_value = train_parameters.filter(col('column_name') == column).select('min').collect()[0][0]
        max_value = train_parameters.filter(col('column_name') == column).select('max').collect()[0][0]
        scaled_median = (median_value - min_value) / (max_value - min_value)

        column_index = whiskers['column'].index(column)

        lower_whisker = whiskers['lower_whisker'][column_index]
        upper_whisker = whiskers['upper_whisker'][column_index]

        data = data.withColumn(
            column,
            when(col(column).isNull(), None)
            .when(col(column) <= lower_whisker, scaled_median)
            .when(col(column) >= upper_whisker, scaled_median)
            .otherwise(col(column))
        )
    return data

In [None]:
test = drop_rows(test)
test = process_term(test)
test = process_int_rate(test)
test = process_revol_util(test)
test = process_emp_length(test)
test = process_grade(test)
test = process_emp_title(test)
test = process_home_ownership(test)

In [None]:
test = process_verification_status(test)

In [None]:
test = process_loan_status(test)
test = process_purpose(test)
test = process_addr_state(test)
test = process_initial_list_status(test)
test = from_time_to_numeric(test)
test = process_delinq_2yrs(test)
test = process_acc_now_delinq(test)
test = cast_int(test)

In [None]:
test = replace_nulls_with_medians(test)
filled_missing_values = fill_missing_values(test)
scaled_data = scale_data(filled_missing_values)
rounded_data = round_data(scaled_data)

In [None]:
main_delinq_amnt = rounded_data.select(*rounded_data.columns[:49] + rounded_data.columns[52:] + [rounded_data.columns[51]])
main_acc_now_delinq = rounded_data.select(*rounded_data.columns[:49] + rounded_data.columns[52:] + [rounded_data.columns[49]])
main_delinq_2yrs = rounded_data.select(*rounded_data.columns[:49] + rounded_data.columns[52:] + [rounded_data.columns[50]])

In [None]:
final_delinq_amnt = handle_outliers(main_delinq_amnt)
final_acc_now_delinq = handle_outliers(main_acc_now_delinq)
final_delinq_2yrs = handle_outliers(main_delinq_2yrs)

In [None]:
final_delinq_amnt.write.parquet('/content/drive/MyDrive/Bank Project/final_delinq_amnt_test', mode='overwrite')
final_acc_now_delinq.write.parquet('/content/drive/MyDrive/Bank Project/final_acc_now_delinq_test', mode='overwrite')
final_delinq_2yrs.write.parquet('/content/drive/MyDrive/Bank Project/final_delinq_2yrs_test', mode='overwrite')