<h1>Feature Creation and Data Preprocessing Pipeline</h1>

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

DEBUG = False

spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.driver.memory", "8g")
    .appName("FeatureCreation")
    .getOrCreate()
)

sc = spark.sparkContext

In [0]:
# Import files
df_bids = spark.read.csv("bids.csv", header=True, inferSchema=True).cache()
df_train = spark.read.csv("train.csv", header=True, inferSchema=True).cache()

# check types
if DEBUG:
    df_bids.printSchema()

In [0]:
def ip_flag(df_bids: DataFrame) -> DataFrame:
    """
    This function takes a DataFrame of bids and returns a DataFrame with an IP flag indicating whether a bidder shares their IP address with other bidders.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing bidder_id and ip columns.

    Returns:
        DataFrame: A DataFrame with bidder_id, ip, and shared_ip columns. The shared_ip column indicates whether the bidder shares their IP address with other bidders (True or False).
    """
    uniqe_users = df_bids.select("bidder_id", "ip").distinct()
    # uniqe_users.show()
    # print(uniqe_users.count())

    # group by IP, collect users Ids, we neeed to group to get rid of repeating IPs
    uniqe_ips = uniqe_users.groupBy("ip").agg(
        collect_set("bidder_id").alias("bidder_ids")
    )

    uniqe_ips = uniqe_ips.withColumn("shared_ip", size(uniqe_ips.bidder_ids) > 1)
    # uniqe_ips.show()

    # Join back with the original DataFrame to get the boolean value for each user
    result = (
        uniqe_users.join(uniqe_ips, "ip", "left_outer")
        .select("bidder_id", "ip", "shared_ip")
        .distinct()
    )

    # Show the result
    # tmp=result.groupBy('ip').agg(collect_set("bidder_id").alias("bidder_ids"))
    # tmp.show()
    # print(tmp.filter(tmp.ip=="0.101.161.187").collect())

    # result.printSchema()

    result = (
        result.groupBy("bidder_id")
        .agg(max(result.shared_ip))
        .withColumnRenamed("max(shared_ip)", "shared_ip")
    )  # True is bigger than False, so if any is true it will result in True
    # result.filter(result.bidder_id=='37bf6f23b628a3e2b5b22ba81beccbef0efoh').show()
    # result.filter(result.bidder_id=='ffbc0fdfbf19a8a9116b68714138f2902cc13').show()
    # result.show()

    # print(result.groupBy("max(shared_ip)").sum().show())
    return result

In [0]:
def convert_time(df_bids: DataFrame) -> DataFrame:
    """
    Converts the 'time' column in the DataFrame to timestamp format and adds a new 'day' column with the date.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing the 'time' column.

    Returns:
        DataFrame: The updated DataFrame with the 'time' column converted to timestamp format and a new 'day' column added.
    """
    df_bids = df_bids.withColumn("time", to_timestamp(col("time")))
    df_bids = df_bids.withColumn("day", to_date("time"))
    return df_bids

In [0]:
def replace_nan_countries(df_bids: DataFrame) -> DataFrame:
    """
    Replace missing values in the 'country' column of the DataFrame with 'unk_ctry'.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing the 'country' column.

    Returns:
        DataFrame: The DataFrame with missing values in the 'country' column replaced.

    """
    if DEBUG:
        df_bids.select(
            [count(when(isnull(c), c)).alias(c) for c in df_bids.columns]
        ).show()

    df_bids: DataFrame = df_bids.fillna(value="unk_ctry", subset=["country"])

    if DEBUG:
        df_bids.select(
            [count(when(isnull(c), c)).alias(c) for c in df_bids.columns]
        ).show()

    return df_bids

