Подключаем нужные библиотеки.

In [1]:
from datetime import timedelta
import pandas as pd
from feast.data_format import ParquetFormat
from feast import (Entity, Feature, FeatureView, FileSource, ValueType, FeatureStore, Field)
from feast.types import Int64, UnixTimestamp, String, Float32
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage
import numpy as np
from feast.dqm.profilers.ge_profiler import ge_profiler
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDataset

### Создание FeatureView на основе данных кредитной истории клиента

Прописываем пути к локальным данным.

In [2]:
zipcode_path = "data/zipcode_table.parquet"
credit_history_path = "data/credit_history.parquet"

Конструируем фичи.

In [3]:
# Создаем FeatureView для географических данных

# Создаем сущность "zipcode", по сути выбираем столбец, по которому будет проходить join 
# при объединении фич из разных FeatureView

zipcode = Entity(
    name="zipcode", 
    value_type=ValueType.INT64) 

# Указываем источник данных для "zipcode" 

zipcode_source = FileSource(
    path=zipcode_path, # Путь к источнику
    file_format=ParquetFormat(), # Формат источника
    timestamp_field="event_timestamp" # инициализируем колонку события
)

# Созаем представление данных и называем его (name). Описываем какие фичи мы заберем 
# (features) из источника (batch_source) и по какому столбцу мы будем их объединять с 
# другими FeatureView entities.

zipcode_features = FeatureView(
    name="zipcode_features",
    entities=[zipcode, ],
    ttl=timedelta(days=3650),
    schema=[
        Field(name="city", dtype=String),
        Field(name="state", dtype=String),
        Field(name="location_type", dtype=String),
        Field(name="tax_returns_filed", dtype=Int64),
        Field(name="population", dtype=Int64),
        Field(name="total_wages", dtype=Int64),
    ],
    source=zipcode_source,
)

# По аналогии создаем FeatureView для данных кредитной истории.

dob_ssn = Entity(
    name="dob_ssn",
    value_type=ValueType.STRING,
    description="Date of birth and last four digits of social security number",
)

credit_history_source = FileSource(
    path=credit_history_path, 
    file_format=ParquetFormat(),
    timestamp_field="event_timestamp"
)

credit_history = FeatureView(
    name="credit_history",
    entities=[dob_ssn,],
    ttl=timedelta(days=140),
    schema=[
        Field(name="credit_card_due", dtype=Int64),
        Field(name="mortgage_due", dtype=Int64),
        Field(name="student_loan_due", dtype=Int64),
        Field(name="vehicle_loan_due", dtype=Int64),
        Field(name="hard_pulls", dtype=Int64),
        Field(name="missed_payments_2y", dtype=Int64),
        Field(name="missed_payments_1y", dtype=Int64),
        Field(name="missed_payments_6m", dtype=Int64),
        Field(name="bankruptcies", dtype=Int64),
    ],
    source=credit_history_source,
)

Инициализируем расположение локального хранилища фич.

In [4]:
store = FeatureStore(repo_path=".")



Сохраняем.

In [5]:
store.apply([zipcode, zipcode_features, dob_ssn, credit_history])

### Подготавливаем данные клиента для обучения

Загружаем таблицу с заявками на кредит.

In [6]:
loans = pd.read_parquet("data/loan_table.parquet")

In [7]:
loans.head()

Unnamed: 0,loan_id,dob_ssn,zipcode,person_age,person_income,person_home_ownership,person_emp_length,loan_intent,loan_amnt,loan_int_rate,loan_status,event_timestamp,created_timestamp
0,10000,19530219_5179,76104,22,59000,RENT,123.0,PERSONAL,35000,16.02,1,2021-08-25 20:34:41.361000+00:00,2021-08-25 20:34:41.361000+00:00
1,10001,19520816_8737,70380,21,9600,OWN,5.0,EDUCATION,1000,11.14,0,2021-08-25 20:16:20.128000+00:00,2021-08-25 20:16:20.128000+00:00
2,10002,19860413_2537,97039,25,9600,MORTGAGE,1.0,MEDICAL,5500,12.87,1,2021-08-25 19:57:58.896000+00:00,2021-08-25 19:57:58.896000+00:00
3,10003,19760701_8090,63785,23,65500,RENT,4.0,MEDICAL,35000,15.23,1,2021-08-25 19:39:37.663000+00:00,2021-08-25 19:39:37.663000+00:00
4,10004,19830125_8297,82223,24,54400,RENT,8.0,MEDICAL,35000,14.27,1,2021-08-25 19:21:16.430000+00:00,2021-08-25 19:21:16.430000+00:00


