In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark=SparkSession.builder\
                  .appName('samyurta')\
                   .getOrCreate()

In [9]:
df = spark.read.parquet('hdfs://localhost:9000/ingest/ml_dataset/gsa_lam_demo_int_table')

# Data used

In [10]:
df.show()

+---------+----------------+-----------+---------+-----+----------------+---------------+---------------+--------+------+---------+---------+--------------+--------------------+--------------------+-------------+----------------+-------------+--------+--------------+--------------------+-----------------+----------+----------------+---------+---------------+----------------+------+----------------+--------------------+---------+----------+---------+--------------+-------+---------+-------------------------+---------------------------+------------+--------------------+--------------------+----------+----------+------+--------------------+---------+----------+-------------+-------------+--------------+-----------------+-------------+--------------------+--------------------+------------------------+--------------------+-------------------+---------------+----------------------------+------------------+--------------------+--------------------+--------------------+--------------------+---

# EDA

## Columns

In [11]:
df.columns

['acid',
 'entity_cre_flg',
 'acct_prefix',
 'acct_num',
 'bacid',
 'foracid',
 'acct_name',
 'acct_short_name',
 'cust_id',
 'emp_id',
 'cif_id',
 'schm_code',
 'schm_desc',
 'acct_opn_date',
 'acct_cls_date',
 'clr_bal_amt',
 'acct_cls_flg',
 'tot_mod_times',
 'ledg_num',
 'un_clr_bal_amt',
 'inter_sol_access_flg',
 'purge_allowed_flg',
 'purge_text',
 'acct_mgr_user_id',
 'schm_type',
 'partitioned_flg',
 'partitioned_type',
 'sol_id',
 'del_flg',
 'sol_desc',
 'city_code',
 'state_code',
 'bank_code',
 'extn_cntr_code',
 'br_code',
 'accountid',
 'preferred_mobile_alert_no',
 'preferred_mobile_alert_type',
 'submitforkyc',
 'kyc_reviewdate',
 'kyc_date',
 'riskrating',
 'salutation',
 'gender',
 'cust_dob',
 'education',
 'occupation',
 'cust_language',
 'demographicid',
 'marital_status',
 'employment_status',
 'income_nature',
 'annual_salary_income',
 'annual_rental_income',
 'annual_stock_bond_income',
 'annual_others_income',
 'annual_total_income',
 'salotherincome2',
 'selfe

## Count Category of sol_desc

In [6]:
from pyspark.sql.functions import desc, asc

data=df.groupBy('sol_desc').count()
data1=data.orderBy(desc('count'))
data1.show()

[Stage 4:>                                                        (0 + 40) / 40]

+--------------------+-----+
|            sol_desc|count|
+--------------------+-----+
|HEAD OFF, BABARMAHAL|26272|
|MAIN BRN, BABARMAHAL|13616|
|     KAMALADI BRANCH|11344|
|  PUTALISADAK BRANCH| 5544|
|        TANDI BRANCH| 3920|
|      HETAUDA BRANCH| 3320|
|      ITAHARI BRANCH| 3312|
|      BIRGUNJ BRANCH| 3240|
|      GONGABU BRANCH| 3040|
|  MAHADEVBESI BRANCH| 2968|
|     DHANGADI BRANCH| 2952|
|   HATTIGAUDA BRANCH| 2952|
|    MANBHAWAN BRANCH| 2808|
|      SURKHET BRANCH| 2768|
|      POKHARA BRANCH| 2704|
|       DHARAN BRANCH| 2624|
|KHICHAPOKHARI BRANCH| 2560|
|   SHANKHAMUL BRANCH| 2328|
|   BIRATNAGAR BRANCH| 2320|
|    BIRTAMODE BRANCH| 2304|
+--------------------+-----+
only showing top 20 rows



                                                                                

## 1.Feature engineering 
### Filter the DataFrame to select only the rows where branch name is "HEAD OFF, BABARMAHAL"

In [7]:
# Filter the DataFrame to select only the rows where branch name is "HEAD OFF, BABARMAHAL"
df = df.filter(df.sol_desc == "HEAD OFF, BABARMAHAL" )

# Save the filtered DataFrame to a new CSV file
df.write.csv("head_off_babarmahal.csv", mode="overwrite")


24/12/05 13:58:20 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

## Count the category of salutation

In [7]:
data=df.groupby('salutation').count()
data.show()

+----------+-----+
|salutation|count|
+----------+-----+
|       MS.| 4160|
|      MRS.| 6472|
|       MR.|15560|
|      MAST|    8|
|      MISS|   72|
+----------+-----+



## Count the category of employement_status

In [8]:
data=df.groupby('employment_status').count()
data.show()

+-----------------+-----+
|employment_status|count|
+-----------------+-----+
|         Employed|  872|
|         Salaried|11088|
|            Other|14184|
|        Housewife|   56|
|          Retired|   24|
|       Unemployed|   40|
|             null|    8|
+-----------------+-----+



### Update gender based on salutation

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Initialize Spark session
spark = SparkSession.builder.appName("GenderUpdate").getOrCreate()


# Update gender based on salutation
df = df.withColumn(
    'gender',
    when(col('salutation') == 'MR.', 'Male')
    .when(col('salutation') == 'MRS.', 'Female')
    .when(col('salutation') == 'MS.', 'Female')
    .when(col('salutation') == 'MAST', 'Male')
    .when(col('salutation') == 'MISS', 'Female')
    .when(col('salutation') == 'M/S', 'org')
    .otherwise(col('gender'))
)

# # Show the updated DataFrame
df.show()


### Calculate kyc_flag on the basis of kyc_date,whether it is null or not.

In [10]:
df = df.withColumn("kyc_flag", when(col("kyc_date").isNotNull(), 1).otherwise(0))
df.select("kyc_flag").show()

+--------+
|kyc_flag|
+--------+
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
|       1|
+--------+
only showing top 20 rows



### Calculated age of customer on the basis of column cust_dob

In [11]:
from pyspark.sql.functions import col, current_date, datediff, floor

# Calculate age
df = df.withColumn('cust_age', floor(datediff(current_date(), col('cust_dob')) / 365.25))

# Show the updated DataFrame
df.select('cust_dob','cust_age').show()


+--------------------+--------+
|            cust_dob|cust_age|
+--------------------+--------+
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1987-03-23 00:00:...|      37|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1990-12-13 00:00:...|      33|
|1967-01-10 00:00:...|      57|
|1967-01-10 00:00:...|      57|
|1967-01-10 00:00:...|      57|
|1967-01-10 00:00:...|      57|
+--------------------+--------+
only showing top 20 rows



### Categorized the (occupation,schm_desc)column and keep top 10 priorities ,put left category into others.

In [12]:
from pyspark.sql import functions as F

# Group by 'occupation' and count occurrences
data = df.groupBy('occupation').count()

# Order by count in descending order
data = data.orderBy(F.desc('count'))

# Get the top 10 categories
top_10_categories = [row['occupation'] for row in data.limit(10).collect()]

# Create a new column 'category_occ' with 'occupation' if it's in the top 10, otherwise mark as 'Other'
data = data.withColumn('category_occ', F.when(F.col('occupation').isin(top_10_categories), F.col('occupation')).otherwise('OTHER'))

# Group by 'category_occ' and sum counts
grouped_data = data.groupBy('category_occ').agg(F.sum('count').alias('total_count'))

# Combine null and "Other" categories
combined_count = grouped_data.filter((F.col('category_occ').isNull()) | (F.col('category_occ') == 'OTHER')) \
                             .agg(F.sum('total_count').alias('total_count')).collect()[0]['total_count']

# Filter out null and "Other" rows, then add a new row for the combined "Other" category
grouped_data = grouped_data.filter(~((F.col('category_occ').isNull()) | (F.col('category_occ') == 'OTHER'))) \
                           .union(df.sql_ctx.createDataFrame([('OTHER', combined_count)], ['category_occ', 'total_count']))

# Show the result
grouped_data.orderBy(F.desc('total_count')).show(10)

# Join the grouped_data with df, renaming the column to avoid ambiguity
df = df.join(data.select('occupation', 'category_occ'), on='occupation', how='left') \
       .withColumnRenamed('category_occ', 'category_occ_type')

# Replace null values with 'Other'
df = df.withColumn('category_occ', F.when(F.col('category_occ_type').isNull(), 'OTHER').otherwise(F.col('category_occ_type')))

# Drop the temporary 'category_occ_type' column if not needed
df = df.drop('category_occ_type')

# Show the modified df DataFrame
#df.show()


                                                                                

+------------+-----------+
|category_occ|total_count|
+------------+-----------+
|       OTHER|      17816|
|       SALPN|       7968|
|       FININ|        112|
|       BUSIN|         72|
|        CMIG|         72|
|        AGRI|         72|
|       FOREM|         56|
|        HMMK|         48|
|       SELFE|         32|
|       RETIR|         24|
+------------+-----------+



In [13]:
from pyspark.sql import functions as F

# Group by 'schm_desc' and count occurrences
data = df.groupBy('schm_desc').count()

# Order by count in descending order
data = data.orderBy(F.desc('count'))

# Get the top 10 categories
top_10_categories = [row['schm_desc'] for row in data.limit(10).collect()]

# Create a new column 'category_schm_desc' with 'schm_desc' if it's in the top 10, otherwise mark as 'Other'
data = data.withColumn('category_schm', F.when(F.col('schm_desc').isin(top_10_categories), F.col('schm_desc')).otherwise('Other'))

# Group by 'category_schm_desc' and sum counts
grouped_data = data.groupBy('category_schm').agg(F.sum('count').alias('total_count'))

# Show the result
grouped_data.orderBy(F.desc('total_count')).show(10)

# Join the grouped_data with df
df = df.join(data.select('schm_desc', 'category_schm'), on='schm_desc', how='left')

# Show the modified df DataFrame
# df.show()




+--------------------+-----------+
|       category_schm|total_count|
+--------------------+-----------+
|STAFF HOUSING LOA...|      11176|
|  STAFF VEHICLE LOAN|       9184|
|     CCBL STAFF LOAN|       3216|
|          STAFF LOAN|       1168|
|STAFF HOUSING LOA...|        992|
|STAFF EARTHQUAKE ...|        528|
|    STAFF OTHER LOAN|          8|
+--------------------+-----------+



In [14]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SelectAndSaveToCSV") \
    .getOrCreate()

# Select specific columns
selected_columns_df = df.select(
    'annual_others_income',
    'totalhouseholdincm',
    'cust_age',
    'category_occ',
    'category_schm',
    'annual_salary_income',
    'employment_status',
    'employersname',
    'income_nature',
    'riskrating',
    'marital_status',
    'annual_total_income',
    'dis_amt',
    'kyc_flag'
)

selected_columns_df.show()

selected_columns_df.write.csv('selected_data.csv', header=True, mode='overwrite')




+--------------------+------------------+--------+------------+--------------------+--------------------+-----------------+-------------+-------------+----------+--------------+-------------------+------------+--------+
|annual_others_income|totalhouseholdincm|cust_age|category_occ|       category_schm|annual_salary_income|employment_status|employersname|income_nature|riskrating|marital_status|annual_total_income|     dis_amt|kyc_flag|
+--------------------+------------------+--------+------------+--------------------+--------------------+-----------------+-------------+-------------+----------+--------------+-------------------+------------+--------+
|                null|              null|      37|       OTHER|STAFF HOUSING LOA...|            500000.0|         Salaried|  PRABHU BANK|          001|    MEDIUM|             M|            50000.0|2005000.0000|       1|
|                null|              null|      37|       OTHER|STAFF HOUSING LOA...|            500000.0|         Salari

In [15]:
selected_columns_df.columns

['annual_others_income',
 'totalhouseholdincm',
 'cust_age',
 'category_occ',
 'category_schm',
 'annual_salary_income',
 'employment_status',
 'employersname',
 'income_nature',
 'riskrating',
 'marital_status',
 'annual_total_income',
 'dis_amt',
 'kyc_flag']

In [16]:
selected_columns_df.count()

26272

### Drop Duplicate Data

In [17]:
from pyspark.sql import SparkSession
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DropDuplicates") \
    .getOrCreate()


df = spark.read.csv('selected_data.csv', header=True, inferSchema=True)

# Drop duplicate rows based on all columns
df_no_duplicates = df.dropDuplicates()








In [18]:
df_no_duplicates.count()

3236

In [19]:
selected_df=df_no_duplicates.fillna('Other')


3236

In [20]:
data=selected_df.groupby('employment_status').count()


+-----------------+-----+
|employment_status|count|
+-----------------+-----+
|         Employed|  108|
|         Salaried| 1379|
|            Other| 1734|
|        Housewife|    7|
|          Retired|    3|
|       Unemployed|    5|
+-----------------+-----+



###  Remove '.' and '-' from employersname

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Clean Data") \
    .getOrCreate()


# Remove rows with None in specific columns
cleaned_df = df.filter(
    col("totalhouseholdincm").isNotNull() & 
    col("annual_salary_income").isNotNull()
)

# Remove '.' and '-' from employersname
cleaned_df = selected_df.withColumn(
    "employersname",
    regexp_replace(col("employersname"), "[.-]", "")
)

cleaned_df.show()


+--------------------+------------------+--------+------------+--------------------+--------------------+-----------------+--------------------+-------------+----------+--------------+-------------------+---------+--------+
|annual_others_income|totalhouseholdincm|cust_age|category_occ|       category_schm|annual_salary_income|employment_status|       employersname|income_nature|riskrating|marital_status|annual_total_income|  dis_amt|kyc_flag|
+--------------------+------------------+--------+------------+--------------------+--------------------+-----------------+--------------------+-------------+----------+--------------+-------------------+---------+--------+
|                null|              null|      34|       OTHER|STAFF HOUSING LOA...|                 1.0|            Other|               Other|          001|       LOW|             S|                1.0|1500000.0|       1|
|                null|              null|      34|       OTHER|  STAFF VEHICLE LOAN|                 1.0

### Change the datatypes in required format

In [22]:
cleaned_df = selected_df.withColumn("dis_amt", col("dis_amt").cast("double"))

In [24]:
df = cleaned_df.na.drop(subset=[ 'totalhouseholdincm', 'cust_age', 'category_occ', 'category_schm',
        'annual_salary_income', 'employment_status', 'employersname',
        'income_nature', 'riskrating', 'marital_status', 'annual_total_income',
        'kyc_flag', 'dis_amt'])

### Removes rows where annual_salary_income, annual_total_income, or totalhouseholdincm contain values in scientific notation (e.g., 1.23E+05)

In [26]:
df = df.filter(~df.annual_salary_income.rlike(r'[0-9\.]+E[+-]?[0-9]+'))
df = df.filter(~when(df.annual_total_income == 1, True).otherwise(False))
df = df.filter(~when(df.totalhouseholdincm == 1, True).otherwise(False))
df = df.filter(~df.annual_total_income.rlike(r'[0-9\.]+E[+-]?[0-9]+'))
df = df.filter(~df.totalhouseholdincm.rlike(r'[0-9\.]+E[+-]?[0-9]+'))

### Convert categorical data into categorical values using stringindexer('cust_age', 'category_occ', 'category_schm','employment_status','income_nature', 'riskrating', 'marital_status', 'kyc_flag')

In [27]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("StringIndexer Example") \
    .getOrCreate()


# Apply StringIndexer
indexer = StringIndexer(
    inputCols=[
         'cust_age', 'category_occ', 'category_schm',
        'employment_status', 
        'income_nature', 'riskrating', 'marital_status', 
        'kyc_flag'
    ],
    outputCols=[
        'cust_age_index', 'category_occ_index', 'category_schm_index',
 'employment_status_index', 
        'income_nature_index', 'riskrating_index', 'marital_status_index', 
        'kyc_flag_index'
    ]
).fit(cleaned_df)

# Transform the DataFrame
indexed_df = indexer.transform(df)

# Rearrange columns to place indexed columns next to their original columns
final_columns = []
for col_name in df.columns:
    final_columns.append(col_name)
    if col_name in indexer.getInputCols():
        final_columns.append(col_name + "_index")

# Select the columns in the desired order
indexed_df = indexed_df.select(final_columns)
indexed_df.show(5)


+--------------------+------------------+--------+--------------+------------+------------------+--------------------+-------------------+--------------------+-----------------+-----------------------+-------------+-------------+-------------------+----------+----------------+--------------+--------------------+-------------------+---------+--------+--------------+
|annual_others_income|totalhouseholdincm|cust_age|cust_age_index|category_occ|category_occ_index|       category_schm|category_schm_index|annual_salary_income|employment_status|employment_status_index|employersname|income_nature|income_nature_index|riskrating|riskrating_index|marital_status|marital_status_index|annual_total_income|  dis_amt|kyc_flag|kyc_flag_index|
+--------------------+------------------+--------+--------------+------------+------------------+--------------------+-------------------+--------------------+-----------------+-----------------------+-------------+-------------+-------------------+----------+----

In [28]:
indexed_df.columns

['annual_others_income',
 'totalhouseholdincm',
 'cust_age',
 'cust_age_index',
 'category_occ',
 'category_occ_index',
 'category_schm',
 'category_schm_index',
 'annual_salary_income',
 'employment_status',
 'employment_status_index',
 'employersname',
 'income_nature',
 'income_nature_index',
 'riskrating',
 'riskrating_index',
 'marital_status',
 'marital_status_index',
 'annual_total_income',
 'dis_amt',
 'kyc_flag',
 'kyc_flag_index']

# Model used: Linear Regression

In [29]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# List of input column names
input_column_names = [
'annual_others_income',
 'totalhouseholdincm',
 'annual_salary_income',
 'annual_total_income',
 'kyc_flag','cust_age_index', 'category_occ_index', 'category_schm_index',
 'employment_status_index', 
        'income_nature_index', 'riskrating_index', 'marital_status_index', 
        'kyc_flag_index'
]

# Define the VectorAssembler using the input_column_names variable
featureassembler = VectorAssembler(inputCols=input_column_names, outputCol='Features')

# Transform the DataFrame
output = featureassembler.transform(indexed_df)

# Display the output DataFrame
#output.show(2)

# Select the features and the target variable
to_predict = output.select("Features", 'dis_amt')

# Display the DataFrame to be predicted
#to_predict.show(2)

# Split the data into training and testing sets
train_X, test_X = to_predict.randomSplit([0.8, 0.2])

# Define and train the Linear Regression model
reg = LinearRegression(featuresCol='Features', labelCol='dis_amt')
reg_model = reg.fit(train_X)




+--------------------+------------------+--------+--------------+------------+------------------+--------------------+-------------------+--------------------+-----------------+-----------------------+-------------+-------------+-------------------+----------+----------------+--------------+--------------------+-------------------+---------+--------+--------------+--------------------+
|annual_others_income|totalhouseholdincm|cust_age|cust_age_index|category_occ|category_occ_index|       category_schm|category_schm_index|annual_salary_income|employment_status|employment_status_index|employersname|income_nature|income_nature_index|riskrating|riskrating_index|marital_status|marital_status_index|annual_total_income|  dis_amt|kyc_flag|kyc_flag_index|            Features|
+--------------------+------------------+--------+--------------+------------+------------------+--------------------+-------------------+--------------------+-----------------+-----------------------+-------------+-------

24/06/12 06:47:44 WARN util.Instrumentation: [a8e59bb4] regParam is zero, which might cause numerical instability and overfitting.
24/06/12 06:47:44 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/12 06:47:44 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
24/06/12 06:47:44 WARN netlib.InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/06/12 06:47:44 WARN util.Instrumentation: [a8e59bb4] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


## Prediction

In [30]:
# Make predictions on the test set
pred = reg_model.transform(test_X)

# Display the predictions
pred.show(5)

+--------------------+---------+------------------+
|            Features|  dis_amt|        prediction|
+--------------------+---------+------------------+
|(13,[1,2,3,4,5],[...|2500000.0|2426439.0253924625|
|(13,[1,2,3,4,5],[...|3000000.0|2325473.1811085376|
|(13,[1,2,3,4,5,6]...|2008000.0|1904412.4608594459|
|(13,[1,2,3,4,5,6,...|3200000.0|2250975.7663144227|
|(13,[1,2,3,4,5,6,...|1050000.0| 2317568.850685691|
+--------------------+---------+------------------+
only showing top 5 rows



## Model Evaluation

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define and train the Linear Regression model
reg = LinearRegression(featuresCol='Features', labelCol='dis_amt')
reg_model = reg.fit(train_X)

# Make predictions on the test set
pred = reg_model.transform(test_X)

# Calculate R2 score
evaluator = RegressionEvaluator(labelCol="dis_amt", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(pred)

# Display the R2 score
print("R2 Score:", r2_score)


24/06/12 06:47:46 WARN util.Instrumentation: [b356eca3] regParam is zero, which might cause numerical instability and overfitting.
24/06/12 06:47:47 WARN util.Instrumentation: [b356eca3] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


R2 Score: 0.0927701089506926


In [32]:
coefficients = reg_model.coefficients

## Coefficient

In [33]:
from pyspark.ml.linalg import DenseVector
import pandas as pd



coefficients_df = pd.DataFrame(list(zip(input_column_names, coefficients)), columns=['input_column_names', 'Coefficient'])


coefficients_df['Coefficient'] = coefficients_df['Coefficient'].abs()


coefficients_df = coefficients_df.sort_values(by='Coefficient', ascending=False)

print(coefficients_df)


         input_column_names    Coefficient
11     marital_status_index  828324.278856
7       category_schm_index  656457.114589
10         riskrating_index  517783.835915
9       income_nature_index  356039.743040
6        category_occ_index  156278.403505
4                  kyc_flag   47096.001582
12           kyc_flag_index   47096.001582
8   employment_status_index   24421.652400
5            cust_age_index    4206.910178
2      annual_salary_income       0.375952
1        totalhouseholdincm       0.123746
3       annual_total_income       0.123746
0      annual_others_income       0.000000
