In [1]:
# Install PySpark

!pip install pyspark



In [2]:
# Install Smogn

# Ask professor about Python Implementation

! pip install smogn



In [3]:
# Set Python Path
import sys

sys.path.append("/Users/anushreekulai/Documents/distributed-resampling/")
sys.path.append("/Users/anushreekulai/Documents/distributed-resampling/src")

In [4]:
# Installed Java -> As Java was a requirement for pyspark

In [5]:
# Install PyArrow

! pip install pyarrow



In [None]:
import time

import pandas as pd
from pyspark.sql import SparkSession
from smogn import smoter

from src.relevance.phi import Phi
from src.sampling.mixed_sampling.distributed_smogn import DistributedSMOGN
from src.sampling.over_sampling.distributed_ros import DistributedROS
from src.sampling.under_sampling.distributed_rus import DistributedRUS

DATA_DIR = "../data"
DATA_RAW_DIR = f"{DATA_DIR}/raw"
DATA_PROCESSED_DIR = f"{DATA_DIR}/processed"

RESULT_DIR = "../results"
RESULT_EXECUTION_TIME_DIR = f"{RESULT_DIR}"
RESULT_PREDICTIVE_PERFORMANCE_DIR = "{RESULT_DIR}/predictive_performance"

DATASETS = {
    "boston": "HousValue",
    "Abalone": "Rings",
    "bank8FM": "rej",
    "heat": "heat",
    "cpuSm": "usr",
    "energy": "Appliances",
    "superconductivity": "critical_temp"
}

EXPERIMENTS = {
    "ros": {
        "name": "ROS",
        "type": "dist",
        "sampler": DistributedROS
    },
    "rus": {
        "name": "RUS",
        "type": "dist",
        "sampler": DistributedRUS
    },
    "smogn": {
        "name": "SMOGN",
        "type": "seq",
        "sampler": smoter
    },
    "dist_smogn": {
        "name": "Distributed SMOGN",
        "type": "dist",
        "sampler": DistributedSMOGN,
        "k_partitions": [2, 4, 8]
    },
}

spark = SparkSession.builder.master('local[4]').appName('Distributed Resampling').getOrCreate()

execution_times = {}

# adding debugging print statement

# adding exception handling for all the sampling techniques

for dataset, label_col in DATASETS.items():
    
    print(dataset)
    DATA_PROCESSED_TRAIN_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/train"
    DATA_PROCESSED_TEST_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/test"

    print("Reading DF")
    df = pd.read_csv(f"{DATA_RAW_DIR}/{dataset}.csv")
    
    print("Reading DF in spark!")
    df = spark.createDataFrame(df)
    
    print("Calculating Phi value")
    relevance_col = "phi"
    df = Phi(input_col=label_col, output_col=relevance_col).transform(df)
    
    print("Splitting and Pre-processing")
    train, test = df.randomSplit(weights=[0.8, 0.2])
    train = train.drop(relevance_col)
    test = test.toPandas()
    phi = test.pop(relevance_col)
    
    print("Saving the CSV files")
    test.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}.csv", index=False)
    phi.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}_phi.csv", index=False)

    execution_times[dataset] = {}
    
    train_base = train.toPandas()
    train_base.to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}.csv", index=False)
    
    try:
        print("Initializing Distributed RUS")
        start_time = time.time()
        train_rus = DistributedRUS(label_col=label_col, k_partitions=1).transform(train)
        end_time = time.time()
        execution_times[dataset]["RUS"] = round(end_time - start_time, 3)
        train_rus.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_rus.csv", index=False)
    except Exception as e:
        print(f"Exception found in Distributed RUS: {e}")
    
    try:
        print("Initializing Distributed ROS")
        start_time = time.time()
        train_ros = DistributedROS(label_col=label_col, k_partitions=1).transform(train)
        end_time = time.time()
        execution_times[dataset]["ROS"] = round(end_time - start_time, 3)
        train_ros.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_ros.csv", index=False)
    except Exception as e:
        print(f"Exception found in Distributed ROS: {e}")
    
    try:
        print("Initializing SMOGN")
        start_time = time.time()
        train_smogn = smoter(data=train.toPandas(), y=label_col)
        end_time = time.time()
        execution_times[dataset]["SMOGN"] = round(end_time - start_time, 3)
        train_smogn.to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_smogn.csv", index=False)
    except Exception as e:
        print(f"Exception found in SMOGN: {e}")
    
    try:
        print("Initializing Distributed SMOGN with 2 partitions!")
        start_time = time.time()
        train_dist_smogn_2 = DistributedSMOGN(label_col=label_col, k_partitions=2).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 2)"] = round(end_time - start_time, 3)
        train_dist_smogn_2.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_2.csv", index=False)
    except Exception as e:
        print(f"Found exception in DistSMOGN-2P: {e}")
    
    try:
        print("Initializing Distributed SMOGN with 4 partitions!")
        start_time = time.time()
        train_dist_smogn_4 = DistributedSMOGN(label_col=label_col, k_partitions=4).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 4)"] = round(end_time - start_time, 3)
        train_dist_smogn_4.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_4.csv", index=False)
    except Exception as e:
        print(f"Found exception in DistSMOGN-4P: {e}")
    
    try:
        print("Initializing Distributed SMOGN with 8 partitions!")
        start_time = time.time()
        train_dist_smogn_8 = DistributedSMOGN(label_col=label_col, k_partitions=8).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 8)"] = round(end_time - start_time, 3)
        train_dist_smogn_8.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_8.csv", index=False)
    except Exception as e:
        print(f"Found exception in Dist-SMOGN-8P: {e}")
        

        
