In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('xente').getOrCreate()
spark

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pyspark.sql.functions as F
import shap
import catboost
from catboost import CatBoostClassifier, Pool
from pyod.models.lscp import LSCP
from pyod.models.knn import KNN

from imblearn.over_sampling import SMOTENC
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import (accuracy_score, classification_report, confusion_matrix,
                              f1_score, precision_score, recall_score, roc_curve, roc_auc_score)
from pyspark.sql.functions import (mean, dayofmonth, hour, dayofweek,
                                                                   month, weekofyear, dayofyear,
                                                                   format_number, col)
from pyspark.ml.feature import StringIndexer

In [2]:
def read_data_from_web(url):
  data = pd.read_csv(url)
  spark_data = spark.createDataFrame(data)
  return spark_data

#fraud_data = spark.read.csv('training.csv', header=True, inferSchema=True)
fraud_data = read_data_from_web("https://drive.google.com/uc?export=download&id=1NrtVkKv8n_g27w5elq9HWZA1i8aFBW0G")
df_backup = fraud_data

In [7]:
fraud_data = df_backup

### Engenharia de features

In [8]:
def remove_feature(data, columns_in):
  return data.drop(*columns_in)

def clean_data(fraud_data, columns_to_remove):
  fraud_data = remove_feature(fraud_data, columns_to_remove)
  return fraud_data

In [10]:
fraud_data.filter('FraudResult==1').count()

IllegalArgumentException: 'Unsupported class file major version 55'

In [9]:
contamination = (fraud_data.filter('FraudResult==1').count())/(fraud_data.count())

IllegalArgumentException: 'Unsupported class file major version 55'

In [None]:
avg_value = fraud_data.agg({'Value':'avg'}).collect()[0][0]
fraud_data = fraud_data.withColumn(
    'ValueStrategy',
    F.when(F.col("Value") > avg_value*1000, 3)\
      .when(F.col("Value") > avg_value*100, 2)\
      .when(F.col("Value") > avg_value*10, 1)\
    .otherwise(0)
)

In [None]:
fraud_data = fraud_data.withColumn("Operation", F.when(fraud_data.Amount > 0, 1).when(fraud_data.Amount < 0, -1).otherwise(0))

fraud_data = fraud_data.withColumn('Hour', hour(fraud_data['TransactionStartTime']))
fraud_data = fraud_data.withColumn('DayOfWeek', F.dayofweek(fraud_data['TransactionStartTime']))
fraud_data = fraud_data.withColumn('DayOfYear', dayofyear(fraud_data['TransactionStartTime']))
fraud_data = fraud_data.withColumn('WeekOfYear', weekofyear(fraud_data['TransactionStartTime']))

fraud_data = fraud_data.withColumn('Vl_per_weekYr', ( fraud_data['Value']/fraud_data['WeekOfYear'] ) )
fraud_data = fraud_data.withColumn('Vl_per_dayWk', ( fraud_data['Value']/fraud_data['DayOfWeek'] ) )
fraud_data = fraud_data.withColumn('Vl_per_dayYr', ( fraud_data['Value']/fraud_data['DayOfYear'] ) )

In [None]:
items_list = ['ProductId','ProviderId']
gen_train_data = fraud_data

for item in items_list:
    mean_column_name = 'avg_vl_{0}'.format(item)

    mean_aux = gen_train_data.select([item,'Value']).groupBy(item).mean()
    mean_aux = mean_aux.select(col(item), col('avg(Value)').alias(mean_column_name))

    fraud_data = fraud_data.join(mean_aux, on=item)
    
    ratio_column_name = 'Rt_avg_vl_{0}'.format(item)
    fraud_data = fraud_data.withColumn(ratio_column_name, (F.col('Value')-F.col(mean_column_name))/ F.col(mean_column_name))

In [None]:
fraud_data = clean_data(fraud_data, columns_to_remove = ['CurrencyCode','CountryCode','AccountId','SubscriptionId','CustomerId','TransactionStartTime','Amount','DayOfYear','avg_vl_ProductId','avg_vl_ProviderId'])

In [None]:
fraud_data_toCorr = fraud_data.toPandas()
corr_matrix = fraud_data_toCorr.corr()
k = 15 #number of variables for heatmap
cols = corr_matrix.nlargest(k, 'FraudResult')['FraudResult'].index
cm = np.corrcoef(fraud_data_toCorr[cols].values.T)
sns.set(font_scale=1.25,rc={'figure.figsize':(8, 8)})
hm = sns.heatmap(cm, cbar=True, annot=True, square=True, fmt='.3f', annot_kws={'size': 8}, yticklabels=cols.values, xticklabels=cols.values)
plt.show()

In [None]:
all_features = ['ProviderId','ProductId','TransactionId','BatchId',
                            'ProductCategory','ChannelId','PricingStrategy','ValueStrategy', 
                            'Value','Operation','Hour','DayOfWeek','WeekOfYear',
                            'Vl_per_weekYr','Vl_per_dayWk','Vl_per_dayYr',
                            'Rt_avg_vl_ProductId','Rt_avg_vl_ProviderId']

