# Small Data

In [6]:
try:
  from pyspark.sql import SparkSession
  pyspark_available = 'Y'
except:
  pyspark_available = 'N'

# If PySpark is not installed, then go through all these steps

if pyspark_available == 'N':
  # Update Installer
  !apt-get update

  # Intsall Java
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null

  # install spark (change the version number if needed)
  !wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

  # unzip the spark file to the current folder
  !tar xf spark-3.0.0-bin-hadoop3.2.tgz

  # set your spark folder to your system path environment.
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

  # install findspark using pip
  !pip install -q findspark

  import findspark
  findspark.init()

  from pyspark.sql import SparkSession
  spark = SparkSession.builder.master("local[*]").getOrCreate()

  # To access Google Cloud Storage
  from google.cloud import storage
  import google.auth

  !pip install gcsfs
  import gcsfs

  from google.colab import auth
  auth.authenticate_user()

  credentials, default_project_id = google.auth.default()
  !gcloud config set project {project_id}
else:
    # Spark / PySpark already pre-installed in the environment
    print("PySpark already pre-installed!")

PySpark already pre-installed!


In [8]:
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, HashingTF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import lit

# Normally, we will not need Pandas if we are working with PySpark (since PySpark provides the dataframe capabilities)
# However, we will need pandas just for one step in this task: for reading files from Google Cloud Storage
#     This is because it's not straightforward to set the configurations correctly for letting spark read from GCS
from pandas import DataFrame, read_csv
# Added this line in SP24 since pandas removed iteritems from DataFrame object in 2024
DataFrame.iteritems = DataFrame.items

from google.cloud import storage

client = storage.Client()

print(f"Package imports done")

Package imports done


In [9]:
 #Configure the following to use YOUR GCP setup

# 1. Configure the Project ID (not Project Name!!!) as per your GCP Dataproc setup
project_id = 'colivar8-cis415-fall24a'

# 2. Configure Bucket name as per your Google Cloud Storage setup
bucket = 'colivar8_data_for_gcp_labs'

# 3. Configure the path to the movie reviews data file as per your Google Cloud Storage setup
#    If your setup is exactly as per the instructions in GCP Lab 1c and in this lab:
#       --- you will not need to make any changes to the below line.
#    If your setup is different (due to whatever reason - doesn't matter),
#       --- just update the below line to reflect the path as per YOUR Google Cloud Storage setup
path_to_data_files = "/data_for_fraud_project/"

# 3. Configure the appropriate data file to be used for the task
#       Uncomment one of the two lines below based on the following:
#          In Google Colab, you should build/test with SMALL DATA
#          In GCP, first you should run with SMALL DATA
#             and finally, you should run with BIG DATA

#fraud_detection = "big_fraud_detection_dataset.csv"
fraud_detection = "small_fraud_detection_dataset.csv"

# Lastly, we will build the full path of the data file and confirm it's correct
# You do not need to change this line
full_file_path = "gs://" + bucket + path_to_data_files + fraud_detection

# Let's print out all the configurations and ensure that they are correct
print(f"ProjectID (and not the Project Name) is: {project_id}")
print(f"Bucket name is: {bucket}")
if fraud_detection == "small_fraud_detection_dataset.csv":
  print(f"We will run this task for SMALL DATA ({fraud_detection})")
elif fraud_detection == "big_fraud_detection_dataset.csv":
  print(f"We will run this task for BIG DATA ({fraud_detection})")
else:
  print("-"*20)
  print(f"Incorrect data file name - {fraud_detection}!! CHECK & FIX!!!")
  print("-"*20)

print(f"Full path to the data file is {full_file_path}")

ProjectID (and not the Project Name) is: colivar8-cis415-fall24a
Bucket name is: colivar8_data_for_gcp_labs
We will run this task for SMALL DATA (small_fraud_detection_dataset.csv)
Full path to the data file is gs://colivar8_data_for_gcp_labs/data_for_fraud_project/small_fraud_detection_dataset.csv


In [10]:
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import split, col

In [12]:
# Read the data file
print(f"Reading the data file: {full_file_path}")

pandas_df = read_csv(full_file_path, sep=",")
print(f"Pandas data frame is ready with the data.  Now converting it to a Spark Dataframe")
cleaned_pandas_df = pandas_df.dropna()

spark_df = spark.createDataFrame(cleaned_pandas_df)
print(f"Spark data frame is ready with the data.  Let's check the first few rows...")

# Check the first few records in the data
spark_df.show()

