# <span style="color:#ff5f27;">👨🏻‍🏫 Lending Club Dataset</span>

## <span style="color:#ff5f27;">Feature pipeline</span>

This notebook:

* reads raw data from a csv file postfixed with a date - the notebook should be parameterized with the date of data that should be processed
* creates features for 2 feature groups - `loans` and `applicants`
* creates a Great Expectations rule to validate feature data before it is written to the feature group
* inserts the dataframes of engineered features into our 2 feature groups

## <span style="color:#ff5f27;">📝 Imports </span>


In [1]:
!pip install --quiet hopsworks 

In [2]:
import pandas as pd
import numpy as np
import os
from features import loans, applicants
import time

pd.set_option('display.float', '{:.2f}'.format)
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_rows', 50)

## <span style="color:#ff5f27;"> 💽 Loading the Data </span>


In [3]:
todays_date = "2023-05-10"

### <span style="color:#ff5f27;">⛳️ Loans Data </span>


In [4]:
loans_df = pd.read_parquet(
    "https://repo.hops.works/dev/jdowling/loans-{}.parquet".format(todays_date))
loans_df.head(3)

Unnamed: 0,id,loan_amnt,term,int_rate,installment,grade,sub_grade,issue_d,loan_status,purpose,title,address
0,0,10000.0,36 months,11.44,329.48,B,B4,Jan-2015,Fully Paid,vacation,Vacation,"0174 Michelle Gateway\r\nMendozaberg, OK 22690"
1,1,8000.0,36 months,11.99,265.68,B,B5,Jan-2015,Fully Paid,debt_consolidation,Debt consolidation,"1076 Carney Fort Apt. 347\r\nLoganmouth, SD 05113"
2,2,15600.0,36 months,10.49,506.97,B,B3,Jan-2015,Fully Paid,credit_card,Credit card refinancing,"87025 Mark Dale Apt. 269\r\nNew Sabrina, WV 05113"


In [5]:
# Iterate over unique values in the 'issue_d' column of the loans_df DataFrame
for month in loans_df.issue_d.unique():
    # Create a new DataFrame (loans_month_df) containing only rows where 'issue_d' is equal to the current month
    loans_month_df = loans_df.loc[loans_df['issue_d'] == month]

In [6]:
loans_df.describe()

Unnamed: 0,id,loan_amnt,int_rate,installment
count,396030.0,396030.0,396030.0,396030.0
mean,198014.5,14113.89,13.64,431.85
std,114324.16,8357.44,4.47,250.73
min,0.0,500.0,5.32,16.08
25%,99007.25,8000.0,10.49,250.33
50%,198014.5,12000.0,13.33,375.43
75%,297021.75,20000.0,16.49,567.3
max,396029.0,40000.0,30.99,1533.81


