In [1]:
import sys
from IPython.display import display, Math, Latex
import matplotlib.pyplot as plt
import plotly.plotly as py
import numpy as np
import seaborn as sns
import pandas as pd
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf,col
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
print(sys.version)

3.6.2 |Anaconda, Inc.| (default, Sep 30 2017, 18:42:57) 
[GCC 7.2.0]


# Presentation

Il existe dans les banques une équipe complète qui s'occupe de vérifier ces erreurs et de relancer ces calculs pendant la journée afin de minimiser les dégâts. Les membres de cette équipe essayent chaque jour de limiter les défaillances des énormes calculateurs. Cependant, vu l'énorme volume de données, toujours en augmentation, ces opérateurs ont une charge très importante.  


Chaque jour les membres de cette équipe reçoivent un jeu de données qui contient des centaines de millions de lignes. Chaque ligne correspond à un risque calculé par rapport à un scénario et une maturité. Ce risque est recalculé tous les jours en fonction des données de marché et de plusieurs autres facteurs. Cependant pour détecter ces erreurs, les opérateurs sont contraints à faire des agrégations sur les données et grâce à ces agrégations, ils essayent d'estimer si les résultats sont bons ou pas et relancent tout un scop de données, s'ils doutent de son résultat.

On n'a aucune indication dur les données pour savoir si une ligne (maturité d'un deal) correspond à une anomalie ou pas. Le seul indicateur qui nous permet d'identifier si le deal est une dernière version (correcte) ou une version intermédiaire (anomalie) c'est une colonne technique nommée $\textbf{ENDDATE}$. Chaque version du deal est caractérisée par une STARTDATE et une ENDDATE. la STARTDATE indique l'heur à laquelle les calculs de risque sur le deal ont été lancés. La ENDATE peut prendre deux valeurs:

- ENDDATE $\textbf{=}$ NULL : Dernière version du deal (version correcte).
- ENDDATE $\textbf{!=}$ NULL : Version intermédiaire (anomalie). La valeur de la ENDDATE de cette dernière sera égale à la valeur de la STARTDATE de la version suivante.
	
Ci-dessous on peut voir un résumé des étapes par lesquelles passe un deal dans une journée:

<img src="DEAL.png">

# Generate Random  data

In [4]:
import random
import datetime
import time

def date_time_range(start, end, format , prop):

    stime = time.mktime(time.strptime(start, format))
    etime = time.mktime(time.strptime(end, format))

    ptime = stime + prop * (etime - stime)

    return time.strftime(format, time.localtime(ptime))

def time_data(start_date_data,end_date_data):
    start_date = date_time_range(start_date_data,end_date_data,'%d/%m/%Y %I:%M %p', random.random())
    asofDate = start_date[0:10]
    rand = random.random()
    if rand > 0.5:
        end_date = None
    else:
        end_date = date_time_range(start_date, asofDate + " 11:59 PM",'%d/%m/%Y %I:%M %p', random.random())
    return asofDate,start_date,end_date

In [5]:
"""
Input : N_data : nombre de données à générer
        start_date_data : date du début des données générées
        end_date_data : date de fin des données générées
output : données d'analyse de risque généré aléatoirement
"""
def generate_market_risk_data(N_data,start_date_data,end_date_data):
    random_Data = pd.DataFrame(np.random.randint(0,100,size=(N_data, 2)),\
                    columns=["KEY","RISK_AMOUNT"])
    market_risk_data = sqlContext.createDataFrame(random_Data)
# Genrate random Maturity
    maturity = ["1Y","2Y","3M","2M","MAR18","SEP18","DEC17"]

    rdd = market_risk_data.rdd.map(lambda x: (x['KEY'], random.choice(maturity),\
                                          time_data(start_date_data,end_date_data), x['RISK_AMOUNT']))\
                    .map(lambda x: (x[2][0],x[0],x[1],x[2][1],x[2][2],x[3]))
    market_risk_data = rdd.toDF(['ASOFDATE','KEY','MATURITY', 'START_DATE','END_DATE','RISK_AMOUNT'])
    market_risk_data.show()
    return market_risk_data

