In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark import SparkContext
from pyspark.conf import SparkConf

In [4]:
conf = SparkConf()

    # Download mongo-spark-connector and its dependencies.
conf.set("spark.jars.packages",
             "org.mongodb.spark:mongo-spark-connector:10.0.1")
mongo_conn = f"mongodb://localhost:27017/fraud_detection.my_collection"
    # Set up read connection :
conf.set("spark.mongodb.read.connection.uri", mongo_conn)
conf.set("spark.mongodb.write.database", "fraud_detecion")
conf.set("spark.mongodb.write.collection", "my_collection")

    # Set up write connection
conf.set("spark.mongodb.read.connection.uri", mongo_conn)
conf.set("spark.mongodb.write.database", "fraud_detecion")
conf.set("spark.mongodb.write.collection", "my_collection")
    # If you need to update instead of inserting :
conf.set("spark.mongodb.write.operationType", "update")
SparkContext(conf=conf)

spark= SparkSession \
        .builder \
        .appName('myApp') \
        .getOrCreate()

:: loading settings :: url = jar:file:/opt/apache-spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/fernblade/.ivy2/cache
The jars for the packages stored in: /home/fernblade/.ivy2/jars
org.mongodb.spark#mongo-spark-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9bf92a5c-de3a-416e-8a32-9995ce34c9bd;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector;10.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.5.1 in central
	[4.5.1] org.mongodb#mongodb-driver-sync;[4.5.0,4.5.99)
	found org.mongodb#bson;4.5.1 in central
	found org.mongodb#mongodb-driver-core;4.5.1 in central
:: resolution report :: resolve 1697ms :: artifacts dl 8ms
	:: modules in use:
	org.mongodb#bson;4.5.1 from central in [default]
	org.mongodb#mongodb-driver-core;4.5.1 from central in [default]
	org.mongodb#mongodb-driver-sync;4.5.1 from central in [default]
	org.mongodb.spark#mongo-spark-connector;10.0.1 from central in [default]
	---------------------------------------------------------------------
	|               

23/04/26 16:16:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
df = spark.read.format("mongodb").load()
# df.repartition(100)

In [6]:
df.rdd.getNumPartitions()

16

In [5]:
# spark = SparkSession.builder.appName("FraudDetection").getOrCreate()



df1 = spark.read.csv("./HI-Small_Trans.csv", header=True, inferSchema=True)
df2 = spark.read.csv("./LI-Small_Trans.csv", header=True, inferSchema=True)

# # Perform join operation
df = df1.union(df2)
# # Write data to Cassandra




                                                                                

In [None]:
df = df.repartition(1)

In [7]:
df.count()

                                                                                

12002394

In [8]:
df.head()

Row(Timestamp='2022/09/01 00:20', From Bank=10, From_Account='8000EBD30', To Bank=10, To_Account='8000EBD30', Amount Received=3697.34, Receiving Currency='US Dollar', Amount Paid=3697.34, Payment Currency='US Dollar', Payment Format='Reinvestment', Is Laundering=0)

In [9]:
# Drop rows with missing values
df = df.na.drop()

# Fill missing values with a default value
df = df.na.fill({"Amount Received": 0, "Amount Paid": 0})

# Replace missing values with a specified value
df = df.na.replace(["Reinvestment"], ["Unknown"], "Payment Format")


In [10]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("Timestamp", to_timestamp(col("Timestamp"), "yyyy/MM/dd HH:mm"))


In [11]:
df.head(5)

[Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 20), From Bank=10, From_Account='8000EBD30', To Bank=10, To_Account='8000EBD30', Amount Received=3697.34, Receiving Currency='US Dollar', Amount Paid=3697.34, Payment Currency='US Dollar', Payment Format='Unknown', Is Laundering=0),
 Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 20), From Bank=3208, From_Account='8000F4580', To Bank=1, To_Account='8000F5340', Amount Received=0.01, Receiving Currency='US Dollar', Amount Paid=0.01, Payment Currency='US Dollar', Payment Format='Cheque', Is Laundering=0),
 Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 0), From Bank=3209, From_Account='8000F4670', To Bank=3209, To_Account='8000F4670', Amount Received=14675.57, Receiving Currency='US Dollar', Amount Paid=14675.57, Payment Currency='US Dollar', Payment Format='Unknown', Is Laundering=0),
 Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 2), From Bank=12, From_Account='8000F5030', To Bank=12, To_Account='8000F5030', Amount Received=2806.97, Recei

