In [1]:
import sys

In [6]:
!{sys.executable} -m pip install -qU python-dotenv

In [2]:
import os
from dotenv import find_dotenv

package_root = "src"
sys.path.append(os.path.join(os.path.dirname(find_dotenv()), package_root))

In [3]:
sys.path

['/home/ajkdrag/work',
 '/usr/local/spark/python/lib/py4j-0.10.9-src.zip',
 '/usr/local/spark/python',
 '/home/ajkdrag/work',
 '/opt/conda/envs/python37/lib/python37.zip',
 '/opt/conda/envs/python37/lib/python3.7',
 '/opt/conda/envs/python37/lib/python3.7/lib-dynload',
 '',
 '/opt/conda/envs/python37/lib/python3.7/site-packages',
 '/opt/conda/envs/python37/lib/python3.7/site-packages/IPython/extensions',
 '/home/ajkdrag/.ipython',
 '/home/ajkdrag/src']

In [4]:
%load_ext autoreload
%autoreload 2

## Spark setup

In [5]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [6]:
jars = [
    "/usr/local/spark/jars/hadoop-aws-3.2.0.jar",
    "/usr/local/spark/jars/aws-java-sdk-bundle-1.11.375.jar",
    "/usr/local/spark/jars/spark-cassandra-connector-assembly_2.12-3.0.0.jar"
]

In [7]:
hosts = {"spark.cassandra.connection.host": 'cassandra'}

In [8]:
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "480m").\
        config("spark.jars", ",".join(jars)).\
        getOrCreate()

In [9]:
spark

In [10]:
sc = spark.sparkContext

In [11]:
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"])
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")

## Testing spark

In [12]:
vals = sc.parallelize([1,2,3])

In [13]:
vals.sum()

6

In [90]:
spark_opts_cust_table = {
    "keyspace": Config.keyspace,
    "table": Config.table_customer
}

In [91]:
df = spark.read.format(Config.cassandra_format) \
            .options(**spark_opts_cust_table, **hosts) \
            .load()

In [92]:
df.show(n=5)

+------+----+---+-----+------+---+----+---+----+-----+------+---+
|cc_num|city|dob|first|gender|job|last|lat|long|state|street|zip|
+------+----+---+-----+------+---+----+---+----+-----+------+---+
+------+----+---+-----+------+---+----+---+----+-----+------+---+



# Job 1: Import to Cassandra

## Loading customer df to Cassandra

In [16]:
from schemas.structs import customer_schema

In [17]:
customer_df = spark.read.csv(Config.path_customer, schema=customer_schema, header=True)

In [18]:
customer_df.show(n=5)

+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+-------------------+
|          cc_num|  first|    last|gender|              street|          city|state|  zip|    lat|     long|                 job|                dob|
+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+-------------------+
|3526015186182660|   Carl|   Gomez|     M|204 Cohen Meadow ...|Hathaway Pines|   CA|95233|38.1919|-120.3644|Data processing m...|1958-10-11 18:30:00|
|4170242670039985|Rebecca|Trujillo|     F|       242 Cody Pass|      Colstrip|   MT|59323|45.9344|-106.6368|          Air broker|1983-08-08 18:30:00|
|   4006862159277| Cheryl|    Rice|     F|0771 Solis Road A...|      Brooklyn|   NY|11228|40.6174| -74.0121|     Tourism officer|1957-07-23 18:30:00|
|3593533875650654|  Cindy|     Ray|     F|     09110 Marie Run|        Oswego|   IL|60543|41.6849| -

In [52]:
spark_opts_cust_table = {
    "keyspace": Config.keyspace,
    "table": Config.table_customer,
    "confirm.truncate": True
}

In [48]:
(
    customer_df.write
        .format(Config.cassandra_format)
        .mode("overwrite")
        .options(**spark_opts_cust_table, **hosts)
        .save()
)

In [49]:
(
    spark.read
        .format(Config.cassandra_format)
        .options(**spark_opts_cust_table, **hosts)
        .load()
        .show(n=5)
)

