In [0]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession \
    .builder \
        .appName("Financial Transaction") \
            .config("spark.ui.port","0") \
                .config("spark.sql.warehouse.dir",f"/user/{username}/warehouse") \
                    .enableHiveSupport() \
                        .master("yarn") \
                            .getOrCreate()

In [0]:
from pyspark.sql.types import *
transaction_schema = StructType([
    StructField("id",IntegerType()),
    StructField("date",TimestampType()),
    StructField("client_id",IntegerType()),
    StructField("card_id", IntegerType()),
    StructField("amount", StringType()),
    StructField("use_chip", StringType()),
    StructField("Merchent_id", IntegerType()),
    StructField("merchent_city",StringType()),
    StructField("merchent_state",StringType()),
    StructField("zip", DoubleType()),
    StructField("mcc", IntegerType()),
    StructField("error", StringType())
])

In [0]:
transaction_raw_df = spark.read \
    .format("csv") \
        .schema(transaction_schema) \
            .load("dbfs:/FileStore/FinancialTransaction/transaction_raw")

In [0]:
transaction_raw_df.show(truncate=False)

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+-----+
|id     |date               |client_id|card_id|amount |use_chip          |Merchent_id|merchent_city  |merchent_state|zip    |mcc |error|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+-----+
|7475327|2010-01-01 00:01:00|1556     |2972   |$-77.00|Swipe Transaction |59935      |Beulah         |ND            |58523.0|5499|null |
|7475328|2010-01-01 00:02:00|561      |4575   |$14.57 |Swipe Transaction |67570      |Bettendorf     |IA            |52722.0|5311|null |
|7475329|2010-01-01 00:02:00|1129     |102    |$80.00 |Swipe Transaction |27092      |Vista          |CA            |92084.0|4829|null |
|7475331|2010-01-01 00:05:00|430      |2860   |$200.00|Swipe Transaction |27092      |Crown Point    |IN            |46307.0|4829|null |
|7475332|2010-01-01 00:06:00|848      |39

In [0]:
from pyspark.sql.functions import regexp_replace,col

In [0]:
transaction_amount_df = transaction_raw_df.withColumn("amount", regexp_replace(col("amount"), r"[$]" , "").cast('float')) \
    .withColumnRenamed("amount", "amount_dollar")

In [0]:
transaction_final_df = transaction_amount_df

In [0]:
transaction_final_df.show(100, truncate=False)

+-------+-------------------+---------+-------+-------------+------------------+-----------+-----------------+--------------+-------+----+-----+
|id     |date               |client_id|card_id|amount_dollar|use_chip          |Merchent_id|merchent_city    |merchent_state|zip    |mcc |error|
+-------+-------------------+---------+-------+-------------+------------------+-----------+-----------------+--------------+-------+----+-----+
|7475327|2010-01-01 00:01:00|1556     |2972   |-77.0        |Swipe Transaction |59935      |Beulah           |ND            |58523.0|5499|null |
|7475328|2010-01-01 00:02:00|561      |4575   |14.57        |Swipe Transaction |67570      |Bettendorf       |IA            |52722.0|5311|null |
|7475329|2010-01-01 00:02:00|1129     |102    |80.0         |Swipe Transaction |27092      |Vista            |CA            |92084.0|4829|null |
|7475331|2010-01-01 00:05:00|430      |2860   |200.0        |Swipe Transaction |27092      |Crown Point      |IN            |46307

In [0]:
mcc_schema = StructType([
    StructField("mcc_code",IntegerType()),
    StructField("Description", StringType())
])

In [0]:
mcc_raw_df = spark.read \
    .format("csv") \
        .schema(mcc_schema) \
        .load("dbfs:/FileStore/FinancialTransaction/mcc_raw")

In [0]:
mcc_final_df = mcc_raw_df

In [0]:
mcc_raw_df.show(truncate=False)

+--------+------------------------------------------+
|mcc_code|Description                               |
+--------+------------------------------------------+
|5812    |Eating Places and Restaurants             |
|5541    |Service Stations                          |
|7996    |Amusement Parks, Carnivals, Circuses      |
|5411    |Grocery Stores, Supermarkets              |
|4784    |Tolls and Bridge Fees                     |
|4900    |Utilities - Electric, Gas, Water, Sanitary|
|5942    |Book Stores                               |
|5814    |Fast Food Restaurants                     |
|4829    |Money Transfer                            |
|5311    |Department Stores                         |
|5211    |Lumber and Building Materials             |
|5310    |Discount Stores                           |
|3780    |Computer Network Services                 |
|5499    |Miscellaneous Food Stores                 |
|4121    |Taxicabs and Limousines                   |
|5300    |Wholesale Clubs   

