## Creating Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import *
from pyspark.sql.functions import col, to_timestamp, when, to_date, mean, stddev, count, min, max, lit, row_number, round

In [2]:
spark = SparkSession.builder \
                    .master("local")\
                    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")\
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
                    .getOrCreate()

## Reading and Initial Processing

In [18]:
path = "file:/home/alphawrath/Data/Users/lionswrath/cloudwalk/transactions.csv"

df = spark.read\
          .option("header", True)\
          .option("delimiter", ",")\
          .format("csv")\
          .load(path)

In [20]:
df = df.withColumn('transaction_id', col('transaction_id').cast('int'))\
       .withColumn('merchant_id', col('merchant_id').cast('int'))\
       .withColumn('user_id', col('user_id').cast('int'))\
       .withColumn('transaction_date', to_timestamp(col('transaction_date')))\
       .withColumn('transaction_amount', col('transaction_amount').cast('decimal(12,2)'))\
       .withColumn('device_id', col('device_id').cast('int'))\
       .withColumn('has_cbk', when(col('has_cbk') == 'FALSE', False).otherwise(True))

In [21]:
df.show()

+--------------+-----------+-------+----------------+--------------------+------------------+---------+-------+
|transaction_id|merchant_id|user_id|     card_number|    transaction_date|transaction_amount|device_id|has_cbk|
+--------------+-----------+-------+----------------+--------------------+------------------+---------+-------+
|      21320398|      29744|  97051|434505******9116|2019-12-01 23:16:...|            374.56|   285475|  false|
|      21320399|      92895|   2708|444456******4210|2019-12-01 22:45:...|            734.87|   497105|   true|
|      21320400|      47759|  14777|425850******7024|2019-12-01 22:22:...|            760.36|     null|  false|
|      21320401|      68657|  69758|464296******3991|2019-12-01 21:59:...|           2556.13|     null|   true|
|      21320402|      54075|  64367|650487******6116|2019-12-01 21:30:...|             55.36|   860232|  false|
|      21320403|      59566|  40759|516292******8220|2019-12-01 21:25:...|             60.49|   192705| 

In [24]:
df.coalesce(1)\
  .write\
  .format("delta")\
  .mode('overwrite')\
  .save("spark-warehouse/tb_transactions.delta")

## Exploratory Analysis

### Merchant Chargeback Rate

In [6]:
df_m_totals        = df.groupBy('merchant_id')\
                       .count()\
                       .sort(col('count').desc())\
                       .withColumnRenamed('count', 'total_purchases')

df_m_chargebacks   = df.groupBy('merchant_id', 'has_cbk')\
                     .count().orderBy(col('merchant_id').desc())

df_m_chargebacks_f = df_m_chargebacks.filter(~col('has_cbk'))\
                                     .select('merchant_id','count')\
                                     .withColumnRenamed('count', 'sum_f')
df_m_chargebacks_t = df_m_chargebacks.filter( col('has_cbk'))\
                                     .select('merchant_id','count')\
                                     .withColumnRenamed('count', 'sum_t')

# The merchant chargeback rate is a rate for fraud prone 
# payment detection based on the number of chargebacks of 
# the store. Maybe a minimum number of purchases is needed
# to filter stores that are in the beginning.
df_m_rates = df_m_totals.join(df_m_chargebacks_f, 'merchant_id', 'left')\
                      .join(df_m_chargebacks_t, 'merchant_id', 'left')\
                      .withColumn('sum_f', when(col('sum_f').isNull(), 0).otherwise(col('sum_f')))\
                      .withColumn('sum_t', when(col('sum_t').isNull(), 0).otherwise(col('sum_t')))\
                      .withColumn('rate', ((col('sum_t') * 100) / col('total_purchases')).cast('decimal(10,2)') )\
                      .orderBy(col('rate').desc())\
                      .withColumn('is_suspicious', when((col('rate') > 80) & (col('total_purchases') > 4), True).otherwise(False))

df_m_rates.show()

