In [1]:
data_file_path="/home/jovyan/work/finance_complaint/finance_artifact/data_ingestion/feature_store/finance_complaint"

In [2]:
from finance_complaint.entity.spark_manager import spark_session

In [3]:
df = spark_session.read.parquet(data_file_path)

In [4]:
df.count()

2786775

In [99]:
from pyspark.sql.functions import col

In [14]:
df.select('complaint_id')

DataFrame[complaint_id: string]

## Conf

In [100]:
def update_column_attribute(df):
    for column in df.columns:
        setattr(df,column,column)

In [101]:
update_column_attribute(df)

## Printing unique values in each column

In [15]:
for column in df.columns:
    print(f"{column}:{df.select(column).distinct().count()}")

company:6464
company_public_response:12
company_response:9
complaint_id:2774363
complaint_what_happened:892308
consumer_consent_provided:6
consumer_disputed:3
date_received:3833
date_sent_to_company:3834
issue:165
product:18
state:64
sub_issue:222
sub_product:77
submitted_via:7
tags:4
timely:2
zip_code:34118


In [102]:
complaint_table="complaint"
df.createOrReplaceTempView(complaint_table)

In [23]:
sql = spark_session.sql

In [103]:
n_row = df.count()

In [35]:
#Target column
df.groupBy(df.consumer_disputed).count().collect()

+-----------------+-------+
|consumer_disputed|  count|
+-----------------+-------+
|              N/A|2017177|
|               No| 621087|
|              Yes| 148511|
+-----------------+-------+



In [36]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- company_public_response: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sub_issue: string (nullable = true)
 |-- sub_product: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [104]:
missing_target_df = sql(f"select * from {complaint_table} where {df.consumer_disputed} ='N/A' ")

In [105]:
df = sql(f"select * from {complaint_table} where {df.consumer_disputed} <>'N/A' ")

In [106]:
complaint_table="complaint"
df.createOrReplaceTempView(complaint_table)

In [107]:
update_column_attribute(df)

In [110]:

def perform_null_analysis(df,table_name):
    null_value_analysis=[]
    n_row=df.count()
    for column in df.columns:
        print(column)
        response = sql(f"select {n_row} as  total_row,count(*) as null_row_{column},(count(*)*100)/{n_row} as missing_percentage,'{column}' as  column_name from {table_name} where {column} is null").collect()
        null_value_analysis.append(response)
    return null_value_analysis

In [111]:
null_report = perform_null_analysis(df,complaint_table)

company
company_public_response
company_response
complaint_id
complaint_what_happened
consumer_consent_provided
consumer_disputed
date_received
date_sent_to_company
issue
product
state
sub_issue
sub_product
submitted_via
tags
timely
zip_code


In [112]:
def unwanted_column_by_missing_percentage(null_value_analysis,per_thres=20):
    columns= []
    for row in null_value_analysis:
        row_info=row[0]
        if row_info.missing_percentage>per_thres:
            print(row_info)
            columns.append(row_info.column_name)

    return columns


In [113]:
columns = unwanted_column_by_missing_percentage(null_value_analysis=null_report)

Row(total_row=769598, null_row_company_public_response=572886, missing_percentage=74.4396425146635, column_name='company_public_response')
Row(total_row=769598, null_row_sub_issue=454896, missing_percentage=59.10826171585685, column_name='sub_issue')
Row(total_row=769598, null_row_sub_product=235106, missing_percentage=30.549195813918434, column_name='sub_product')
Row(total_row=769598, null_row_tags=660916, missing_percentage=85.8780818037469, column_name='tags')


In [132]:
def drop_column(df,columns):
    selected_column = list(filter(lambda x:x not in columns,df.columns))
    selected_column = ",".join(selected_column)
    df= sql(f"select {selected_column}  from {complaint_table} ")
    return df

In [133]:
df=drop_column(df,columns)

In [139]:
columns = perform_null_analysis(df,complaint_table)

company
company_response
complaint_id
complaint_what_happened
consumer_consent_provided
consumer_disputed
date_received
date_sent_to_company
issue
product
state
submitted_via
timely
zip_code


In [141]:
unwanted_column_by_missing_percentage(columns)

[]

In [115]:
#dropping feature as we have found more than 20% of null value in above columns