In [0]:
mcc_raw_df.groupBy("Description").count().show(100,truncate=False)

+---------------------------------------------------+-----+
|Description                                        |count|
+---------------------------------------------------+-----+
|Cleaning and Maintenance Services                  |1    |
|Local and Suburban Commuter Transportation         |1    |
|Floor Covering Stores                              |1    |
|Grocery Stores, Supermarkets                       |1    |
|Books, Periodicals, Newspapers                     |1    |
|Wholesale Clubs                                    |1    |
|Podiatrists                                        |1    |
|Computer Network Services                          |1    |
|Heating, Plumbing, Air Conditioning Contractors    |1    |
|Tools, Parts, Supplies Manufacturing               |1    |
|Steel Drums and Barrels                            |1    |
|Miscellaneous Metals                               |1    |
|Cruise Lines                                       |1    |
|Leather Goods                          

In [0]:
cards_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("client_id", IntegerType(), True),
    StructField("card_brand", StringType(), True),
    StructField("card_type", StringType(), True),
    StructField("card_number", LongType(), True),
    StructField("card_expiry", StringType(), True),
    StructField("cvv", IntegerType(), True),
    StructField("has_chip", StringType(), True),
    StructField("num_cards_issued", IntegerType(), True),
    StructField("credit_limit", StringType(), True),
    StructField("acct_open_date", StringType(), True),
    StructField("year_pin_last_changed", IntegerType(), True),
    StructField("card_on_dark_web", StringType(), True)
])

In [0]:
cards_raw_df = spark.read \
    .format("csv") \
        .schema(cards_schema) \
        .load("dbfs:/FileStore/FinancialTransaction/cards_raw")

In [0]:
cards_raw_df.display()

id,client_id,card_brand,card_type,card_number,card_expiry,cvv,has_chip,num_cards_issued,credit_limit,acct_open_date,year_pin_last_changed,card_on_dark_web
4524,825,Visa,Debit,4344676511950444,12/2022,623,YES,2,$24295,09/2002,2008,No
2731,825,Visa,Debit,4956965974959986,12/2020,393,YES,2,$21968,04/2014,2014,No
3701,825,Visa,Debit,4582313478255491,02/2024,719,YES,2,$46414,07/2003,2004,No
42,825,Visa,Credit,4879494103069057,08/2024,693,NO,1,$12400,01/2003,2012,No
4659,825,Mastercard,Debit (Prepaid),5722874738736011,03/2009,75,YES,1,$28,09/2008,2009,No
4537,1746,Visa,Credit,4404898874682993,09/2003,736,YES,1,$27500,09/2003,2012,No
1278,1746,Visa,Debit,4001482973848631,07/2022,972,YES,2,$28508,02/2011,2011,No
3687,1746,Mastercard,Debit,5627220683410948,06/2022,48,YES,2,$9022,07/2003,2015,No
3465,1746,Mastercard,Debit (Prepaid),5711382187309326,11/2020,722,YES,2,$54,06/2010,2015,No
3754,1746,Mastercard,Debit (Prepaid),5766121508358701,02/2023,908,YES,1,$99,07/2006,2012,No


In [0]:
from pyspark.sql.functions import *

In [0]:
cards_date_df = cards_raw_df.withColumn("card_expiry", to_date(col("card_expiry"), "MM/yyyy").cast('date')) \
    .withColumn("acct_open_date", to_date(col("acct_open_date"), "MM/yyyy")) \
    .withColumn("acct_open_month", month(col("acct_open_date")).cast('int')) \
    .withColumn("acct_open_year", year(col("acct_open_date")).cast('int')) \
    .withColumn("credit_limit", regexp_replace(col("credit_limit"), r"[$]", "").cast('int')) \
    .withColumnRenamed("credit_limit", "credit_limit_dollar")

In [0]:
cards_date_df.display()

