In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

## Importing useful libraries

In [2]:
import warnings
warnings.filterwarnings('ignore')
import sys
import os
import pandas as pd
import numpy as np
import joblib
import multiprocessing as mp
import json

### Adding some paths to use from which we would be fetching useful modules like '/home/shared/utils' has db_utils module which is used to connect to the server without showing the credentials!!

In [3]:
sys.path.insert(0, os.getcwd())
sys.path.insert(0, '/home/shared/utils')
sys.path.insert(0, '/home/vishal/refactoring_pipeline')

### Helper function has fetch_checking_acct_txns which is used to check which account from all the available accounts of candidate is checking account. Similarly, EDA is used to get plots which are useful to analyse

### Query function is now available where we dont even have to add credentials in the python dile to establish connection It uses a yaml config file to establish connection.

In [None]:
from db_utils import *
from helper import fetch_checking_acct_txns
import EDA as eda
import query as q

### NCPU is for multiprocessing. The function below takes all the CPU cores available in the system except 2 and if the system only has 2 cores, it uses only one of the cores to perform operations!!(FUn fact:The server which we are working on has 16 spu cores!!)

In [None]:
NCPU = mp.cpu_count() - 2 if mp.cpu_count() > 2 else 1

### Query to fetch required data from iloans!!

In [None]:
def fetch_required_bank_reports(start, end):
    query = f'''
                SELECT
                    LN.LoanId,
                    GCD.TimeAdded,
                    LN.OriginationDate,
                    LN.FirstName,
                    LN.LastName,
                    LN.Campaign,
                    LN.OriginalPrincipal,
                    LN.ReUppedPrincipal,
                    LN.DateOfBirth,
                    LN.BankName,
                    LN.MonthlyGrossIncome,
                    LN.EmployerName,
                    LN.IsFirstDefault,
                    GCD.BankTransactionId,
                    GCD.BankReportData
                FROM view_FCL_Loan LN
                LEFT JOIN view_FCL_GetCreditDataLoan GCDL on LN.LoanId = GCDL.LoanId
                LEFT JOIN view_FCL_GetCreditData GCD on GCD.BankTransactionId = GCDL.BankTransactionId
                WHERE LN.OriginationDate >= {start}
                AND LN.OriginationDate <= {end}
                AND LN.IsFirstDefault IS NOT NULL
                AND LN.MerchantId IN (15, 18)
                AND GCD.ReportStatus  = 'COMPLETE' 
            '''
    df = q.iloans(query)
    return df

### Query to fetch the json data which has all the income related information from bankapp

In [None]:
def fetch_required_bank_app(start, end):
    query = f'''
                SELECT
                    loan_id as LoanId,
                    json
                FROM loan
                WHERE campaign like '%Production%'
                AND STR_TO_DATE(entered_date, '%m/%d/%Y') >= STR_TO_DATE({start}, '%Y-%m-%d')
                AND STR_TO_DATE(entered_date, '%m/%d/%Y') <= STR_TO_DATE({end}, '%Y-%m-%d')
             '''
    df = q.bankapp(query)
    return df

### Fetching the primary checking account coz a candidate may have multiple checking accounts

In [None]:
def get_primary_account(bankreport):
    """
    Flag primary checking account (account having max transaction count)
    
    Args:
    bankreport (json)
    loanid (str)
    
    Returns:
    account number (str) : account number of primary account
    """
    df_txn = fetch_checking_acct_txns(bankreport)
    if df_txn.empty is False:
        df_txns_count = df_txn['account_number'].value_counts()
        return df_txns_count.idxmax()

### We would be working on only those applicants who have atleast 60 days of transactions, from the applied date

In [None]:
def get_transaction_days_count(primary_account, bank_report):
    """Checks if number of transaction days >=60 given an account
    
    Args:
    primary_account (str): Account number of primary account
    bank_report (str): bank report string

    Returns:
    True or False (bool)
    """ 
    df_checking_txns = fetch_checking_acct_txns(bank_report)
    if df_checking_txns.empty is False:
        df_primary_account_txns = df_checking_txns[df_checking_txns['account_number'] == primary_account]
        df_primary_account_txns= df_primary_account_txns.sort_values(by = 'posted_date')
        first_txn_date = df_primary_account_txns['posted_date'].iloc[0]
        last_txn_date = df_primary_account_txns['posted_date'].iloc[-1]
        txn_days_count = (last_txn_date - first_txn_date).days
        return txn_days_count >= 60

