In [1]:
from pyspark.sql import Column as PySparkColumn
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window

from pyspark.sql.types import DoubleType

from scipy.stats import norm

from typing import List, Union, NamedTuple

In [2]:
def get_local_spark_session() -> SparkSession:
    """creates a local spark session"""
    return SparkSession.builder.getOrCreate()


spark = get_local_spark_session()

spark.sparkContext.setLogLevel("OFF")

23/12/31 22:35:04 WARN Utils: Your hostname, pc resolves to a loopback address: 127.0.1.1; using 192.168.15.88 instead (on interface wlp7s0)
23/12/31 22:35:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/31 22:35:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Auxiliary Functions & Transformation Objects

In [3]:
# I decided to create both objects to make these domain metrics, concepts and transformations reusable somehow for other datasets

class TransformedColumns(NamedTuple):

    CARD_SCHEME_INFO = (
        F.when(F.col("CARD_SCHEME_LOCAL").isNotNull(),F.concat_ws("_",F.col("CARD_SCHEME"),F.col("CARD_SCHEME_LOCAL"))).otherwise(F.col("CARD_SCHEME")).alias("CARD_SCHEME_INFO")
    )
    
    THREE_DS_CHALLENGED = (
        F.when(F.col("IS_THREE_DS_CHALLENGED"),F.lit(True)).otherwise(False).alias("THREE_DS_CHALLENGED")
    )
    
    MCC = (
        F.regexp_extract("MERCHANT_CATEGORY_CODE", r"\d+", 0).alias("MCC")
    )
    
    BILLING_ADDRESS_PROVIDED = (
        F.when(F.col("BILLING_ADDRESS_COUNTRY").isNotNull(),F.lit(True)).otherwise(False).alias("BILLING_ADDRESS_PROVIDED")
    )
    
    CVV_PROVIDED = (
        F.when(F.col("CVV_PRESENT"),F.lit(True)).otherwise(False).alias("CVV_PROVIDED")
    )
    
    
class Metrics(NamedTuple):


    COUNT_RECEIVED = F.sum(F.lit(1)).alias("count_received")
    COUNT_AUTHORISED = F.sum("IS_AUTHORISED").alias("count_approved")

    AUTH_RATE_COUNT = F.round(COUNT_AUTHORISED / COUNT_RECEIVED,4).alias("auth_rate_count")

In [4]:
# The choice of pyspark for this exercise was because payment transactional data can be quite big.
# In scenarios where we need to collect and process a huge amount of days, for instance, can be almost 
# impossible or will demand large amounts of computation resources in frameworks like Pandas


def read_csv(spark_session, path: str) -> DataFrame:
    """loads csv data from a file path inferring the schema"""
    return (
        spark_session.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("treatEmptyValuesAsNulls", "true")
        .option("inferSchema", "true")
        .load(path)
    )


def add_z_score_column(df: DataFrame) -> DataFrame:
    """adds a z score column for 2 population proportions"""
    a1 = F.col("true_count_approved")
    a2 = F.col("false_count_approved")
    n1 = F.col("true_count_received")
    n2 = F.col("false_count_received")
    
    prob1 = F.col("true_auth_rate_count")
    prob2 = F.col("false_auth_rate_count")
    
    p1_minus_p2 = prob1 - prob2
    pop_prob = (a1+a2)/(n1+n2)
    
    raw_z_score = p1_minus_p2 / F.sqrt( (pop_prob*(1-pop_prob)) * ((1/n1)+(1/n2)) )
    
    cdf = spark.udf.register("norm_cdf", lambda x: float(norm.cdf(x)), DoubleType())
    
    return df.withColumn("z_score", F.round(raw_z_score, 6))