In [143]:
df.collect()

+--------------------+--------------------+------------+-----------------------+-------------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+------+--------+
|             company|    company_response|complaint_id|complaint_what_happened|consumer_consent_provided|consumer_disputed|       date_received|date_sent_to_company|               issue|             product|state|submitted_via|timely|zip_code|
+--------------------+--------------------+------------+-----------------------+-------------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----+-------------+------+--------+
|       EQUIFAX, INC.|Closed with expla...|     2390217|                       |     Consent not provided|               No|2017-03-16T12:00:...|2017-03-17T12:00:...|Credit monitoring...|    Credit reporting|   CT|          Web|   Yes|   06226|
|TRANSUNION INTERM..

In [144]:
## Unique values in each columns



Total number of row: 769598


In [145]:
print(f"Total number of row: {df.count()}")
for column in df.columns:
    print(f"{column}:{df.select(column).distinct().count()}")

company:4284
company_response:7
complaint_id:765945
complaint_what_happened:160969
consumer_consent_provided:6
consumer_disputed:2
date_received:1940
date_sent_to_company:2024
issue:99
product:13
state:63
submitted_via:6
timely:2
zip_code:28717


In [146]:
update_column_attribute(df)

In [147]:
df=drop_column(df,columns=[df.complaint_id])

In [148]:
update_column_attribute(df)

In [149]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [150]:
print(f"Total number of row: {df.count()}")
for column in df.columns:
    print(f"{column}:{df.select(column).distinct().count()}")

Total number of row: 769598
company:4284
company_response:7
complaint_what_happened:160969
consumer_consent_provided:6
consumer_disputed:2
date_received:1940
date_sent_to_company:2024
issue:99
product:13
state:63
submitted_via:6
timely:2
zip_code:28717


In [154]:
df.groupBy(df.company_response).count().show()
df.groupBy(df.consumer_consent_provided).count().show()
df.groupBy(df.consumer_disputed).count().show()
df.groupBy(df.product).count().show()
df.groupBy(df.submitted_via).count().show()
df.groupBy(df.timely).count().show()


+--------------------+------+
|    company_response| count|
+--------------------+------+
|   Untimely response|  2901|
|Closed with non-m...| 95968|
|Closed with monet...| 51597|
|Closed with expla...|580728|
|              Closed| 17691|
|Closed without re...| 16061|
|  Closed with relief|  4652|
+--------------------+------+

+-------------------------+------+
|consumer_consent_provided| count|
+-------------------------+------+
|                     null|    38|
|        Consent withdrawn|     8|
|                    Other|  8492|
|         Consent provided|164887|
|     Consent not provided|125995|
|                      N/A|470178|
+-------------------------+------+

+-----------------+------+
|consumer_disputed| count|
+-----------------+------+
|               No|621087|
|              Yes|148511|
+-----------------+------+

+--------------------+------+
|             product| count|
+--------------------+------+
|     Debt collection|146584|
|    Virtual currency|    18|
|    

In [156]:

df.groupBy(df.company_response).count().collect()
df.groupBy(df.consumer_consent_provided).count().collect()
df.groupBy(df.consumer_disputed).count().collect()
df.groupBy(df.product).count().collect()
df.groupBy(df.submitted_via).count().collect()
df.groupBy(df.timely).count().collect()


[Row(timely='No', count=21331), Row(timely='Yes', count=748267)]

In [158]:
df.groupBy(df.product).count().collect()

[Row(product='Debt collection', count=146584),
 Row(product='Virtual currency', count=18),
 Row(product='Payday loan', count=5580),
 Row(product='Money transfers', count=5387),
 Row(product='Checking or savings account', count=3),
 Row(product='Mortgage', count=226616),
 Row(product='Prepaid card', count=3833),
 Row(product='Credit reporting', count=141194),
 Row(product='Consumer Loan', count=31756),
 Row(product='Credit card', count=88332),
 Row(product='Bank account or service', count=86578),
 Row(product='Other financial service', count=1062),
 Row(product='Student loan', count=32655)]

In [159]:
ONE_HOT_FEATURE = [df.company_response,df.consumer_consent_provided,df.submitted_via,df.timely]
BINARY_ENCODING = [df.product,]
TARGET_FEATURE = [df.consumer_disputed]