id,client_id,card_brand,card_type,card_number,card_expiry,cvv,has_chip,num_cards_issued,credit_limit_dollar,acct_open_date,year_pin_last_changed,card_on_dark_web,acct_open_month,acct_open_year
4524,825,Visa,Debit,4344676511950444,2022-12-01,623,YES,2,24295,2002-09-01,2008,No,9,2002
2731,825,Visa,Debit,4956965974959986,2020-12-01,393,YES,2,21968,2014-04-01,2014,No,4,2014
3701,825,Visa,Debit,4582313478255491,2024-02-01,719,YES,2,46414,2003-07-01,2004,No,7,2003
42,825,Visa,Credit,4879494103069057,2024-08-01,693,NO,1,12400,2003-01-01,2012,No,1,2003
4659,825,Mastercard,Debit (Prepaid),5722874738736011,2009-03-01,75,YES,1,28,2008-09-01,2009,No,9,2008
4537,1746,Visa,Credit,4404898874682993,2003-09-01,736,YES,1,27500,2003-09-01,2012,No,9,2003
1278,1746,Visa,Debit,4001482973848631,2022-07-01,972,YES,2,28508,2011-02-01,2011,No,2,2011
3687,1746,Mastercard,Debit,5627220683410948,2022-06-01,48,YES,2,9022,2003-07-01,2015,No,7,2003
3465,1746,Mastercard,Debit (Prepaid),5711382187309326,2020-11-01,722,YES,2,54,2010-06-01,2015,No,6,2010
3754,1746,Mastercard,Debit (Prepaid),5766121508358701,2023-02-01,908,YES,1,99,2006-07-01,2012,No,7,2006


In [0]:
cards_final_df = cards_date_df.drop("acct_open_date")

In [0]:
train_schema = StructType([
    StructField("trasaction_id", LongType()),
    StructField("is_fraud_label", StringType())
])

In [0]:
train_final_df = spark.read \
    .format("csv") \
        .schema(train_schema) \
        .load("dbfs:/FileStore/FinancialTransaction/train_raw")

In [0]:
train_final_df.show()

+-------------+--------------+
|trasaction_id|is_fraud_label|
+-------------+--------------+
|     10649266|            No|
|     23410063|            No|
|      9316588|            No|
|     12478022|            No|
|      9558530|            No|
|     12532830|            No|
|     19526714|            No|
|      9906964|            No|
|     13224888|            No|
|     13749094|            No|
|     12303776|            No|
|     19480376|            No|
|     11716050|            No|
|     20025400|            No|
|      7661688|            No|
|     16662807|            No|
|     21419778|            No|
|     18011186|            No|
|     23289598|            No|
|     11644547|            No|
+-------------+--------------+
only showing top 20 rows



In [0]:
user_df_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("current_age", IntegerType(), True),
    StructField("retirement_age", IntegerType(), True),
    StructField("birth_year", IntegerType(), True),
    StructField("birth_month", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("address", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("per_capita_income", StringType(), True),
    StructField("yearly_income", StringType(), True),
    StructField("total_debt", StringType(), True),
    StructField("credit_score", IntegerType(), True),
    StructField("num_credit_cards", IntegerType(), True)
])

In [0]:
user_df = spark.read \
    .format("csv") \
        .schema(user_df_schema) \
            .load("dbfs:/FileStore/FinancialTransaction/user_raw")

In [0]:
user_df.display()

id,current_age,retirement_age,birth_year,birth_month,gender,address,latitude,longitude,per_capita_income,yearly_income,total_debt,credit_score,num_credit_cards
825,53,66,1966,11,Female,462 Rose Lane,34.15,-117.76,$29278,$59696,$127613,787,5
1746,53,68,1966,12,Female,3606 Federal Boulevard,40.76,-73.74,$37891,$77254,$191349,701,5
1718,81,67,1938,11,Female,766 Third Drive,34.02,-117.89,$22681,$33483,$196,698,5
708,63,63,1957,1,Female,3 Madison Street,40.71,-73.99,$163145,$249925,$202328,722,4
1164,43,70,1976,9,Male,9620 Valley Stream Drive,37.76,-122.44,$53797,$109687,$183855,675,1
68,42,70,1977,10,Male,58 Birch Lane,41.55,-90.6,$20599,$41997,$0,704,3
1075,36,67,1983,12,Female,5695 Fifth Street,38.22,-85.74,$25258,$51500,$102286,672,3
1711,26,67,1993,12,Male,1941 Ninth Street,45.51,-122.64,$26790,$54623,$114711,728,1
1116,81,66,1938,7,Female,11 Spruce Avenue,40.32,-75.32,$26273,$42509,$2895,755,5
1752,34,60,1986,1,Female,887 Grant Street,29.97,-92.12,$18730,$38190,$81262,810,1