def add_p_value_column(df: DataFrame) -> DataFrame:
    """adds a p-value column for a given z-score"""
    
    #norm.cdf generates numpy float types, therefore the use of udf in order to convert it to a spark native type
    cdf = spark.udf.register("norm_cdf", lambda x: float(norm.cdf(abs(x))), DoubleType()) 
    df = (
        df.where(F.col("z_score").isNotNull())
        .where(F.col("true_count_received")>3) #filtering up at least 3 observations in each group to avoid unreasonable p-values
        .where(F.col("false_count_received")>3)
    )
               
    return df.withColumn("p_value", F.round(2 * (1 - cdf(F.col("z_score"))),6))


def add_significance_column(df: DataFrame, i: float) -> DataFrame:
    """adds a boolean column that indicates statistical significance at a given i level"""
    return df.withColumn(f"significant_at_{str(i)[-2:]}", F.when(F.col("p_value")<=i, F.lit(True)).otherwise(False))


def get_metrics_df_display(df: DataFrame, 
                           dimensions: List[Union[PySparkColumn, str]], 
                           pivot_col: Union[PySparkColumn, str], 
                           metrics: Metrics = Metrics()) -> DataFrame:
    """
    Auxiliary function for code organization purposes only. It could be converted into a more robust Python lib or in an 
    automated pipeline transformation in the future, but my focus was more on the potential data insights of the dataset
    """
    df_metrics = (
        df
        .groupby(*dimensions)
        .pivot(pivot_col)
        .agg(
            metrics.COUNT_AUTHORISED,
            metrics.COUNT_RECEIVED,
            metrics.AUTH_RATE_COUNT
        )
        .withColumn("total_count", F.col("false_count_received")+F.col("true_count_received"))
        .sort(F.desc("total_count"))
        .transform(add_z_score_column)
        .transform(add_p_value_column)
        .transform(add_significance_column, 0.05)
        .drop("false_count_approved","true_count_approved")
    )
    
    df_metrics.show(truncate=False)

## Datasets

In [5]:
transformed_cols = TransformedColumns()
metrics = Metrics()

df_transactions = read_csv(spark, "Downloads/transactions.csv") 
df_cards = read_csv(spark, "Downloads/cards.csv")

df_payments = (
    # I decided to focus on this particular subset as it seems to be the bulk of the main 
    # dataset and for simplification purposes to make the discussions more objective
    df_transactions
    .join(df_cards, on="CARD_FINGERPRINT", how="left")
    .where(F.col("ACQUIRER_COUNTRY_NAME")=="FR")
    .where(~F.col("IS_CARD_VERIFICATION"))
    .where(transformed_cols.MCC==5999)
    .where(F.col("card_type")=="debit")
)

## Internal Optimization & Network Token Application

In [6]:
get_metrics_df_display(
    df=df_payments.where(~F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO], 
    pivot_col="IS_OPTIMISED",
)

+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+--------+--------+-----------------+
|CARD_SCHEME_INFO           |false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score |p_value |significant_at_05|
+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+--------+--------+-----------------+
|visa_cartes_bancaires      |53                  |0.717                |1081               |0.8418              |1134       |2.395583|0.016594|true             |
|visa                       |34                  |0.6176               |951                |0.939               |985        |7.120188|0.0     |true             |
|mastercard_cartes_bancaires|49                  |0.7959               |846                |0.8215              |895        |0.453601|0.650116|false            |
|mastercard                 

                                                                                

In [7]:
get_metrics_df_display(
    df=df_payments.where(~F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO], 
    pivot_col="IS_NETWORK_TOKEN_USED",
)

+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|CARD_SCHEME_INFO           |false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score  |p_value |significant_at_05|
+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|visa_cartes_bancaires      |775                 |0.8129               |359                |0.8858              |1134       |3.083696 |0.002044|true             |
|visa                       |830                 |0.9301               |155                |0.9161              |985        |-0.618655|0.536144|false            |
|mastercard_cartes_bancaires|840                 |0.8155               |55                 |0.8909              |895        |1.410402 |0.158421|false            |
|mastercard           

