In [2]:

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()


In [3]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("BigDataFraudDetection").getOrCreate()
spark


In [4]:
from google.colab import files
uploaded = files.upload()

Saving bankdataset.xlsx.zip to bankdataset.xlsx.zip


In [5]:
!unzip -o bankdataset.xlsx.zip


Archive:  bankdataset.xlsx.zip
  inflating: bankdataset.xlsx        


In [6]:
!pip install openpyxl




In [7]:
import pandas as pd

# Read Excel file (already uploaded and extracted)
excel_data = pd.read_excel("bankdataset.xlsx", engine="openpyxl")

# Save to CSV again
excel_data.to_csv("bankdataset.csv", index=False)


In [8]:
import os

# Check if the file is there
os.listdir()


['.config',
 'bankdataset.csv',
 'spark-3.4.1-bin-hadoop3.tgz',
 'bankdataset.xlsx.zip',
 'bankdataset.xlsx',
 'spark-3.4.1-bin-hadoop3',
 'sample_data']

In [9]:
df = spark.read.csv("bankdataset.csv", header=True, inferSchema=True)
df.show(5)


+----------+-------------+--------+------+-----------------+
|      Date|       Domain|Location| Value|Transaction_count|
+----------+-------------+--------+------+-----------------+
|2022-01-01|    RESTRAUNT|    Bhuj|365554|             1932|
|2022-01-01|  INVESTMENTS|Ludhiana|847444|             1721|
|2022-01-01|       RETAIL|     Goa|786941|             1573|
|2022-01-01|INTERNATIONAL| Mathura|368610|             2049|
|2022-01-01|    RESTRAUNT| Madurai|615681|             1519|
+----------+-------------+--------+------+-----------------+
only showing top 5 rows



In [10]:
df.printSchema()


root
 |-- Date: date (nullable = true)
 |-- Domain: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Value: integer (nullable = true)
 |-- Transaction_count: integer (nullable = true)



In [11]:
df.show(5, truncate=False)


+----------+-------------+--------+------+-----------------+
|Date      |Domain       |Location|Value |Transaction_count|
+----------+-------------+--------+------+-----------------+
|2022-01-01|RESTRAUNT    |Bhuj    |365554|1932             |
|2022-01-01|INVESTMENTS  |Ludhiana|847444|1721             |
|2022-01-01|RETAIL       |Goa     |786941|1573             |
|2022-01-01|INTERNATIONAL|Mathura |368610|2049             |
|2022-01-01|RESTRAUNT    |Madurai |615681|1519             |
+----------+-------------+--------+------+-----------------+
only showing top 5 rows



In [12]:
from pyspark.sql import functions as F


In [13]:
# Check if there are any missing/null values
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+----+------+--------+-----+-----------------+
|Date|Domain|Location|Value|Transaction_count|
+----+------+--------+-----+-----------------+
|   0|     0|       0|    0|                0|
+----+------+--------+-----+-----------------+



In [14]:
# Create a new column: average value per transaction
df = df.withColumn("Average_Per_Transaction", F.col("Value") / F.col("Transaction_count"))

# Show the updated table
df.select("Date", "Domain", "Location", "Value", "Transaction_count", "Average_Per_Transaction").show(5, truncate=False)


+----------+-------------+--------+------+-----------------+-----------------------+
|Date      |Domain       |Location|Value |Transaction_count|Average_Per_Transaction|
+----------+-------------+--------+------+-----------------+-----------------------+
|2022-01-01|RESTRAUNT    |Bhuj    |365554|1932             |189.21014492753622     |
|2022-01-01|INVESTMENTS  |Ludhiana|847444|1721             |492.4137129575828      |
|2022-01-01|RETAIL       |Goa     |786941|1573             |500.2803560076287      |
|2022-01-01|INTERNATIONAL|Mathura |368610|2049             |179.89751098096633     |
|2022-01-01|RESTRAUNT    |Madurai |615681|1519             |405.31994733377223     |
+----------+-------------+--------+------+-----------------+-----------------------+
only showing top 5 rows



In [15]:
# Extract day of the week from the Date column
df = df.withColumn("Day_Of_Week", F.date_format("Date", "EEEE"))

# Show updated table with new column
df.select("Date", "Day_Of_Week", "Domain", "Location", "Value").show(10, truncate=False)


