In [None]:
import pandas as pd
from faker import Faker
import random

fake = Faker()

# Generate customer data
customers = []
for _ in range(100):
    customer = {
        'cc_num': fake.credit_card_number(card_type=None),
        'first': fake.first_name(),
        'last': fake.last_name(),
        'gender': fake.random_element(elements=('M', 'F')),
        'street': fake.street_address(),
        'city': fake.city(),
        'state': fake.state(),
        'zip': fake.zipcode(),
        'lat': fake.latitude(),
        'long': fake.longitude(),
        'job': fake.job(),
        'dob': fake.date_of_birth(minimum_age=18, maximum_age=90).strftime('%Y-%m-%d')
    }
    customers.append(customer)

df_customers = pd.DataFrame(customers)
df_customers.to_csv('customer.csv', index=False)

# Display first few rows to ensure customers are generated correctly
print(df_customers.head())


In [None]:
pip install pandas


In [None]:
import pandas as pd
import random
from faker import Faker
from datetime import datetime
import time

fake = Faker()

# Load customer data
df_customers = pd.read_csv('customer.csv')

transaction_data = []

# Define some categories for transactions
categories = ['food', 'entertainment', 'groceries', 'gas_transport', 'clothing', 'health', 'misc_net', 'misc_pos']

# Generate transaction data based on customers
for _ in range(10000):
    customer = df_customers.sample(1).iloc[0]
    transaction_date = fake.date_time_this_year()
    transaction = {
        'cc_num': customer['cc_num'],
        'first': customer['first'],
        'last': customer['last'],
        'trans_num': fake.uuid4(),
        'trans_date': transaction_date.strftime('%Y-%m-%d'),
        'trans_time': transaction_date.strftime('%H:%M:%S'),
        'unix_time': int(time.mktime(transaction_date.timetuple())),
        'category': random.choice(categories),
        'amt': round(random.uniform(1, 1000), 2),
        'merchant': fake.company(),
        'merch_lat': fake.latitude(),
        'merch_long': fake.longitude(),
        'is_fraud': random.choice([0, 1])
    }
    transaction_data.append(transaction)

df_transactions = pd.DataFrame(transaction_data)
df_transactions.to_csv('transaction_training.csv', index=False)

# Display first few rows to ensure transactions are generated correctly
print(df_transactions.head())


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, DoubleType
from math import sqrt
import datetime

# Your MongoDB Atlas connection string with the database name
atlas_connection_string = "mongodb+srv://mungunoble:ZXeFVujGHAJszjWH@cluster0.8zkuwy4.mongodb.net/rtfs?retryWrites=true&w=majority&appName=Cluster0"

spark = SparkSession.builder \
    .appName("FraudDetection") \
    .config("spark.mongodb.input.uri", f"{atlas_connection_string}&collection=customers") \
    .config("spark.mongodb.output.uri", f"{atlas_connection_string}&collection=customers") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

csv_file_path = "customer.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Select the first 10 customers
first_10_customers = df.limit(10)

# Write the first 10 customers to MongoDB
first_10_customers.write.format("mongo").mode("append").option("database", "rtfs").option("collection", "customers").save()

# Print the message
print("First 10 customers posted into DB")

spark.stop()



# # Define UDFs to calculate age and distance
# def calculate_age(dob):
#     return datetime.datetime.now().year - datetime.datetime.strptime(dob, '%Y-%m-%d').year

# def calculate_distance(lat1, lon1, lat2, lon2):
#     return sqrt((lat1 - lat2)**2 + (lon1 - lon2)**2)

# age_udf = udf(calculate_age, IntegerType())
# distance_udf = udf(calculate_distance, DoubleType())

# # Read the CSV files into Spark DataFrames
# df_customers = spark.read.csv("customer.csv", header=True, inferSchema=True)
# df_transactions = spark.read.csv("transaction_training.csv", header=True, inferSchema=True)
# # Calculate age
# df_customers = df_customers.withColumn('age', age_udf(col('dob')))