risk_data = generate_market_risk_data(10000,"30/11/2016 1:30 PM", "27/12/2016 11:59 PM")

+----------+---+--------+-------------------+-------------------+-----------+
|  ASOFDATE|KEY|MATURITY|         START_DATE|           END_DATE|RISK_AMOUNT|
+----------+---+--------+-------------------+-------------------+-----------+
|14/12/2016| 22|      1Y|14/12/2016 04:36 PM|14/12/2016 10:50 PM|          6|
|16/12/2016| 31|   SEP18|16/12/2016 01:43 AM|16/12/2016 08:59 PM|          3|
|04/12/2016| 22|      2M|04/12/2016 08:58 PM|04/12/2016 09:32 PM|          9|
|23/12/2016| 86|   MAR18|23/12/2016 07:59 PM|23/12/2016 09:04 PM|         68|
|14/12/2016| 58|   SEP18|14/12/2016 01:42 AM|14/12/2016 05:35 AM|         14|
|12/12/2016| 35|      2Y|12/12/2016 08:47 AM|               null|          0|
|16/12/2016| 75|      1Y|16/12/2016 10:49 AM|16/12/2016 12:30 PM|         25|
|08/12/2016| 18|   MAR18|08/12/2016 08:30 PM|               null|         57|
|04/12/2016| 27|      3M|04/12/2016 08:52 PM|04/12/2016 11:35 PM|          7|
|20/12/2016| 43|   SEP18|20/12/2016 04:44 AM|20/12/2016 03:17 PM

# Preprocess data

Les données traitées sont sous le format $\textbf{spark DataFrame}$ qui est une table structurée en forme de tableau avec en plus des metadatas permettant à spark de faire des optimisations sur le traitement des données. Cette table ne contient pas d'index. Elle ressemble à une table SQL avec un système de (clé, valeur). La clé d'un deal est constitué par plusieurs colonnes. Pour simplifier la compréhension des données, on va considérer qu'un deal a une seule colonne comme clé et que cette clé est unique.

La variation par rapport au bon résultat de la journée peut être égale à 0 pour des versions intermédiaires. Ce qui signifie que la relance était inutile et la même donnée se trouvent à la fois sur la version relancée (par erreur) et la bonne version du jour. C'est grâce à un seuil fixé sur cette colonne qu'on pourra $\textbf{labéliser}$ les anomalies dans le data set.

Cette colonne permet aussi d'éliminer les réplications de la même donnée (ou sensiblement proches). 

Grâce à la colonne rajouté le problème s'est transformé d'un problème de machine learning non superviser à un problème supervisé. l'énorme volumétrie des données incite à la recherche d'une solution simple d'un point de vue calcule afin de classifier les deals en deals contenant des anomalies et des deals corrects. Cette solution doit être adapté pour recevoir les informations du deal le matin et prédire si elles sont bonnes ou pas.

In [6]:
"""
Input : données de risque de marché
output : données avec un colonne permettant de renseigné sur la variation d'une version intermédiaire
         par rapport au résultat final de la journée
"""


from pyspark.sql.functions import broadcast,lit

def variation(y,x): 
    if x != 0:
        return (y-x)/x
    else:
        return None
    
risk_variation = udf(variation, FloatType())