In [7]:
loans_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 396030 entries, 0 to 396029
Data columns (total 12 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   id           396030 non-null  int64  
 1   loan_amnt    396030 non-null  float64
 2   term         396030 non-null  object 
 3   int_rate     396030 non-null  float64
 4   installment  396030 non-null  float64
 5   grade        396030 non-null  object 
 6   sub_grade    396030 non-null  object 
 7   issue_d      396030 non-null  object 
 8   loan_status  396030 non-null  object 
 9   purpose      396030 non-null  object 
 10  title        394275 non-null  object 
 11  address      396030 non-null  object 
dtypes: float64(3), int64(1), object(8)
memory usage: 36.3+ MB


### <span style="color:#ff5f27;">⛳️ Applicants Data </span>


In [8]:
applicants_df = pd.read_parquet(
    "https://repo.hops.works/dev/jdowling/applicants-{}.parquet".format(todays_date))
applicants_df.head(3)

Unnamed: 0,emp_title,emp_length,home_ownership,annual_inc,verification_status,dti,earliest_cr_line,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,application_type,mort_acc,pub_rec_bankruptcies,id
0,Marketing,10+ years,RENT,117000.0,Not Verified,26.24,Jun-1990,16.0,0.0,36369.0,41.8,25.0,w,INDIVIDUAL,0.0,0.0,0
1,Credit analyst,4 years,MORTGAGE,65000.0,Not Verified,22.05,Jul-2004,17.0,0.0,20131.0,53.3,27.0,f,INDIVIDUAL,3.0,0.0,1
2,Statistician,< 1 year,RENT,43057.0,Source Verified,12.79,Aug-2007,13.0,0.0,11987.0,92.2,26.0,f,INDIVIDUAL,0.0,0.0,2


In [9]:
# Get unique values in the 'earliest_cr_line' column of the applicants_df DataFrame
unique_months = applicants_df.earliest_cr_line.unique()

# Iterate over unique values in the 'earliest_cr_line' column
for month in unique_months:
    # Create a new DataFrame (applicants_month_df) containing only rows where 'earliest_cr_line' is equal to the current month
    applicants_month_df = applicants_df.loc[applicants_df['earliest_cr_line'] == month]

In [10]:
applicants_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 396030 entries, 0 to 396029
Data columns (total 17 columns):
 #   Column                Non-Null Count   Dtype  
---  ------                --------------   -----  
 0   emp_title             373103 non-null  object 
 1   emp_length            377729 non-null  object 
 2   home_ownership        396030 non-null  object 
 3   annual_inc            396030 non-null  float64
 4   verification_status   396030 non-null  object 
 5   dti                   396030 non-null  float64
 6   earliest_cr_line      396030 non-null  object 
 7   open_acc              396030 non-null  float64
 8   pub_rec               396030 non-null  float64
 9   revol_bal             396030 non-null  float64
 10  revol_util            395754 non-null  float64
 11  total_acc             396030 non-null  float64
 12  initial_list_status   396030 non-null  object 
 13  application_type      396030 non-null  object 
 14  mort_acc              358235 non-null  float64
 15  

## <span style="color:#ff5f27;">🛠️ Feature Engineering</span>

In [11]:
# Apply a function to transform the 'home_ownership' column in the applicants_df DataFrame
applicants_df['home_ownership'] = applicants_df.home_ownership.apply(
    applicants.home_ownership,
)

# Display the count of unique values in the 'home_ownership' column after the transformation
home_ownership_counts = applicants_df.home_ownership.value_counts()

In [12]:
# Convert the 'earliest_cr_line' column in the applicants_df DataFrame to datetime format
applicants_df['earliest_cr_line'] = pd.to_datetime(applicants_df['earliest_cr_line'])

# Convert the 'issue_d' column in the loans_df DataFrame to datetime format
loans_df['issue_d'] = pd.to_datetime(loans_df['issue_d'])

  applicants_df['earliest_cr_line'] = pd.to_datetime(applicants_df['earliest_cr_line'])
  loans_df['issue_d'] = pd.to_datetime(loans_df['issue_d'])


## ✔️ `dti`, `open_acc`, `revol_bal`, `revol_util`, & `total_acc`

> - `dti`: A ratio calculated using the borrower’s total monthly debt payments on the total debt obligations, excluding mortgage and the requested LC loan, divided by the borrower’s self-reported monthly income.
> - `open_acc`: The number of open credit lines in the borrower's credit file.
> - `revol_bal`: Total credit revolving balance
> - `revol_util`: Revolving line utilization rate, or the amount of credit the borrower is using relative to all available revolving credit.
> - `total_acc`: The total number of credit lines currently in the borrower's credit file

In [13]:
# Apply a function to transform the 'pub_rec' column in the applicants_df DataFrame
applicants_df['pub_rec'] = applicants_df.pub_rec.apply(
    applicants.pub_rec,
)

# Apply a function to transform the 'pub_rec_bankruptcies' column in the applicants_df DataFrame
applicants_df['pub_rec_bankruptcies'] = applicants_df.pub_rec_bankruptcies.apply(
    applicants.pub_rec_bankruptcies,
)

## <span style="color:#ff5f27;">🔄 Data PreProcessing</span>

**Section Goals:** 
> - Remove or fill any missing data. 
> - Remove unnecessary or repetitive features. 
> - Convert categorical string features to dummy variables.

Realistically there are too many unique job titles to try to convert this to a dummy variable feature. Let's remove that emp_title column.

In [14]:
applicants_df.drop('emp_title', axis=1, inplace=True)

Charge off rates are extremely similar across all employment lengths. So we are going to drop the `emp_length` column.

In [15]:
applicants_df.drop('emp_length', axis=1, inplace=True)

The title column is simply a string subcategory/description of the purpose column. So we are going to drop the title column.

In [16]:
loans_df.drop('title', axis=1, inplace=True)

### `mort_acc`

There are many ways we could deal with this missing data. We could attempt to build a simple model to fill it in, such as a linear model, we could just fill it in based on the mean of the other columns, or you could even bin the columns into categories and then set NaN as its own category. There is no 100% correct approach! 

Let's try the fillna() approach. We will group the dataframe by the total_acc and calculate the mean value for the mort_acc per total_acc entry. To get the result below:

In [17]:
total_acc_avg = applicants.mean_mort_acc(applicants_df)
total_acc_avg.head(3)

Unnamed: 0,total_acc,mean_mort_acc
0,2.0,0.0
1,3.0,0.05
2,4.0,0.07


In [18]:
applicants_df['mort_acc'] = applicants_df.apply(
    lambda x: applicants.fill_mort_acc(
        x['total_acc'], 
        x['mort_acc'], 
        total_acc_avg['mean_mort_acc'],
    ), 
    axis=1,
)

### `revol_util` & `pub_rec_bankruptcies`
These two features have missing data points, but they account for less than 0.5% of the total data. So we are going to remove the rows that are missing those values in those columns with dropna().

In [19]:
applicants_df.dropna(inplace=True)

### <span style="color:#ff5f27;"> 🧮 Categorical Variables</span>


### `term`

### `grade` & `sub_grade`

We know that `grade` is just a sub feature of `sub_grade`, So we are goinig to drop it.

In [20]:
loans_df.drop('grade', axis=1, inplace=True)

### `address`
We are going to feature engineer a zip code column from the address in the data set. Create a column called 'zip_code' that extracts the zip code from the address column.

In [21]:
loans_df['zip_code'] = loans_df.apply(
    lambda x: loans.zipcode(x['address'][-5:]), 
    axis=1,
)

In [22]:
loans_df.zip_code.value_counts()

zip_code
0        91226
70466    56985
30723    56546
22690    56527
48052    55917
29597    45471
11650    11226
93700    11151
86630    10981
Name: count, dtype: int64

In [23]:
loans_df.drop('address', axis=1, inplace=True)

### `issue_d` 

This is the event_time for the loan being issued

In [24]:
loans_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 396030 entries, 0 to 396029
Data columns (total 10 columns):
 #   Column       Non-Null Count   Dtype         
---  ------       --------------   -----         
 0   id           396030 non-null  int64         
 1   loan_amnt    396030 non-null  float64       
 2   term         396030 non-null  object        
 3   int_rate     396030 non-null  float64       
 4   installment  396030 non-null  float64       
 5   sub_grade    396030 non-null  object        
 6   issue_d      396030 non-null  datetime64[ns]
 7   loan_status  396030 non-null  object        
 8   purpose      396030 non-null  object        
 9   zip_code     396030 non-null  object        
dtypes: datetime64[ns](1), float64(3), int64(1), object(5)
memory usage: 30.2+ MB


### `earliest_cr_line`
This appears to be a historical time stamp feature. Extract the year from this feature using a `.apply()` function, then convert it to a numeric feature.

In [25]:
# applicants_df['earliest_cr_line_year'] = applicants_df.apply(
#     lambda x: applicants.earliest_cr_line(x['earliest_cr_line']), axis=1)

In [26]:
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

expectation_suite = ExpectationSuite(
    expectation_suite_name="transaction_suite")

expectation_suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column":"int_rate", 
            "min_value":-2.0,
            "max_value":2000.0,
        }
    )
)