Прописываем фичи, которыми мы хотим обогатить таблицу с заявками.

In [8]:
feast_features = [
    "zipcode_features:city",
    "zipcode_features:state",
    "zipcode_features:location_type",
    "zipcode_features:tax_returns_filed",
    "zipcode_features:population",
    "zipcode_features:total_wages",
    "credit_history:credit_card_due",
    "credit_history:mortgage_due",
    "credit_history:student_loan_due",
    "credit_history:vehicle_loan_due",
    "credit_history:hard_pulls",
    "credit_history:missed_payments_2y",
    "credit_history:missed_payments_1y",
    "credit_history:missed_payments_6m",
    "credit_history:bankruptcies",
]

## Great Expectations 

Great Expectations – это открытый фреймворк для профилирования, тестирования и документирования данных. 

Объеденяем фичи из zipcode_features и credit_history (feast_features) по таблице loans. (Делаем join талицы loans по столбцам zipcode (с zipcode_features), dob_ssn (с credit_history))

In [None]:
training_df = store.get_historical_features(
    entity_df=loans, 
    features=feast_features
)

Сохраняем объединенную таблицу. 

In [None]:
store.create_saved_dataset(
    from_=training_df,
    name='my_training_ds',
    storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
)

Загружаем сохраненную таблицу.

In [None]:
ds = store.get_saved_dataset('my_training_ds')
ds.to_df().head()

### Исследование данных

Для понимания специфики данных data scientist может использовать разные методы их исследования. <br> В данном примере будет использовани библиотека pandas-profiling.

In [None]:
from pandas_profiling import ProfileReport

In [None]:
df = ds.to_df()

In [None]:
profile = ProfileReport(df, title="Pandas Profiling Report", explorative=True)

In [None]:
from IPython.core.display import display
profile

### Составление профиля данных

На основе полученных знаний о данных сформируес профайлер для нашего обучающего датасета.

In [None]:
@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
    
    # ожидаемые столбцы
    ds.expect_table_columns_to_match_ordered_list(
    ["loan_id","dob_ssn","zipcode","person_age","person_income","person_home_ownership",
     "person_emp_length","loan_intent","loan_amnt","loan_int_rate","loan_status","event_timestamp",
     "created_timestamp","city","state","location_type","tax_returns_filed","population",
     "total_wages","credit_card_due","mortgage_due","student_loan_due","vehicle_loan_due",
     "hard_pulls","missed_payments_2y","missed_payments_1y","missed_payments_6m","bankruptcies"]
    )
    
    # указываем, что возраст клиента должен быть от 18 до 90 (в данных есть клиенты с возрастом 144)
    ds.expect_column_values_to_be_between(
        "person_age",
        min_value=18,
        max_value=90,
        mostly=1.
    )
    
    # значение "выплатил ли кредит" должно быть бинарным
    ds.expect_column_values_to_be_in_set(
    'loan_status',
    [0, 1]
    )
    
    # доход обрежем на 95ом перцентиле (но допустим некоторые привышающие значения)
    ds.expect_column_values_to_be_between(
        "person_income",
        min_value=0,
        max_value=140000,
#         mostly= 0.95,
        mostly= 1.,
    )
    
    # и можем проверить колонку на соответствие типу
    ds.expect_column_values_to_be_of_type(
    'city',
    'str'
    )
    
    return ds.get_expectation_suite(discard_failed_expectations=False)

