In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

## SPARK SESSION

In [2]:
spark = SparkSession.builder.appName("sparkhive").enableHiveSupport().getOrCreate()

## Data Extraction from Hive

In [3]:
#We got the historical data from Hive to build our training dataset
df = spark.sql("SELECT * FROM fraud_project.fraud_full_load_external")

In [17]:
df.show()

+----+----------------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------+
|step|transaction_type|   amount|   nameorig|oldbalanceorg|newbalanceorig|   namedest|oldbalancedest|newbalancedest|isfraud|isflaggedfraud| row_id|
+----+----------------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------+
| 157|        CASH_OUT|243195.42| C557292767|      30945.0|           0.0|C2133715645|     247810.02|     443321.12|      0|             0|1000001|
| 157|         PAYMENT| 26450.15| C547756324|        139.0|           0.0| M366988082|           0.0|           0.0|      0|             0|1000002|
| 157|        CASH_OUT| 200987.4|C2119056835|          0.0|           0.0|C1913140449|     3156111.5|     3357099.0|      0|             0|1000003|
| 157|         PAYMENT|  5636.54|C1661110193|     101817.0|      96180.46|M1937624524|           0.0|           

## Transformation
#Null Values #Duplicates #Feature selection #Scaling

In [18]:
#Checking for null values
columns = df.columns

# Count null values for each column
null_counts_per_column = [df.where(col(c).isNull()).count() for c in columns]

# Display the null counts for each column
for col_name, null_count in zip(columns, null_counts_per_column):
    print(f"Null count in '{col_name}': {null_count}")
    
#Imputation techniques 

Null count in 'step': 0
Null count in 'transaction_type': 0
Null count in 'amount': 0
Null count in 'nameorig': 0
Null count in 'oldbalanceorg': 0
Null count in 'newbalanceorig': 0
Null count in 'namedest': 0
Null count in 'oldbalancedest': 0
Null count in 'newbalancedest': 0
Null count in 'isfraud': 0
Null count in 'isflaggedfraud': 0
Null count in 'row_id': 0


In [None]:
#Checking for duplicated rows
# Count the total number of rows in the original DataFrame
total_rows = df.count()

# Count the number of rows in the deduplicated DataFrame
deduplicated_rows = df.dropDuplicates().count()

# Calculate the number of duplicate rows
duplicate_count = total_rows - deduplicated_rows

print("Total Rows:", total_rows)
print("Duplicate Rows:", duplicate_count)

# Remove exact duplicates from the entire DataFrame
deduplicated_df = df.dropDuplicates()

In [4]:
#Feature Selection
selected_columns = ["row_id","transaction_type","amount", "oldbalanceorg", "newbalanceorig", "oldbalancedest", "newbalancedest", "isfraud"]
df_new = df.select(*selected_columns)
df_new.show()

+-------+----------------+---------+-------------+--------------+--------------+--------------+-------+
| row_id|transaction_type|   amount|oldbalanceorg|newbalanceorig|oldbalancedest|newbalancedest|isfraud|
+-------+----------------+---------+-------------+--------------+--------------+--------------+-------+
|1000001|        CASH_OUT|243195.42|      30945.0|           0.0|     247810.02|     443321.12|      0|
|1000002|         PAYMENT| 26450.15|        139.0|           0.0|           0.0|           0.0|      0|
|1000003|        CASH_OUT| 200987.4|          0.0|           0.0|     3156111.5|     3357099.0|      0|
|1000004|         PAYMENT|  5636.54|     101817.0|      96180.46|           0.0|           0.0|      0|
|1000005|         CASH_IN|147929.05|     429283.0|     577212.06|           0.0|           0.0|      0|
|1000006|         CASH_IN|  5907.47|    577212.06|      583119.5|     252544.69|     246637.22|      0|
|1000007|         CASH_IN|  92038.0|     583119.5|      675157.5

In [19]:
#Visualizing the types of each columns #1
df_new.printSchema()

root
 |-- row_id: integer (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- oldbalanceorg: float (nullable = true)
 |-- newbalanceorig: float (nullable = true)
 |-- oldbalancedest: float (nullable = true)
 |-- newbalancedest: float (nullable = true)
 |-- isfraud: integer (nullable = true)



In [20]:
 #z-score scaling

# List of columns to scale (exclude "transaction_type", "isfraud", and "row_id")
columns_to_scale = ["amount", "oldbalanceorg", "newbalanceorig", "oldbalancedest", "newbalancedest"]

# Create a VectorAssembler to assemble the selected columns into a feature vector
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="scaled_features")

# Assemble the features into a vector
df_assembled = assembler.transform(df)

# Create a StandardScaler object
scaler = StandardScaler(inputCol="scaled_features", outputCol="scaled_features_scaled", withMean=True, withStd=True)

# Fit and transform the DataFrame
scaler_model = scaler.fit(df_assembled)
scaled_df = scaler_model.transform(df_assembled)

# Show the resulting DataFrame with scaled features
scaled_df.show()


Total Amount of Rows: 1020002
Total Amount of Columns: 8


## Load bak to Hive

In [None]:
# Overwrite existing Hive table
scaled_df.write.mode("overwrite").saveAsTable("your_hive_database.your_hive_table")


## Save using other Pyspark formats

In [None]:
# Save as Parquet
scaled_df.write.parquet("path_to_save/your_file.parquet")

# Save as Avro
scaled_df.write.format("avro").save("path_to_save/your_file.avro")

# Save as ORC
scaled_df.write.orc("path_to_save/your_file.orc")

# Save as JSON
scaled_df.write.json("path_to_save/your_file.json")