In [1]:
import pandas as pd
import numpy as np

In [2]:
from pyspark.sql import SparkSession

In [347]:
spark = SparkSession.builder.appName('practice2').getOrCreate()

### Creating Dummy Data

In [4]:
np.random.seed(1)

In [None]:
ID ,SCORE, LABEL,PROB,SEGMENT

In [61]:
train_size = 100000
val_size = 50000
train = pd.DataFrame({"ID" : np.random.choice(list(range(1,train_size+1)), size=train_size, replace=False),
                     "LABEL" : np.random.choice([0.0,1.0], size=train_size, replace=True),
                      "PROB"  : np.random.sample(size=train_size),
                     "SCORE"  : np.random.randint(low=0, high=1001, size=train_size),
                      "SEGMENT" : np.random.randint(low=1, high=6, size=train_size),
                     })

val = pd.DataFrame({"ID" : np.random.choice(list(range(train_size+1, train_size+val_size+1)), size=val_size, replace=False),
                     "LABEL" : np.random.choice([0.0,1.0], size=val_size, replace=True),
                      "PROB"  : np.random.sample(size=val_size),
                     "SCORE"  : np.random.randint(low=0, high=1001, size=val_size),
                      "SEGMENT" : np.random.randint(low=1, high=6, size=val_size),
                     })

In [62]:
train.head()

Unnamed: 0,ID,LABEL,PROB,SCORE,SEGMENT
0,77756,0.0,0.436554,542,2
1,16806,1.0,0.290832,951,2
2,82834,1.0,0.310882,619,3
3,37926,1.0,0.674866,526,5
4,8119,1.0,0.855252,382,2


In [63]:
train.describe()

Unnamed: 0,ID,LABEL,PROB,SCORE,SEGMENT
count,100000.0,100000.0,100000.0,100000.0,100000.0
mean,50000.5,0.5,0.499487,500.30075,2.99641
std,28867.657797,0.500003,0.289166,289.674883,1.415124
min,1.0,0.0,5e-06,0.0,1.0
25%,25000.75,0.0,0.249247,249.0,2.0
50%,50000.5,0.5,0.497934,501.0,3.0
75%,75000.25,1.0,0.74995,753.0,4.0
max,100000.0,1.0,0.99998,1000.0,5.0


In [64]:
val.head()

Unnamed: 0,ID,LABEL,PROB,SCORE,SEGMENT
0,105967,0.0,0.614261,867,2
1,116279,1.0,0.675801,641,4
2,138185,0.0,0.871711,560,2
3,141765,0.0,0.994826,278,5
4,133599,1.0,0.865611,367,2


In [65]:
val.describe()

Unnamed: 0,ID,LABEL,PROB,SCORE,SEGMENT
count,50000.0,50000.0,50000.0,50000.0,50000.0
mean,125000.5,0.5023,0.500456,501.43056,2.99696
std,14433.901067,0.5,0.288183,289.530136,1.413998
min,100001.0,0.0,7e-06,0.0,1.0
25%,112500.75,0.0,0.251939,251.0,2.0
50%,125000.5,1.0,0.500318,500.0,3.0
75%,137500.25,1.0,0.750513,753.0,4.0
max,150000.0,1.0,0.999998,1000.0,5.0


In [66]:
train.to_csv("./dummy_train.csv", index=False)
val.to_csv("./dummy_val.csv", index=False)

In [67]:
# train = pd.read_csv("./dummy_train.csv")
# val = pd.read_csv("./dummy_val.csv")

### Calculating AUC score using pandas

In [69]:
from sklearn.metrics import roc_auc_score

In [70]:
# train2 = train.withColumn("LABEL", train.call_time.cast('int'))
# df2 = train.withColumn("LABEL",col("LABEL").cast('int'))

In [102]:
def calculate_auc(data, round_off=3):
    auc_score = round(roc_auc_score(data['LABEL'], data['PROB']), round_off)
    return auc_score

In [103]:
train_auc_score = calculate_auc(train)
val_auc_score = calculate_auc(val)
print(f"Train AUC : {train_auc_score}")
print(f"Val AUC : {val_auc_score}")

Train AUC : 0.499
Val AUC : 0.498


In [107]:
def get_gini(auc_score, round_off=3):
    return round(2*auc_score - 1, round_off)
train_gini = get_gini(train_auc_score)
val_gini = get_gini(val_auc_score)
print(f"Train Gini : {train_gini}")
print(f"Val Gini : {val_gini}")

Train Gini : -0.002
Val Gini : -0.004


