In [27]:
import time
import warnings
import os
import sys
import pandas as pd
import cProfile
from pyspark.sql import SparkSession
if os.path.basename(os.getcwd()) == "notebooks":
    os.chdir("..")
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from src.relevance.phi import Phi
from src.sampling.mixed_sampling.distributed_smogn import DistributedSMOGN

In [28]:
DATA_DIR = "data"
DATA_RAW_DIR = f"{DATA_DIR}/raw"
DATA_PROCESSED_DIR = f"{DATA_DIR}/processed"

RESULT_DIR = "results"
RESULT_EXECUTION_TIME_DIR = f"{RESULT_DIR}"
RESULT_PREDICTIVE_PERFORMANCE_DIR = "{RESULT_DIR}/predictive_performance"

In [None]:
DATASETS = {
    "boston": "HousValue",
    "Abalone": "Rings",
    "bank8FM": "rej",
    "heat": "heat",
    "cpuSm": "usr",
    "energy": "Appliances",
    "superconductivity": "critical_temp",
    "flights": "ActualElapsedTime",
    "power": "Global_active_power",
    "sales": "Sale Amount"
}

def generateDF(dataset): 
    if dataset == "flights":
        df = pd.read_csv(f"{DATA_RAW_DIR}/flights.csv", usecols=["ActualElapsedTime", "CRSElapsedTime", "DepDelayMinutes", "AirTime", "Distance", "TaxiOut", "TaxiIn", "DayOfWeek", "Month", "Quarter"])
        df['DepDelayMinutes'].fillna(0, inplace=True)
        df['AirTime'].fillna(df['AirTime'].median(), inplace=True)
        df['ActualElapsedTime'].fillna(df['ActualElapsedTime'].median(), inplace=True)
        df['TaxiOut'].fillna(df['TaxiOut'].median(), inplace=True)
        df['TaxiIn'].fillna(df['TaxiIn'].median(), inplace=True)
    elif dataset == "power":
        df = pd.read_csv(f"{DATA_RAW_DIR}/power.txt", sep=';', usecols=["Global_active_power", "Global_reactive_power", "Voltage", "Global_intensity", "Sub_metering_1", "Sub_metering_2", "Sub_metering_3"], na_values='?')
        df.fillna(df.mean(), inplace=True)
    elif dataset == "sales":
        df = pd.read_csv(f"{DATA_RAW_DIR}/sales.csv", usecols=["List Year", "Assessed Value", "Sale Amount", "Sales Ratio", "Property Type", "Residential Type"], dtype={"Property Type": "category", "Residential Type": "category"})
        df.loc[:, 'Property Type'] = df['Property Type'].fillna(df['Property Type'].mode()[0])
        df.loc[:, 'Residential Type'] = df['Residential Type'].fillna(df['Residential Type'].mode()[0])
    else:
        df = pd.read_csv(f"{DATA_RAW_DIR}/{dataset}.csv")
    return df

In [30]:
spark = (
    SparkSession.builder
    .appName('Distributed Resampling')
    .master('local[*]')  # Utilizes all available cores
    .config('spark.executor.memory', '4g')  # Memory per executor
    .config('spark.driver.memory', '4g')  # Memory for the driver
    .config('spark.executor.cores', '2')  # Number of cores per executor
    .config('spark.sql.shuffle.partitions', '200')  # Adjust shuffle partitions
    .config('spark.storage.memoryFraction', '0.8')  # Allocate more memory for caching
    .config('spark.network.timeout', '800s')  # Increase network timeout for large data
    .getOrCreate()
)

In [31]:
execution_times = {}

In [None]:
warnings.filterwarnings('ignore')

