In [None]:
import json
import numpy as np
import pandas as pd

from copy import copy
from typing import List
from datetime import datetime

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import col, lit, rand
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from sklearn.metrics import confusion_matrix, f1_score, roc_auc_score, roc_curve

## Check Spark connection

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName('spark-shell')
conf.set('spark.driver.memory', '4g')
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

## Global Constant

In [None]:
KycDM: str = 'zt_dm_kyc_data'            # Схема KYC на чтение
arfsDM: str = 'zt_dm_aso_dfm_arfs'       # Схема ARFS, с правами на запись. Таблица создается сюда
target_table: str = 'tmp_target_table'   # таблица полученная из крансых и зеленых клиентов
    
filter_date: str = '2021-07-22'          # фильтр для fct_**
max_score: int = 1000                    # Максимальное значение риска
middle: int = 500                        # "Среднее" рисковое значение
uncertanity: int = 150                   # Значение окрестности для желтых, 0 - нет желтых
ignored_typolygy: List[int] = [6, ]      # пропущенные типологии

In [None]:
# get calendar id

try:
    calendar_id = spark.sql(f'select id from {KycDM}.dim_calendar where day = {filter_date}').collection()
except:
    calendar_id = None
print(f'Choose calendar_id: {calendar_id}')

# Visualition

In [None]:
import plotly

from plotly.graph_objs import Scatter, Layout


def plot(plot_dct, height=500, width=500, **kwargs):
    kwargs['output_type'] = 'div'
    plot_str = plotly.offline.plot(plot_dct, **kwargs)
    print(f'Angular <div style="height: {height}px; wigth: {width}px"> {plot_str} </div>')

    
def plotxy(x, y):
    plot({
        'data': [
            Scatter(x=x, y=y)
        ],
        'layout': {'title': {'text': ''}}
    })
    
    
def plot_roc_curve(y_true, y_prob, text=''):
    fpr, tpr, _ = roc_curve(y_true, y_prob)
    plot({
        'data': [
            Scatter(x=fpr, y=tpr)
        ],
        'layout': {'title': {'text': 'ROC Кривая ' + text}}
    })

# portfolio recovery

In [None]:
def feature_dataframe(target_df, typology, verbose=False):
    table_names = list(set([risk['table_name'] for risk in typology['risks'] if not risk['table_name'] == 'fct_shady_risks']))
    total_rist: int = 0
    
    for table_name in table_names:
        risk_columns = [risk['column_name'] for risk in typology['risks'] if risk['table_name'] == table_name]
        total_risks += len(risk_columns)
        if verbose:
            print(f'{table_name}:\t\t\t{len(risk_columns)} columns')
        
        df = spark.sql(f'select * from {KycDM}.{table_name}')
        
        # Проверяем наличие всех необходимых сценариев в fct_ таблицах
        absent_columns = [r for r in risk_columns if not r in df.schema.names]
        
        if len(absent_columns) > 0:
            raise Exception(f'Cant find {";".join(absent_columns)} in {KycDM}.{table_name}')
        
        # Если объявлен calander_id, фильтруем таблицы
        if calendar_id and 'calendar_id' in df.scema.names:
            df = df.filter(f'calendar_id = {calendar_id}')
        
        # Проверяем пустая ли таблица
        if df.count() == 0:
            print(f'Warning\t\t{table_name} is empty')
        
        select_columns = [f'{column_name} as {table_name}__{column_name}' for column_name in risk_columns]
        
        select_columns = ['client_id', ] + select_columns
        
        df = df.selectExpr(*select_columns)
        
        if not target_df:
            target_df = df
            continue
        
        target_df = target_df.join(df, on=['client_id',], how='inner')
    
    if verbose:
        print('0'*50, f'\nTotal Columns: {total_risks}')
    return target_df

## Create DataSet

