In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType, ArrayType
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split

In [None]:
spark = SparkSession.builder.appName("FraudDataNormalization").getOrCreate()
s3_path = "s3://credit-transaction-fruad-new/data_cleaned.csv"
df = spark.read.csv(s3_path, header=True, inferSchema=True)

exclude_cols = [
    "is_fraud",
    "trans_time_is_night", "trans_date_is_weekend",
    "category_entertainment", "category_food_dining", "category_gas_transport",
    "category_grocery_net", "category_grocery_pos", "category_health_fitness",
    "category_home", "category_kids_pets", "category_misc_net", "category_misc_pos",
    "category_personal_care", "category_shopping_net", "category_shopping_pos",
    "category_travel"
]

numeric_cols = [c[0] for c in df.dtypes if c[1] in ("int", "double") and c[0] not in exclude_cols]


In [None]:
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features_vector")
df_vector = assembler.transform(df)

scaler = StandardScaler(inputCol="features_vector", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
df_scaled = df_scaled.withColumn("scaled_array", to_array_udf(col("scaled_features")))

for i, c in enumerate(numeric_cols):
    df_scaled = df_scaled.withColumn(c, col("scaled_array")[i])

df_final = df_scaled.drop("features_vector", "scaled_features", "scaled_array")

In [None]:
# ------------------ Convert to pandas ------------------
df_pd = df_final.toPandas()
print("Data loaded & normalized. Shape:", df_pd.shape)
print("\n[Original class distribution]")
print(df_pd["is_fraud"].value_counts())

# ------------------ Train-test split ------------------
X = df_pd.drop(columns=["is_fraud"])
y = df_pd["is_fraud"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_test shape : {X_test.shape}")
print(f"y_test shape : {y_test.shape}")


In [None]:
smote = SMOTE(random_state=42)
X_train_res, y_train_res = smote.fit_resample(X_train, y_train)

print("\n[Class distribution after SMOTE on train set]")
print(y_train_res.value_counts())
print(y_train_res.value_counts(normalize=True).round(4))

# ------------------ Save datasets to S3 ------------------
s3_output_prefix = "s3://credit-transaction-fruad-new/processed/"

X_train_res.to_csv(s3_output_prefix + "X_train_res.csv", index=False)
y_train_res.to_csv(s3_output_prefix + "y_train_res.csv", index=False)
X_test.to_csv(s3_output_prefix + "X_test.csv", index=False)
y_test.to_csv(s3_output_prefix + "y_test.csv", index=False)

print("\nAll datasets saved to S3 under:", s3_output_prefix)