# How many records got loaded?
print(f"Total number of records from data file = {spark_df.count()}")

Reading the data file: gs://colivar8_data_for_gcp_labs/data_for_fraud_project/small_fraud_detection_dataset.csv
Pandas data frame is ready with the data.  Now converting it to a Spark Dataframe
Spark data frame is ready with the data.  Let's check the first few rows...


24/10/10 04:17:11 WARN TaskSetManager: Stage 0 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|Transaction_Amount|Transaction_Type|Account_Age|Transaction_Location|Transaction_Time|Fraudulent|Customer_ID|Loan_Amount|Loan_Purpose|
+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|           2199.93|        In-store|       21.0|             Chicago|            0:57|         0|  CUST91186|   36021.94|    Business|
|            875.52|        In-store|       18.0|         Los Angeles|           11:41|         0|  CUST15505|   10584.69|    Personal|
|           1109.42|        In-store|       16.0|       San Francisco|           17:26|         0|  CUST48110|   91714.73|   Education|
|           4052.24|        In-store|        8.0|             Houston|           23:43|         0|  CUST15744|   36170.59|   Education|
|            571.92|          Online|       14.0

24/10/10 04:17:18 WARN TaskSetManager: Stage 1 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.

Total number of records from data file = 49961


                                                                                

In [13]:
# Code that GPT gave me for step 2 in the project
# Step 2: Load the small dataset
data = spark_df

# Step 3: EDA - Display dataset schema and first few rows
data.printSchema()
data.show(5)

# Step 5: Feature engineering
# Convert categorical variables into numeric format using StringIndexer
indexer = StringIndexer(inputCols=["Transaction_Type", "Transaction_Location"],
                        outputCols=["Transaction_Type_Index", "Transaction_Location_Index"])
data = indexer.fit(data).transform(data)

# Step 6: Feature selection
# Select relevant features and target variable for modeling
assembler = VectorAssembler(
    inputCols=["Transaction_Amount", "Account_Age", "Transaction_Type_Index", "Transaction_Location_Index"],
    outputCol="features"
)
data = assembler.transform(data).select("features", "Fraudulent")

# Step 7: Splitting data into training and validation sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 8: Training the first model - Logistic Regression
lr = LogisticRegression(labelCol="Fraudulent", featuresCol="features")
lr_model = lr.fit(train_data)

# Step 9: Training the second model - Random Forest
rf = RandomForestClassifier(labelCol="Fraudulent", featuresCol="features", numTrees=100)
rf_model = rf.fit(train_data)

# Step 10: Evaluating both models
evaluator = BinaryClassificationEvaluator(labelCol="Fraudulent", rawPredictionCol="rawPrediction")

# Logistic Regression evaluation
lr_predictions = lr_model.transform(test_data)
lr_auc = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression AUC: {lr_auc}")

# Random Forest evaluation
rf_predictions = rf_model.transform(test_data)
rf_auc = evaluator.evaluate(rf_predictions)
print(f"Random Forest AUC: {rf_auc}")

# Step 11: Conclusion - Once the ML pipeline is tested in the small data environment,
# it can be deployed in the big data environment, starting with the small dataset
# and scaling up to the larger dataset.

spark.stop()

