## Process with Spark - ETL

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, max, min, when, lag, mean, unix_timestamp
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("ETL script for TalkingData AdTracking Fraud Detection using PySpark") \
    .getOrCreate()


In [35]:
# Read raw data
raw_data = spark.read.csv('train_subset.csv', header=True, inferSchema=True)
raw_data.show(3)

In [38]:
# Feature engineering
window_ip = Window.partitionBy("ip").orderBy('click_time')

data = raw_data.withColumn("IP_first_click", when(min("click_time").over(window_ip) == col("click_time"), 1).otherwise(0)) \
                .withColumn("IP_click_count", count("*").over(window_ip)) \
                .withColumn("IP_download_count", sum("is_attributed").over(window_ip)) \
                .withColumn("IP_download_ratio", col("IP_download_count") / col("IP_click_count")) \
                .withColumn("last_click_time", lag("click_time", 1).over(window_ip)) \
                .withColumn("IP_time_since_last_click", when(col("last_click_time").isNull(), -1).otherwise((unix_timestamp(col("click_time")) - unix_timestamp(col("last_click_time")))))
data.show(3)

+---+---+------+---+-------+-------------------+-------------+--------------+--------------+-----------------+-----------------+-------------------+------------------------+
| ip|app|device| os|channel|         click_time|is_attributed|IP_first_click|IP_click_count|IP_download_count|IP_download_ratio|    last_click_time|IP_time_since_last_click|
+---+---+------+---+-------+-------------------+-------------+--------------+--------------+-----------------+-----------------+-------------------+------------------------+
|  9| 18|     1| 13|    107|2017-11-06 16:02:30|            0|             1|             1|                0|              0.0|               null|                      -1|
|  9| 18|     1| 13|    107|2017-11-06 16:02:41|            0|             0|             2|                0|              0.0|2017-11-06 16:02:30|                      11|
|  9| 18|     1| 13|    107|2017-11-06 16:24:54|            0|             0|             3|                0|              0.0|20

In [40]:
# Calculate IP address statistics - will be used for prediction
ip_stats = data.groupBy('ip').agg(
    count('*').alias('IP_click_count'),
    sum('is_attributed').alias('IP_download_count'),
    (sum('is_attributed') / count('*')).alias('IP_download_ratio'),
    max('click_time').alias('last_click_time'))

+------+--------------+-----------------+--------------------+-------------------+
|    ip|IP_click_count|IP_download_count|   IP_download_ratio|    last_click_time|
+------+--------------+-----------------+--------------------+-------------------+
| 50223|          1137|                2|0.001759014951627089|2017-11-06 18:31:11|
|149177|            24|                0|                 0.0|2017-11-06 16:53:42|
| 37489|            87|                0|                 0.0|2017-11-06 18:27:43|
+------+--------------+-----------------+--------------------+-------------------+
only showing top 3 rows



### Combine the code into ETL script and test output

In [45]:
%%writefile etl_spark.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, max, min, when, lag, mean, unix_timestamp
from pyspark.sql.window import Window
import argparse

def main(args):
    spark = SparkSession.builder \
        .appName("ETL script for TalkingData AdTracking Fraud Detection using PySpark") \
        .getOrCreate()

    # Read raw data
    raw_data = spark.read.csv(args.input_file, header=True, inferSchema=True)

    # Feature engineering
    window_ip = Window.partitionBy("ip").orderBy('click_time')

    data = raw_data.withColumn("IP_first_click", when(min("click_time").over(window_ip) == col("click_time"), 1).otherwise(0)) \
                .withColumn("IP_click_count", count("*").over(window_ip)) \
                .withColumn("IP_download_count", sum("is_attributed").over(window_ip)) \
                .withColumn("IP_download_ratio", col("IP_download_count") / col("IP_click_count")) \
                .withColumn("last_click_time", lag("click_time", 1).over(window_ip)) \
                .withColumn("IP_time_since_last_click", when(col("last_click_time").isNull(), -1).otherwise((unix_timestamp(col("click_time")) - unix_timestamp(col("last_click_time")))))

    # Calculate IP address statistics
    ip_stats = data.groupBy('ip').agg(
        count('*').alias('IP_click_count'),
        sum('is_attributed').alias('IP_download_count'),
        (sum('is_attributed') / count('*')).alias('IP_download_ratio'),
        max('click_time').alias('last_click_time')
    )

    # Save the processed data
    data.write.parquet(args.processed_data_output)

    # Save the IP address statistics
    ip_stats.write.parquet(args.ip_stats_output)

    spark.stop()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='ETL script for TalkingData AdTracking Fraud Detection using PySpark')
    parser.add_argument('--input_file', type=str, required=True, help='Input file path for raw data')
    parser.add_argument('--processed_data_output', type=str, required=True, help='Output file path for processed data')
    parser.add_argument('--ip_stats_output', type=str, required=True, help='Output file path for IP address statistics')

    args = parser.parse_args()
    main(args)


