In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

MONGO_CONN = "<connection_string>"

database="fraud-detection"
collection="txn-data"

In [None]:
pipeline = []
df=(spark.read.format("mongodb").\
	option('spark.mongodb.connection.uri', MONGO_CONN).\
	option('spark.mongodb.database', database).\
	option('spark.mongodb.collection', collection).\
	option("forceDeleteTempCheckpointLocation", "true").load())

In [None]:
columns = ["_id","cc_num",'category','amt','zip','lat','long', 'gender','city_pop','merch_lat','merch_long','dob','trans_date_trans_time','is_fraud']
fdf = df.select(*columns)

In [None]:
from datetime import datetime, timedelta
@F.udf()
def convert_dob_dt(time):
    format = "%Y-%m-%d"
    return datetime.strptime(time,format)
@F.udf(T.IntegerType())
def convert_datetime(time):
    format = "%Y-%m-%d %H:%M:%S"
    return datetime.strptime(time,format)
@F.udf(T.IntegerType())
def get_hour(time):
    return int(time.hour)
@F.udf(T.IntegerType())
def get_day(time):
    return time.day
@F.udf(T.IntegerType())
def get_month(time):
    return time.month
@F.udf(T.IntegerType())
def get_year(time):
    return time.year

In [None]:
fdf = fdf.withColumn("trans_date_trans_time", convert_datetime("trans_date_trans_time")).withColumn("dob",convert_dob_dt("dob"))
fdf = fdf.withColumn("hour", get_hour("trans_date_trans_time")).withColumn("day",get_day("trans_date_trans_time")).withColumn("month",get_month("trans_date_trans_time"))
fdf = fdf.withColumn("age", (get_year("trans_date_trans_time") - get_year("dob")))

In [None]:
f_columns = ["_id","cc_num",'category', 'gender','amt','zip','lat','long','city_pop','merch_lat','merch_long','age','hour','day','month','is_fraud']

In [None]:
cat_lookup = fdf.select("category").distinct().withColumn("cat_id", F.monotonically_increasing_id())
gender_lookup = fdf.select("gender").distinct().withColumn("gender_id", F.monotonically_increasing_id())

cat_lookup.write.format("mongodb").option('spark.mongodb.connection.uri', MONGO_CONN).\
    option('spark.mongodb.database', database).\
	option('spark.mongodb.collection', "category_lookup").\
	option("forceDeleteTempCheckpointLocation", "true").mode("overwrite").save()
gender_lookup.write.format("mongodb").option('spark.mongodb.connection.uri', MONGO_CONN).\
    option('spark.mongodb.database', database).\
	option('spark.mongodb.collection', "gender_lookup").\
	option("forceDeleteTempCheckpointLocation", "true").mode("overwrite").save()

In [None]:
fdf = fdf.select(*f_columns)
fdf = fdf.withColumn("amt",fdf.amt.cast("double"))
fdf = fdf.withColumn("lat",fdf.lat.cast("double"))
fdf = fdf.withColumn("long",fdf.long.cast("double"))
fdf = fdf.withColumn("merch_lat",fdf.merch_lat.cast("double"))
fdf = fdf.withColumn("merch_long",fdf.merch_long.cast("double"))
fdf = fdf.withColumn("is_fraud",fdf.is_fraud.cast("integer"))
fdf = fdf.withColumn("city_pop",fdf.city_pop.cast("integer"))
fdf = fdf.withColumn("zip",fdf.zip.cast("integer"))
fdf = fdf.join(F.broadcast(cat_lookup),on="category")
fdf = fdf.join(F.broadcast(gender_lookup),on="gender")
fdf = fdf.drop("category")
fdf = fdf.drop("gender")
fdf.show(10,False)

+------------------------+----------------+------+-----+-------+------------------+--------+------------------+------------------+---+----+---+-----+--------+------+---------+
|_id                     |cc_num          |amt   |zip  |lat    |long              |city_pop|merch_lat         |merch_long        |age|hour|day|month|is_fraud|cat_id|gender_id|
+------------------------+----------------+------+-----+-------+------------------+--------+------------------+------------------+---+----+---+-----+--------+------+---------+
|63d0c513c9e4437035cd9efb|2703186189652095|4.97  |28654|36.0788|-81.1781          |3495    |36.011293         |-82.048315        |31 |0   |1  |1    |0       |1     |0        |
|63d0c513c9e4437035cd9efc|630423337322    |107.23|99160|48.8878|-118.2105         |149     |49.159046999999994|-118.186462       |41 |0   |1  |1    |0       |2     |0        |
|63d0c513c9e4437035cd9efd|38859492057661  |220.11|83252|42.1808|-112.262          |4154    |43.150704         |-112.1544

In [None]:
fdf.write.mode("overwrite").format("delta").saveAsTable("fraud_demo_txn_data")

In [None]:
fdf = spark.read.table("fraud_demo_txn_data")

In [None]:
from databricks.feature_store import feature_table
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()

features_df = fdf.drop("is_fraud")

customer_feature_table = fs.create_table(
  name='default.bfsi_txn_features',
  primary_keys='_id',
  schema=features_df.schema,
  description='Transaction features'
)

fs.write_table(
  name='default.bfsi_txn_features',
  df = features_df,
  mode = 'overwrite'
)

2023/01/25 18:33:21 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.default.bfsi_txn_features'.


In [None]:
fdf.printSchema()

root
 |-- _id: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- cat_id: long (nullable = true)
 |-- gender_id: long (nullable = true)



In [None]:
fdf.show(10,False)

+------------------------+-------------------+------+-----+-------+---------+--------+------------------+------------------+---+----+---+-----+--------+------+---------+
|_id                     |cc_num             |amt   |zip  |lat    |long     |city_pop|merch_lat         |merch_long        |age|hour|day|month|is_fraud|cat_id|gender_id|
+------------------------+-------------------+------+-----+-------+---------+--------+------------------+------------------+---+----+---+-----+--------+------+---------+
|63d0c5fdc9e4437035d30475|3559679414981506   |54.39 |13367|43.7893|-75.4156 |8830    |43.583257         |-76.060536        |28 |2   |15 |6    |0       |7     |0        |
|63d0c5fdc9e4437035d30476|4147608975828480   |160.8 |56019|44.1111|-94.9134 |914     |43.70858          |-95.353617        |75 |2   |15 |6    |0       |13    |1        |
|63d0c5fdc9e4437035d30477|4400011257587661852|15.93 |68859|41.4972|-98.7858 |509     |41.975213000000004|-98.388599        |39 |2   |15 |6    |0      

In [None]:
fdf.schema

Out[76]: StructType([StructField('_id', StringType(), True), StructField('cc_num', StringType(), True), StructField('amt', DoubleType(), True), StructField('zip', IntegerType(), True), StructField('lat', DoubleType(), True), StructField('long', DoubleType(), True), StructField('city_pop', IntegerType(), True), StructField('merch_lat', DoubleType(), True), StructField('merch_long', DoubleType(), True), StructField('age', IntegerType(), True), StructField('hour', IntegerType(), True), StructField('day', IntegerType(), True), StructField('month', IntegerType(), True), StructField('is_fraud', IntegerType(), True), StructField('cat_id', LongType(), True), StructField('gender_id', LongType(), True)])

In [None]:
user_df = df.select("first","last", "amt", "is_fraud").groupby("first","last").agg(F.sum("amt").alias("total"),F.avg("amt").alias("avg_txn_amt"),F.sum(F.lit(1)).alias("ntxn"), F.sum("is_fraud").alias("ftxn"))

In [None]:
user_df.count()

Out[78]: 973