## Using Pyspark

### Reading pyspark dataframes

In [73]:
train_pyspark = spark.read.csv('./dummy_train.csv', header=True, inferSchema=True)
val_pyspark = spark.read.csv('./dummy_val.csv', header=True, inferSchema=True)

In [74]:
train_pyspark.head(5)

[Row(ID=77756, LABEL=0.0, PROB=0.43655406692733745, SCORE=542, SEGMENT=2),
 Row(ID=16806, LABEL=1.0, PROB=0.29083223676639114, SCORE=951, SEGMENT=2),
 Row(ID=82834, LABEL=1.0, PROB=0.3108823196506507, SCORE=619, SEGMENT=3),
 Row(ID=37926, LABEL=1.0, PROB=0.6748656638696303, SCORE=526, SEGMENT=5),
 Row(ID=8119, LABEL=1.0, PROB=0.8552521826576316, SCORE=382, SEGMENT=2)]

In [75]:
val_pyspark.head(5)

[Row(ID=105967, LABEL=0.0, PROB=0.6142610586471573, SCORE=867, SEGMENT=2),
 Row(ID=116279, LABEL=1.0, PROB=0.6758005764366875, SCORE=641, SEGMENT=4),
 Row(ID=138185, LABEL=0.0, PROB=0.8717114002906813, SCORE=560, SEGMENT=2),
 Row(ID=141765, LABEL=0.0, PROB=0.9948260188796219, SCORE=278, SEGMENT=5),
 Row(ID=133599, LABEL=1.0, PROB=0.8656106744998311, SCORE=367, SEGMENT=2)]

### AUC and GINI

In [108]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [86]:
def get_score_labels_RDD(df):
    out_score_labels = df.select(["PROB", "LABEL"]).rdd.map(lambda x: (x[0],x[1]))
    return out_score_labels

In [87]:
train_score_labels = get_score_labels_RDD(train_pyspark)
train_score_labels.take(5)

[(0.43655406692733745, 0.0),
 (0.29083223676639114, 1.0),
 (0.3108823196506507, 1.0),
 (0.6748656638696303, 1.0),
 (0.8552521826576316, 1.0)]

In [88]:
val_score_labels = get_score_labels_RDD(val_pyspark)
val_score_labels.take(5)

[(0.6142610586471573, 0.0),
 (0.6758005764366875, 1.0),
 (0.8717114002906813, 0.0),
 (0.9948260188796219, 0.0),
 (0.8656106744998311, 1.0)]

In [83]:
# type(train_view)

In [93]:
def get_auc(inp_score_label, round_off=3):
    metrics = BinaryClassificationMetrics(inp_score_label)
    return round(metrics.areaUnderROC, round_off)

In [None]:
def get_gini(auc_score, round_off=3):
    return round(2*auc_score - 1, round_off)

In [113]:
train_auc = get_auc(train_score_labels)
val_auc = get_auc(val_score_labels)
print(f"train_auc : {train_auc}\nval_auc : {val_auc}")

train_auc : 0.499
val_auc : 0.498


In [114]:
train_gini = get_gini(train_auc)
val_gini = get_gini(val_auc)

In [115]:
print(f"train_gini : {train_gini}\nval_gini : {val_gini}")

train_gini : -0.002
val_gini : -0.004


### Decile Creation

In [117]:
from pyspark.ml.feature import QuantileDiscretizer

In [242]:
def createBucketizer(input_df, input_col="PROB", output_col="BUCKET", numBuckets=10, relativeError=0.0001, handleInvalid="error"):
    qds = QuantileDiscretizer(inputCol=input_col, outputCol=output_col, numBuckets=numBuckets, relativeError=relativeError, handleInvalid=handleInvalid)
    bucketizer = qds.fit(input_df)
    return bucketizer

In [243]:
def get_buckets(bucketizer, inp_df):
    res = bucketizer.transform(inp_df)
    res = res.withColumn("BUCKET", 10 - res["BUCKET"])
    return res

In [244]:
train_bucketizer = createBucketizer(train_pyspark)
train_splits = train_bucketizer.getSplits()
train_splits

[-inf,
 0.09860318820772884,
 0.19944321897725403,
 0.2990615528534277,
 0.39873221748411125,
 0.4979335591313695,
 0.5995457170330696,
 0.7013894392499551,
 0.8003422961531734,
 0.9001488110014753,
 inf]

In [245]:
train_bucketed = get_buckets(train_bucketizer, train_pyspark)
val_bucketed = get_buckets(train_bucketizer, val_pyspark)