+---------------+-----------+-------------------+-------+------+--------------------+-------+-------+---------+-----+--------------------+-----+
|         cc_num|       city|                dob|  first|gender|                 job|   last|    lat|     long|state|              street|  zip|
+---------------+-----------+-------------------+-------+------+--------------------+-------+-------+---------+-----+--------------------+-----+
|180036251237802|  Salt Lick|1956-07-19 18:30:00|Melissa|     F|Psychologist, for...|  James| 38.104| -83.6316|   KY|     537 Bryant Mall|40371|
|   676165681542|      Salem|1954-07-03 18:30:00|   John|     M|Human resources o...| Garcia|44.9039|-123.0445|   OR|34153 Maria Mountain|97302|
| 38535403302699|     London|1970-04-16 18:30:00|William|     M|Insurance risk su...|   Reed|39.9001| -83.4439|   OH|334 Adam Lodge Su...|43140|
| 30157941709315|  Caledonia|1974-11-03 18:30:00|Maurice|     M|Hydrographic surv...|  Simon|43.6221| -91.4837|   MN|031 Jessica H

## Loading fraud/non-fraud df to cassandra

In [41]:
from schemas.structs import fraud_transaction_schema
from utils.general import get_haversine_distance_udf

In [34]:
transaction_df = (
                    spark.read
                        .csv(Config.path_transactions, schema=fraud_transaction_schema, header=True)
                        .withColumn("trans_date", F.split("trans_date", "T").getItem(0))
                        .withColumn("trans_time", F.to_timestamp(F.concat_ws(" ", "trans_date", "trans_time")))
                )

In [35]:
transaction_df.show(n=5)

+----------------+-------+-------+--------------------+----------+-------------------+----------+-------------+--------------------+-----+---------+-----------+--------+
|          cc_num|  first|   last|           trans_num|trans_date|         trans_time| unix_time|     category|            merchant|  amt|merch_lat| merch_long|is_fraud|
+----------------+-------+-------+--------------------+----------+-------------------+----------+-------------+--------------------+-----+---------+-----------+--------+
| 180094108369013|   John|Holland|80f5177be11f0bcd7...|2012-01-01|2012-01-01 00:12:15|1325376735|personal_care|         Hills-Boyer| 64.0|39.011566|-119.937831|     0.0|
|4368593032190508|  Carla|Fleming|7933d389bf8ef8a11...|2012-01-01|2012-01-01 00:16:58|1325377018|gas_transport|      Friesen-DAmore|133.0|40.149071| -75.589697|     0.0|
|   4361355512072|Matthew| Nelson|1467c318b5d73d22d...|2012-01-01|2012-01-01 00:36:42|1325378202|entertainment|         Larson-Moen|119.0|47.297797| -

In [36]:
transaction_df.printSchema()