{"kwargs": {"column": "int_rate", "min_value": -2.0, "max_value": 2000.0}, "meta": {}, "expectation_type": "expect_column_values_to_be_between"}

## <span style="color:#ff5f27;"> 🔮 Connect to Hopsworks Feature Store</span>

In [27]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/537749




Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27;"> 🪄 Creating Feature Groups </span>


In [28]:
loans_fg = fs.get_or_create_feature_group(
    name="loans",
    version=1,
    description="Lending Club Loans",
    online_enabled=True,
    primary_key=['id'],
    event_time='issue_d',
    expectation_suite=expectation_suite,
)

In [29]:
applicants_fg = fs.get_or_create_feature_group(
    name="applicants",
    version=1,
    description="Lending Club Loan Applicants",
    online_enabled=True,
    primary_key=['id'],
#     partition_key=['earliest_cr_line_year'],
    event_time='earliest_cr_line',
)

### Configure upload batch size for performance (latency vs throughput)

```
loans_fg.insert(loans_df,
    "kafka_producer_config": {
        "linger.ms": 20,
        "batch.size": 1000000,
        "acks": 1,
        "max.in.flight.requests.per.connection": 5,
        "message.max.bytes": 2000000,
        "batch.num.messages": 200000,
        "buffer.memory": 335544320,
        "queue.buffering.max.messages": 10000000,
        "debug": "broker,topic,msg,queue"
        } 
```

