In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, count, avg, when, expr, lit, to_date
from datetime import datetime, timedelta

In [2]:
spark = SparkSession.builder \
    .appName("Generate Rolling Features") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

In [3]:
# Đọc dữ liệu
raw_df = spark.read.parquet("hdfs://hadoop:9000/data/transactions/partitioned")

# Tạo account_id từ From Bank và From Account
raw_df = raw_df.withColumn("account_id", expr("concat(`From Bank`, '_', `From Account`)"))
raw_df = raw_df.withColumn("date", to_date("ts"))

In [4]:
# Lấy danh sách ngày duy nhất
dates = [r["date"] for r in raw_df.select("date").distinct().collect()]

all_features = []

for to_day in dates:
    from_day = (to_day - timedelta(days=5))

    today_df = raw_df.filter(col("date") == to_day)
    batch_df = raw_df.filter((col("date") > from_day) & (col("date") < to_day))

    batch_agg = batch_df.groupBy("account_id").agg(
        count("*").alias("feature_tx_count"),
        avg("Amount Paid").alias("feature_avg_amount"),
        countDistinct("To Account").alias("feature_unique_targets"),
        count(when(col("Amount Paid") < 10000, True)).alias("feature_small_tx")
    )

    today_agg = today_df.groupBy("account_id").agg(
        count("*").alias("today_tx"),
        avg("Amount Paid").alias("today_avg"),
        countDistinct("To Account").alias("today_targets")
    )

    joined = batch_agg.join(today_agg, on="account_id", how="outer").fillna(0)

    result = joined.withColumn("tx_count_delta", col("today_tx") - col("feature_tx_count")) \
        .withColumn("avg_amount_spike", 
                    (col("today_avg") - col("feature_avg_amount")) / (col("feature_avg_amount") + lit(1e-6))) \
        .withColumn("target_growth", 
                    col("today_targets") / (col("feature_unique_targets") + lit(1e-6))) \
        .withColumn("smurfing_score", 
                    col("today_tx") / (col("today_avg") + lit(1e-6))) \
        .withColumn("round_trip_combined", lit(0)) \
        .withColumn("avg_round_trip_len", lit(0)) \
        .withColumn("feature_date", lit(str(to_day)))

    all_features.append(result)

In [6]:
# Union tất cả
final_df = all_features[0]
for df in all_features[1:]:
    final_df = final_df.union(df)

# Ghi kết quả
final_df.write.mode("overwrite").parquet("hdfs://hadoop:9000/data/transactions/features/batch_combined_rolling")

# Hiển thị thử
final_df.select("account_id", "feature_date", "tx_count_delta", "avg_amount_spike", "target_growth", 
                "smurfing_score", "round_trip_combined", "avg_round_trip_len").show(10, truncate=False)

+---------------+------------+--------------+----------------------+-------------------+---------------------+-------------------+------------------+
|account_id     |feature_date|tx_count_delta|avg_amount_spike      |target_growth      |smurfing_score       |round_trip_combined|avg_round_trip_len|
+---------------+------------+--------------+----------------------+-------------------+---------------------+-------------------+------------------+
|10057_803DE1580|2022-09-07  |-4            |-0.8970041049800832   |0.33333322222225925|1.2629587460042852E-4|0                  |0                 |
|10057_803F405F0|2022-09-07  |-3            |0.0                   |0.9999990000010001 |2.7207782543115594E-6|0                  |0                 |
|10057_8040BE8F0|2022-09-07  |-3            |-1.945478982957244E-16|0.9999990000010001 |6.26682645514253E-7  |0                  |0                 |
|10057_804109CC0|2022-09-07  |1             |7.231143E10           |1000000.0          |1.3829072388

In [7]:
df_spark = spark.read.parquet("hdfs://hadoop:9000/data/transactions/features/batch_combined_rolling")
df = df_spark.toPandas()

In [8]:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import joblib

# Chọn các feature đầu vào
feature_cols = [
    "tx_count_delta",
    "avg_amount_spike",
    "target_growth",
    "smurfing_score",
    "round_trip_combined",
    "avg_round_trip_len"
]

X = df[feature_cols]

# Chuẩn hóa dữ liệu
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Huấn luyện Isolation Forest
model = IsolationForest(n_estimators=100, contamination=0.01, random_state=42)
model.fit(X_scaled)

In [9]:
joblib.dump((scaler, model), "iforest_model.pkl")
print("✅ Model saved as iforest_model.pkl")

✅ Model saved as iforest_model.pkl


In [10]:
scores = model.decision_function(X_scaled)  # càng thấp = càng bất thường
labels = model.predict(X_scaled)  # 1 = normal, -1 = anomaly

df["score"] = scores
df["is_anomaly"] = labels == -1

df[["account_id", "feature_date", "score", "is_anomaly"]].head()

Unnamed: 0,account_id,feature_date,score,is_anomaly
0,10057_803DE1580,2022-09-05,0.342188,False
1,10057_803F405F0,2022-09-05,0.335669,False
2,10057_803FEFF90,2022-09-05,0.355646,False
3,10057_8040BE8F0,2022-09-05,0.351674,False
4,10057_80435C9F0,2022-09-05,0.355646,False


In [11]:
df[df["is_anomaly"]].head()

Unnamed: 0,account_id,feature_tx_count,feature_avg_amount,feature_unique_targets,feature_small_tx,today_tx,today_avg,today_targets,tx_count_delta,avg_amount_spike,target_growth,smurfing_score,round_trip_combined,avg_round_trip_len,feature_date,score,is_anomaly
1502,116_80F742460,0,0.0,0,0,1,0.11,1,1,110000.0,1000000.0,9.090826,0,0,2022-09-05,-0.00815,True
2447,124_81342CBD1,2,0.012715,1,2,1,0.012715,1,-1,0.0,0.999999,78.641082,0,0,2022-09-05,-0.064386,True
2450,124_8136E3DD1,16,0.046963,4,16,4,0.03049,2,-12,-0.350751,0.5,131.185176,0,0,2022-09-05,-0.104716,True
2452,124_813747681,17,10.731555,2,17,4,0.536639,1,-13,-0.9499943,0.5,7.453793,0,0,2022-09-05,-0.053408,True
2453,124_8137484D1,4,0.027533,1,4,2,0.027532,1,-2,-1.260082e-16,0.999999,72.638785,0,0,2022-09-05,-0.068516,True


In [12]:
spark.stop()