label = 'FraudResult'

categorical_features = ['ProviderId','ProductId','TransactionId','BatchId',
                            'ProductCategory','ChannelId','PricingStrategy']

numerical_features = ['ValueStrategy','Value','Operation','Hour','DayOfWeek','WeekOfYear',
                           'Vl_per_weekYr','Vl_per_dayWk','Vl_per_dayYr',
                           'Rt_avg_vl_ProductId','Rt_avg_vl_ProviderId']

In [None]:
contamination

### split data

In [None]:
fraud_data = fraud_data.toPandas()
X_train = fraud_data[all_features]
y_train = fraud_data[label]

In [None]:
X_outliers = fraud_data[fraud_data['FraudResult'].isin([1])]

In [None]:
X_train_toTrain = X_train[numerical_features]
X_outliers_toTrain = X_outliers[numerical_features]

### Isolation Forest

In [None]:
if_clf = IsolationForest(behaviour='new',random_state=42, contamination=contamination, n_jobs=10)

In [None]:
if_clf.fit(X_train_toTrain, y_train)

In [None]:
predictions_if = if_clf.predict(X_train[numerical_features])

In [None]:
X_train_if = X_train
X_train_if['IsolationForest'] = predictions_if

In [None]:
X_train_if = X_train_if.replace({'IsolationForest' : 1}, 0)
X_train_if = X_train_if.replace({'IsolationForest' : -1}, 1)

### LSCP

In [None]:
from pyod.models.lof import LOF
detector_list = [LOF(contamination=contamination, n_jobs=10), LOF(contamination=contamination, n_jobs=10)]

In [None]:
lscp_clf = LSCP(detector_list=detector_list, random_state=42, contamination=contamination)

In [None]:
lscp_clf.fit(X_train_toTrain)

In [None]:
predictions_lscp = lscp_clf.predict(X_train[numerical_features])

In [None]:
X_train_if_lscp = X_train_if
X_train_if_lscp['LSCP'] = predictions_lscp

### knn

In [None]:
knn_clf = KNN(n_jobs=10, contamination=contamination,n_neighbors=2,method='mean')

In [None]:
knn_clf.fit(X_train_toTrain, y_train)

In [None]:
predictions_knn = knn_clf.predict(X_train[numerical_features])

In [None]:
X_train_if_lscp_knn = X_train_if_lscp
X_train_if_lscp_knn['Knn'] = predictions_knn

#### New Feature: CountDetection | PercentDetection

In [None]:
final_data = X_train_if_lscp_knn

In [None]:
final_data[ 'CountDetection'] = (final_data.IsolationForest + final_data.LSCP + final_data.Knn)

### Adding features to the dataframe

In [None]:
categorical_features.append('IsolationForest')
categorical_features.append('LSCP')
categorical_features.append('Knn')
categorical_features.append('CountDetection')

all_features.append('IsolationForest')
all_features.append('LSCP')
all_features.append('Knn')
all_features.append('CountDetection')

In [None]:
categorical_features_dims = [final_data.columns.get_loc(i) for i in categorical_features[:]] 
numerical_features_dims = [final_data.columns.get_loc(i) for i in numerical_features[:]] 

### Balanceando

In [None]:
X_train = final_data[all_features]

In [None]:
sm = SMOTENC(categorical_features=categorical_features_dims, random_state = 42, n_jobs=10)

In [None]:
 X_smotenc, y_smotenc = sm.fit_sample(X_train[all_features], y_train)

In [None]:
X_smotenc = pd.DataFrame(X_smotenc, columns=all_features)
y_smotenc = pd.DataFrame(y_smotenc, columns=[label])

In [None]:
sns.set(font_scale=1.25,rc={'figure.figsize':(4, 4)})
pd.Series(y_smotenc[label]).value_counts().plot.bar(title='SMOTENC : Count - Fraud Result')

### CatBoost

In [None]:
model = CatBoostClassifier(depth=5,learning_rate=0.1,l2_leaf_reg=1,eval_metric='F1',random_seed=42)
model.fit(X_smotenc, y_smotenc, verbose=False, plot=True, cat_features=categorical_features)

In [None]:
shap.initjs()

shap_values = model.get_feature_importance(Pool(X_smotenc, y_smotenc, cat_features=categorical_features_dims), type='ShapValues')
expected_value = shap_values[0,-1]
shap_values = shap_values[:,:-1]

# visualize the first prediction's explanation
shap.force_plot(expected_value, shap_values[200,:], X_smotenc.iloc[200,:])

In [None]:
shap.initjs()
# summarize the effects of all the features
shap.summary_plot(shap_values, X_smotenc)

### Avaliacao

In [None]:
#test_data = read_data_from_web("https://drive.google.com/uc?export=download&id=16cRQIFW6n2th2YOK7DEsp9dQgihHDuHa")
test_data = spark.read.csv('../data/xente_fraud_detection_test.csv', header=True, inferSchema=True)
test_backup = test_data