In [8]:
(df_payments
 .where(~F.col("IS_RETRIED"))
 .groupBy("IS_OPTIMISED","IS_NETWORK_TOKEN_USED")
 .agg(F.sum(F.lit(1)).alias("count"),metrics.AUTH_RATE_COUNT)
 .sort(F.desc("count"))
 .show(truncate=False))

+------------+---------------------+-----+---------------+
|IS_OPTIMISED|IS_NETWORK_TOKEN_USED|count|auth_rate_count|
+------------+---------------------+-----+---------------+
|true        |false                |2802 |0.8712         |
|true        |true                 |699  |0.9056         |
|false       |false                |157  |0.7006         |
+------------+---------------------+-----+---------------+



In [9]:
get_metrics_df_display(
    df=df_payments.where(~F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO,"ISSUER_NAME"], 
    pivot_col="IS_NETWORK_TOKEN_USED",
)

+---------------------------+-----------------------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|CARD_SCHEME_INFO           |ISSUER_NAME                              |false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score  |p_value |significant_at_05|
+---------------------------+-----------------------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|visa_cartes_bancaires      |BPCE                                     |232                 |0.8922               |249                |0.8795              |481        |-0.437355|0.661854|false            |
|visa                       |CAIXABANK S.A.                           |248                 |0.9315               |75                 |0.9067              |323        |-0.717576|0.4

## Three DS

In [10]:
(df_payments
 .where(~F.col("IS_RETRIED"))
 .groupBy("HAS_THREE_DS_EXEMPTION","HAS_THREE_DS")
 .agg(F.sum(F.lit(1)).alias("count"),metrics.AUTH_RATE_COUNT)
 .sort(F.desc("count"))
 .show(truncate=False))

+----------------------+------------+-----+---------------+
|HAS_THREE_DS_EXEMPTION|HAS_THREE_DS|count|auth_rate_count|
+----------------------+------------+-----+---------------+
|true                  |false       |3205 |0.9423         |
|false                 |true        |261  |0.1533         |
|false                 |false       |116  |0.8276         |
|true                  |true        |76   |0.3684         |
+----------------------+------------+-----+---------------+



In [11]:
get_metrics_df_display(
    df=df_payments.where(~F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO,"HAS_THREE_DS"], 
    pivot_col="HAS_THREE_DS_EXEMPTION",
)

+---------------------------+------------+--------------------+---------------------+-------------------+--------------------+-----------+--------+--------+-----------------+
|CARD_SCHEME_INFO           |HAS_THREE_DS|false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score |p_value |significant_at_05|
+---------------------------+------------+--------------------+---------------------+-------------------+--------------------+-----------+--------+--------+-----------------+
|visa_cartes_bancaires      |false       |39                  |0.7949               |961                |0.9272              |1000       |3.020237|0.002526|true             |
|visa                       |false       |24                  |0.8333               |932                |0.9442              |956        |2.28433 |0.022352|true             |
|mastercard_cartes_bancaires|false       |39                  |0.8974               |716                |0.9413              

In [12]:
get_metrics_df_display(
    df=df_payments.where(~F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO,"HAS_THREE_DS_EXEMPTION"], 
    pivot_col="HAS_THREE_DS",
)

+---------------------------+----------------------+--------------------+---------------------+-------------------+--------------------+-----------+----------+--------+-----------------+
|CARD_SCHEME_INFO           |HAS_THREE_DS_EXEMPTION|false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score   |p_value |significant_at_05|
+---------------------------+----------------------+--------------------+---------------------+-------------------+--------------------+-----------+----------+--------+-----------------+
|visa_cartes_bancaires      |true                  |961                 |0.9272               |26                 |0.5                 |987        |-7.744884 |0.0     |true             |
|mastercard_cartes_bancaires|true                  |716                 |0.9413               |40                 |0.275               |756        |-14.058622|0.0     |true             |
|mastercard                 |true                  |596          