+-----------+---------------+-----+-----+------+-------------+
|merchant_id|total_purchases|sum_f|sum_t|  rate|is_suspicious|
+-----------+---------------+-----+-----+------+-------------+
|      84902|              1|    0|    1|100.00|        false|
|      11470|              1|    0|    1|100.00|        false|
|      56977|              1|    0|    1|100.00|        false|
|      92215|              1|    0|    1|100.00|        false|
|      73922|              1|    0|    1|100.00|        false|
|      52897|              3|    0|    3|100.00|        false|
|      62194|              1|    0|    1|100.00|        false|
|      18768|              1|    0|    1|100.00|        false|
|      72723|              4|    0|    4|100.00|        false|
|      41354|              3|    0|    3|100.00|        false|
|      25932|              4|    0|    4|100.00|        false|
|       2842|              1|    0|    1|100.00|        false|
|       1175|              1|    0|    1|100.00|       

In [7]:
df_m_rates.write\
          .format("delta")\
          .mode('overwrite')\
          .save("spark-warehouse/tb_merchant_cbk_rate.delta")

### Consumer Chargeback Rate

In [8]:
df_p_totals        = df.groupBy('user_id')\
                       .count()\
                       .sort(col('count').desc())\
                       .withColumnRenamed('count', 'total_purchases')

df_p_chargebacks   = df.groupBy('user_id', 'has_cbk')\
                     .count().orderBy(col('user_id').desc())

df_p_chargebacks_f = df_p_chargebacks.filter(~col('has_cbk'))\
                                     .select('user_id','count')\
                                     .withColumnRenamed('count', 'sum_f')
df_p_chargebacks_t = df_p_chargebacks.filter( col('has_cbk'))\
                                     .select('user_id','count')\
                                     .withColumnRenamed('count', 'sum_t')

# The consumer  chargeback rate is a rate for fraud prone 
# payment detection based on the number of chargebacks of 
# the consumer. Maybe a minimum number of purchases is needed
# to filter stores that are in the beginning.
df_p_rates = df_p_totals.join(df_p_chargebacks_f, 'user_id', 'left')\
                      .join(df_p_chargebacks_t, 'user_id', 'left')\
                      .withColumn('sum_f', when(col('sum_f').isNull(), 0).otherwise(col('sum_f')))\
                      .withColumn('sum_t', when(col('sum_t').isNull(), 0).otherwise(col('sum_t')))\
                      .withColumn('rate', ((col('sum_t') * 100) / col('total_purchases')).cast('decimal(10,2)') )\
                      .orderBy(col('rate').desc())\
                      .withColumn('is_suspicious', when((col('rate') > 50) & (col('total_purchases') > 3), True).otherwise(False))

df_p_rates.show(n=100)

+-------+---------------+-----+-----+------+-------------+
|user_id|total_purchases|sum_f|sum_t|  rate|is_suspicious|
+-------+---------------+-----+-----+------+-------------+
|  24543|              2|    0|    2|100.00|        false|
|  24027|              1|    0|    1|100.00|        false|
|  97150|              1|    0|    1|100.00|        false|
|  70557|              2|    0|    2|100.00|        false|
|  17929|              6|    0|    6|100.00|         true|
|  86219|              1|    0|    1|100.00|        false|
|  62888|              1|    0|    1|100.00|        false|
|  76819|              4|    0|    4|100.00|         true|
|  44531|              3|    0|    3|100.00|        false|
|  66979|              3|    0|    3|100.00|        false|
|  61047|              1|    0|    1|100.00|        false|
|  27987|              1|    0|    1|100.00|        false|
|  30874|              4|    0|    4|100.00|         true|
|  75710|             10|    0|   10|100.00|         tru

In [9]:
df_p_rates.write\
          .format("delta")\
          .mode('overwrite')\
          .save("spark-warehouse/tb_consumer_cbk_rate.delta")

### Frequency of purchases per user

