In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [2]:
spark = SparkSession.builder.appName("score_lowcost").getOrCreate()
sc = spark.sparkContext

In [3]:
perimetre = spark.read.csv("data_clients/sample_perimetre.csv", header=True)
histo_client_raw = spark.read.csv("data_clients/sample_histo_client.csv", header=True)
histo_train_raw = spark.read.csv("data_clients/sample_histo_train.csv", header=True)
histo_lowcost_raw = spark.read.csv("data_clients/sample_histo_lowcost.csv", header=True)
visites_raw = spark.read.csv("data_clients/sample_visites.csv", header=True)

# Columns conversion

In [4]:
# The function

def cast_columns_of_df(df, cols_to_cast, col_to_keep, cast_type='double'):
    """cast continuous columns into double since all columns are """
    return df.select(col_to_keep + [(df[feature].cast('double'))
                    for feature in cols_to_cast if 'ID_CLIENT' not in feature])

In [5]:
# Application

client_cols_to_keep = ["ID_CLIENT", 'LBL_STATUT_CLT','LBL_GEO_AIR',
            'LBL_SEG_COMPORTEMENTAL','LBL_GEO_TRAIN','LBL_GRP_SEGMENT_NL',
            'LBL_SEGMENT_ANTICIPATION','FLG_CMD_CARTE_1225']

histo_train = cast_columns_of_df(histo_train_raw, histo_train_raw.columns,["ID_CLIENT"], cast_type='double')
histo_lowcost = cast_columns_of_df(histo_lowcost_raw, histo_lowcost_raw.columns,["ID_CLIENT"], cast_type='double')

visites = cast_columns_of_df(visites_raw, visites_raw.columns,["ID_CLIENT"], cast_type='double')

histo_client = cast_columns_of_df(histo_client_raw,["anciennete", "recence_cmd", "AGE"],
                                  client_cols_to_keep,cast_type='double')

## Joining

In [6]:
col_to_keep = "ID_CLIENT"

lowcost = perimetre.join(histo_client_raw, how="left", on=col_to_keep).join(histo_train_raw, how="left", on=col_to_keep).join(histo_lowcost_raw, how="left", on=col_to_keep).join(visites_raw, how="left", on=col_to_keep)

### Checking same number of users

In [12]:
lowcost.count() == perimetre.count()

True

### Save data to disk

In [None]:
#lowcost.write.save("data_clients/lowcost.csv",format="csv")

In [16]:
# Number of columns
list(enumerate(lowcost.columns))

[(0, 'ID_CLIENT'),
 (1, 'anciennete'),
 (2, 'recence_cmd'),
 (3, 'AGE'),
 (4, 'LBL_STATUT_CLT'),
 (5, 'LBL_GEO_AIR'),
 (6, 'LBL_GRP_SEGMENT_NL'),
 (7, 'LBL_SEG_COMPORTEMENTAL'),
 (8, 'LBL_GEO_TRAIN'),
 (9, 'LBL_SEGMENT_ANTICIPATION'),
 (10, 'FLG_CMD_CARTE_1225'),
 (11, 'nb_od'),
 (12, 'mean_nb_passagers'),
 (13, 'mean_duree_voyage'),
 (14, 'mean_mt_voyage'),
 (15, 'mean_tarif_loisir'),
 (16, 'mean_classe_1'),
 (17, 'mean_pointe'),
 (18, 'mean_depart_we'),
 (19, 'flg_cmd_lowcost'),
 (20, 'flg_track_nl_lowcost'),
 (21, 'flg_track_nl'),
 (22, 'days_since_last_visit'),
 (23, 'tx_conversion')]

In [17]:
# Distinct values
lowcost.select("LBL_STATUT_CLT").distinct().show()

+--------------------+
|      LBL_STATUT_CLT|
+--------------------+
|         Moyen moins|
|Non present dans ...|
|    Nouveau prospect|
|            Prospect|
|          Tres petit|
|                null|
|               Petit|
|             Inactif|
|       Nouveau actif|
|               Grand|
|          Tres grand|
|          Moyen plus|
+--------------------+