root
 |-- Transaction_Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Account_Age: double (nullable = true)
 |-- Transaction_Location: string (nullable = true)
 |-- Transaction_Time: string (nullable = true)
 |-- Fraudulent: long (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Loan_Amount: double (nullable = true)
 |-- Loan_Purpose: string (nullable = true)



24/10/10 04:17:45 WARN TaskSetManager: Stage 4 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.


+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|Transaction_Amount|Transaction_Type|Account_Age|Transaction_Location|Transaction_Time|Fraudulent|Customer_ID|Loan_Amount|Loan_Purpose|
+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|           2199.93|        In-store|       21.0|             Chicago|            0:57|         0|  CUST91186|   36021.94|    Business|
|            875.52|        In-store|       18.0|         Los Angeles|           11:41|         0|  CUST15505|   10584.69|    Personal|
|           1109.42|        In-store|       16.0|       San Francisco|           17:26|         0|  CUST48110|   91714.73|   Education|
|           4052.24|        In-store|        8.0|             Houston|           23:43|         0|  CUST15744|   36170.59|   Education|
|            571.92|          Online|       14.0

24/10/10 04:17:46 WARN TaskSetManager: Stage 5 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:17:53 WARN TaskSetManager: Stage 8 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:18:09 WARN TaskSetManager: Stage 9 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:18:11 WARN TaskSetManager: Stage 10 contains a task of very large size (1412 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:18:12 WARN TaskSetManager: Stage 11 contains a task of very large size (1412 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:18:12 WARN TaskSetManager: Stage 12 contains a task of very large size (1412 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:18:12 WARN TaskSetManager: Stage 13 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 

Logistic Regression AUC: 0.508714282327229


24/10/10 04:18:35 WARN TaskSetManager: Stage 42 contains a task of very large size (1367 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Random Forest AUC: 0.4967845023863872


# Big Data

In [17]:
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, HashingTF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import lit

# Normally, we will not need Pandas if we are working with PySpark (since PySpark provides the dataframe capabilities)
# However, we will need pandas just for one step in this task: for reading files from Google Cloud Storage
#     This is because it's not straightforward to set the configurations correctly for letting spark read from GCS
from pandas import DataFrame, read_csv
# Added this line in SP24 since pandas removed iteritems from DataFrame object in 2024
DataFrame.iteritems = DataFrame.items

from google.cloud import storage

client = storage.Client()

print(f"Package imports done")

Package imports done


In [18]:
 #Configure the following to use YOUR GCP setup

# 1. Configure the Project ID (not Project Name!!!) as per your GCP Dataproc setup
project_id = 'colivar8-cis415-fall24a'

# 2. Configure Bucket name as per your Google Cloud Storage setup
bucket = 'colivar8_data_for_gcp_labs'

# 3. Configure the path to the movie reviews data file as per your Google Cloud Storage setup
#    If your setup is exactly as per the instructions in GCP Lab 1c and in this lab:
#       --- you will not need to make any changes to the below line.
#    If your setup is different (due to whatever reason - doesn't matter),
#       --- just update the below line to reflect the path as per YOUR Google Cloud Storage setup
path_to_data_files = "/data_for_fraud_project/"

# 3. Configure the appropriate data file to be used for the task
#       Uncomment one of the two lines below based on the following:
#          In Google Colab, you should build/test with SMALL DATA
#          In GCP, first you should run with SMALL DATA
#             and finally, you should run with BIG DATA

fraud_detection = "big_fraud_detection_dataset.csv"
#fraud_detection = "small_fraud_detection_dataset.csv"

# Lastly, we will build the full path of the data file and confirm it's correct
# You do not need to change this line
full_file_path = "gs://" + bucket + path_to_data_files + fraud_detection

# Let's print out all the configurations and ensure that they are correct
print(f"ProjectID (and not the Project Name) is: {project_id}")
print(f"Bucket name is: {bucket}")
if fraud_detection == "small_fraud_detection_dataset.csv":
  print(f"We will run this task for SMALL DATA ({fraud_detection})")
elif fraud_detection == "big_fraud_detection_dataset.csv":
  print(f"We will run this task for BIG DATA ({fraud_detection})")
else:
  print("-"*20)
  print(f"Incorrect data file name - {fraud_detection}!! CHECK & FIX!!!")
  print("-"*20)

print(f"Full path to the data file is {full_file_path}")

ProjectID (and not the Project Name) is: colivar8-cis415-fall24a
Bucket name is: colivar8_data_for_gcp_labs
We will run this task for BIG DATA (big_fraud_detection_dataset.csv)
Full path to the data file is gs://colivar8_data_for_gcp_labs/data_for_fraud_project/big_fraud_detection_dataset.csv


In [19]:
spark = SparkSession.builder.appName("FraudDetection").getOrCreate()

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import split, col

In [21]:
# Read the data file
print(f"Reading the data file: {full_file_path}")

pandas_df = read_csv(full_file_path, sep=",")
print(f"Pandas data frame is ready with the data.  Now converting it to a Spark Dataframe")
cleaned_pandas_df = pandas_df.dropna()

spark_df = spark.createDataFrame(cleaned_pandas_df)
print(f"Spark data frame is ready with the data.  Let's check the first few rows...")

# Check the first few records in the data
spark_df.show()

# How many records got loaded?
print(f"Total number of records from data file = {spark_df.count()}")

Reading the data file: gs://colivar8_data_for_gcp_labs/data_for_fraud_project/big_fraud_detection_dataset.csv
Pandas data frame is ready with the data.  Now converting it to a Spark Dataframe
Spark data frame is ready with the data.  Let's check the first few rows...


24/10/10 04:24:42 WARN TaskSetManager: Stage 0 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:24:49 WARN TaskSetManager: Stage 1 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.


+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|Transaction_Amount|Transaction_Type|Account_Age|Transaction_Location|Transaction_Time|Fraudulent|Customer_ID|Loan_Amount|Loan_Purpose|
+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|           1645.59|        In-store|       16.0|            New York|           19:12|         0|  CUST70769|   97913.21|    Business|
|           2852.75|          Online|        5.0|             Houston|            6:57|         0|  CUST79634|   69864.83|   Education|
|            730.57|          Online|        9.0|            New York|           17:37|         0|  CUST24562|    47731.3|    Personal|
|           1008.33|        In-store|       18.0|            New York|             0:0|         0|  CUST85889|    4698.02|    Business|
|           2298.86|             ATM|       24.0



Total number of records from data file = 999001


                                                                                

In [None]:
# Code that GPT gave me for step 2 in the project
# Step 2: Load the small dataset
data = spark_df

# Step 3: EDA - Display dataset schema and first few rows
data.printSchema()
data.show(5)

# Step 5: Feature engineering
# Convert categorical variables into numeric format using StringIndexer
indexer = StringIndexer(inputCols=["Transaction_Type", "Transaction_Location"],
                        outputCols=["Transaction_Type_Index", "Transaction_Location_Index"])
data = indexer.fit(data).transform(data)

# Step 6: Feature selection
# Select relevant features and target variable for modeling
assembler = VectorAssembler(
    inputCols=["Transaction_Amount", "Account_Age", "Transaction_Type_Index", "Transaction_Location_Index"],
    outputCol="features"
)
data = assembler.transform(data).select("features", "Fraudulent")

# Step 7: Splitting data into training and validation sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 8: Training the first model - Logistic Regression
lr = LogisticRegression(labelCol="Fraudulent", featuresCol="features")
lr_model = lr.fit(train_data)

# Step 9: Training the second model - Random Forest
rf = RandomForestClassifier(labelCol="Fraudulent", featuresCol="features", numTrees=100)
rf_model = rf.fit(train_data)

# Step 10: Evaluating both models
evaluator = BinaryClassificationEvaluator(labelCol="Fraudulent", rawPredictionCol="rawPrediction")

# Logistic Regression evaluation
lr_predictions = lr_model.transform(test_data)
lr_auc = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression AUC: {lr_auc}")

# Random Forest evaluation
rf_predictions = rf_model.transform(test_data)
rf_auc = evaluator.evaluate(rf_predictions)
print(f"Random Forest AUC: {rf_auc}")

# Step 11: Conclusion - Once the ML pipeline is tested in the small data environment,
# it can be deployed in the big data environment, starting with the small dataset
# and scaling up to the larger dataset.

spark.stop()

root
 |-- Transaction_Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Account_Age: double (nullable = true)
 |-- Transaction_Location: string (nullable = true)
 |-- Transaction_Time: string (nullable = true)
 |-- Fraudulent: long (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Loan_Amount: double (nullable = true)
 |-- Loan_Purpose: string (nullable = true)



24/10/10 04:26:40 WARN TaskSetManager: Stage 4 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|Transaction_Amount|Transaction_Type|Account_Age|Transaction_Location|Transaction_Time|Fraudulent|Customer_ID|Loan_Amount|Loan_Purpose|
+------------------+----------------+-----------+--------------------+----------------+----------+-----------+-----------+------------+
|           1645.59|        In-store|       16.0|            New York|           19:12|         0|  CUST70769|   97913.21|    Business|
|           2852.75|          Online|        5.0|             Houston|            6:57|         0|  CUST79634|   69864.83|   Education|
|            730.57|          Online|        9.0|            New York|           17:37|         0|  CUST24562|    47731.3|    Personal|
|           1008.33|        In-store|       18.0|            New York|             0:0|         0|  CUST85889|    4698.02|    Business|
|           2298.86|             ATM|       24.0

24/10/10 04:26:47 WARN TaskSetManager: Stage 5 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:27:05 WARN TaskSetManager: Stage 8 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:27:27 WARN TaskSetManager: Stage 9 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:27:38 WARN TaskSetManager: Stage 10 contains a task of very large size (27683 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:27:39 WARN TaskSetManager: Stage 11 contains a task of very large size (27683 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:27:39 WARN TaskSetManager: Stage 12 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 04:27:40 WARN TaskSetManager: Stage 13 contains a task of very large size (27676 KiB). The maximum recommended task size is 1000 KiB.
24