### The function below returns all the transactions for a candidate

In [None]:
def get_transaction_time_series(primary_account, bank_report, loan_id):
    """Compute transactions of each day with the dates in timeseries format.

    Args:
    loan_id (float)
    bank_report (str)
    primary_acct (str)

    Returns:
    df_txns(pandas dataframe):
    """
    df_checking_txns = fetch_checking_acct_txns(bank_report)
    if df_checking_txns.empty is False:
        df_txns = df_checking_txns.loc[df_checking_txns['account_number'] == primary_account, :]
        df_txns['posted_date'] = pd.to_datetime(df_txns['posted_date'])
        df_txns['LoanId'] = loan_id
        return df_txns

### This function returns the number of income sources and the income cycle of each income source separated by +. For ex, if income sources are 2, the income cycles would be like in_cycle1+in_cycle2, where in_cycle1 is the income cycle of the first income source and similarly the second part for the second income respectively

In [None]:
def get_income_sources_and_cycle(json_string, loan_id):
    try:
        no_sources = json.loads(json_string)['incomeReview']['data']['incomeSources']
        cycles = []
        for income in range(int(no_sources)):
            cycles.append(json.loads(json_string)['incomeReview']['data']['sources'][income]['incomeCycle'])
        cycles = '+'.join(cycles)
        return [loan_id, no_sources, cycles]
    except:
        return [0, 0, 0]

### The below function checks that if payroll category is present in the candidates transaction, it would return all credit transactions that were not of income type. If we dont find any payroll type category, we would just check all the credit transactions

In [None]:
def get_income(primary_acct, bank_report, loanid):
    test_transact = get_transaction_time_series(primary_acct, bank_report, loanid).sort_values(by = 'posted_date').reset_index(drop = True)
    test_transact['LoanId'] = loanid
    test_inc = test_transact[(test_transact['amount'] > 0) & ((test_transact['category'] == 'Income') | (test_transact['category'] == 'Payroll') | (test_transact['category'] == 'Paycheck'))][['posted_date', 'amount', 'category', 'type', 'memo']]
    list_ = test_inc.to_dict('records')
    return list_

### The below function transforms the bank statement like the loan ids are of float type, it coverts it into str type and strips off the date part from timeadded feature

In [None]:
def modify_statement(df):
    """Modifies extracted bank statement data
    Args:
        df (pandas df): Dataframe consisting all the required columns from predicon model database
    Returns:
        pandas df: Modified dataframe
    """
    df['LoanId'] = df['LoanId'].astype(str).map(lambda x : x.split('.')[0])
    df['TimeAdded'] = pd.to_datetime(df['TimeAdded'].map(lambda x : x.date()))
    return df

### The below function converts the transaction statement into 4 buckets based on category of the transactions viz. Payroll, Paycheck, Income or Rest

In [None]:
def get_diff_inc_cat(k):
    txn_proll = txn_pcheq = txn_inc = txn_rest = pd.DataFrame()
    if 'Payroll' in pd.DataFrame(income_temp[k]).groupby('category', as_index = False)['posted_date'].count().nlargest(8, 'posted_date').category.values:
        txn_proll = pd.DataFrame(income_temp[k])[pd.DataFrame(income_temp[k])['category'] == 'Payroll']
    if 'Paycheck' in pd.DataFrame(income_temp[k]).groupby('category', as_index = False)['posted_date'].count().nlargest(8, 'posted_date').category.values:
        txn_pcheq = pd.DataFrame(income_temp[k])[pd.DataFrame(income_temp[k])['category'] == 'Paycheck']
    if 'Income' in pd.DataFrame(income_temp[k]).groupby('category', as_index = False)['posted_date'].count().nlargest(8, 'posted_date').category.values:
        txn_inc = pd.DataFrame(income_temp[k])[pd.DataFrame(income_temp[k])['category'] == 'Income']
    txn_rest = pd.DataFrame(income_temp[k])[(pd.DataFrame(income_temp[k])['category'] != 'Income') & (pd.DataFrame(income_temp[k])['category'] != 'Paycheck') & (pd.DataFrame(income_temp[k])['category'] != 'Payroll')]
    return txn_proll, txn_pcheq, txn_inc, txn_rest

### Fetching bankreports and modifying them, then fetching bankapp data

In [None]:
df_bankreports = fetch_required_bank_reports("'2020-01-07'", "'2020-02-07'")

In [None]:
df_bankapp = fetch_required_bank_app("'2020-01-07'", "'2020-02-07'")