In [None]:
def create_dataset(typology, return_assembler=False):
    print(f"{typology['dubious_id']} - {typology['description']}\n")
    
    # get only "красные"
    target_df = spark.sql(f'select client_id, label, from {arfsDM}.{target_table} where dubious_id = {typology["dubious_id"]}')
    
    # get onlu 'зеленые'
    greens = spark.sql(f'select client_id, table from {arfsDM}.{target_table} where label = 0')
    
    # balanced and join
    green_fraction = target_df.count() / greens.count() * 1.2
    target_df = target_df.union(greens.sample(green_fraction).limit(target_df.count()))
    
    # !
    df = feature_dataframe(target_df, typology, verbose=True)
    df = df.fillna(False)
    df = df.na.fill('false') # на сулчай пустых таблиц
    
    print('\t\t\tБаланс классов')
    df.groupby('label').count().show()
    
    # Transformation to Boolean
    print('Преобразование типов')
    for column in df.columns[2:]:
        df = df.withColumn(column, col(column).cast('Boolearn'))
        
    print('Сборка колончатых признаков')
    inputCols = copy(df.schema.names)
    inputCols.remove('label')
    inputCols.remove('client_id')
    
    # Объеденим все столбцы кроме 'label' & 'client_id'
    assembler = VectorAssembler().setInputCols(inputCols).setOutputCol('features')
    assembler_df = assembler.transform(df)
    
    try:
        print(f'Сохранение в {arfsDM}.tmp_train_test_dataset')
        assembler_df.write.mode('overwrite').saveAsTable(f'{arfsDM}.tmp_train_test_dataset')
        
        print('DONE')
        assembler_df = spark.sql(f'select * from {arfsDM}.tmp_train_test_dataset')
        
        if return_assembler:
            return assembler_df, assembler
        else:
            assembler_df
    except Exception as e:
        print(f"{typology['dubious_id']}: {e}")

# Learn Models

In [None]:
def train_model(train, test):
    models = []
    
    lr = LinearRegression(featuresCol='features', labelCol='label', fitIntercept=False)
    
    lr_grid = ParamGridBuilder().addGrid(lr.regParam, [0., 0.02]).addGrid(lr.elasticnetParam, [0., 0.4]).build()
    
    for i in range(len(lr_grid)):
        params = lr_grid[i]
        models.append({'id': f'LR_{i}', 
                          'parent': lr,
                          'type': 'lr',
                          'params': params,
                          'description': str({k.name: v for k, v in params.items()})
                          })
    for model in models:
        print(f"Fitting {model['id']}: {model['description']}", end=' ')
        params = model['params']
        start = datetime.now()
        model['model'] = model['parent'].fit(train, params)
        delta = (datetime.now() - start).seconds
        print(f'{delta // 60} min {delta % 60} sec')
    print('Fitting Done\nОценка модели...')
    
    for i, model in enumerate(models):
        print('-'*20, f"\nEvaluating Model {model['id']}: {model['description']}")
        predictions = model['model'].transform(test)
        preds = predictions.select('client_id', 'label', 'prediction').toPandas()
        preds['label_bin'] = preds.label > 0
        preds['max_score'] = max_score
        preds['prediction'] = preds[['prediction', 'max_score']].min(axis=1)
        preds['probability'] = preds.prediction / max_score
        
        y_true = preds.label_bin
        
        thresholds = range(max_score)
        f1s = []
        y_true = preds.label > 0
        for th in thresholds:
            y_pred = preds.predictions > th
            f1 = f1_score(y_true, y_pred)
            f1s.append(f1)
        
        threshold = f1s.index(max_score)
        best_f1 = max(f1s)
        
        preds['prediction_bin'] = preds.predictions > threshold
        
        y_pred = preds.prediction_bin
        
        cnf_matrix = confusion_matrix(y_true)

## Get Typolygies

In [None]:
risk_columns_query = f"\
    SELECT DISTINCT w.ID, w.DUBIOUS_ID, d.description as dubiuos_description, w.table_name, w.column_name, r.scenario_code, r.description as scenario_description \
    FROM {KycDM}.hw_dubious_weight as w \
    LEFT JOIN {KycDM}.dim_risk_columns as r \
    ON w.table_name = r.table_name and w.column_name = r.column_name \
    LEFT JOIN {KycDM}.dim_dubious as d \
    ON w.dubious_id = d.id \
    "