def add_Risk_Variation(risk_data):
    last_versions = risk_data.filter(risk_data.END_DATE.isNull())
    intermediate = risk_data.filter(risk_data.END_DATE.isNotNull())
    last_versions_copy = last_versions.select('ASOFDATE','RISK_AMOUNT','KEY','MATURITY')\
                                             .withColumnRenamed("RISK_AMOUNT","LAST_RISK_AMOUNT")\
                                             .withColumnRenamed("KEY","KEY2")\
                                             .withColumnRenamed("ASOFDATE","ASOFDATE2")\
                                             .withColumnRenamed("MATURITY","MATURITY2")
    joined_intermediate_last_version = intermediate.join(last_versions_copy, (intermediate.KEY == last_versions_copy.KEY2)&\
                                                        (intermediate.ASOFDATE == last_versions_copy.ASOFDATE2)&\
                                                        (intermediate.MATURITY == last_versions_copy.MATURITY2))
    data_with_variation = joined_intermediate_last_version.withColumn("VARIATION_AMOUNT",\
                    risk_variation(joined_intermediate_last_version.RISK_AMOUNT,joined_intermediate_last_version.LAST_RISK_AMOUNT))
    last_versions = last_versions.withColumn("Variation_Amount", lit(None).cast(FloatType()))\
                                 .withColumn("LAST_RISK_AMOUNT", lit(None).cast(FloatType()))
    drop_list = ['KEY2', 'ASOFDATE2','MATURITY2']

    data_with_variation = data_with_variation.select([column for column in data_with_variation.columns if column not in drop_list])
    Final_Frame = last_versions.unionAll(data_with_variation) 
    Final_Frame.show()
    return Final_Frame

In [8]:
enriched_market_risk_data = add_Risk_Variation(risk_data)

+----------+---+--------+-------------------+--------+-----------+----------------+----------------+
|  ASOFDATE|KEY|MATURITY|         START_DATE|END_DATE|RISK_AMOUNT|Variation_Amount|LAST_RISK_AMOUNT|
+----------+---+--------+-------------------+--------+-----------+----------------+----------------+
|12/12/2016| 35|      2Y|12/12/2016 08:47 AM|    null|          0|            null|            null|
|08/12/2016| 18|   MAR18|08/12/2016 08:30 PM|    null|         57|            null|            null|
|01/12/2016| 91|      1Y|01/12/2016 08:38 AM|    null|         90|            null|            null|
|05/12/2016| 87|      2Y|05/12/2016 11:42 PM|    null|          0|            null|            null|
|13/12/2016|  3|      2M|13/12/2016 05:15 PM|    null|         28|            null|            null|
|10/12/2016| 19|   SEP18|10/12/2016 03:32 PM|    null|         72|            null|            null|
|06/12/2016| 51|   DEC17|06/12/2016 05:05 AM|    null|         23|            null|        

In [10]:
"""
input : données d'analyse de risque (chaque ligne représente un rique par rapport à une maturité)
output: trasposition des données (maturité en colonne)
"""

import pyspark.sql.functions as func

def pivot_data(data):
    # add column for thresholding
    add_minimum_variation = data.groupBy(["ASOFDATE","KEY","START_DATE"]).agg(func.min((data.Variation_Amount))\
                    .alias("MINIMUM_VARIATION"))
    # Pivot data
    pivot_maturities = data.groupBy(["ASOFDATE","KEY","START_DATE"]).pivot("MATURITY").agg(func.first("RISK_AMOUNT"))
    pivot_maturities = pivot_maturities.withColumnRenamed("ASOFDATE","ASOFDATE1")\
                                        .withColumnRenamed("START_DATE","START_DATE1")\
                                        .withColumnRenamed("KEY","KEY1")\
                                        .na.fill(0)
    
    pivoted_data = add_minimum_variation.join(pivot_maturities, (add_minimum_variation.ASOFDATE == pivot_maturities.ASOFDATE1)&\
                                             (add_minimum_variation.KEY == pivot_maturities.KEY1)&\
                                             (add_minimum_variation.START_DATE == pivot_maturities.START_DATE1))
    drop_list = ['KEY1', 'ASOFDATE1','MATURITY1','START_DATE1']

    pivoted_data = pivoted_data.select([column for column in pivoted_data.columns if column not in drop_list])
    return pivoted_data
    

In [11]:
pivoted_data = pivot_data(enriched_market_risk_data)
pivoted_data.show()

