In [2]:
from pyspark.ml.clustering import KMeans,BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SparkSession
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import logging
import pandas as pd
import pyarrow.parquet as pq
import numpy as np
import os
from pyspark.ml.functions import vector_to_array
from itertools import chain
import databricks.koalas as ks
from pyspark.sql import DataFrame
from functools import reduce

spark = SparkSession.builder.master("local[1]")\
          .appName("sunshine_v2")\
          .getOrCreate()

sc = spark.sparkContext

### Fonction des detections communes

In [4]:
## cette fonction retrourne un df conteant les détections communes (FRA et sunshine)
## elle prend en entrée une date sous le format (yyyyMMdd)
## la liste des fraudeurs va du 1er au 22 Décembre 2021
def commun1(date) :
        dec_1_22 = spark.read.option("header", True) \
                        .option("delimiter", ",") \
                        .csv("/Users/youssouf/Downloads/SIMBOX_SUNSHINE_DEC_0122.csv") \
                        .withColumn("date_detection", 
                                         date_format(to_date(col("Date_DETECTION"), "yyyy-MM-dd"), 
                                                     "yyyyMMdd"))  \
                        .withColumnRenamed("MSISDN_SB", "msisdn")  \
                        .filter(col("date_detection") == date)               
        #dec_1_22.show(2)      
        path_prediction = f"/Users/youssouf/Downloads/prediction_rf_{date}.csv"
        prediction_01 = spark.read.option("header", True) \
                        .option("delimiter", ";") \
                        .csv(path_prediction)      
        #prediction_01.show(2) 
        cols = ["date_appel", "distributeur", "nbre_fois_detectes", "moyenne_probability" , "probability", "msisdn"]
        commun = prediction_01.join(dec_1_22, ["msisdn"], "inner").select(*(c for c in cols))
        return commun

In [5]:
commun1("20211201").show(2) ## j'obtiens ici les détections communes de la journée 20211201

+----------+------------+------------------+-------------------+-----------------+----------+
|date_appel|distributeur|nbre_fois_detectes|moyenne_probability|      probability|    msisdn|
+----------+------------+------------------+-------------------+-----------------+----------+
|  20211201|         SII|                 1|   0.65886954361059| 0.65886954361059|0703152224|
|  20211201|         AJS|                 1|  0.718101178241166|0.718101178241166|0703624390|
+----------+------------+------------------+-------------------+-----------------+----------+
only showing top 2 rows



### Fonctions de calcul des Intervalles de Confiance

In [6]:
## cette fonction retourne un intervalle de confiance basée sur les probabilités communes 
## l'intervalle de conficance est calculée pour sigma inconnue et on cherche à estimer la moyenne
## elle prend en paramètres le jour et alpha
def confidence_interval(day, alpha) :    
    stats = commun1(day).toPandas()["probability"].astype(float).to_list()
    alpha = alpha
    p = ((1.0-alpha)/2.0) * 100
    lower = np.percentile(stats, p)
    p = (alpha+((1.0-alpha)/2.0)) * 100
    upper = np.percentile(stats, p)
    print('%.1f confidence interval %.1f%% and %.1f%%' % (alpha*100, lower*100, upper*100))  

In [7]:
confidence_interval("20211201", 0.95)

95.0 confidence interval 53.2% and 77.8%


In [8]:
confidence_interval("20211202", 0.99)

99.0 confidence interval 51.0% and 83.6%


In [9]:
## cette fonction ci est pareille que *confidence_interval* juste qu'ellle prend en paramètres un df(chargé en pyspark) 
## et alpha
def confidence_interval1(df, alpha) :    
    stats = df.toPandas()["probability"].astype(float).to_list()
    alpha = alpha
    p = ((1.0-alpha)/2.0) * 100
    lower = np.percentile(stats, p)
    p = (alpha+((1.0-alpha)/2.0)) * 100
    upper = np.percentile(stats, p)
    print('%.1f confidence interval %.1f%% and %.1f%%' % (alpha*100, lower*100, upper*100))  
    return [lower,upper]

### Fonction des union des df 

In [10]:
## 2 fonctions *ar* et *union_commun_pred*
## *ar* juste une fonction intermédiare
## *union_commun_pred* retourne les détections communes(FRA - SUNSHINE) et les predictions, elle prend en paramètre le nombre de jour 
## à unir...
def ar(x) :
    if len(x) <= 1 :
        return "0" + x
    else :
        return x

def union_commun_pred(nb) :
    
    list_a = [str(x)  for x in list(range(1,13))]
    lst_day = [ar(x) for x in list_a]
    
    
    L_commun = []
    L_pred = []
    
    for day in lst_day :
        dy = f"202112{day}"
        path_prediction = f"/Users/youssouf/Downloads/prediction_rf_202112{day}.csv"
        L_commun.append(commun1(dy))
        prediction = spark.read.option("header", True) \
                        .option("delimiter", ";") \
                        .csv(path_prediction) \
                        .select("msisdn","probability", "date_appel")
        L_pred.append(prediction)
        
    #union_df =  L_df.reduce((a, b) => a.union(b))  
    union_commun = reduce(DataFrame.union , L_commun)
    union_pred = reduce(DataFrame.union , L_pred)   
    return [union_commun, union_pred]  

In [11]:
union_commun_pred(2)[0].show(2)  # communs 2 jours (20211201 - 20211202)