+----------+-----------+-------------+---------+-------+
|Date      |Day_Of_Week|Domain       |Location |Value  |
+----------+-----------+-------------+---------+-------+
|2022-01-01|Saturday   |RESTRAUNT    |Bhuj     |365554 |
|2022-01-01|Saturday   |INVESTMENTS  |Ludhiana |847444 |
|2022-01-01|Saturday   |RETAIL       |Goa      |786941 |
|2022-01-01|Saturday   |INTERNATIONAL|Mathura  |368610 |
|2022-01-01|Saturday   |RESTRAUNT    |Madurai  |615681 |
|2022-01-01|Saturday   |INTERNATIONAL|Daman    |1191092|
|2022-01-01|Saturday   |INTERNATIONAL|Buxar    |968883 |
|2022-01-01|Saturday   |PUBLIC       |Trichy   |1030297|
|2022-01-01|Saturday   |RESTRAUNT    |Kullu    |688655 |
|2022-01-01|Saturday   |MEDICAL      |Hyderabad|1174302|
+----------+-----------+-------------+---------+-------+
only showing top 10 rows



In [16]:
# Save your Spark DataFrame to CSV
df.write.option("header", True).csv("final_transaction_data")


In [17]:
# Zip all parts into one file
!zip -r final_transaction_data.zip final_transaction_data


  adding: final_transaction_data/ (stored 0%)
  adding: final_transaction_data/_SUCCESS (stored 0%)
  adding: final_transaction_data/part-00001-f88acb75-1159-4d57-a829-e882f07801af-c000.csv (deflated 70%)
  adding: final_transaction_data/part-00000-f88acb75-1159-4d57-a829-e882f07801af-c000.csv (deflated 70%)
  adding: final_transaction_data/.part-00001-f88acb75-1159-4d57-a829-e882f07801af-c000.csv.crc (deflated 0%)
  adding: final_transaction_data/._SUCCESS.crc (stored 0%)
  adding: final_transaction_data/.part-00000-f88acb75-1159-4d57-a829-e882f07801af-c000.csv.crc (deflated 0%)


In [18]:
from google.colab import files
files.download("final_transaction_data.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [19]:
import pandas as pd

# Load the final CSV file from Spark output
df = pd.read_csv("bankdataset.csv")

# Create 'Average_Per_Transaction' column if not present
df["Average_Per_Transaction"] = df["Value"] / df["Transaction_count"]

# Simulate Risk Flag (1 = risky, 0 = safe)
df["Risk_Flag"] = df["Average_Per_Transaction"].apply(lambda x: 1 if x > 500 else 0)

df.head()


Unnamed: 0,Date,Domain,Location,Value,Transaction_count,Average_Per_Transaction,Risk_Flag
0,2022-01-01,RESTRAUNT,Bhuj,365554,1932,189.210145,0
1,2022-01-01,INVESTMENTS,Ludhiana,847444,1721,492.413713,0
2,2022-01-01,RETAIL,Goa,786941,1573,500.280356,1
3,2022-01-01,INTERNATIONAL,Mathura,368610,2049,179.897511,0
4,2022-01-01,RESTRAUNT,Madurai,615681,1519,405.319947,0


In [20]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline


In [21]:
# Features we want to use
features = ["Domain", "Location", "Value", "Transaction_count"]

# Target column: Risk_Flag (1 = risky, 0 = normal)
X = df[features]
y = df["Risk_Flag"]


In [22]:
# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [23]:
# Encode categorical columns using OneHotEncoder
categorical_features = ["Domain", "Location"]
numerical_features = ["Value", "Transaction_count"]

preprocessor = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown='ignore'), categorical_features),
    ],
    remainder='passthrough'  # Keep numerical columns as-is
)

# Create the pipeline: preprocessing + model
pipeline = Pipeline(steps=[
    ("preprocess", preprocessor),
    ("model", LogisticRegression())
])


In [24]:
pipeline.fit(X_train, y_train)


The format of the columns of the 'remainder' transformer in ColumnTransformer.transformers_ will change in version 1.7 to match the format of the other transformers.
At the moment the remainder columns are stored as indices (of type int). With the same ColumnTransformer configuration, in the future they will be stored as column names (of type str).



In [25]:
# Predict on the full dataset (all 1 million rows)
y_pred_all = pipeline.predict(X)

# Add prediction back to the original DataFrame
df["Predicted_Risk"] = y_pred_all

# Save full dataset with predictions
df.to_csv("full_predictions.csv", index=False)

# Download
from google.colab import files
files.download("full_predictions.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>