root
 |-- cc_num: string (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- trans_time: timestamp (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- category: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: double (nullable = true)



In [37]:
customer_age_df = customer_df.withColumn("age", (F.datediff(F.current_date(), F.to_date("dob")) / 365).cast(T.IntegerType()))

In [38]:
customer_age_df.show(n=3)

+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+-------------------+---+
|          cc_num|  first|    last|gender|              street|          city|state|  zip|    lat|     long|                 job|                dob|age|
+----------------+-------+--------+------+--------------------+--------------+-----+-----+-------+---------+--------------------+-------------------+---+
|3526015186182660|   Carl|   Gomez|     M|204 Cohen Meadow ...|Hathaway Pines|   CA|95233|38.1919|-120.3644|Data processing m...|1958-10-11 18:30:00| 64|
|4170242670039985|Rebecca|Trujillo|     F|       242 Cody Pass|      Colstrip|   MT|59323|45.9344|-106.6368|          Air broker|1983-08-08 18:30:00| 39|
|   4006862159277| Cheryl|    Rice|     F|0771 Solis Road A...|      Brooklyn|   NY|11228|40.6174| -74.0121|     Tourism officer|1957-07-23 18:30:00| 65|
+----------------+-------+--------+------+--------------------+-------------

In [43]:
processed_df = (
                transaction_df.join(F.broadcast(customer_age_df).alias("cust"), "cc_num", how="inner")
                        .withColumn("distance", F.round(get_haversine_distance_udf("cust.lat", "cust.long", "merch_lat", "merch_long"), 2))
                        .select("cc_num", "trans_num", "trans_time", "category", "merchant", "amt", "merch_lat", "merch_long", "distance", "age", "is_fraud")
            )

In [44]:
processed_df.show(n=2)

+----------------+--------------------+-------------------+-------------+--------------+-----+---------+-----------+--------+---+--------+
|          cc_num|           trans_num|         trans_time|     category|      merchant|  amt|merch_lat| merch_long|distance|age|is_fraud|
+----------------+--------------------+-------------------+-------------+--------------+-----+---------+-----------+--------+---+--------+
| 180094108369013|80f5177be11f0bcd7...|2012-01-01 00:12:15|personal_care|   Hills-Boyer| 64.0|39.011566|-119.937831|    1.23| 73|     0.0|
|4368593032190508|7933d389bf8ef8a11...|2012-01-01 00:16:58|gas_transport|Friesen-DAmore|133.0|40.149071| -75.589697|    3.51| 62|     0.0|
+----------------+--------------------+-------------------+-------------+--------------+-----+---------+-----------+--------+---+--------+
only showing top 2 rows



In [45]:
processed_df.cache()

DataFrame[cc_num: string, trans_num: string, trans_time: timestamp, category: string, merchant: string, amt: double, merch_lat: double, merch_long: double, distance: double, age: int, is_fraud: double]

In [46]:
fraud_df = processed_df.filter(F.col("is_fraud") == 1)
non_fraud_df = processed_df.filter(F.col("is_fraud") == 0)

In [47]:
fraud_df.show(n=3)

+----------------+--------------------+-------------------+--------------+--------------+------+---------+-----------+--------+---+--------+
|          cc_num|           trans_num|         trans_time|      category|      merchant|   amt|merch_lat| merch_long|distance|age|is_fraud|
+----------------+--------------------+-------------------+--------------+--------------+------+---------+-----------+--------+---+--------+
|5157436163845247|8e0299d3779108d4d...|2012-01-02 01:23:34|health_fitness|Dietrich-Fadel|1774.0|33.619662|-117.391852|  225.49| 32|     1.0|
|5157436163845247|2fe127c95a68344e6...|2012-01-02 01:52:06|  shopping_pos|     Lynch Ltd|2013.0|34.570986| -118.71129|   89.74| 32|     1.0|
|5157436163845247|80d10820173241448...|2012-01-02 02:00:37|  shopping_pos|  Hudson-Grady|1801.0|36.988153|-120.376765|  154.79| 32|     1.0|
+----------------+--------------------+-------------------+--------------+--------------+------+---------+-----------+--------+---+--------+
only showing 

In [54]:
spark_opts_fraud_table = {
    "keyspace": Config.keyspace,
    "table": Config.table_fraud,
    "confirm.truncate": True
}

In [53]:
(
    fraud_df.write
        .format(Config.cassandra_format)
        .mode("overwrite")
        .options(**spark_opts_fraud_table, **hosts)
        .save()
)

In [55]:
spark_opts_non_fraud_table = {
    "keyspace": Config.keyspace,
    "table": Config.table_non_fraud,
    "confirm.truncate": True
}

In [56]:
(
    non_fraud_df.write
        .format(Config.cassandra_format)
        .mode("overwrite")
        .options(**spark_opts_non_fraud_table, **hosts)
        .save()
)

In [57]:
(
    spark.read
        .format(Config.cassandra_format)
        .options(**spark_opts_fraud_table, **hosts)
        .load()
        .show(n=5)
)

+---------------+-------------------+---+------+------------+--------+--------+---------+----------+--------------------+--------------------+
|         cc_num|         trans_time|age|   amt|    category|distance|is_fraud|merch_lat|merch_long|            merchant|           trans_num|
+---------------+-------------------+---+------+------------+--------+--------+---------+----------+--------------------+--------------------+
|341559343212109|2012-01-17 23:36:55| 78|1082.0|shopping_net|    52.7|     1.0|39.810603|-88.442308|    Schmidt and Sons|4e8d75754e4e9b6df...|
|341559343212109|2012-01-17 23:17:40| 78|1376.0| grocery_pos|  169.17|     1.0|37.921349|-86.744044|        Bailey-Morar|f154f3c85a0239b84...|
|341559343212109|2012-01-17 23:09:48| 78| 248.0| grocery_pos|   152.7|     1.0|38.021388|-86.344542|           Kunze Inc|ff94e13997e34c06c...|
|341559343212109|2012-01-17 22:41:58| 78| 998.0|shopping_net|   52.01|     1.0|38.990974|-87.871855|Reichert, Rowe an...|c11e2d86100c70885...|

In [58]:
(
    spark.read
        .format(Config.cassandra_format)
        .options(**spark_opts_non_fraud_table, **hosts)
        .load()
        .show(n=5)
)

+--------------+-------------------+---+-----+-------------+--------+--------+---------+----------+--------------------+--------------------+
|        cc_num|         trans_time|age|  amt|     category|distance|is_fraud|merch_lat|merch_long|            merchant|           trans_num|
+--------------+-------------------+---+-----+-------------+--------+--------+---------+----------+--------------------+--------------------+
|30157941709315|2012-02-01 05:05:11| 48|375.0|     misc_net|    0.65|     0.0|43.627917|-91.484569|      Flatley-Durgan|600e35d550f6046ce...|
|30157941709315|2012-01-31 15:14:54| 48|357.0|entertainment|    1.94|     0.0|43.608473|-91.472015|Schaefer, Fay and...|c6ecaa2e1f29ee311...|
|30157941709315|2012-01-30 21:51:14| 48|408.0|entertainment|    2.21|     0.0|43.637434| -91.49716|Parker, Nolan and...|bcc9e3c696ce6f154...|
|30157941709315|2012-01-30 19:31:03| 48| 57.0|  food_dining|    0.94|     0.0|43.621082|-91.474712|        Leannon-Ward|27e66cdb5f564e756...|
|30157

## Loading fraud/non-fraud df to cassandra (using package)

In [46]:
import importlib
from main import parse_config, init_or_get_spark

In [47]:
config_file = "../src/configs/base_config.json"

In [48]:
config = parse_config(config_file)

In [49]:
config

Config(spark=SparkConfig(app_name='fraud_detection', master='spark://spark-master:7077', jars='/usr/local/spark/jars/hadoop-aws-3.2.0.jar,/usr/local/spark/jars/aws-java-sdk-bundle-1.11.375.jar,/usr/local/spark/jars/spark-cassandra-connector-assembly_2.12-3.0.0.jar', exec_mem='480m', cassandra_format='org.apache.spark.sql.cassandra', cassandra_host='cassandra'), cassandra=CassandraConfig(keyspace='creditcard', table_customer='customer', table_fraud='fraud_transaction', table_non_fraud='non_fraud_transaction'), s3=S3Config(path_transactions='s3a://realtime-ml/data/raw/transactions.csv', path_customer='s3a://realtime-ml/data/raw/customer.csv', path_ml_artifacts='s3a://realtime-ml/ml_artifacts/'), ml=MLConfig(feature_cols=['cc_num', 'category', 'merchant', 'distance', 'amt', 'age'], train_pct=0.7, seed=123, feature_col_name='features', label_col_name='label', kmeans_extra_args={'maxIter': 30, 'seed': 123}, model_extra_args={'maxBins': 700, 'seed': 123}))

In [50]:
config.run_id

'9d024f2b89d44f768f3d33872e2656ef'

In [17]:
spark

In [18]:
job_name = "import_to_cassandra"

In [19]:
job_module = importlib.import_module(f"jobs.{job_name}.entrypoint")
getattr(job_module, "run")(spark, config)



# Job 2: Fraud Detection Training

In [40]:
job_name = "fraud_detection_training"

In [41]:
import numpy as np
np.random.seed(1111)

In [51]:
job_module = importlib.import_module(f"jobs.{job_name}.entrypoint")
getattr(job_module, "run")(spark, config)

{'maxIter': 30, 'seed': 123}
{'maxBins': 700, 'seed': 123}
0.9997526339502256


In [26]:
job_module = importlib.import_module(f"jobs.{job_name}.entrypoint")
getattr(job_module, "run")(spark, config)

{'maxIter': 30, 'seed': 123}
{'maxBins': 700, 'seed': 123}
0.9995154474091957


In [27]:
job_module = importlib.import_module(f"jobs.{job_name}.entrypoint")
getattr(job_module, "run")(spark, config)

{'maxIter': 30, 'seed': 123}
{'maxBins': 700, 'seed': 123}
0.9884113299530023