In [30]:
# Insert data into the "loans" feature group
loans_fg.insert(loans_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/537749/fs/533572/fg/722287
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/537749/fs/533572/fg/722287




Uploading Dataframe: 0.00% |          | Rows 0/396030 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: loans_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/537749/jobs/named/loans_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x71a6a63ba1e0>,
 {
   "results": [
     {
       "expectation_config": {
         "kwargs": {
           "column": "int_rate",
           "min_value": -2.0,
           "max_value": 2000.0
         },
         "meta": {
           "expectationId": 452609
         },
         "expectation_type": "expect_column_values_to_be_between"
       },
       "success": true,
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "result": {
         "element_count": 396030,
         "missing_count": 0,
         "missing_percent": 0.0,
         "unexpected_count": 0,
         "unexpected_percent": 0.0,
         "unexpected_percent_total": 0.0,
         "unexpected_percent_nonmissing": 0.0,
         "partial_unexpected_list": []
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-04-15T11:56:54.000795Z"
       }
     }
   ],
   "suc

In [31]:
# Insert data into the "applicants" feature group
applicants_fg.insert(applicants_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/537749/fs/533572/fg/722288


Uploading Dataframe: 0.00% |          | Rows 0/395219 | Elapsed Time: 00:00 | Remaining Time: ?

%5|1713182666.478|REQTMOUT|rdkafka#producer-3| [thrd:ssl://3.147.195.6:9092/bootstrap]: ssl://3.147.195.6:9092/3: Timed out ProduceRequest in flight (after 47374ms, timeout #0)
%4|1713182666.479|REQTMOUT|rdkafka#producer-3| [thrd:ssl://3.147.195.6:9092/bootstrap]: ssl://3.147.195.6:9092/3: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1713182666.479|FAIL|rdkafka#producer-3| [thrd:ssl://3.147.195.6:9092/bootstrap]: ssl://3.147.195.6:9092/3: 1 request(s) timed out: disconnect (after 226960ms in state UP)


Launching job: applicants_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/537749/jobs/named/applicants_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x71a6a63d10d0>, None)

## <span style="color:#ff5f27;">📖 Update the description of any features found in the data dictionary</span>

Loop through the datadict. For each entry, if there is a corresponding feature in the feature group, update its description

In [32]:
metadata = pd.read_csv("https://repo.hops.works/dev/jdowling/LCDataDictionary.csv")
metadata.head()

Unnamed: 0,LoanStatNew,Description
0,acc_now_delinq,The number of accounts on which the borrower i...
1,acc_open_past_24mths,Number of trades opened in past 24 months.
2,addr_state,The state provided by the borrower in the loan...
3,all_util,Balance to credit limit on all trades
4,annual_inc,The self-reported annual income provided by th...


In [33]:
datadict=[]

# For each (name, description) pair in LCDataDictionary.csv, we try and set the feature 
# description for loans_fg and applicants_fg. If the feature doesn't exist in the feature group
# an exception is thrown, and we "do nothing"
for entry in metadata.index: 
        name = metadata['LoanStatNew'][entry]
        try:
            f = loans_fg.get_feature(name)
            loans_fg.update_feature_description(name, metadata['Description'][entry])
            print("Updating description of feature: {}".format(f.name))
        except:
            pass # do nothing

        try:
            f = applicants_fg.get_feature(name)
            applicants_fg.update_feature_description(name, metadata['Description'][entry])
            print("Updating description of feature: {}".format(f.name))
        except:
            pass # do nothing

Updating description of feature: annual_inc
Updating description of feature: application_type
Updating description of feature: dti
Updating description of feature: earliest_cr_line
Updating description of feature: home_ownership
Updating description of feature: id
Updating description of feature: id
Updating description of feature: initial_list_status
Updating description of feature: installment
Updating description of feature: int_rate
Updating description of feature: issue_d
Updating description of feature: loan_amnt
Updating description of feature: loan_status
Updating description of feature: mort_acc
Updating description of feature: open_acc
Updating description of feature: pub_rec
Updating description of feature: pub_rec_bankruptcies
Updating description of feature: purpose
Updating description of feature: revol_bal
Updating description of feature: revol_util
Updating description of feature: sub_grade
Updating description of feature: term
Updating description of feature: total_acc

---
## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Training Pipeline
 </span> 

In the following notebook you will use your feature groups to create a dataset you can train a model on.