pd.DataFrame(data=execution_times).to_csv(f"{RESULT_EXECUTION_TIME_DIR}/execution_time.csv", index=True)

In [6]:
import time

import pandas as pd
from pyspark.sql import SparkSession
from smogn import smoter

from src.relevance.phi import Phi
from src.sampling.mixed_sampling.distributed_smogn import DistributedSMOGN
from src.sampling.over_sampling.distributed_ros import DistributedROS
from src.sampling.under_sampling.distributed_rus import DistributedRUS

In [7]:
DATA_DIR = "../data"
DATA_RAW_DIR = f"{DATA_DIR}/raw"
DATA_PROCESSED_DIR = f"{DATA_DIR}/processed"

RESULT_DIR = "../results"
RESULT_EXECUTION_TIME_DIR = f"{RESULT_DIR}"
RESULT_PREDICTIVE_PERFORMANCE_DIR = "{RESULT_DIR}/predictive_performance"

In [8]:
DATASETS = {
    "boston": "HousValue",
    "Abalone": "Rings",
    "bank8FM": "rej",
    "heat": "heat",
    "cpuSm": "usr",
    "energy": "Appliances",
    "superconductivity": "critical_temp"
}

EXPERIMENTS = {
    "ros": {
        "name": "ROS",
        "type": "dist",
        "sampler": DistributedROS
    },
    "rus": {
        "name": "RUS",
        "type": "dist",
        "sampler": DistributedRUS
    },
    "smogn": {
        "name": "SMOGN",
        "type": "seq",
        "sampler": smoter
    },
    "dist_smogn": {
        "name": "Distributed SMOGN",
        "type": "dist",
        "sampler": DistributedSMOGN,
        "k_partitions": [2, 4, 8]
    },
}

In [9]:
spark = SparkSession.builder.master('local[4]').appName('Distributed Resampling').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/05 00:36:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
execution_times = {}

In [11]:
# adding debugging print statement

# adding exception handling for all the sampling techniques