In [10]:
# Ideally a history of each client must be maintained
# with the common number of purchases over the time.
df_p_purchases = df.withColumn('date', to_date(col('transaction_date')))\
                   .groupBy('user_id', 'date')\
                   .count()\
                   .sort(col('count').desc())

# Calculate the mean and deviation for each consumer
# Null standard deviation is removed because is trivial
df_p_purchases_agg = df_p_purchases.groupBy('user_id')\
                                   .agg(
                                        count('count').alias('count'),
                                        mean('count').alias('avg'), 
                                        stddev('count').alias('stddev'))\
                                   .sort(col('avg').desc(), col('stddev').desc())\
                                   .withColumn('avg', col('avg').cast('decimal(12,2)'))\
                                   .withColumn('stddev', col('stddev').cast('decimal(12,2)'))

# Most likely clients follow an binomial distribution or
# similar and a general analysis like this does not fit here
# df_p_purchases.groupBy().agg(mean('count'), stddev('count')).show()

# Mean and standard deviation of purchases by a consumer
# A day of buying can be considered suspicious if there is
# days with more purchases than round(avg+ stddev)
df_p_purchases_agg.show()
#df_p_purchases.show()

df_p_purchase_row = df.withColumn('date', to_date(col('transaction_date')))\
                      .withColumn('purchase_num', row_number().over(Window.partitionBy('user_id', 'date').orderBy('transaction_date')))\
                      .join(df_p_purchases_agg, 'user_id', 'left')\
                      .withColumn('is_suspicious', when(col('purchase_num') > round(col('avg') + col('stddev')), True).otherwise(False))\
                      .select('transaction_id', 'user_id', 'is_suspicious')

df_p_purchase_row.show()

+-------+-----+----+------+
|user_id|count| avg|stddev|
+-------+-----+----+------+
|  79054|    2|8.50|  2.12|
|  96025|    2|7.00|  5.66|
|  49106|    1|7.00|  null|
|   3584|    1|6.00|  null|
|  75710|    2|5.00|  2.83|
|  56877|    2|4.50|  3.54|
|   7281|    1|4.00|  null|
|  76837|    1|4.00|  null|
|  31819|    1|4.00|  null|
|  23204|    1|4.00|  null|
|  76819|    1|4.00|  null|
|  74585|    1|4.00|  null|
|  81152|    1|4.00|  null|
|  11750|   10|3.10|  2.38|
|  67519|    2|3.00|  2.83|
|  83722|    2|3.00|  1.41|
|  21768|    2|3.00|  1.41|
|  40779|    2|3.00|  1.41|
|   9669|    1|3.00|  null|
|  62541|    1|3.00|  null|
+-------+-----+----+------+
only showing top 20 rows

+--------------+-------+-------------+
|transaction_id|user_id|is_suspicious|
+--------------+-------+-------------+
|      21320649|   7725|        false|
|      21321455|  11065|        false|
|      21321447|  11065|        false|
|      21322366|  17246|        false|
|      21322358|  17246|     

### Value of the purchases per Merchant

In [11]:
# For stores, some purchases can be considered suspicious if the
# total amount goes over multiple times the amount purchased by
# by the majority of other customers.

df_m_values = df.filter(~col('has_cbk'))\
                .groupBy('merchant_id')\
                .agg(
                    count('transaction_amount').alias('count'),
                    min('transaction_amount').alias('min'),
                    max('transaction_amount').alias('max'),
                    mean('transaction_amount').alias('avg'), 
                    stddev('transaction_amount').alias('stddev'))\
                .filter(~col('stddev').isNull())\
                .sort(col('avg').desc(), col('stddev').desc())\
                .withColumn('avg', col('avg').cast('decimal(12,2)'))\
                .withColumn('stddev', col('stddev').cast('decimal(12,2)'))

df_m_values_val = df.join(df_m_values, 'merchant_id', 'left')\
                      .withColumn('is_suspicious', when((col('count') > 4) & (col('transaction_amount') > lit(2) * (col('avg') + col('stddev'))), True).otherwise(False))\
                      .select('transaction_id', 'merchant_id', 'is_suspicious')