In [0]:
user_final_df = user_df.withColumn("per_capita_income", regexp_replace(col("per_capita_income"), r"[$]", "").cast('int')) \
    .withColumn("total_debt", regexp_replace(col("total_debt"), r"[$]", "").cast('int')) \
    .withColumn("yearly_income", regexp_replace(col("yearly_income"), r"[$]", "").cast('int')) \
    .withColumnRenamed("per_capita_income", "per_capita_income_dol") \
    .withColumnRenamed("total_debt", "total_debt_dol") \
    .withColumnRenamed("yearly_income", "yearly_income_dol")

In [0]:
user_final_df.display()

id,current_age,retirement_age,birth_year,birth_month,gender,address,latitude,longitude,per_capita_income_dol,yearly_income_dol,total_debt_dol,credit_score,num_credit_cards
825,53,66,1966,11,Female,462 Rose Lane,34.15,-117.76,29278,59696,127613,787,5
1746,53,68,1966,12,Female,3606 Federal Boulevard,40.76,-73.74,37891,77254,191349,701,5
1718,81,67,1938,11,Female,766 Third Drive,34.02,-117.89,22681,33483,196,698,5
708,63,63,1957,1,Female,3 Madison Street,40.71,-73.99,163145,249925,202328,722,4
1164,43,70,1976,9,Male,9620 Valley Stream Drive,37.76,-122.44,53797,109687,183855,675,1
68,42,70,1977,10,Male,58 Birch Lane,41.55,-90.6,20599,41997,0,704,3
1075,36,67,1983,12,Female,5695 Fifth Street,38.22,-85.74,25258,51500,102286,672,3
1711,26,67,1993,12,Male,1941 Ninth Street,45.51,-122.64,26790,54623,114711,728,1
1116,81,66,1938,7,Female,11 Spruce Avenue,40.32,-75.32,26273,42509,2895,755,5
1752,34,60,1986,1,Female,887 Grant Street,29.97,-92.12,18730,38190,81262,810,1


Fraud Trasactions

In [0]:
transaction_final_df.createOrReplaceTempView("trasactions")

In [0]:
spark.sql("select count(*) from trasactions").show()

+--------+
|count(1)|
+--------+
|13305915|
+--------+



In [0]:
mcc_final_df.createOrReplaceTempView("mcc")

In [0]:
spark.sql("select count(*) from mcc").show()

+--------+
|count(1)|
+--------+
|     109|
+--------+



In [0]:
train_final_df.createOrReplaceTempView("trainlabels")

In [0]:
user_final_df.createOrReplaceTempView("users")

In [0]:
cards_final_df.createOrReplaceTempView("cards")

In [0]:
spark.sql("""SELECT count(*) FROM trasactions a
          JOIN mcc b on a.mcc == b.mcc_code """).show()

+--------+
|count(1)|
+--------+
|13305915|
+--------+



In [0]:
trans_df = spark.sql("""SELECT a.id as transaction_id,d.is_fraud_label,a.merchent_id,a.client_id as user_id, a.amount_dollar, a.use_chip,b.description,c.address as user_address from trasactions a
          JOIN mcc b on a.mcc == b.mcc_code
          JOIN users c on a.client_id == c.id
          JOIN trainlabels d on a.id == d.trasaction_id
          """)

In [0]:
Fraud_transaction_df = trans_df.withColumn("is_trasaction_fraud", when(col("is_fraud_label") == 'Yes', "Fraud Transaction").otherwise("Trasaction Done"))

In [0]:
Fraud_transaction_df.show()