+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+
|  ASOFDATE|KEY|         START_DATE|MINIMUM_VARIATION| 1Y| 2M| 2Y| 3M|DEC17|MAR18|SEP18|
+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+
|26/12/2016| 50|26/12/2016 07:10 AM|             null|  0|  0|  0|  0|    0|   48|    0|
|04/12/2016| 58|04/12/2016 09:11 PM|             52.0|  0|  0|  0|  0|   98|    0|    0|
|04/12/2016| 88|04/12/2016 03:01 PM|             null|  0| 73|  0|  0|    0|    0|    0|
|19/12/2016| 11|19/12/2016 08:21 PM|             null|  0|  0|  0|  0|   71|    0|    0|
|05/12/2016| 86|05/12/2016 03:06 PM|             null|  0|  0|  0|  0|    0|   55|    0|
|10/12/2016| 69|10/12/2016 10:16 PM|             null|  0|  0|  0|  0|   30|    0|    0|
|26/12/2016| 33|26/12/2016 09:28 AM|             22.0|  0|  0|  0|  0|    0|    0|   87|
|26/12/2016| 57|26/12/2016 08:33 AM|             null|  0|  0|  0|  0|    0|   87|    0|
|13/12/2016| 14|13/12

In [12]:
"""
Input : donnée d'analyse de risque de marché pivoté
Output: donnée enréchie par la variation de chacune des maturité par rapport à la valeur du jour
        ouvret précédent
"""

def variation2(y,x): 
    if (x != 0) & (x != None):
        return (y-x)/x
    else:
        return None
    
day_variation = udf(variation2, FloatType())

def day_to_day_variation(data):
    maturities_list = [col for col in data.columns if col not in ["KEY","ASOFDATE","START_DATE","MINIMUM_VARIATION"]]
    asofDates = sorted(data.select("ASOFDATE").distinct().rdd.map(lambda x : x[0]).collect())
    indexed_asofDates = [ [i[0],i[1]] for i in zip(range(len(asofDates)), asofDates)]
    indexed_asofDates_Frame = sc.parallelize(indexed_asofDates).toDF(["index_date","ASOFDATE2"])
    indexed_asofDates_Frame
    indexed_data = data.join(indexed_asofDates_Frame, data.ASOFDATE == indexed_asofDates_Frame.ASOFDATE2)
    indexed_data = indexed_data.drop("ASOFDATE2")
    indexed_data_copy = indexed_data
    columns_renamed = [col+'1' for col in indexed_data.columns]
    indexed_data_copy = indexed_data_copy.toDF(*columns_renamed)
    
    
    day_to_day_risk = indexed_data.join(indexed_data_copy, 
                                             # need versioned data
                                             #(indexed_data.START_DATE == indexed_data_copy.START_DATE1)
                                             #(indexed_data.KEY == indexed_data_copy.KEY1)&\
                                             (indexed_data.index_date == indexed_data_copy.index_date1 - 1)&\
                                             (indexed_data_copy.MINIMUM_VARIATION1.isNull()))
    drop_list = ['KEY1', 'ASOFDATE1','MATURITY1','index_date1','START_DATE1','MINIMUM_VARIATION']

    
    for mat in maturities_list:
        day_to_day_risk = day_to_day_risk.withColumn("variation_"+mat,day_variation(mat,mat+'1'))

    day_to_day_risk = day_to_day_risk.select([column for column in day_to_day_risk.columns if column not in indexed_data_copy.columns])

    day_to_day_risk.show()
    
    return day_to_day_risk

data_with_variation = day_to_day_variation(pivoted_data)

+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+----------+------------+------------+------------+------------+---------------+---------------+---------------+
|  ASOFDATE|KEY|         START_DATE|MINIMUM_VARIATION| 1Y| 2M| 2Y| 3M|DEC17|MAR18|SEP18|index_date|variation_1Y|variation_2M|variation_2Y|variation_3M|variation_DEC17|variation_MAR18|variation_SEP18|
+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+----------+------------+------------+------------+------------+---------------+---------------+---------------+
|20/12/2016|  9|20/12/2016 06:17 AM|             null|  0|  0|  0| 31|    0|    0|    0|        19|        null|        null|        -1.0|        null|           null|           null|           null|
|20/12/2016|  9|20/12/2016 06:17 AM|             null|  0|  0|  0| 31|    0|    0|    0|        19|        null|        null|        null|        null|           -1.0|           null|           null|