# # Join transaction data with customer data to get customer latitude and longitude
# data = df_transactions.join(df_customers, df_transactions.cc_num == df_customers.cc_num)

# # Calculate distance using customer and merchant locations
# data = data.withColumn('distance', distance_udf(
#     col('lat'), col('long'), col('merch_lat'), col('merch_long')
# ))

# # Write data to MongoDB
# try:
#     data.write.format("mongo").mode("append").save()
#     print("Data has been posted in MongoDB")
# except Exception as e:
#     print(f"An error occurred: {e}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, year, lit
from pyspark.sql.types import IntegerType, DoubleType
import datetime
import math

# MongoDB Atlas connection string
atlas_connection_string = "mongodb+srv://mungunoble:ZXeFVujGHAJszjWH@cluster0.8zkuwy4.mongodb.net/rtfs?retryWrites=true&w=majority&appName=Cluster0"

# UDF to calculate age based on birth year
def calculate_age(birth_year):
    today = datetime.date.today()
    current_year = today.year
    return current_year - birth_year

calculate_age_udf = udf(calculate_age, IntegerType())

# UDF to calculate distance using the Haversine formula
def calculate_distance(customer_lat, customer_lon, merchant_lat, merchant_lon):
    earth_radius = 6371  # Kilometers

    # Convert to radians
    customer_lat_rad = customer_lat * math.pi / 180
    customer_lon_rad = customer_lon * math.pi / 180
    merchant_lat_rad = merchant_lat * math.pi / 180
    merchant_lon_rad = merchant_lon * math.pi / 180

    # Calculate distance formula
    dlon = merchant_lon_rad - customer_lon_rad
    dlat = merchant_lat_rad - customer_lat_rad
    a = math.sin(dlat / 2) ** 2 + math.cos(customer_lat_rad) * math.cos(merchant_lat_rad) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = earth_radius * c

    return distance

calculate_distance_udf = udf(calculate_distance, DoubleType())

spark = SparkSession.builder \
    .appName("FraudDetection") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Read customer data (replace with your actual path and format)
customer_df = spark.read.csv("customer.csv", header=True, inferSchema=True)

# Read transaction data (replace with your actual path and format)
transaction_df = spark.read.csv("transaction_training.csv", header=True, inferSchema=True)

# Add 'age' feature based on date of birth
# Assuming 'dob' is in 'yyyy-MM-dd' format and you need to extract the year
customer_df = customer_df.withColumn("birth_year", year(col("dob")))

# Calculate age using the birth year
customer_df = customer_df.withColumn("age", calculate_age_udf(col("birth_year")))

# Join customer and transaction data using cc_num (assuming it's unique)
joined_df = transaction_df.join(customer_df, on="cc_num", how="left")

# Add 'distance' feature using Euclidean distance (replace column names)
joined_df = joined_df.withColumn(
    "distance",
    calculate_distance_udf(
        col("lat"), col("long"),  # Customer location columns
        col("merch_lat"), col("merch_long")  # Merchant location columns
    )
)

# Split data into fraud and non-fraud based on a "is_fraud" column (replace with your actual column)
fraud_df = joined_df.where(col("is_fraud") == lit(True))
non_fraud_df = joined_df.where(col("is_fraud") == lit(False))

# Write fraud data to MongoDB
fraud_df.write.format("mongo") \
    .mode("append") \
    .option("uri", atlas_connection_string) \
    .option("database", "rtfs") \
    .option("collection", "fraud") \
    .save()

# Write non-fraud data to MongoDB
non_fraud_df.write.format("mongo") \
    .mode("append") \
    .option("uri", atlas_connection_string) \
    .option("database", "rtfs") \
    .option("collection", "non-fraud") \
    .save()

spark.stop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, col, rand
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# MongoDB Atlas connection string
atlas_connection_string = "mongodb+srv://mungunoble:ZXeFVujGHAJszjWH@cluster0.8zkuwy4.mongodb.net/rtfs?retryWrites=true&w=majority&appName=Cluster0"

num_non_fraud_transactions = 1000 
num_fraud_transactions = 500 

