In [1]:
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id

from sklearn.metrics import accuracy_score
import os
import math

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("nb_map_reduce") \
    .getOrCreate()

25/04/28 20:11:01 WARN Utils: Your hostname, m-Laptop resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlp3s0)
25/04/28 20:11:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 20:11:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/28 20:11:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# relative paths
current_dir = os.getcwd()
data_relative_path = os.path.join('..', 'splitted_data')

# read the data
x_train = spark.read.csv(os.path.join(current_dir, data_relative_path, 'X_train.csv'), header=True, inferSchema=True)
x_test = spark.read.csv(os.path.join(current_dir, data_relative_path, 'X_test.csv'), header=True, inferSchema=True)
y_train = spark.read.csv(os.path.join(current_dir, data_relative_path, 'y_train.csv'), header=True, inferSchema=True)
y_test = spark.read.csv(os.path.join(current_dir, data_relative_path, 'y_test.csv'), header=True, inferSchema=True)

                                                                                

In [4]:
# Create unique IDs (join label with features)
x_train = x_train.withColumn("index", monotonically_increasing_id())
y_train = y_train.withColumn("index", monotonically_increasing_id())
x_test = x_test.withColumn("index", monotonically_increasing_id())
y_test = y_test.withColumn("index", monotonically_increasing_id())

train_df = x_train.join(y_train, "index").drop("index")
test_df = x_test.join(y_test, "index").drop("index")

# Rename "TARGET" -> "label"
train_df = train_df.withColumnRenamed("TARGET", "label")
test_df = test_df.withColumnRenamed("TARGET", "label")