+--------------+--------------+-----------+-------+-------------+------------------+--------------------+--------------------+-------------------+
|transaction_id|is_fraud_label|merchent_id|user_id|amount_dollar|          use_chip|         description|        user_address|is_trasaction_fraud|
+--------------+--------------+-----------+-------+-------------+------------------+--------------------+--------------------+-------------------+
|       7475341|            No|      33326|   1797|        43.33| Swipe Transaction|Taxicabs and Limo...|391 Martin Luther...|    Trasaction Done|
|       7475347|            No|      61195|    114|        -64.0| Swipe Transaction|    Service Stations|      112 Elm Avenue|    Trasaction Done|
|       7475363|            No|      81536|    285|          7.8| Swipe Transaction|     Discount Stores|       702 Elm Drive|    Trasaction Done|
|       7475378|            No|      29232|   1575|        17.14| Swipe Transaction|Taxicabs and Limo...|   23 12th Bo

In [0]:
Fraud_transaction_df.repartition(1) \
    .write \
        .format("csv") \
        .mode("overwrite") \
            .option("path", "dbfs:/FileStore/FinancialTransaction/transactionsdoneAndFraudCSV") \
                .save()

Creating external table on top of trsactions

In [0]:
spark.sql("CREATE DATABASE Financialtrasaction")

Out[127]: DataFrame[]

In [0]:
spark.sql("CREATE TABLE Financialtrasaction.trasactions (transaction_id INT,is_fraud_label VARCHAR(50),merchent_id INT,user_id INT,amount_dollar FLOAT,use_chip VARCHAR(50),description VARCHAR(100),user_address VARCHAR(150),is_trasaction_fraud VARCHAR(50)) USING CSV LOCATION 'dbfs:/FileStore/FinancialTransaction/transactionsdoneAndFraudCSV'")

Out[146]: DataFrame[]

In [0]:
spark.sql("DROP TABLE Financialtrasaction.trasactions")

Out[145]: DataFrame[]

In [0]:
spark.sql("SELECT * FROM Financialtrasaction.trasactions").show()

+--------------+--------------+-----------+-------+-------------+------------------+--------------------+--------------------+-------------------+
|transaction_id|is_fraud_label|merchent_id|user_id|amount_dollar|          use_chip|         description|        user_address|is_trasaction_fraud|
+--------------+--------------+-----------+-------+-------------+------------------+--------------------+--------------------+-------------------+
|       7475341|            No|      33326|   1797|        43.33| Swipe Transaction|Taxicabs and Limo...|391 Martin Luther...|    Trasaction Done|
|       7475347|            No|      61195|    114|        -64.0| Swipe Transaction|    Service Stations|      112 Elm Avenue|    Trasaction Done|
|       7475363|            No|      81536|    285|          7.8| Swipe Transaction|     Discount Stores|       702 Elm Drive|    Trasaction Done|
|       7475378|            No|      29232|   1575|        17.14| Swipe Transaction|Taxicabs and Limo...|   23 12th Bo

In [0]:
spark.sql("DESCRIBE EXTENDED Financialtrasaction.trasactions").show(50,truncate=False)

+----------------------------+----------------------------------------------------------------+-------+
|col_name                    |data_type                                                       |comment|
+----------------------------+----------------------------------------------------------------+-------+
|transaction_id              |int                                                             |null   |
|is_fraud_label              |varchar(50)                                                     |null   |
|merchent_id                 |int                                                             |null   |
|user_id                     |int                                                             |null   |
|amount_dollar               |float                                                           |null   |
|use_chip                    |varchar(50)                                                     |null   |
|description                 |varchar(100)                      

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1869086002673465>:1[0m
[0;32m----> 1[0m host_name [38;5;241m=[39m dbutils[38;5;241m.[39mnotebook[38;5;241m.[39mentry_point[38;5;241m.[39mgetDbutils()[38;5;241m.[39mnotebook()[38;5;241m.[39mgetContext()[38;5;241m.[39mtags()[38;5;241m.[39mapply([38;5;124m'[39m[38;5;124mVikas[39m[38;5;124m'[39m)
[1;32m      2[0m [38;5;28mprint[39m(host_name)

File [0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321[0m, in [0;36mJavaMember.__call__[0;34m(self, *args)[0m
[1;32m   1315[0m command [38;5;241m=[39m proto[38;5;241m.[39mCALL_COMMAND_NAME [38;5;241m+[39m\
[1;32m   1316[0m     [38;5;28mself[39m[38;5;241m.[39mcommand_header [38;5;241m+[39m\
[1;32m   1317[0m     args_command [38;5;241m+[39m\
[1;32m   1318[0m     proto[38;5

Loan Risk