# NaN handling

In [7]:
def input_df(df):
    ds = df.select('ID_CLIENT',
    f.when(df.LBL_GEO_TRAIN.isin(['Toulouse', 'Lille', 'Dijon',
                                  'Lyon', 'Marseille', 'Paris',
                                  'Nice', 'Limoges','Rouen','Rennes',
                                  'Montpellier', 'Bordeaux', 'Metz',
                                  'Strasbourg']), df.LBL_GEO_TRAIN)\
               .otherwise('na').alias('geo_train'),
    f.when(df.LBL_GEO_AIR.isin(['Aéroports de Paris Orly',
                                'Aéroport de Bâle-Mulhouse / Bassel',
                                'Aéroport Lille Lesquin', 'Aéroport de Rennes',
                                'Aéroport de Nantes Atlantique',
                                'Aéroport de Marseille Provence  (MRS)', 
                                'Aéroport de Bordeaux Mérignac',
                                'Aéroports de Paris Roissy-Charles-de Gaulle', 
                                "Aéroport de Nice Côte d'Azur",
                                'Aéroport de Strasbourg',
                                'Aéroport de Lyon - Saint Exupéry', 
                                'Aéroport de Toulouse Blagnac']), df.LBL_GEO_AIR)\
               .otherwise('na').alias('geo_air'),
    f.when(df.FLG_CMD_CARTE_1225 == '1', '1')\
                   .otherwise('0').alias('cc_jeunes'),
    f.when(df.LBL_STATUT_CLT.isin(['Tres grand', 'Nouveau actif',
                                   'Moyen moins', ' Prospect', ' Petit',
                                   'Inactif', 'Tres petit',
                                   'Nouveau prospect', 'Moyen plus',
                                   'Grand']), df.LBL_STATUT_CLT)\
                   .otherwise('na').alias('segt_rfm'),
    f.when(df.LBL_SEGMENT_ANTICIPATION.isin(['Peu Anticipateur', 'Tres Anticipateur',
                                             'Anticipateur', 'Mixte', 'Non Anticipateur',
                                             'Non Defini']), df.LBL_SEGMENT_ANTICIPATION)\
                   .otherwise('na').alias('segt_anticipation'),
    f.when(df.LBL_SEG_COMPORTEMENTAL.isin(['Mono-commande',
                                           'Comportement Pro',
                                           'Exclusifs Agence', 
                                           'Anticipateurs Methodiques',
                                           'Chasseurs Bons Plans', 
                                           'Rythmes scolaires', 'Nouveaux',
                                           'Sans contraintes']),
           df.LBL_SEG_COMPORTEMENTAL).otherwise('na').alias('segt_comportemental'), 
    f.when(df.LBL_GRP_SEGMENT_NL.isin(['Endormi', 'Spectateur', 'Acteur',
                                       'Eteint', 'Non defini']),
           df.LBL_GRP_SEGMENT_NL).otherwise('na').alias('segt_nl'),
    f.when(((df.AGE > 0) & (df.AGE < 100)), df.AGE)\
                   .otherwise(-1).alias('age'),
    f.when(df.recence_cmd >= 0, df.recence_cmd)\
                   .otherwise(-1).alias('recence_cmd'),
    f.when(((df.mean_duree_voyage > 0) & (df.mean_duree_voyage < 750)),
           df.mean_duree_voyage).otherwise(-1).alias('mean_duree_voyage'),
    f.when(df.days_since_last_visit >= 0, df.days_since_last_visit)\
                   .otherwise(-1).alias('recence_visite'),
    f.when(df.mean_mt_voyage > 0, df.mean_mt_voyage)\
                   .otherwise(-1).alias('mean_mt_voyage'),
    f.when(df.anciennete >= 0, df.anciennete)\
                   .otherwise(-1).alias('anciennete'),
    f.when(df.nb_od > 0, df.nb_od)\
                   .otherwise(-1).alias('nb_od'),
    f.when(df.mean_nb_passagers > 0, df.mean_nb_passagers)\
                   .otherwise(-1).alias('mean_nb_passagers'),
    f.when(df.mean_tarif_loisir >= 0, df.mean_tarif_loisir)\
                   .otherwise(-1).alias('mean_tarif_loisir'),
    f.when(df.mean_classe_1 >= 0, df.mean_classe_1)\
                   .otherwise(-1).alias('mean_classe_1'),
    f.when(df.mean_pointe >= 0, df.mean_pointe)\
                   .otherwise(-1).alias('mean_pointe'),
    f.when(df.mean_depart_we >= 0, df.mean_depart_we)\
                   .otherwise(-1).alias('mean_depart_we'),
    f.when(df.tx_conversion >= 0, df.tx_conversion)\
                   .otherwise(-1).alias('tx_conversion'),
    f.when(df.flg_cmd_lowcost == 1, '1')\
                   .otherwise('0').alias('flg_cmd_lowcost'),
    f.when(df.flg_track_nl_lowcost == 1, '1')\
                   .otherwise('0').alias('flg_track_nl_lowcost'), 
    f.when(df.flg_track_nl == 1, '1')\
                   .otherwise('0').alias('flg_track_nl'))
    
    return ds