In [246]:
train_bucketed.show(5)

+-----+-----+-------------------+-----+-------+------+
|   ID|LABEL|               PROB|SCORE|SEGMENT|BUCKET|
+-----+-----+-------------------+-----+-------+------+
|77756|  0.0|0.43655406692733745|  542|      2|   6.0|
|16806|  1.0|0.29083223676639114|  951|      2|   8.0|
|82834|  1.0| 0.3108823196506507|  619|      3|   7.0|
|37926|  1.0| 0.6748656638696303|  526|      5|   4.0|
| 8119|  1.0| 0.8552521826576316|  382|      2|   2.0|
+-----+-----+-------------------+-----+-------+------+
only showing top 5 rows



In [247]:
train_bucketed.groupBy('BUCKET').count().orderBy('BUCKET').show()

+------+-----+
|BUCKET|count|
+------+-----+
|   1.0|10010|
|   2.0| 9994|
|   3.0|10001|
|   4.0| 9994|
|   5.0|10002|
|   6.0|10003|
|   7.0| 9996|
|   8.0|10001|
|   9.0|10009|
|  10.0| 9990|
+------+-----+



In [248]:
val_bucketed.show(5)

+------+-----+------------------+-----+-------+------+
|    ID|LABEL|              PROB|SCORE|SEGMENT|BUCKET|
+------+-----+------------------+-----+-------+------+
|105967|  0.0|0.6142610586471573|  867|      2|   4.0|
|116279|  1.0|0.6758005764366875|  641|      4|   4.0|
|138185|  0.0|0.8717114002906813|  560|      2|   2.0|
|141765|  0.0|0.9948260188796219|  278|      5|   1.0|
|133599|  1.0|0.8656106744998311|  367|      2|   2.0|
+------+-----+------------------+-----+-------+------+
only showing top 5 rows



In [249]:
val_bucketed.groupBy('BUCKET').count().orderBy('BUCKET').show()

+------+-----+
|BUCKET|count|
+------+-----+
|   1.0| 4916|
|   2.0| 5140|
|   3.0| 4783|
|   4.0| 5126|
|   5.0| 5160|
|   6.0| 5118|
|   7.0| 4901|
|   8.0| 4950|
|   9.0| 5021|
|  10.0| 4885|
+------+-----+



### BAD Capture Rate

In [284]:
from pyspark.sql.functions import sum, avg, max, min, round, count
train_bads = train_bucketed.groupBy('BUCKET') \
    .agg(sum("LABEL").alias("COUNT_BADS"), \
    round((sum("LABEL")/train_bucketed.agg({'LABEL': 'sum'}).collect()[0][0])*100, 2).alias("BAD_CAPTURE_RATE(%)"), \
    # round(avg("LABEL")*100, 2).alias("ACTUAL_BAD_RATE(%)")
    ).orderBy('BUCKET')
train_bads.show()

+------+----------+-------------------+
|BUCKET|COUNT_BADS|BAD_CAPTURE_RATE(%)|
+------+----------+-------------------+
|   1.0|    4953.0|               9.91|
|   2.0|    5044.0|              10.09|
|   3.0|    4919.0|               9.84|
|   4.0|    5026.0|              10.05|
|   5.0|    5030.0|              10.06|
|   6.0|    5034.0|              10.07|
|   7.0|    5039.0|              10.08|
|   8.0|    4988.0|               9.98|
|   9.0|    4999.0|               10.0|
|  10.0|    4968.0|               9.94|
+------+----------+-------------------+



In [239]:
train_bucketed.agg({'LABEL': 'sum'}).collect()[0][0]

50000.0

### PSI

In [260]:
from pyspark.sql.functions import log
def get_PSI(ref_df, score_df, numBuckets=10):
    ref_grp = ref_df.groupby('BUCKET') \
    .agg(((count("ID")/ref_df.agg({'ID': 'count'}).collect()[0][0])*100).alias("TRAIN_BUCKET_POP(%)")).orderBy('BUCKET')

    score_grp = score_df.groupby('BUCKET') \
    .agg(((count("ID")/score_df.agg({'ID': 'count'}).collect()[0][0])*100).alias("SCORE_BUCKET_POP(%)")).orderBy('BUCKET')

    final_grp = train_grp.join(score_grp, on='BUCKET', how='left').orderBy('BUCKET')
    final_grp = final_grp.withColumn("A-B", final_grp["SCORE_BUCKET_POP(%)"]-final_grp["TRAIN_BUCKET_POP(%)"])
    final_grp = final_grp.withColumn("ln(A/B)", log(final_grp["SCORE_BUCKET_POP(%)"]/final_grp["TRAIN_BUCKET_POP(%)"]))
    final_grp = final_grp.withColumn("PSI", final_grp["A-B"] * final_grp["ln(A/B)"])
    PSI = final_grp.agg({"PSI":'sum'}).collect()[0][0]
    return PSI