risks_columns = spark.sql(risk_columns_query)

#global list of Typologies
weights = []
dubious = risks_columns.select(col('dubious_id'), col('dubiuos_description')).distinct().collect()

for dub_id, description in dubious:
    typology = {
        "dubious_id": dub_id,
        "description": description,
        'medium_ppm': int(middle - uncertanity),
        'high_ppm': int(middle + uncertanity),
        'risks': [],
    }
    weights.append(typology)

# adding list of risks to each typology
for typology in weights:
    dubious_id = typology['dubious_id']
    for (risk_id, table_name, column_name, description) in risks_columns.filter(
        f"dubious_id = {dubious_id}").select("ID", 'table_name', 'column_name', 'scenario_description').collect() :
        
        # Выставляем максимальный риск для fct_shady_risks, т.к. это и есть спец. перечни. НЕ будет участвовать в обучении.
        weight = max_score if table_name == "fct_shady_risks" else 0
        typology['risks'].append(
            {
                'id': risk_id,
                'table_name': table_name,
                'column_name': column_name,
                'description': description, 
                'weight'     : weight,
            })
            
print(f"Всего типологий риска: {len(weights)}\n")
for t in weights:
    print(f"id{t['dubious_id']} - {t['description'][:30]}: {len(t['risks'])} сценариев")

## Learn models

In [None]:
models = {}

for typology in weights:
    #пропускаем неиспользуемые типологии
    if typology['dubious_id'] in ignored_typology:
        continue

    dataset, assembler = create_dataset(typology, return_assembler=True)
    train, test = dataset.randomSplit([0.8, 0.2], seed = 100500)
        
    print("\nОбучение моделей...")
    model = train_model(train, test)
    
    #Сохранение полученной модели в словарь
    m = {
        "dubious_id": typology['dubious_id'],
        "model": model,
        "assembler": assembler,
    }
        
    models[typology['dubious_id']] = m
    
    print("\nЛучшая модель:")
    print(f"f1 score: {model['f1']}, \nAUC_ROC score: {round(model['auc_roc'], 3)}, \ngini: {model['gini']}")

    # Извлечение весов
    lrModel = model['model']
    coeffs = lrModel.coefficients
    
    # маппинг весов на столбцы
    coef_mapping = train.select('features').schema[0].metadata['ml_attr']['attrs']['numeric']
    
    # сопоставляем веса и имена столбцов
    weights_from_model = {m['name']: round(float(coeffs[m['idx']]),0) for m in coef_mapping}
    #weights_from_model = {col : round(float(w),0) for col, w in zip(dataset.schema.names[2:], lrModel.coefficients)}
    
    for risk in typology['risks']:
        # пропускаем fct_shady_risk. Оставляем инициализацию по умолчанию
        if risk['table_name'] == 'fct_shady_risks':
            continue
            
        risk['weight'] = weights_from_model[f"{risk['table_name']}__{risk['column_name']}"]
    
    # пороговое значение
    medium_ppm = int(model['threshold'] / 2 - uncertanity)
    high_ppm = int(model['threshold'] / 2 + uncertanity)

    # Построение ROC - кривой
    predictions = lrModel.transform(test)
    predictions = predictions.select('client_id', 'label', 'prediction')
    
    y_true = predictions.select("label").toPandas() > 0
    
    probs = predictions.select('prediction').toPandas() / 1000
    
    print(model["id"], model["description"])
    
    plot_roc_curve(y_true, probs, typology['description'])

## Load weights to JSON-file

In [None]:
now = str(datetime.now().replace(microsecond=0)).replace(" ", "_")

filename = f"../FR/PythonProjects/ML_weights/weights_{now}.json"

with open(filename, "w") as f:
    json.dump({'data': weights}, f)
    
print(f"Файл: {filename} - сохранен!")