In [37]:
"""
Input: data  : données enrechies
       threshold: seuil pour labélisé les données
Output: labélise les données intermédiaire ayant une variation (par rapport au dernier résultat de la journée)
        supérieur à -threshold- comme anomalies
"""
from pyspark.ml.feature import VectorAssembler

def labelize_data(data,threshold):

    drop_columns = ["ASOFDATE","KEY","START_DATE","MINIMUM_VARIATION","label","index_date"]
    labeled_data = data.withColumn("label",func.when(data.MINIMUM_VARIATION > threshold, 1).otherwise(0))
    inputColumns =  [m for m in labeled_data.columns if m not in drop_columns]
    labeled_data = labeled_data.na.fill(0)
    assembler = VectorAssembler(inputCols= inputColumns,outputCol="features")
    ready_for_ML = assembler.transform(labeled_data)
    ready_for_ML.show()
    return ready_for_ML

labeled_data = labelize_data(data_with_variation,0.05)

+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+----------+------------+------------+------------+------------+---------------+---------------+---------------+-----+--------------------+
|  ASOFDATE|KEY|         START_DATE|MINIMUM_VARIATION| 1Y| 2M| 2Y| 3M|DEC17|MAR18|SEP18|index_date|variation_1Y|variation_2M|variation_2Y|variation_3M|variation_DEC17|variation_MAR18|variation_SEP18|label|            features|
+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+----------+------------+------------+------------+------------+---------------+---------------+---------------+-----+--------------------+
|27/12/2016| 74|27/12/2016 12:43 PM|             62.0|  0| 80|  0|  0|    0|    0|    0|        26|         0.0|         0.0|         0.0|         0.0|           -1.0|            0.0|            0.0|    1|(14,[1,11],[80.0,...|
|27/12/2016| 74|27/12/2016 12:43 PM|             62.0|  0| 80|  0|  0|    0|    0|    0|    

In [77]:
"""
Input : data: données prête pour la partie machine learning
        date: date de séparation entre le train et le test
Output: modéle de classification des anomalies (randomForest)
"""

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def data_classifier(data,date):
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)


# Split the data into training and test sets (30% held out for testing)
    trainingData = data.filter(data.ASOFDATE < date)
    testData = data.filter(data.ASOFDATE >= date)

# Train a RandomForest model.
    rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=3)

# Convert indexed labels back to original labels.
    labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                                   labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
    pipeline = Pipeline(stages=[labelIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
    model = pipeline.fit(trainingData)

# Make predictions.
    predictions = model.transform(testData)

    evaluator = MulticlassClassificationEvaluator(
            labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
    f1 = evaluator.evaluate(predictions)
    return model,predictions,f1

In [78]:
model,prediction,evaluator =  data_classifier(labeled_data,"20/12/2016")

In [79]:
print(f1)

1.0


In [80]:
predictionction.show()

+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+----------+------------+------------+------------+------------+---------------+---------------+---------------+-----+--------------------+------------+-------------+--------------------+----------+--------------+
|  ASOFDATE|KEY|         START_DATE|MINIMUM_VARIATION| 1Y| 2M| 2Y| 3M|DEC17|MAR18|SEP18|index_date|variation_1Y|variation_2M|variation_2Y|variation_3M|variation_DEC17|variation_MAR18|variation_SEP18|label|            features|indexedLabel|rawPrediction|         probability|prediction|predictedLabel|
+----------+---+-------------------+-----------------+---+---+---+---+-----+-----+-----+----------+------------+------------+------------+------------+---------------+---------------+---------------+-----+--------------------+------------+-------------+--------------------+----------+--------------+
|20/12/2016| 71|20/12/2016 08:47 AM|              0.0|  0| 52|  0|  0|    0|    0|    0|        1