In [None]:
df_bankreports = modify_statement(df_bankreports)

In [None]:
df_bankreports.sample(5)

In [None]:
df_bankapp.sample(5)

### Merging the bankapp data with and bankreports

In [None]:
df = pd.merge(df_bankapp, df_bankreports, on = 'LoanId', how = 'inner')

In [None]:
df.sample(5)

### Using the multiprocessing function to fetch the name of primary checking accounts

In [None]:
with mp.Pool(processes = NCPU) as pool:
        result_primary_accts = pool.map(get_primary_account, df['BankReportData'])
    
df['primary_account'] = result_primary_accts

In [None]:
df.info()

### Similarly, filtering only those candidates that have atleast 60 transaction days!!

In [None]:
with mp.Pool(processes=NCPU) as pool:
        txn_days_count = pool.starmap(get_transaction_days_count, zip(df['primary_account'], df['BankReportData']))

df['txn_days_count'] = txn_days_count

has_gt_60_days_txns = (df['txn_days_count'] == True)
df = df[has_gt_60_days_txns]

In [None]:
df.info()

In [None]:
df = df.reset_index(drop = True)

### Fetching the income sources and their cycles respectively

In [None]:
with mp.Pool(processes = NCPU) as pool:
        source_and_cycle = pool.starmap(get_income_sources_and_cycle, zip(df['json'], df['LoanId']))
        
df_source_and_cycle = pd.DataFrame(source_and_cycle, columns = ['LoanId', '#sources', 'in_cycles'])

In [None]:
df_source_and_cycle.sample(5)

### Merging the sources and cycle data we got to our previously available dataset

In [None]:
df = pd.merge(df, df_source_and_cycle, on = 'LoanId', how = 'left')

In [None]:
df.sample(5)

### Taking a look at how income sources and cycles are distributed accross the loanids

In [None]:
eda.countplot_categorical_columns(df, cols = ['#sources', 'in_cycles'], force = True)

### Fetching required data for a particular index from the dataframe to crossverify things

In [None]:
df.loc[195, ['BankTransactionId', 'primary_account', 'EmployerName', 'LoanId']]

### Checking the json file to see the income status of an applicamt. Here, 47 is the index

In [None]:
json.loads(df.loc[195, 'json'])['incomeReview']['data']

### Using the get_income function to fetch all the required income types

In [None]:
with mp.Pool(processes = NCPU) as pool:
        income_temp = pool.starmap(get_income, zip(df['primary_account'], df['BankReportData'], df['LoanId']))

### An example of how our fetched data looks like

In [None]:
income_temp[317]

### Loanids which were faulted by manual agents(I think so because they could have a different definition of how an income should behave in the bank statement. may be correct..may be not!!)

<br>81672524189</br>
<br>20669855168</br>

### The income_temp variable is a list of lists of dictionaries. Converting it to dataframe to analyse it rigorously

In [None]:
pd.DataFrame(income_temp[195]).set_index('posted_date')

### Analysing the buckets returned by get_diff_inc_cat

In [None]:
txn_proll, txn_pcheq, txn_inc, txn_rest = get_diff_inc_cat(195)

In [None]:
txn_proll

In [None]:
txn_inc

In [None]:
txn_pcheq

In [None]:
txn_rest

<br>Occurences like business services and music!! causing problems(index425)</br>
<br>Occurences where income comes from 'income' category with electronic deposits are hard to isolate(index152)</br>
<br>Paychecks are always correct</br>
<br>Mostly whenever there are payrolls, income is not but vice versa not true!!</br>

### Analysing in which categories do most of the income fall!!

In [None]:
df_1_inc = df[df['#sources'] == '1'].reset_index(drop = True)

In [None]:
def analyse_cat_from_json(json_string):
    json_data = json.loads(json_string)['incomeReview']['data']
    source_Name = json_data['sources'][0]['sourceName']
    cat = []
    try:
        for i in json_data['incomeTransactions']:
            if i['memo'] == source_Name:
                cat.append(i['categoryName'])
    finally:
        return set(cat)

In [None]:
json.loads(df_analyse.loc[200, 'json'])['incomeReview']['data']

In [None]:
with mp.Pool(processes = NCPU) as pool:
        cat_temp_1_inc = pool.map(analyse_cat_from_json, df_1_inc['json'])

In [None]:
cat_1_inc = [x[0] for x in cat if len(x) != 0]

In [None]:
cat_1