In [5]:
# merge features into one vector (all columns except label)
feature_columns = [col for col in train_df.columns if col != "label"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

train_features = assembler.transform(train_df).select("features", "label")
test_features = assembler.transform(test_df).select("features", "label")

# collect labels to list
train_labels_list = train_features.select("label").rdd.flatMap(lambda x: x).collect()
test_labels_list = test_features.select("label").rdd.flatMap(lambda x: x).collect()

                                                                                

In [6]:
def discretize_features(df, num_bins=10):

    feature_stats = df.select("features").rdd.map(lambda row: row["features"]) \
        .aggregate((None, None),
                #    seq op
                    lambda acc, features: (
                        features if acc[0] is None else [min(a, b) for a, b in zip(acc[0], features)],
                        features if acc[1] is None else [max(a, b) for a, b in zip(acc[1], features)]
                    ),
                #    comb op (partitions)
                    lambda acc1, acc2: (
                        acc1[0] if acc2[0] is None else (acc2[0] if acc1[0] is None else [min(a, b) for a, b in zip(acc1[0], acc2[0])]),
                        acc1[1] if acc2[1] is None else (acc2[1] if acc1[1] is None else [max(a, b) for a, b in zip(acc1[1], acc2[1])])
                    ))
    
    min_values, max_values = feature_stats

    def discretize_vector(vec):
        features = vec["features"]
        discrete_features = {}
        for i, val in enumerate(features):
            if val is not None:
                min_val = min_values[i]
                max_val = max_values[i]
                if min_val == max_val:
                    bin_idx = 0
                else:
                    bin_width = (max_val - min_val) / num_bins
                    bin_idx = int((val - min_val) / bin_width)
                    bin_idx = min(bin_idx, num_bins - 1)
            else:
                bin_idx = 0
            discrete_features[f"feature_{i}"] = f"bin_{bin_idx}"
        return {"features": discrete_features, "label": vec["label"]}
    
    return df.rdd.map(discretize_vector)


# Discretize continuous features
train_discrete = discretize_features(train_features)
test_discrete = discretize_features(test_features)

# calc prior prob (map/reduce) ex: [("1", 1), ("1", 1), ("0", 1)] => [("1", 2), ("0", 1)]
class_counts = train_discrete.map(lambda row: (row["label"], 1)) \
                          .reduceByKey(lambda a, b: a + b) \
                          .collect()

# calc prior prob ( p(label) = label_count / total_count )
total_samples = sum(count for _, count in class_counts)
class_probabilities = {label: count / total_samples for label, count in class_counts}

                                                                                

In [7]:
# feature_vec_count (map/reduce) ex: [((1, "feature_0", "bin_3"), 1), ((1, "feature_1", "bin_5"), 1)] => sum by key
feature_counts_rdd = train_discrete.flatMap(lambda row: [
    ((row["label"], feature_name, feature_value), 1) 
    for feature_name, feature_value in row["features"].items()
]) \
.reduceByKey(lambda a, b: a + b)

feature_vec_count = feature_counts_rdd.collect()

# {label -> {feature_name -> {feature_value -> {count}}}}
feature_counts_dict = {}
for (label, feature_name, feature_value), count in feature_vec_count:
    if label not in feature_counts_dict:
        feature_counts_dict[label] = {}
    if feature_name not in feature_counts_dict[label]:
        feature_counts_dict[label][feature_name] = {}
    feature_counts_dict[label][feature_name][feature_value] = count

# for each feature, get all its unique values
all_feature_values = {}
for label in feature_counts_dict:
    for feature_name in feature_counts_dict[label]:
        if feature_name not in all_feature_values:
            all_feature_values[feature_name] = set()
        all_feature_values[feature_name].update(feature_counts_dict[label][feature_name].keys())

# feature prob (likelihood) - Laplace smoothing
# p(feature_value | label) = (count(label, feature name, feature value) + 1) / (count(label) + unique_values)
feature_probabilities = {}
for label in feature_counts_dict:
    feature_probabilities[label] = {}
    for feature_name in feature_counts_dict[label]:
        feature_probabilities[label][feature_name] = {}
        total_count = sum(feature_counts_dict[label][feature_name].values())
        unique_values = len(all_feature_values[feature_name])
        
        for feature_value in all_feature_values[feature_name]:
            count = feature_counts_dict[label][feature_name].get(feature_value, 0)
            feature_probabilities[label][feature_name][feature_value] = (count + 1) / (total_count + unique_values)

                                                                                

In [8]:
# broadcast to workers
feature_probs_broadcast = spark.sparkContext.broadcast(feature_probabilities)
class_probs_broadcast = spark.sparkContext.broadcast(class_probabilities)

In [9]:
# Prediction function (map)
def predict_sample(vec):
    features = vec["features"]
    feature_probs = feature_probs_broadcast.value
    class_probs = class_probs_broadcast.value
    
    # calc log prob for each class
    log_probs = {}
    for label in class_probs:
        log_prob = math.log(class_probs[label]) # prior prob
        # feature probs
        for feature_name, feature_value in features.items():
            if (label in feature_probs and 
                feature_name in feature_probs[label] and 
                feature_value in feature_probs[label][feature_name]):
                prob = feature_probs[label][feature_name][feature_value]
            else: 
                # in case of unseen feature/value (take small prob)
                prob = 1e-10 
            log_prob += math.log(max(prob, 1e-10)) # avoid log(0)
        log_probs[label] = log_prob
    
    # get the class that has higher prob (reduce)
    predicted_label = max(log_probs, key=log_probs.get)
    return predicted_label

# collect predictions (map)
test_predictions = test_discrete.map(predict_sample).collect()
train_predictions = train_discrete.map(predict_sample).collect()

# Evaluate the model
train_accuracy = accuracy_score(train_labels_list, train_predictions)
test_accuracy = accuracy_score(test_labels_list, test_predictions)

# Print results
print(f"Training Accuracy: {train_accuracy:.2f}")
print(f"Testing Accuracy: {test_accuracy:.2f}")

# stop spark session
spark.stop()

                                                                                

Training Accuracy: 0.91
Testing Accuracy: 0.91