for dataset, label_col in DATASETS.items():
    DATA_PROCESSED_TRAIN_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/train"
    DATA_PROCESSED_TEST_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/test"
    spark.conf.set("spark.local.dir", "/new/temp/dir")
    df = generateDF(dataset)
    
    df = spark.createDataFrame(df)

    relevance_col = "phi"
    df = Phi(input_col=label_col, output_col=relevance_col).transform(df)

    train, test = df.randomSplit(weights=[0.8, 0.2])
    train = train.drop(relevance_col)
    test = test.toPandas()
    phi = test.pop(relevance_col)

    test.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}.csv", index=False)
    phi.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}_phi.csv", index=False)

    execution_times[dataset] = {"Distributed SMOGN (k_partitions = 2)": [],
                                "Distributed SMOGN (k_partitions = 4)": [],
                                "Distributed SMOGN (k_partitions = 8)": []}

    train_base = train.toPandas()
    train_base.to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}.csv", index=False)

    for i in range(5):  # Perform 5 runs for each dataset
        # k_partitions = 2
        start_time = time.time()
        train_dist_smogn_2 = DistributedSMOGN(label_col=label_col, k_partitions=2).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 2)"].append(end_time - start_time)
        train_dist_smogn_2.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_2.csv", index=False)

        # k_partitions = 4
        start_time = time.time()
        train_dist_smogn_4 = DistributedSMOGN(label_col=label_col, k_partitions=4).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 4)"].append(end_time - start_time)
        train_dist_smogn_4.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_4.csv", index=False)

        # k_partitions = 8
        start_time = time.time()
        train_dist_smogn_8 = DistributedSMOGN(label_col=label_col, k_partitions=8).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 8)"].append(end_time - start_time)
        train_dist_smogn_8.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_8.csv", index=False)

    # Calculate average execution times for each configuration
    execution_times[dataset] = {
        config: round(sum(times) / len(times), 3)
        for config, times in execution_times[dataset].items()
    }

boston
RUN: 1
RUN: 2


In [None]:
# # Resample sales dataset
# def profile_cell():
#     warnings.filterwarnings('ignore', category=FutureWarning)
#     dataset = "sales"
#     label_col = "Sale Amount"

#     DATA_PROCESSED_TRAIN_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/train"
#     DATA_PROCESSED_TEST_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/test"
#     spark.conf.set("spark.local.dir", "/new/temp/dir")
#     df = pd.read_csv(f"{DATA_RAW_DIR}/{dataset}.csv", usecols=["List Year", "Assessed Value", "Sale Amount", "Sales Ratio", "Property Type", "Residential Type"], dtype={"Property Type": "category", "Residential Type": "category"}, nrows=10000)
#     df.loc[:, 'Property Type'] = df['Property Type'].fillna(df['Property Type'].mode()[0])
#     df.loc[:, 'Residential Type'] = df['Residential Type'].fillna(df['Residential Type'].mode()[0])

#     df = spark.createDataFrame(df)

#     relevance_col = "phi"
#     df = Phi(input_col=label_col, output_col=relevance_col).transform(df)

#     train, test = df.randomSplit(weights=[0.8, 0.2])
#     train = train.drop(relevance_col)
#     test = test.toPandas()
#     phi = test.pop(relevance_col)

#     test.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}.csv", index=False)
#     phi.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}_phi.csv", index=False)

#     execution_times[dataset] = {"Distributed SMOGN (k_partitions = 2)": [],
#                                 "Distributed SMOGN (k_partitions = 4)": [],
#                                 "Distributed SMOGN (k_partitions = 8)": []}

#     train_base = train.toPandas()
#     train_base.to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}.csv", index=False)

#     for _ in range(5):  # Perform 5 runs for each dataset
#         # k_partitions = 2
#         start_time = time.time()
#         train_dist_smogn_2 = DistributedSMOGN(label_col=label_col, k_partitions=2).transform(train)
#         end_time = time.time()
#         execution_times[dataset]["Distributed SMOGN (k_partitions = 2)"].append(end_time - start_time)
#         train_dist_smogn_2.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_2.csv", index=False)

#         # k_partitions = 4
#         start_time = time.time()
#         train_dist_smogn_4 = DistributedSMOGN(label_col=label_col, k_partitions=4).transform(train)
#         end_time = time.time()
#         execution_times[dataset]["Distributed SMOGN (k_partitions = 4)"].append(end_time - start_time)
#         train_dist_smogn_4.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_4.csv", index=False)

#         # k_partitions = 8
#         start_time = time.time()
#         train_dist_smogn_8 = DistributedSMOGN(label_col=label_col, k_partitions=8).transform(train)
#         end_time = time.time()
#         execution_times[dataset]["Distributed SMOGN (k_partitions = 8)"].append(end_time - start_time)
#         train_dist_smogn_8.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_8.csv", index=False)

#     # Calculate average execution times for each configuration
#     execution_times[dataset] = {
#         config: round(sum(times) / len(times), 3)
#         for config, times in execution_times[dataset].items()
#     }

#     pass

# cProfile.run('profile_cell()', 'profile_output.prof')

In [None]:
pd.DataFrame(data=execution_times).to_csv(f"{RESULT_EXECUTION_TIME_DIR}/execution_time.csv", index=True)