#df.company_response No null value
#df.consumer_consent_provided  replace null with top category 
#df.consumer_disputed target feature label encoding
#df.product one hot encoding 

#df.product no null value


In [162]:
remaining_column = list(filter(lambda x: x not in ONE_HOT_FEATURE+BINARY_ENCODING+TARGET_FEATURE,df.columns))

In [163]:
remaining_column

['company',
 'complaint_what_happened',
 'date_received',
 'date_sent_to_company',
 'issue',
 'state',
 'zip_code']

In [169]:
df.groupBy(df.company).count().count()

4284

In [None]:
FREQUENCY_ENCODING = [df.company]

In [180]:
df.select(df.company).count()-df.select(df.company).dropna().count()


0

In [186]:
REPLACE_NULL_WITH_TOP_VALUE = [df.zip_code,df.state,df.consumer_consent_provided]

In [188]:
REPLACE_NULL_WITH_TOP_VALUE

['zip_code', 'state', 'consumer_consent_provided']

In [189]:
print(f"Total number of row: {df.count()}")
for column in remaining_column:
    print(f"{column}:  {df.select(column).distinct().count()}")

Total number of row: 769598
company:  4284
complaint_what_happened:  160969
date_received:  1940
date_sent_to_company:  2024
issue:  99
state:  63
zip_code:  28717


In [190]:
df.groupBy(df.issue).count().show()

+--------------------+------+
|               issue| count|
+--------------------+------+
|Communication tac...| 23965|
|Application proce...|   536|
|Advertising and m...|  2948|
|Balance transfer fee|   216|
|Customer service/...|   284|
|        Adding money|   202|
|Closing/Cancellin...|  6345|
|Credit card prote...|  2701|
|Received a loan I...|   619|
|Can't stop charge...|   515|
|          Bankruptcy|   447|
|                Fees|   234|
|Forbearance / Wor...|   550|
|Credit determination|  3016|
|Loan modification...|112115|
|    Cash advance fee|   193|
|Other transaction...|  1501|
|Customer service ...|  3491|
|      Getting a loan|   666|
|  Delinquent account|  3224|
+--------------------+------+
only showing top 20 rows



In [192]:
remaining_column

['company',
 'complaint_what_happened',
 'date_received',
 'date_sent_to_company',
 'issue',
 'state',
 'zip_code']

In [195]:
df.select(df.complaint_what_happened)[3].collect()

IndexError: list index out of range

In [197]:
sql(f"select {df.complaint_what_happened} from {complaint_table} limit 5").collect()

[Row(complaint_what_happened=''),
 Row(complaint_what_happened=''),
 Row(complaint_what_happened=''),
 Row(complaint_what_happened='around XX/XX/2016 i got a check from tenant. I am a XXXX and I accept a check from him after that i found bedbugs that he brought from work. \nI have to give him notice for moving because my apartment had a carpet floor. \nMy roommate got a problem and moving out. and he also gave me a checked and cash in the same time. I after that time i got that the checked was report for lost. \nand Fraud also. Bank of America close my account and report me I am a thief and fraud with check. I have a paper that give him to sign and let him know about the payment after he left for bedbugs treatment. I need bank of America resolve this problem that i am a XXXX that he used to lived before he changed the address. i can proof with letter that he report that was lost. but its not he was lie to bank. Cheating on my with the checked.'),
 Row(complaint_what_happened='')]

In [198]:
TOKENIZER_FEATURE = [df.complaint_what_happened]

In [199]:
ONE_HOT_FEATURE

['company_response', 'consumer_consent_provided', 'submitted_via', 'timely']

In [204]:
FREQUENCY_ENCODING = [df.company,df.issue,df.state,df.zip_code]

In [201]:
TARGET_FEATURE

['consumer_disputed']

In [202]:
TOKENIZER_FEATURE

['complaint_what_happened']

In [203]:
remaining_column

['company',
 'complaint_what_happened',
 'date_received',
 'date_sent_to_company',
 'issue',
 'state',
 'zip_code']

In [213]:
from pyspark.sql.types import TimestampType

In [216]:
df=df.withColumn(df.date_received,col(df.date_received).cast(TimestampType()))

In [222]:
update_column_attribute(df)