In [261]:
PSI = get_PSI(train_bucketed, val_bucketed)
print(f"PSI : {PSI}")

PSI : 0.06233653331089773


In [252]:
train_grp = train_bucketed.groupby('BUCKET') \
    .agg(((count("ID")/train_bucketed.agg({'ID': 'count'}).collect()[0][0])*100).alias("TRAIN_BUCKET_POP(%)") \
    ).orderBy('BUCKET')
train_grp.show()

+------+-------------------+
|BUCKET|TRAIN_BUCKET_POP(%)|
+------+-------------------+
|   1.0|              10.01|
|   2.0|              9.994|
|   3.0|             10.001|
|   4.0|              9.994|
|   5.0| 10.001999999999999|
|   6.0|             10.003|
|   7.0|  9.995999999999999|
|   8.0|             10.001|
|   9.0|             10.009|
|  10.0|               9.99|
+------+-------------------+



In [253]:
val_grp = val_bucketed.groupby('BUCKET') \
    .agg(((count("ID")/val_bucketed.agg({'ID': 'count'}).collect()[0][0])*100).alias("VAL_BUCKET_POP(%)") \
    ).orderBy('BUCKET')
val_grp.show()

+------+------------------+
|BUCKET| VAL_BUCKET_POP(%)|
+------+------------------+
|   1.0|             9.832|
|   2.0|10.280000000000001|
|   3.0| 9.565999999999999|
|   4.0|            10.252|
|   5.0|             10.32|
|   6.0|            10.236|
|   7.0|             9.802|
|   8.0|               9.9|
|   9.0|            10.042|
|  10.0|              9.77|
+------+------------------+



In [254]:
# temp1 = train_grp.withColumn("BUCKET_POP(%)") - val_grp.withColumn("BUCKET_POP(%)")
# temp

In [257]:
temp = train_grp.join(val_grp, on='BUCKET', how='left').orderBy('BUCKET')
temp.show()

+------+-------------------+------------------+
|BUCKET|TRAIN_BUCKET_POP(%)| VAL_BUCKET_POP(%)|
+------+-------------------+------------------+
|   1.0|              10.01|             9.832|
|   2.0|              9.994|10.280000000000001|
|   3.0|             10.001| 9.565999999999999|
|   4.0|              9.994|            10.252|
|   5.0| 10.001999999999999|             10.32|
|   6.0|             10.003|            10.236|
|   7.0|  9.995999999999999|             9.802|
|   8.0|             10.001|               9.9|
|   9.0|             10.009|            10.042|
|  10.0|               9.99|              9.77|
+------+-------------------+------------------+



In [258]:
from pyspark.sql.functions import log
temp = temp.withColumn("A-B", temp["VAL_BUCKET_POP(%)"]-temp["TRAIN_BUCKET_POP(%)"])
temp = temp.withColumn("ln(A/B)", log(temp["VAL_BUCKET_POP(%)"]/temp["TRAIN_BUCKET_POP(%)"]))
temp = temp.withColumn("PSI", temp["A-B"] * temp["ln(A/B)"])
temp.show()

+------+-------------------+------------------+--------------------+--------------------+--------------------+
|BUCKET|TRAIN_BUCKET_POP(%)| VAL_BUCKET_POP(%)|                 A-B|             ln(A/B)|                 PSI|
+------+-------------------+------------------+--------------------+--------------------+--------------------+
|   1.0|              10.01|             9.832|-0.17799999999999905|-0.01794222106339...|0.003193715349284365|
|   2.0|              9.994|10.280000000000001| 0.28600000000000136|0.028215347105005832|0.008069589272031706|
|   3.0|             10.001| 9.565999999999999| -0.4350000000000005|-0.04446994273627385|0.019344425090279147|
|   4.0|              9.994|            10.252|  0.2580000000000009|0.025487895579811504|0.006575877059591391|
|   5.0| 10.001999999999999|             10.32|  0.3180000000000014| 0.03129868705670495|0.009952982484032218|
|   6.0|             10.003|            10.236| 0.23300000000000054| 0.02302587029449887|0.005365027778618249|
|