df_m_values.show()
df_m_values_val.show()

+-----------+-----+-------+-------+-------+-------+
|merchant_id|count|    min|    max|    avg| stddev|
+-----------+-----+-------+-------+-------+-------+
|      61529|    2|4043.96|4070.87|4057.42|  19.03|
|      79266|    2|4006.85|4008.79|4007.82|   1.37|
|      81977|    2|3721.73|4072.37|3897.05| 247.94|
|      17304|    4|2463.69|4091.83|3642.10| 786.64|
|       1569|    2|3258.81|3334.50|3296.66|  53.52|
|       5832|    2|3001.65|3449.01|3225.33| 316.33|
|      24295|    2|3059.48|3087.71|3073.60|  19.96|
|      70943|    2|3067.61|3074.36|3070.99|   4.77|
|      65330|    2|2020.44|3996.79|3008.62|1397.49|
|      30121|    5|1866.69|3197.53|2857.33| 557.63|
|        341|    2|1809.16|3816.19|2812.68|1419.18|
|      55531|    2|2690.80|2726.27|2708.54|  25.08|
|      50970|    4|2521.23|2757.53|2616.09| 110.69|
|      36929|   13|   5.43|4077.07|2542.33|1501.98|
|      72057|    2| 700.80|4067.90|2384.35|2380.90|
|      84546|    2|2001.97|2750.30|2376.14| 529.15|
|      42687

### Value of the purchase by Consumer

In [12]:
# For stores, some purchases can be considered suspicious if the
# total amount goes over multiple times the amount purchased by
# by the majority of other customers.

df_p_values = df.filter(~col('has_cbk'))\
                .groupBy('user_id')\
                .agg(
                    count('transaction_amount').alias('count'),
                    min('transaction_amount').alias('min'),
                    max('transaction_amount').alias('max'),
                    mean('transaction_amount').alias('avg'), 
                    stddev('transaction_amount').alias('stddev'))\
                .filter(~col('stddev').isNull())\
                .sort(col('avg').desc(), col('stddev').desc())\
                .withColumn('avg', col('avg').cast('decimal(12,2)'))\
                .withColumn('stddev', col('stddev').cast('decimal(12,2)'))

df_p_values_val = df.join(df_p_values, 'user_id', 'left')\
                      .withColumn('is_suspicious', when((col('count') > 4) & (col('transaction_amount') > lit(2) * (col('avg') + col('stddev'))), True).otherwise(False))\
                      .select('transaction_id', 'user_id', 'is_suspicious')

df_p_values.show()
df_p_values_val.show()

+-------+-----+-------+-------+-------+-------+
|user_id|count|    min|    max|    avg| stddev|
+-------+-----+-------+-------+-------+-------+
|  25385|    2|4043.96|4070.87|4057.42|  19.03|
|   5537|    2|4006.85|4008.79|4007.82|   1.37|
|  49106|    7|1583.22|4077.07|3655.35| 915.94|
|  42388|    2|3518.81|3574.11|3546.46|  39.10|
|  31561|    3|2463.69|4091.83|3521.14| 916.74|
|  40779|    6|1866.69|3197.53|2893.78| 506.69|
|  57594|    2|2771.39|2821.35|2796.37|  35.33|
|  17407|    2|2690.80|2726.27|2708.54|  25.08|
|  79054|    2|1972.13|3165.49|2568.81| 843.83|
|  28518|    2| 700.80|4067.90|2384.35|2380.90|
|  55544|    2|2001.97|2750.30|2376.14| 529.15|
|   1516|    2|1354.65|3375.01|2364.83|1428.61|
|  20191|    2|2257.53|2431.18|2344.36| 122.79|
|  37373|    2|1551.68|2633.30|2092.49| 764.82|
|  58011|    2|1958.62|2030.52|1994.57|  50.84|
|  27939|    3|1177.96|3066.53|1859.00|1048.61|
|  63096|    2|1427.98|2076.23|1752.11| 458.38|
|  15629|    2| 625.75|2701.08|1663.42|1