In [None]:
test_data = test_backup

In [None]:
avg_value = test_data.agg({'Value':'avg'}).collect()[0][0]
test_data = test_data.withColumn(
    'ValueStrategy',
    F.when(F.col("Value") > avg_value*1000, 3)\
      .when(F.col("Value") > avg_value*100, 2)\
      .when(F.col("Value") > avg_value*10, 1)\
    .otherwise(0)
)

In [None]:
test_data = test_data.withColumn("Operation", F.when(test_data.Amount > 0, 1).when(test_data.Amount < 0, -1).otherwise(0))

test_data = test_data.withColumn('Hour', hour(test_data['TransactionStartTime']))
test_data = test_data.withColumn('DayOfWeek', F.dayofweek(test_data['TransactionStartTime']))
test_data = test_data.withColumn('DayOfYear', dayofyear(test_data['TransactionStartTime']))
test_data = test_data.withColumn('WeekOfYear', weekofyear(test_data['TransactionStartTime']))

test_data = test_data.withColumn('Vl_per_weekYr', ( test_data['Value']/test_data['WeekOfYear'] ) )
test_data = test_data.withColumn('Vl_per_dayWk', ( test_data['Value']/test_data['DayOfWeek'] ) )
test_data = test_data.withColumn('Vl_per_dayYr', ( test_data['Value']/test_data['DayOfYear'] ) )

In [None]:
items_list = ['ProductId','ProviderId']
gen_train_data = test_data

for item in items_list:
    mean_column_name = 'avg_vl_{0}'.format(item)

    mean_aux = gen_train_data.select([item,'Value']).groupBy(item).mean()
    mean_aux = mean_aux.select(col(item), col('avg(Value)').alias(mean_column_name))

    test_data = test_data.join(mean_aux, on=item)
    
    ratio_column_name = 'Rt_avg_vl_{0}'.format(item)
    test_data = test_data.withColumn(ratio_column_name, (F.col('Value')-F.col(mean_column_name))/ F.col(mean_column_name))

In [None]:
test_data = clean_data(test_data, columns_to_remove = ['CurrencyCode','CountryCode','AccountId','SubscriptionId','CustomerId','TransactionStartTime','Amount','DayOfYear','avg_vl_ProductId','avg_vl_ProviderId'])

In [None]:
all_features = ['ProviderId','ProductId','TransactionId','BatchId',
                            'ProductCategory','ChannelId','PricingStrategy','ValueStrategy', 
                            'Value','Operation','Hour','DayOfWeek','WeekOfYear',
                            'Vl_per_weekYr','Vl_per_dayWk','Vl_per_dayYr',
                            'Rt_avg_vl_ProductId','Rt_avg_vl_ProviderId']

label = 'FraudResult'

categorical_features = ['ProviderId','ProductId','TransactionId','BatchId',
                            'ProductCategory','ChannelId','PricingStrategy']

numerical_features = ['ValueStrategy','Value','Operation','Hour','DayOfWeek','WeekOfYear',
                           'Vl_per_weekYr','Vl_per_dayWk','Vl_per_dayYr',
                           'Rt_avg_vl_ProductId','Rt_avg_vl_ProviderId']

In [None]:
test_data = test_data.toPandas()
X_test = test_data[all_features]

In [None]:
categorical_features_dims = [X_test.columns.get_loc(i) for i in categorical_features[:]] 
numerical_features_dims = [X_test.columns.get_loc(i) for i in numerical_features[:]] 

In [None]:
predictions_if = if_clf.predict(X_test[numerical_features])
X_test['IsolationForest'] = predictions_if

In [None]:
X_test = X_test.replace({'IsolationForest' : 1}, 0)
X_test = X_test.replace({'IsolationForest' : -1}, 1)

In [None]:
X_test_if = X_test

In [None]:
predictions_lscp = lscp_clf.predict(X_test[numerical_features])
X_test_if['LSCP'] = predictions_lscp

In [None]:
X_test_if_lscp = X_test_if

In [None]:
predictions_knn = knn_clf.predict(X_test[numerical_features])
X_test_if_lscp['Knn'] = predictions_knn

In [None]:
final_test_data = X_test_if_lscp

In [None]:
final_test_data[ 'CountDetection'] = (final_test_data.IsolationForest + final_test_data.LSCP + final_test_data.Knn)

In [None]:
final_result = model.predict( final_test_data )

In [None]:
def save_predictions_xente(file_name, transactions_list, predictions_list):
    file = open(file_name,'w')
    file.write('TransactionId,FraudResult\n')
    for (trans_id, value) in zip(transactions_list, predictions_list):
        file.write('{0},{1}\n'.format(trans_id, int(value)))
    file.close()

In [None]:
X_test = test_backup.toPandas()
output_file = 'xente_predictions.txt'  
X_test_transactions_list = list(X_test['TransactionId'])

In [None]:
save_predictions_xente(output_file, X_test_transactions_list, final_result)

In [None]:
print("Concluido")