In [219]:
df=df.withColumn(df.date_sent_to_company,col(df.date_sent_to_company).cast(TimestampType()))
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: timestamp (nullable = true)
 |-- date_sent_to_company: timestamp (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [225]:
df.select([df.date_received,df.date_sent_to_company]).show()

+-------------------+--------------------+
|      date_received|date_sent_to_company|
+-------------------+--------------------+
|2017-03-16 17:00:00| 2017-03-17 17:00:00|
|2017-02-08 17:00:00| 2017-02-08 17:00:00|
|2017-01-25 17:00:00| 2017-01-25 17:00:00|
|2017-01-13 17:00:00| 2017-01-13 17:00:00|
|2017-01-31 17:00:00| 2017-02-07 17:00:00|
|2017-01-06 17:00:00| 2017-01-06 17:00:00|
|2017-03-01 17:00:00| 2017-03-01 17:00:00|
|2017-02-19 17:00:00| 2017-02-19 17:00:00|
|2017-02-15 17:00:00| 2017-02-15 17:00:00|
|2017-03-20 17:00:00| 2017-03-21 17:00:00|
|2017-03-08 17:00:00| 2017-03-10 17:00:00|
|2017-02-14 17:00:00| 2017-02-17 17:00:00|
|2017-02-17 17:00:00| 2017-02-23 17:00:00|
|2017-02-23 17:00:00| 2017-02-28 17:00:00|
|2017-02-03 17:00:00| 2017-02-07 17:00:00|
|2017-01-27 17:00:00| 2017-01-27 17:00:00|
|2017-02-03 17:00:00| 2017-02-03 17:00:00|
|2017-01-11 17:00:00| 2017-01-12 17:00:00|
|2017-02-14 17:00:00| 2017-02-14 17:00:00|
|2017-01-21 17:00:00| 2017-01-21 17:00:00|
+----------

In [226]:
from pyspark.sql.types import  LongType

In [233]:
df = df.withColumn("diff_in_days",(col(df.date_sent_to_company).cast(LongType())-col(df.date_received).cast(LongType()))/(60*60*24))

In [234]:
update_column_attribute(df)

In [235]:
remove_column = [df.date_received,df.date_sent_to_company]

In [243]:
df=df.drop(col(df.date_received)).drop(col(df.date_sent_to_company))

+--------------------+--------------------+-----------------------+-------------------------+-----------------+--------------------+--------------------+-----+-------------+------+--------+------------+
|             company|    company_response|complaint_what_happened|consumer_consent_provided|consumer_disputed|               issue|             product|state|submitted_via|timely|zip_code|diff_in_days|
+--------------------+--------------------+-----------------------+-------------------------+-----------------+--------------------+--------------------+-----+-------------+------+--------+------------+
|       EQUIFAX, INC.|Closed with expla...|                       |     Consent not provided|               No|Credit monitoring...|    Credit reporting|   CT|          Web|   Yes|   06226|         1.0|
|TRANSUNION INTERM...|Closed with expla...|                       |     Consent not provided|               No|Incorrect informa...|    Credit reporting|   MO|          Web|   Yes|   63134

In [249]:
NUMERICAL_FEATURE = [df.diff_in_days,]
ONE_HOT_FEATURE+\
FREQUENCY_ENCODING+\
BINARY_ENCODING+\
TARGET_FEATURE

['company_response',
 'consumer_consent_provided',
 'submitted_via',
 'timely',
 'company',
 'issue',
 'state',
 'zip_code',
 'issue',
 'consumer_disputed']

In [250]:
BINARY_ENCODING

['issue']

In [251]:
FREQUENCY_ENCODING=FREQUENCY_ENCODING+BINARY_ENCODING

In [254]:
FREQUENCY_ENCODING.remove('issue')

In [255]:
FREQUENCY_ENCODING

['company', 'state', 'zip_code', 'issue']

In [256]:
ONE_HOT_FEATURE

['company_response', 'consumer_consent_provided', 'submitted_via', 'timely']

In [None]:

data_file_path="/home/jovyan/work/finance_complaint/finance_artifact/data_preprocessing/20220907_063829/complaint_data"

In [3]:
from finance_complaint.entity.spark_manager import spark_session
df = spark_session.read.parquet(data_file_path)

RuntimeError: Java gateway process exited before sending its port number