## Merge analysis data

In [17]:
df_analysis = df.join(df_m_rates.select('merchant_id', col('is_suspicious').alias('merchant_rate')), 'merchant_id', 'left')\
                .join(df_p_rates.select('user_id', col('is_suspicious').alias('consumer_rate')), 'user_id', 'left')\
                .join(df_p_purchase_row.select('transaction_id', col('is_suspicious').alias('consumer_freq')), 'transaction_id', 'left')\
                .join(df_m_values_val.select('transaction_id', col('is_suspicious').alias('consumer_val')), 'transaction_id', 'left')\
                .join(df_p_values_val.select('transaction_id', col('is_suspicious').alias('merchant_val')), 'transaction_id', 'left')\
                .select(
                    'transaction_id', 
                    'merchant_id', 
                    'user_id', 
                    'device_id',
                    'merchant_rate', 
                    'consumer_rate', 
                    'consumer_freq', 
                    'consumer_val',
                    'merchant_val')

df_analysis.show()

+--------------+-----------+-------+---------+-------------+-------------+-------------+------------+------------+
|transaction_id|merchant_id|user_id|device_id|merchant_rate|consumer_rate|consumer_freq|consumer_val|merchant_val|
+--------------+-----------+-------+---------+-------------+-------------+-------------+------------+------------+
|      21320490|      30121|  40779|   571604|        false|        false|        false|       false|       false|
|      21320615|       5142|  84481|   417598|        false|        false|        false|       false|       false|
|      21320647|      24427|  76740|     null|        false|        false|        false|       false|       false|
|      21320976|      92057|  71366|     null|        false|        false|        false|       false|       false|
|      21320979|      79123|  31269|   896735|        false|        false|        false|       false|       false|
|      21321147|      36617|  79054|   101848|        false|         true|      

In [14]:
df_analysis.write.mode('overwrite').csv('res/analysis_all.csv')

In [15]:
df_analysis = df.filter(~col('has_cbk'))\
                .join(df_m_rates.select('merchant_id', col('is_suspicious').alias('merchant_rate')), 'merchant_id', 'left')\
                .join(df_p_rates.select('user_id', col('is_suspicious').alias('consumer_rate')), 'user_id', 'left')\
                .join(df_p_purchase_row.select('transaction_id', col('is_suspicious').alias('consumer_freq')), 'transaction_id', 'left')\
                .join(df_m_values_val.select('transaction_id', col('is_suspicious').alias('consumer_val')), 'transaction_id', 'left')\
                .join(df_p_values_val.select('transaction_id', col('is_suspicious').alias('merchant_val')), 'transaction_id', 'left')\
                .select(
                    'transaction_id', 
                    'merchant_id', 
                    'user_id',
                    'device_id',
                    'merchant_rate', 
                    'consumer_rate', 
                    'consumer_freq', 
                    'consumer_val',
                    'merchant_val')

df_analysis.show()

+--------------+-----------+-------+---------+-------------+-------------+-------------+------------+------------+
|transaction_id|merchant_id|user_id|device_id|merchant_rate|consumer_rate|consumer_freq|consumer_val|merchant_val|
+--------------+-----------+-------+---------+-------------+-------------+-------------+------------+------------+
|      21320490|      30121|  40779|   571604|        false|        false|        false|       false|       false|
|      21320615|       5142|  84481|   417598|        false|        false|        false|       false|       false|
|      21320647|      24427|  76740|     null|        false|        false|        false|       false|       false|
|      21320976|      92057|  71366|     null|        false|        false|        false|       false|       false|
|      21320979|      79123|  31269|   896735|        false|        false|        false|       false|       false|
|      21321209|      78654|  94895|   635203|        false|        false|      

In [16]:
df_analysis.write.mode('overwrite').csv('res/analysis_no_cbk.csv')