Overwriting etl_spark.py


In [None]:
%%python3 /content/etl_spark.py --input_file 'train_subset.csv' --processed_data_output 'train_output' --ip_stats_output 'ip_stats_output'

In [53]:
# Load the parquet files from the processed_data_output folder into a DataFrame
processed_data = spark.read.parquet("train_output")

# Display the first few rows of the DataFrame
processed_data.show(3)


+---+---+------+---+-------+-------------------+-------------+--------------+--------------+-----------------+-----------------+-------------------+------------------------+
| ip|app|device| os|channel|         click_time|is_attributed|IP_first_click|IP_click_count|IP_download_count|IP_download_ratio|    last_click_time|IP_time_since_last_click|
+---+---+------+---+-------+-------------------+-------------+--------------+--------------+-----------------+-----------------+-------------------+------------------------+
| 10| 64|     1| 22|    459|2017-11-06 16:06:34|            0|             1|             1|                0|              0.0|               null|                      -1|
| 10| 18|     1| 19|    121|2017-11-06 16:22:46|            0|             0|             3|                0|              0.0|2017-11-06 16:06:34|                     972|
| 10| 15|     1| 19|    245|2017-11-06 16:22:46|            0|             0|             3|                0|              0.0|20

## Train with Spark

In [54]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Feature columns
feature_columns = ['app', 'device', 'os', 'channel', "IP_first_click", "IP_time_since_last_click", "IP_click_count", "IP_download_ratio"]

# Prepare features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
processed_data = assembler.transform(processed_data)

# Train-test split
train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=42)

# Train the model
rf = RandomForestClassifier(featuresCol="features", labelCol="is_attributed", numTrees=100, seed=42)
model = rf.fit(train_data)

# Model evaluation
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="is_attributed", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

print(f"AUC: {auc}")

AUC: 0.9876090623850763


### Run the script for training and evaluation

In [56]:
%%writefile train_rf.py
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import argparse

def main(args):
    spark = SparkSession.builder \
        .appName("Training script for TalkingData AdTracking Fraud Detection using PySpark") \
        .getOrCreate()

    # Read processed data
    processed_data = spark.read.parquet(args.processed_data_input)

    # Feature columns
    feature_columns = ['app', 'device', 'os', 'channel', "IP_first_click", "IP_time_since_last_click", "IP_click_count", "IP_download_ratio"]

    # Prepare features
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    processed_data = assembler.transform(processed_data)

    # Train-test split
    train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=42)

    # Train the model
    rf = RandomForestClassifier(featuresCol="features", labelCol="is_attributed", numTrees=100, seed=42)
    model = rf.fit(train_data)

    # Model evaluation
    predictions = model.transform(test_data)
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="is_attributed", metricName="areaUnderROC")
    auc = evaluator.evaluate(predictions)

    print(f"AUC: {auc}")

    # Save the model
    model.write().overwrite().save(args.model_output_path)

    spark.stop()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Training script for TalkingData AdTracking Fraud Detection using PySpark')
    parser.add_argument('--processed_data_input', type=str, required=True, help='Input file path for processed data')
    parser.add_argument('--model_output_path', type=str, required=True, help='Output path for the trained model')

    args = parser.parse_args()
    main(args)


Overwriting train_rf.py


In [58]:
%%python3 train_rf.py --processed_data_input 'train_output' --model_output_path 'model_output'

AUC: 0.9875399297370214


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/24 05:12:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/24 05:12:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/24 05:14:25 WARN BlockManager: Persisting block rdd_30_0 to disk instead.
23/04/24 05:14:26 WARN MemoryStore: Not enough space to cache rdd_30_1 in memory! (computed 62.9 MiB so far)
23/04/24 05:14:26 WARN BlockManager: Persisting block rdd_30_1 to disk instead.
[Stage 8:>                                                          (0 + 2) / 2]23/04/24 05:15:13 WARN MemoryStore: Not enough space to cache rdd_30_0 in memory! (computed 147.2 MiB so far)
23/04/24 05:15:14 WARN MemoryStore: Not enough space to cache rdd_30_1 in memory! (computed 226.4 MiB so far)
23/04/24 05:15:49 WARN MemoryStore: Not enough space to c