# Initialize Spark session
spark = SparkSession.builder \
.appName("FraudDetection") \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.getOrCreate()

# Read fraud and non-fraud data from MongoDB
fraud_df = spark.read.format("mongo") \
    .option("uri", atlas_connection_string) \
    .option("database", "rtfs") \
    .option("collection", "fraud") \
    .load()

non_fraud_df = spark.read.format("mongo") \
    .option("uri", atlas_connection_string) \
    .option("database", "rtfs") \
    .option("collection", "non-fraud") \
    .load()

# Combine fraud and non-fraud data
combined_df = fraud_df.union(non_fraud_df)

# Ensure the label column is of the correct type
combined_df = combined_df.withColumn("is_fraud", col("is_fraud").cast("double"))

# Balancing the data
fraud_oversampled_df = fraud_df.withColumn("weight", rand()).orderBy("weight").limit(num_non_fraud_transactions)

# Undersampling the non-fraud transactions
non_fraud_undersampled_df = non_fraud_df.withColumn("weight", rand()).orderBy("weight").limit(num_fraud_transactions)

# Combine oversampled fraud and undersampled non-fraud transactions
balanced_df = fraud_oversampled_df.union(non_fraud_undersampled_df)

# Remove the temporary weight column
balanced_df = balanced_df.drop("weight")

# Define columns for transformation
categorical_cols = ["category", "merchant", "gender", "job"]
date_cols = ["trans_date", "dob"]
time_cols = ["trans_time"]
numeric_cols = ["amt", "lat", "long", "merch_lat", "merch_long"]

# Extract features from date and time columns
for col_name in date_cols:   
    balanced_df = balanced_df.withColumn(f"{col_name}_year", year(col(col_name)))
    balanced_df = balanced_df.withColumn(f"{col_name}_month", month(col(col_name)))
    balanced_df = balanced_df.withColumn(f"{col_name}_day", dayofmonth(col(col_name)))

for col_name in time_cols:
    balanced_df = balanced_df.withColumn(f"{col_name}_hour", hour(col(col_name)))
    balanced_df = balanced_df.withColumn(f"{col_name}_minute", minute(col(col_name)))

# Create StringIndexer and OneHotEncoder stages for categorical columns
string_indexer_stages = [StringIndexer(inputCol=col, outputCol=col + "_indexed") for col in categorical_cols]
onehot_encoder_stages = [OneHotEncoder(inputCol=col + "_indexed", outputCol=col + "_encoded") for col in categorical_cols]

# Combine all feature columns
feature_cols = numeric_cols + \
            [col + "_year" for col in date_cols] + \
            [col + "_month" for col in date_cols] + \
            [col + "_day" for col in date_cols] + \
            [col + "_hour" for col in time_cols] + \
            [col + "_minute" for col in time_cols] + \
            [col + "_encoded" for col in categorical_cols]

# Create VectorAssembler to combine all encoded features
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Define the classifier
classifier = RandomForestClassifier(labelCol="is_fraud", featuresCol="features")

# Create the pipeline
pipeline = Pipeline(stages=string_indexer_stages + onehot_encoder_stages + [vector_assembler, classifier])

# Split data into training and testing sets
training_df, testing_df = balanced_df.randomSplit([0.8, 0.2], seed=42)

# Train the model on the training data
# Train the model on the training data
model = pipeline.fit(training_df)

# Evaluate the model on the testing data
predictions = model.transform(testing_df)



# Select example rows to display.
predictions.select("prediction", "is_fraud", "probability").show(5)

# Instantiate an evaluator for multiclass classification.
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")

# Calculate precision, recall, and F1-score
precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
f1 = evaluator.setMetricName("f1").evaluate(predictions)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")

# Calculate AUC-ROC
auc_roc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print(f"AUC-ROC: {auc_roc:.4f}")

# Train the model on the full dataset
model = pipeline.fit(balanced_df)

# Save the model
model_path = "Model"
model.write().overwrite().save(model_path)

spark.stop()