for dataset, label_col in DATASETS.items():
    
    print(dataset)
    DATA_PROCESSED_TRAIN_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/train"
    DATA_PROCESSED_TEST_DIR = f"{DATA_PROCESSED_DIR}/{dataset}/test"

    print("Reading DF")
    df = pd.read_csv(f"{DATA_RAW_DIR}/{dataset}.csv")
    
    print("Reading DF in spark!")
    df = spark.createDataFrame(df)
    
    print("Calculating Phi value")
    relevance_col = "phi"
    df = Phi(input_col=label_col, output_col=relevance_col).transform(df)
    
    print("Splitting and Pre-processing")
    train, test = df.randomSplit(weights=[0.8, 0.2])
    train = train.drop(relevance_col)
    test = test.toPandas()
    phi = test.pop(relevance_col)
    
    print("Saving the CSV files")
    test.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}.csv", index=False)
    phi.to_csv(f"{DATA_PROCESSED_TEST_DIR}/{dataset}_phi.csv", index=False)

    execution_times[dataset] = {}
    
    train_base = train.toPandas()
    train_base.to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}.csv", index=False)
    
    try:
        print("Initializing Distributed RUS")
        start_time = time.time()
        train_rus = DistributedRUS(label_col=label_col, k_partitions=1).transform(train)
        end_time = time.time()
        execution_times[dataset]["RUS"] = round(end_time - start_time, 3)
        train_rus.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_rus.csv", index=False)
    except Exception as e:
        print(f"Exception found in Distributed RUS: {e}")
    
    try:
        print("Initializing Distributed ROS")
        start_time = time.time()
        train_ros = DistributedROS(label_col=label_col, k_partitions=1).transform(train)
        end_time = time.time()
        execution_times[dataset]["ROS"] = round(end_time - start_time, 3)
        train_ros.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_ros.csv", index=False)
    except Exception as e:
        print(f"Exception found in Distributed ROS: {e}")
    
    try:
        print("Initializing SMOGN")
        start_time = time.time()
        train_smogn = smoter(data=train.toPandas(), y=label_col)
        end_time = time.time()
        execution_times[dataset]["SMOGN"] = round(end_time - start_time, 3)
        train_smogn.to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_smogn.csv", index=False)
    except Exception as e:
        print(f"Exception found in SMOGN: {e}")
    
    try:
        print("Initializing Distributed SMOGN with 2 partitions!")
        start_time = time.time()
        train_dist_smogn_2 = DistributedSMOGN(label_col=label_col, k_partitions=2).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 2)"] = round(end_time - start_time, 3)
        train_dist_smogn_2.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_2.csv", index=False)
    except Exception as e:
        print(f"Found exception in DistSMOGN-2P: {e}")
    
    try:
        print("Initializing Distributed SMOGN with 4 partitions!")
        start_time = time.time()
        train_dist_smogn_4 = DistributedSMOGN(label_col=label_col, k_partitions=4).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 4)"] = round(end_time - start_time, 3)
        train_dist_smogn_4.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_4.csv", index=False)
    except Exception as e:
        print(f"Found exception in DistSMOGN-4P: {e}")
    
    try:
        print("Initializing Distributed SMOGN with 8 partitions!")
        start_time = time.time()
        train_dist_smogn_8 = DistributedSMOGN(label_col=label_col, k_partitions=8).transform(train)
        end_time = time.time()
        execution_times[dataset]["Distributed SMOGN (k_partitions = 8)"] = round(end_time - start_time, 3)
        train_dist_smogn_8.toPandas().to_csv(f"{DATA_PROCESSED_TRAIN_DIR}/{dataset}_dist_smogn_8.csv", index=False)
    except Exception as e:
        print(f"Found exception in Dist-SMOGN-8P: {e}")

boston
Reading DF
Reading DF in spark!
Calculating Phi value
Splitting and Pre-processing


                                                                                

Saving the CSV files
Initializing Distributed RUS


                                                                                

22/12/05 00:37:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 12:>                                                         (0 + 4) / 4]

22/12/05 00:37:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

22/12/05 00:37:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 0

dist_matrix: 100%|##############################| 75/75 [00:01<00:00, 50.11it/s]
synth_matrix: 100%|############################| 75/75 [00:00<00:00, 861.21it/s]
r_index: 100%|#################################| 52/52 [00:00<00:00, 936.51it/s]


Initializing Distributed SMOGN with 2 partitions!
22/12/05 00:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can

[Stage 58:>                                                         (0 + 1) / 1]                                                                                

22/12/05 00:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:20 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/12/05 00:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradat

                                                                                

22/12/05 00:37:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 0

                                                                                

22/12/05 00:37:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 0

Calculating Phi value
Splitting and Pre-processing


                                                                                

Saving the CSV files
Initializing Distributed RUS
22/12/05 00:37:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/05 00:37:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can

Initializing SMOGN


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data.iloc[:, j] = pd.Categorical(pd.factorize(
dist_matrix: 100%|############################| 359/359 [00:33<00:00, 10.69it/s]
synth_matrix: 100%|##########################| 359/359 [00:00<00:00, 448.97it/s]
r_index: 100%|#################################| 49/49 [00:00<00:00, 855.21it/s]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data.iloc[:, j] = pd.Categorical(pd.factorize(
dist_matrix:  82%|#######################     | 640/778 [02:06<00:27,  5.05it/s]


KeyboardInterrupt: 

In [None]:
pd.DataFrame(data=execution_times).to_csv(f"{RESULT_EXECUTION_TIME_DIR}/execution_time.csv", index=True)