+----------+------------+------------------+-------------------+-----------------+----------+
|date_appel|distributeur|nbre_fois_detectes|moyenne_probability|      probability|    msisdn|
+----------+------------+------------------+-------------------+-----------------+----------+
|  20211201|         SII|                 1|   0.65886954361059| 0.65886954361059|0703152224|
|  20211201|         AJS|                 1|  0.718101178241166|0.718101178241166|0703624390|
+----------+------------+------------------+-------------------+-----------------+----------+
only showing top 2 rows



In [12]:
union_commun_pred(2)[1].show(2)  # predictions 2 jours (20211201 - 20211202)

+----------+------------------+----------+
|    msisdn|       probability|date_appel|
+----------+------------------+----------+
|0700004911|0.5160806693366838|  20211201|
|0700006491|0.6706355589116454|  20211201|
+----------+------------------+----------+
only showing top 2 rows



In [13]:
union_commun_pred(3)[0].show(2)  # communs 3 jours (20211201 - 20211202 - 20211203)

+----------+------------+------------------+-------------------+-----------------+----------+
|date_appel|distributeur|nbre_fois_detectes|moyenne_probability|      probability|    msisdn|
+----------+------------+------------------+-------------------+-----------------+----------+
|  20211201|         SII|                 1|   0.65886954361059| 0.65886954361059|0703152224|
|  20211201|         AJS|                 1|  0.718101178241166|0.718101178241166|0703624390|
+----------+------------+------------------+-------------------+-----------------+----------+
only showing top 2 rows



In [14]:
union_commun_pred(3)[1].show(2)  # predictions 3 jours (20211201 - 20211202 - 20211203)

+----------+------------------+----------+
|    msisdn|       probability|date_appel|
+----------+------------------+----------+
|0700004911|0.5160806693366838|  20211201|
|0700006491|0.6706355589116454|  20211201|
+----------+------------------+----------+
only showing top 2 rows



### Fonctions de filtre en utilisant les bornes des intervalles de confiance

In [17]:
## cette fonction-ci calcule l'IC et ensuite applique le filtre sur le df de prediction et 
## le df des detections commnes
## elle retourne le nombre d'individus obtenus après et avant application du filtre 
## elle prend en entrée le jour et alpha

def cf_filter2(date, alpha) :

        dec_1_22 = spark.read.option("header", True) \
                        .option("delimiter", ",") \
                        .csv("/Users/youssouf/Downloads/SIMBOX_SUNSHINE_DEC_0122.csv") \
                        .withColumn("date_detection", 
                                         date_format(to_date(col("Date_DETECTION"), "yyyy-MM-dd"), 
                                                     "yyyyMMdd"))  \
                        .withColumnRenamed("MSISDN_SB", "msisdn")  \
                        .filter(col("date_detection") == date)
                        
        #dec_1_22.show(2)
               
        path_prediction = f"/Users/youssouf/Downloads/prediction_rf_{date}.csv"
        prediction = spark.read.option("header", True) \
                        .option("delimiter", ";") \
                        .csv(path_prediction) 
                
        #prediction_01.show(2)
        
        cols = ["date_appel", "distributeur", "nbre_fois_detectes", "moyenne_probability" , "probability", "msisdn"]
        commun = prediction.join(dec_1_22, ["msisdn"], "inner").select(*(c for c in cols))
        
        c = commun.distinct().count()                
        born = confidence_interval1(commun, alpha)
        lower,upper = born[0],born[1]
        #c1 = commun.filter( (col("probability") >= lower) & (col("probability") <= upper) )
        
        prediction_f = prediction.filter( (col("probability") >= lower) & (col("probability") <= upper) )
        commun_f = prediction_f.join(dec_1_22, ["msisdn"], "inner").select(*(c for c in cols))
        c1 = commun_f.distinct().count()
             
        return {"BEFORE" : [c,prediction.distinct().count()] , "AFTER" : [c1,prediction_f.distinct().count()] }

In [18]:
cf_filter2("20211201", 0.99)

99.0 confidence interval 52.7% and 77.8%


{'BEFORE': [146, 980611], 'AFTER': [145, 534987]}

In [19]:
cf_filter2("20211202", 0.95)

95.0 confidence interval 52.0% and 83.5%


{'BEFORE': [130, 660417], 'AFTER': [126, 430834]}

In [16]:
### cette fonction est pareille à *cf_filter* mais c'est juste qu'elle s'applique seulement que sur l'union des df 
## (commun - prediction)
## elle prend en paramètres le nombre de jour à unir(nb) et alpha
def cf_filter3(nb, alpha) :  
    u = union_commun_pred(nb)
    commun = u[0]
    pred = u[1]
    print(" ***** COMMUN ***** ")
    born = confidence_interval1(commun, alpha)
    lower,upper = born[0],born[1]
    c = commun.distinct().count()
    c1 = commun.filter( (col("probability") >= lower) & (col("probability") <= upper) ).distinct().count()      
    print(" ***** PRED ***** ")
    born = confidence_interval1(pred, alpha)
    lower,upper = born[0],born[1]
    p = pred.distinct().count()
    p1 = pred.filter( (col("probability") >= lower) & (col("probability") <= upper) ).distinct().count()
    return {"PRED" :  {"BEFORE" : c, "AFTER" : c1}, "COMMUN" :  {"BEFORE" : p, "AFTER" : p1}}