In [12]:
from pyspark.sql.types import DoubleType

# Convert Amount Received and Amount Paid columns to DoubleType
df = df.withColumn("Amount Received", col("Amount Received").cast(DoubleType()))
df = df.withColumn("Amount Paid", col("Amount Paid").cast(DoubleType()))


In [13]:
from pyspark.sql.functions import hour, dayofmonth, month, year
from pyspark.sql.types import IntegerType

# Convert Timestamp column to TimestampType
df = df.withColumn("Timestamp", df["Timestamp"].cast("timestamp"))

# Extract Hour, Day, Month, and Year from Timestamp
df = df.withColumn("Hour", hour(df["Timestamp"]).cast(IntegerType()))
df = df.withColumn("Day", dayofmonth(df["Timestamp"]).cast(IntegerType()))
df = df.withColumn("Month", month(df["Timestamp"]).cast(IntegerType()))
df = df.withColumn("Year", year(df["Timestamp"]).cast(IntegerType()))


In [14]:
from pyspark.ml.feature import StringIndexer

# Create StringIndexer for Payment Format
payment_format_indexer = StringIndexer(inputCol="Payment Format", outputCol="Payment Format Index")

# Fit and transform the DataFrame
df = payment_format_indexer.fit(df).transform(df)

# Drop the original Payment Format column
df = df.drop("Payment Format")


                                                                                

In [15]:
receiving_currency_indexer = StringIndexer(inputCol="Receiving Currency", outputCol="Receiving Currency Index")

# Create StringIndexer for Payment Currency
payment_currency_indexer = StringIndexer(inputCol="Payment Currency", outputCol="Payment Currency Index")

# Fit and transform the DataFrame
df = receiving_currency_indexer.fit(df).transform(df)
df = payment_currency_indexer.fit(df).transform(df)

# Drop the original Receiving Currency and Payment Currency columns
df = df.drop("Receiving Currency", "Payment Currency")

                                                                                

In [16]:
df.head(5)

[Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 20), From Bank=10, From_Account='8000EBD30', To Bank=10, To_Account='8000EBD30', Amount Received=3697.34, Amount Paid=3697.34, Is Laundering=0, Hour=0, Day=1, Month=9, Year=2022, Payment Format Index=4.0, Receiving Currency Index=0.0, Payment Currency Index=0.0),
 Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 20), From Bank=3208, From_Account='8000F4580', To Bank=1, To_Account='8000F5340', Amount Received=0.01, Amount Paid=0.01, Is Laundering=0, Hour=0, Day=1, Month=9, Year=2022, Payment Format Index=0.0, Receiving Currency Index=0.0, Payment Currency Index=0.0),
 Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 0), From Bank=3209, From_Account='8000F4670', To Bank=3209, To_Account='8000F4670', Amount Received=14675.57, Amount Paid=14675.57, Is Laundering=0, Hour=0, Day=1, Month=9, Year=2022, Payment Format Index=4.0, Receiving Currency Index=0.0, Payment Currency Index=0.0),
 Row(Timestamp=datetime.datetime(2022, 9, 1, 0, 2), From Bank=12

In [17]:
df=df.drop("TimeStamp")

In [18]:
df=df.drop("From_Account","To_Account")

In [19]:
print(df)

DataFrame[From Bank: int, To Bank: int, Amount Received: double, Amount Paid: double, Is Laundering: int, Hour: int, Day: int, Month: int, Year: int, Payment Format Index: double, Receiving Currency Index: double, Payment Currency Index: double]


In [20]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Extract X_train, y_train, X_test, y_test from the DataFrames
X_train = train_data.drop("Is Laundering")  # Drop the 'Is Laundering' column to get features
y_train = train_data.select("Is Laundering")  # Select only the 'Is Laundering' column as the label
X_test = test_data.drop("Is Laundering")
y_test = test_data.select("Is Laundering")


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assemble features into a vector column
assembler = VectorAssembler(inputCols=X_train.columns, outputCol="features")

# Create a Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="Is Laundering")

# Create a pipeline with the assembler and the Decision Tree classifier
pipeline = Pipeline(stages=[assembler, dt])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the model's performance



[Stage 24:>                                                        (0 + 8) / 16]

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import IntegerType
evaluator = MulticlassClassificationEvaluator(labelCol="Is Laundering", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.4f}".format(accuracy))