In [259]:
PSI = temp.agg({"PSI":'sum'}).collect()[0][0]
PSI

0.06233653331089773

### KS

In [344]:
from pyspark.sql import Window
from pyspark.sql import functions as F

def get_ks(data=None, target="LABEL", prob="PROB"):
    data = data.withColumn("LABEL0", 1-data["LABEL"])
    kstable = data.groupby('BUCKET') \
    .agg(min("PROB").alias("min_prob"), \
         max("PROB").alias("max_prob"), \
         sum("LABEL").alias("events"), \
         sum("LABEL0").alias("nonevents"))

    kstable = kstable.orderBy('min_prob', ascending=False)
    kstable = kstable.withColumn("event_rate", kstable["events"]/data.agg({"LABEL" : "sum"}).collect()[0][0])
    kstable = kstable.withColumn("nonevent_rate", kstable["nonevents"]/data.agg({"LABEL0" : "sum"}).collect()[0][0])
    my_window = (Window.orderBy('min_prob').rowsBetween(Window.unboundedPreceding, 0))
    kstable = kstable.withColumn("cum_eventrate", F.sum("event_rate").over(my_window))
    kstable = kstable.withColumn("cum_noneventrate", F.sum("nonevent_rate").over(my_window))
    kstable = kstable.withColumn("KS", round(kstable["cum_eventrate"]-kstable["cum_noneventrate"], 3)*100)

    #Formating
    kstable = kstable.withColumn("cum_eventrate", round(kstable["cum_eventrate"], 3))
    kstable = kstable.withColumn("cum_noneventrate", round(kstable["cum_noneventrate"], 3))
    
    #Display KS
    kstable = kstable.orderBy('BUCKET', ascending=True)
    # print(Fore.RED + "KS is " + str(max(kstable['KS']))+"%"+ " at decile " + str((kstable.index[kstable['KS']==max(kstable['KS'])][0])))
    print("KS is " + str(kstable.agg({'KS':'max'}).collect()[0][0])+"%"+ " at decile " + str(kstable.select("BUCKET").where(kstable['KS']==kstable.agg({'KS':'max'}).collect()[0][0]).collect()[0][0]))
    return(kstable)

In [345]:
train_ks = get_ks(train_bucketed)
train_ks.show()

KS is 0.3% at decile 4.0
+------+--------------------+-------------------+------+---------+----------+-------------+-------------+----------------+----+
|BUCKET|            min_prob|           max_prob|events|nonevents|event_rate|nonevent_rate|cum_eventrate|cum_noneventrate|  KS|
+------+--------------------+-------------------+------+---------+----------+-------------+-------------+----------------+----+
|   1.0|  0.9001488110014753|  0.999979839176757|4953.0|   5057.0|   0.09906|      0.10114|          1.0|             1.0| 0.0|
|   2.0|  0.8003422961531734| 0.9001473080139691|5044.0|   4950.0|   0.10088|        0.099|        0.901|           0.899| 0.2|
|   3.0|  0.7013894392499551| 0.8003368942123185|4919.0|   5082.0|   0.09838|      0.10164|          0.8|             0.8| 0.0|
|   4.0|  0.5995457170330696|  0.701386326802304|5026.0|   4968.0|   0.10052|      0.09936|        0.702|           0.698| 0.3|
|   5.0|  0.4979335591313695| 0.5995409069864155|5030.0|   4972.0|    0.1006|  

In [346]:
val_ks = get_ks(val_bucketed)
val_ks.show()

KS is 0.4% at decile 4.0
+------+--------------------+-------------------+------+---------+-------------------+-------------------+-------------+----------------+----+
|BUCKET|            min_prob|           max_prob|events|nonevents|         event_rate|      nonevent_rate|cum_eventrate|cum_noneventrate|  KS|
+------+--------------------+-------------------+------+---------+-------------------+-------------------+-------------+----------------+----+
|   1.0|  0.9001775261767412| 0.9999978105929195|2460.0|   2456.0|0.09794943260999403|0.09869399236487844|          1.0|             1.0| 0.0|
|   2.0|  0.8003509759230957| 0.9001475274125148|2579.0|   2561.0|0.10268763687039618|0.10291340164757887|        0.902|           0.901| 0.1|
|   3.0|   0.701395174286093| 0.8003221477827438|2361.0|   2422.0|0.09400756520007963|0.09732770745428973|        0.799|           0.798| 0.1|
|   4.0|  0.5995527103214316| 0.7013052080974834|2589.0|   2537.0|0.10308580529564006|0.10194896524010448|        0.7