Тестируем наш профайлер (должны отобразиться все тесты)

In [None]:
ds.get_profile(profiler=stats_profiler)

Теперь мы можем создать ссылку на проверку из набора данных.

In [None]:
validation_reference = ds.as_reference(name='test',profiler=stats_profiler)

И сразу проверим наш job на соответствие.

In [None]:
from feast.dqm.errors import ValidationFailed

In [None]:
fail_str = None

In [None]:
def beautifull_report(report_list):
    report_list = eval(str(report_list).replace('false','False').replace('true','True').replace('null','None'))
    columns_dict = {}
    for report in report_list:
        column = report["expectation_config"]["kwargs"]["column"]
        if column in columns_dict:
            columns_dict[column].append(report)
        else:
            columns_dict[column] = [report,]
            
    for key, value in columns_dict.items():
        print(key)
        print("*"*20)
        for report in value:
            expectation_type = report["expectation_config"]["expectation_type"]
            print("Expectation type: ", expectation_type)
            element_count = report["result"]["element_count"]
            print("Element count: ", element_count)
            missing_count = report["result"]["missing_count"]
            print("Missing count: ", missing_count)
            
            unexpected_count = report["result"]["unexpected_count"]
            print("Unexpected count: ", unexpected_count)

            partial_unexpected_counts = report["result"]["partial_unexpected_counts"]
            print("Top 5 unexpected elenents list: ", partial_unexpected_counts[:5])
            print("-"*40)
        print()



### Валидация и получение отчета

In [None]:
try:
    df = training_df.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
    fail_str = exc.validation_report
    beautifull_report(fail_str)
    print(exc.validation_report)

Видим, что тест на возраст не прошел, мы можем изменить исходники или добавить предобработку данных перед обучением.

In [None]:
training_df = store.get_historical_features(
    entity_df=loans, 
    features=feast_features
).to_df()

In [None]:
training_df.to_csv("test_data/credit_history_all.csv", index=False)

### Подготавливаем данные клиента для обучения

In [None]:
training_df = training_df[(training_df['person_age'] < 90)&(training_df['person_income'] < 140000)]

In [None]:
training_df.to_csv("test_data/credit_history_clear.csv", index=False)

In [None]:
from sklearn import tree
from sklearn.preprocessing import OrdinalEncoder

In [None]:
encoder = OrdinalEncoder()
categorical_features = [
    "person_home_ownership",
    "loan_intent",
    "city",
    "state",
    "location_type",
]
encoder.fit(training_df[categorical_features])
transform_training_df=training_df.copy()
transform_training_df[categorical_features] = encoder.transform(
    training_df[categorical_features]
)

target = "loan_status"
train_X = transform_training_df[
    transform_training_df.columns.drop(target)
    .drop("event_timestamp")
    .drop("created_timestamp")
    .drop("loan_id")
    .drop("zipcode")
    .drop("dob_ssn")
]
train_X = train_X.reindex(sorted(train_X.columns), axis=1)
train_Y = transform_training_df.loc[:, target]

### Разделяем подготовленные данные на тестовую и обучающие выборки

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(train_X, train_Y, test_size=0.10)

In [None]:
train_X.head()

In [None]:
train_Y

### Производим обучение модели

In [None]:
classifier = tree.DecisionTreeClassifier()
classifier.fit(train_X[sorted(train_X)], train_Y)

In [None]:
max_depth = 22
classifier = tree.DecisionTreeClassifier(max_depth = max_depth)
classifier.fit(x_train[sorted(x_train)], y_train)

### Подсчитываем метрики и смотрим на качество обучения модели

In [None]:
predictions = classifier.predict(x_test)

In [None]:
from sklearn.metrics import accuracy_score, make_scorer

accuracy = accuracy_score(y_true=y_test, y_pred = predictions)
print(accuracy)

In [None]:
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

In [None]:
(rmse, mae, r2) = eval_metrics(y_test, predictions)
print(rmse)
print(mae)
print(r2)