In [8]:
df_ = input_df(lowcost)
#df_ = input_df(df_nf)

In [12]:
#df_.show(3)

# MLib

In [13]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression

## features engineering et modélisation

In [14]:
def preprocessed_df(df, label="flg_cmd_lowcostIndex"):
    max_values_to_define_str_cols = 10
    id_col = 'ID_CLIENT'
    
    dty = dict(df.dtypes)
    str_cols = [k for k, v in dty.items() if v == 'string']
    str_cols.remove(id_col)
    
    for c in str_cols:
        stringIndexer = StringIndexer(inputCol=c, outputCol=c+"Index")
        model_str = stringIndexer.fit(df)
        df = model_str.transform(df).drop(c)

    input_cols = df.columns
    input_cols.remove(id_col)
    input_cols.remove(label)
    
    assembler = VectorAssembler(inputCols=input_cols,
                            outputCol="features")
    df = assembler.transform(df)
    
    featureIndexer = VectorIndexer(inputCol="features", 
                   outputCol="indexedFeatures", 
                   maxCategories=max_values_to_define_str_cols).fit(df)
    return featureIndexer.transform(df), df

In [15]:
data, dff = preprocessed_df(df_)

## Logistic Regration on the data

In [16]:
train_df, test_df = data.select("indexedFeatures","flg_cmd_lowcostIndex").randomSplit([0.4,0.6])

In [20]:
#train_df.show(5, False)

In [21]:
lr = LogisticRegression(labelCol="flg_cmd_lowcostIndex", featuresCol="indexedFeatures",elasticNetParam=0.5)
model_lr = lr.fit(train_df)

In [25]:
pred_lr = model_lr.evaluate(test_df).predictions

In [56]:
tp = pred_lr[(pred_lr.flg_cmd_lowcostIndex == 1.0) & (pred_lr.prediction == 1.0)].count()
tn = pred_lr[(pred_lr.flg_cmd_lowcostIndex == 0.0) & (pred_lr.prediction == 0.0)].count()
fp = pred_lr[(pred_lr.flg_cmd_lowcostIndex == 0.0) & (pred_lr.prediction == 1.0)].count()
fn = pred_lr[(pred_lr.flg_cmd_lowcostIndex == 1.0) & (pred_lr.prediction == 0.0)].count()

In [60]:
# Accurancy
float((tp+tn) /(pred_lr.count()))

0.9661135653485616

In [65]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [71]:
auc = BinaryClassificationEvaluator(labelCol="flg_cmd_lowcostIndex", metricName="").evaluate(pred_lr)
auc

0.8717803573957355

## Random forest

In [72]:
classifier = RandomForestClassifier(labelCol="flg_cmd_lowcostIndex", 
                                    featuresCol="indexedFeatures",
                                    maxDepth=15, numTrees=100)

model_rf = classifier.fit(data)

IllegalArgumentException: 'requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 11 has 68867 values. Considering remove this and other categorical features with a large number of values, or add more training examples.'