In [0]:
def bids_per_auction(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the number of bids per auction for each bidder.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing the bids data.

    Returns:
        DataFrame: A new DataFrame with the number of bids per auction for each bidder.
    """
    return (
        df_bids.groupBy("bidder_id", "auction")
        .count()
        .withColumnRenamed("count", "bids_count")
    )

In [0]:
def mean_bids_per_auction(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the mean number of bids per auction for each bidder.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing the bids data.

    Returns:
        DataFrame: A DataFrame with the mean number of bids per auction for each bidder.
    """
    return (
        bids_per_auction(df_bids)
        .groupBy("bidder_id")
        .avg("bids_count")
        .withColumnRenamed("avg(bids_count)", "mean_bids_per_auction")
    )

In [0]:
def unique_devices_per_bidder(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the number of unique devices used by each bidder.

    Parameters:
        df_bids (DataFrame): The DataFrame containing the bids data.

    Returns:
        DataFrame: A DataFrame with the number of unique devices per bidder.
    """
    return df_bids.groupBy("bidder_id").agg(
        countDistinct("device").alias("unique_devices")
    )

In [0]:
def unique_ips_per_bidder(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the number of unique IP addresses per bidder.

    Parameters:
        df_bids (DataFrame): The DataFrame containing the bids data.

    Returns:
        DataFrame: A new DataFrame with the number of unique IP addresses per bidder.
    """
    return df_bids.groupBy("bidder_id").agg(countDistinct("ip").alias("unique_ips"))

In [0]:
def unique_countries_per_bidder(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the number of unique countries per bidder in the given DataFrame.

    Parameters:
    - df_bids (DataFrame): The DataFrame containing the bids data.

    Returns:
    - DataFrame: A new DataFrame with the number of unique countries per bidder.
    """
    return df_bids.groupBy("bidder_id").agg(
        countDistinct("country").alias("unique_countries")
    )

In [0]:
def auction_frequency(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the frequency of auctions for each bidder.

    Parameters:
        df_bids (DataFrame): The DataFrame containing the bids data.

    Returns:
        DataFrame: A DataFrame with the bidder ID and the corresponding auction frequency.
    """
    return df_bids.groupBy("bidder_id").agg(
        countDistinct("auction").alias("auction_frequency")
    )

In [0]:
def unique_url_per_bidder(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the number of unique URLs per bidder in the given DataFrame.

    Parameters:
        df_bids (DataFrame): The DataFrame containing the bids data.

    Returns:
        DataFrame: A new DataFrame with the number of unique URLs per bidder.

    """
    return df_bids.groupBy("bidder_id").agg(countDistinct("url").alias("unique_url"))

In [0]:
def unique_merchandise_per_bidder(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the number of unique merchandise per bidder.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing bid data.

    Returns:
        DataFrame: A new DataFrame with the count of unique merchandise per bidder.
    """
    return df_bids.groupBy("bidder_id").agg(
        countDistinct("merchandise").alias("unique_merchandise")
    )

In [0]:
def main_merchandise(df_bids: DataFrame) -> DataFrame:
    """
    Assigns the main merchandise for each bidder based on the merchandise with the most bids.

    Parameters:
        df_bids (DataFrame): DataFrame containing the bids data.

    Returns:
        DataFrame: DataFrame with the main merchandise for each bidder.
    """

    main_merchandise = df_bids.groupBy("bidder_id", "merchandise").count()
    max_merchandise = (
        main_merchandise.groupby("bidder_id")
        .agg({"count": "max"})
        .withColumnRenamed("max(count)", "count")
    )
    main_merchandise = main_merchandise.join(
        max_merchandise, ["bidder_id", "count"]
    ).select("bidder_id", "merchandise")

    # CACHE - IMPORTANT
    main_merchandise.cache()

    # if multiple merchandises per bidder
    if (
        main_merchandise.count()
        != main_merchandise.select("bidder_id").distinct().count()
    ):
        main_merchandise = main_merchandise.groupBy("bidder_id").agg(
            collect_set("merchandise").alias("merchandises")
        )
        main_merchandise = main_merchandise.withColumn(
            "mul_merch", size(main_merchandise.countries) > 1
        )
        main_merchandise = main_merchandise.withColumn(
            "merchandise",
            when(main_merchandise.mul_merch, "multiple").otherwise(
                main_merchandise.merchandises[0]
            ),
        )
        main_merchandise = main_merchandise.drop("merchandises", "mul_merch")

    main_merchandise = main_merchandise.withColumnRenamed(
        "merchandise", "top_merchandise"
    )
    return main_merchandise

In [0]:
def main_country(df_bids: DataFrame) -> DataFrame:
    """
    Extracts the main country for each bidder based on the number of bids made in each country.

    Parameters:
        df_bids (DataFrame): The input DataFrame containing bidder information.

    Returns:
        DataFrame: A DataFrame with the main country for each bidder.
    """

    # Getting main country - multiple if more than one main country

    main_country = df_bids.groupBy("bidder_id", "country").count()
    max_country = (
        main_country.groupby("bidder_id")
        .agg({"count": "max"})
        .withColumnRenamed("max(count)", "count")
    )

    main_country = main_country.join(max_country, ["bidder_id", "count"]).select(
        "bidder_id", "country"
    )

    # CACHE - IMPORTANT
    main_country.cache()

    if main_country.count() != main_country.select("bidder_id").distinct().count():
        main_country = main_country.groupBy("bidder_id").agg(
            collect_set("country").alias("countries")
        )
        main_country = main_country.withColumn(
            "mul_ctry", size(main_country.countries) > 1
        )
        main_country = main_country.withColumn(
            "country",
            when(main_country.mul_ctry, "multiple").otherwise(
                main_country.countries[0]
            ),
        )
        main_country = main_country.drop("countries", "mul_ctry")

    main_country = main_country.withColumnRenamed("country", "top_country")
    return main_country

In [0]:
def median_time_diff_per_bidder(df_bids: DataFrame) -> DataFrame:
    """
    Calculates the median time difference between bids for each bidder.

    Parameters:
        df_bids (DataFrame): DataFrame containing the bids data.

    Returns:
        DataFrame: DataFrame with the median time difference per bidder.

    """

    # Median time difference per bid, if only one bid per user - median of the rest of the results
    from pyspark.sql.functions import col, to_timestamp, lag, expr
    from pyspark.sql import Window
    from pyspark.ml.feature import Imputer

    # Define window specification to calculate lag per bidder ordered by time
    windowSpec = Window.partitionBy("bidder_id").orderBy("time")

    # Calculate previous time and time difference
    df_bids_sorted = df_bids.withColumn("prev_time", lag("time", 1).over(windowSpec))
    df_bids_sorted = df_bids_sorted.withColumn(
        "time_diff", (col("time").cast("long") - col("prev_time").cast("long"))
    )

    # Aggregate to find median time difference per bidder
    median_time_diff_per_bidder = df_bids_sorted.groupBy("bidder_id").agg(
        expr("percentile_approx(time_diff, 0.5)").alias("median_time_diff")
    )

    # Initialize the Imputer to fill missing values in 'median_time_diff'
    imputer = Imputer(
        inputCols=["median_time_diff"],  # Correct column name for imputation
        outputCols=["median_time_diff"],
    ).setStrategy("median")

    # Apply the imputer to the DataFrame containing median time differences
    median_time_diff_per_bidder = imputer.fit(median_time_diff_per_bidder).transform(
        median_time_diff_per_bidder
    )

    return median_time_diff_per_bidder

    # Join the imputed median time differences to the features dataset
    # features_df = features_df.join(median_time_diff_per_bidder, 'bidder_id')

In [0]:
def bidder_entropy(df_bids: DataFrame) -> DataFrame:
    """
    Calculate the entropy (randomness) of the bidder's actions per day.
    A higher number indicates higher randomness, more human-like.

    Parameters:
        df_bids (DataFrame): The DataFrame containing the bids data.

    Returns:
        DataFrame: A DataFrame with the entropy per bidder.

    """

    # from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, log2
    from pyspark.sql.functions import sum as spark_sum

    # Calculate bid counts per day per bidder
    # df_bids.show()

    bids_per_day = df_bids.groupBy("bidder_id", "day").count()

    # Calculate total bids per bidder
    total_bids = bids_per_day.groupBy("bidder_id").agg(
        spark_sum("count").alias("total_bids")
    )

    # Join to get total bids alongside each day's count
    bids_per_day = bids_per_day.join(total_bids, "bidder_id")

    # Calculate probability of each day's bids
    bids_per_day = bids_per_day.withColumn(
        "probability", col("count") / col("total_bids")
    )

    # Calculate entropy components
    bids_per_day = bids_per_day.withColumn(
        "entropy_component", -col("probability") * log2(col("probability"))
    )

    # Aggregate entropy per bidder
    bidder_entropy = bids_per_day.groupBy("bidder_id").agg(
        spark_sum("entropy_component").alias("day_entropy")
    )

    return bidder_entropy

In [0]:
def count_nulls(df_to_count: DataFrame):
    """
    Counts the number of null values in each column of the given DataFrame.

    Parameters:
        df_to_count (DataFrame): The DataFrame to count null values from.

    Returns:
        None, prints the number of null values in each column of the DataFrame.
    """
    null_counts = [
        sum(col(column).isNull().cast("int")).alias(column)
        for column in df_to_count.columns
    ]
    df_null_counts = df_to_count.agg(*null_counts)
    df_null_counts.show()

In [0]:
def remove_strings(df_features: DataFrame):
    """
    Removes string columns from the DataFrame and performs label encoding on specific columns.

    Parameters:
        df_features (DataFrame): The input DataFrame containing string columns.

    Returns:
        DataFrame: The modified DataFrame with string columns removed and label encoding applied.
    """
    from pyspark.ml.feature import StringIndexer

    # Label Encoding
    indexer = StringIndexer(inputCol="top_country", outputCol="top_country_index")
    df_features = indexer.fit(df_features).transform(df_features).drop("top_country")

    indexer = StringIndexer(
        inputCol="top_merchandise", outputCol="top_merchandise_index"
    )
    df_features = (
        indexer.fit(df_features).transform(df_features).drop("top_merchandise")
    )

    return df_features

In [0]:
def move_outcome(df_features: DataFrame):
    """
    Moves the 'outcome' column to the end of the DataFrame.

    Parameters:
        df_features (DataFrame): The input DataFrame containing the features.

    Returns:
        DataFrame: The modified DataFrame with the 'outcome' column moved to the end.
    """
    # move outcome to the end
    columns = df_features.columns
    order = [col_name for col_name in columns if col_name != "outcome"] + ["outcome"]
    df_features = df_features.select(*order)
    return df_features

In [0]:
def create_features(
    df_bidders: DataFrame, df_bids: DataFrame, keep_strings=False
) -> DataFrame:
    """
    Create final features for bidders based on the given dataframes.

    Parameters:
        df_bidders (DataFrame): The dataframe containing bidder information.
        df_bids (DataFrame): The dataframe containing bid information.
        keep_strings (bool, optional): Flag to indicate whether to keep string features. Defaults to False.

    Returns:
        DataFrame: The dataframe with the created features.

    """

    # clear bids first so no time is wasted calculating bidders we are not interested in

    # pre-process
    df_bids = convert_time(replace_nan_countries(df_bids)).cache()
    df_bids_main = (
        df_bidders.join(df_bids, "bidder_id", "inner")
        .select(df_bids["*"], df_bidders["outcome"])
        .cache()
    )
    df_bids_base = df_bidders.select("bidder_id", "outcome")

    features_df = (
        df_bids_base.join(mean_bids_per_auction(df_bids_main), "bidder_id")
        .join(unique_devices_per_bidder(df_bids_main), "bidder_id")
        .join(unique_ips_per_bidder(df_bids_main), "bidder_id")
        .join(unique_countries_per_bidder(df_bids_main), "bidder_id")
        .join(auction_frequency(df_bids_main), "bidder_id")
        .join(unique_url_per_bidder(df_bids_main), "bidder_id")
        .join(unique_merchandise_per_bidder(df_bids_main), "bidder_id")
        .join(main_merchandise(df_bids_main), "bidder_id")
        .join(main_country(df_bids_main), "bidder_id")
        .join(median_time_diff_per_bidder(df_bids_main), "bidder_id")
        .join(bidder_entropy(df_bids), "bidder_id")
        .join(ip_flag(df_bids), "bidder_id")
    )

    if keep_strings:
        return move_outcome(features_df)
    else:
        return move_outcome(remove_strings(features_df))

In [0]:
ft = create_features(df_train, df_bids)

In [0]:
ft.show()

In [0]:
count_nulls(ft)

In [0]:
print(ft.select("bidder_id").count())

In [0]:
# Save features csv
ft.toPandas().to_csv("features.csv")

### TIME ANALYSIS

In [0]:
from time import time
from pyspark.sql import DataFrame


def time_analysis(df: DataFrame, func) -> dict:
    """
    Apply the provided function to different fractions (subsets) of the DataFrame and measure the time taken
    for each function run.

    Parameters:
    df : DataFrame
        The DataFrame to apply the function to.
    func : function
        The function to apply to the DataFrame. The function should take a DataFrame as its only argument.

    Returns:
    dict
        A dictionary containing the fraction of DataFrame and the corresponding time taken for the function to run.
    """
    results = {}
    df_length = df.count()

    for percentage in range(10, 110, 10):
        # Calculate the number of rows for the given percentage
        num_rows = int(df_length * (percentage / 100))
        partial_df = df.limit(num_rows).cache()

        # Measure the time taken to apply the function
        start_time = time()
        func(partial_df)
        end_time = time()

        # Store the results
        results[float(percentage / 100)] = end_time - start_time

    return results

In [0]:
import matplotlib.pyplot as plt

plt.rcParams.update({"font.size": 16})


def plot_time_analysis(times_partitioning: dict, plot_title: str):
    """
    Plot the time taken for the function to run for different fractions of the DataFrame.

    Parameters:
    times_partitioning : dict
        A dictionary containing the fraction of DataFrame and the corresponding time taken for the function to run.
    plot_title : str
        The title of the plot.
    """

    plt.plot(list(times_partitioning.keys()), list(times_partitioning.values()))

    plt.xlabel("Fraction of DataFrame")

    plt.ylabel("Time taken (s)")
    # plt.title(plot_title)

    # Change the plot title to have only file supported characters(no spaces)
    plot_filename = "".join(e for e in plot_title if e.isalnum() or e == "_")

    plt.tight_layout()

    plt.savefig(plot_filename + ".pdf")

    plt.show()

In [0]:
def wrapper(df_bids):
    ft = create_features(df_train, df_bids)
    ft.collect()  # could be replaced with write to file

In [0]:
times = time_analysis(df_bids, wrapper)

In [0]:
plot_time_analysis(times, "Feature Creation Time Analysis")