In [13]:
(df_payments
 .where(~F.col("IS_RETRIED"))
 .groupBy(transformed_cols.CARD_SCHEME_INFO)
 .pivot("HAS_THREE_DS_EXEMPTION")
 .agg(
     F.round(F.avg("TRANSACTION_AMOUNT_EUR")).alias("avg_ticket"),
     F.round(F.max("TRANSACTION_AMOUNT_EUR")).alias("max_ticket")
 )
 .show(truncate=False))

+---------------------------+----------------+----------------+---------------+---------------+
|CARD_SCHEME_INFO           |false_avg_ticket|false_max_ticket|true_avg_ticket|true_max_ticket|
+---------------------------+----------------+----------------+---------------+---------------+
|visa_cartes_bancaires      |93.0            |760.0           |48.0           |242.0          |
|mastercard                 |71.0            |320.0           |52.0           |230.0          |
|mastercard_cartes_bancaires|86.0            |432.0           |42.0           |236.0          |
|visa                       |161.0           |1480.0          |52.0           |239.0          |
+---------------------------+----------------+----------------+---------------+---------------+



In [14]:
(df_payments
 .where(~F.col("IS_RETRIED"))
 .where(F.col("TRANSACTION_AMOUNT_EUR")<55)
 .groupBy(transformed_cols.CARD_SCHEME_INFO)
 .pivot("HAS_THREE_DS_EXEMPTION")
 .count()
 .show(truncate=False))
 

+---------------------------+-----+----+
|CARD_SCHEME_INFO           |false|true|
+---------------------------+-----+----+
|visa_cartes_bancaires      |68   |700 |
|mastercard                 |22   |401 |
|mastercard_cartes_bancaires|65   |594 |
|visa                       |22   |618 |
+---------------------------+-----+----+



## Retries

In [15]:
get_metrics_df_display(
    df=df_payments.where(F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO], 
    pivot_col="IS_OPTIMISED",
)

+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|CARD_SCHEME_INFO           |false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score  |p_value |significant_at_05|
+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|visa_cartes_bancaires      |21                  |0.9524               |407                |0.7912              |428        |-1.797759|0.072215|false            |
|mastercard_cartes_bancaires|14                  |1.0                  |264                |0.9205              |278        |-1.096931|0.272672|false            |
|visa                       |26                  |0.9615               |38                 |0.2368              |64         |-5.705933|0.0     |true             |
|mastercard           

In [16]:
get_metrics_df_display(
    df=df_payments.where(F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO], 
    pivot_col="CVV_PRESENT",
)

+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|CARD_SCHEME_INFO           |false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score  |p_value |significant_at_05|
+---------------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|visa_cartes_bancaires      |216                 |0.9167               |212                |0.6792              |428        |-6.130812|0.0     |true             |
|mastercard_cartes_bancaires|176                 |0.9432               |102                |0.8922              |278        |-1.550861|0.120935|false            |
|visa                       |19                  |0.3158               |45                 |0.6222              |64         |2.244201 |0.024819|true             |
|mastercard           

In [17]:
get_metrics_df_display(
    df=df_payments.where(F.col("IS_RETRIED")), 
    dimensions=[transformed_cols.CARD_SCHEME_INFO, "IS_NETWORK_TOKEN_USED"], 
    pivot_col="CVV_PRESENT",
)

+---------------------------+---------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|CARD_SCHEME_INFO           |IS_NETWORK_TOKEN_USED|false_count_received|false_auth_rate_count|true_count_received|true_auth_rate_count|total_count|z_score  |p_value |significant_at_05|
+---------------------------+---------------------+--------------------+---------------------+-------------------+--------------------+-----------+---------+--------+-----------------+
|mastercard_cartes_bancaires|false                |170                 |0.9412               |100                |0.91                |270        |-0.967937|0.333076|false            |
|visa_cartes_bancaires      |false                |166                 |0.8916               |96                 |0.6875              |262        |-4.114867|3.9E-5  |true             |
|visa_cartes_bancaires      |true                 |50                  |1.0