In [1]:
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import SparkSession
from collections import defaultdict

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Naive Bayes Classifier") \
    .getOrCreate()

# Define file paths
current_dir = os.getcwd() 
relative_path_train = os.path.join('..', 'data', 'preprocessed_train_data.csv')
relative_path_test = os.path.join('..', 'data', 'preprocessed_test_data.csv')

# Load data into Spark DataFrames
train_df = spark.read.csv(relative_path_train, header=True, inferSchema=True)
test_df = spark.read.csv(relative_path_test, header=True, inferSchema=True)

# Drop the target column and create feature vectors
feature_columns = [col for col in train_df.columns if col != "satisfaction"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

train_features = assembler.transform(train_df).select("features", "satisfaction")
test_features = assembler.transform(test_df).select("features", "satisfaction")

# Apply MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Fit scaler to train data and transform train and test data
scaler_model = scaler.fit(train_features)
train_scaled = scaler_model.transform(train_features)
test_scaled = scaler_model.transform(test_features)

# calculate prior probabilities
def mapper_prior_probability(data):
    class_counts = data.groupBy("satisfaction").count().collect()
    return {"class_counts": {row["satisfaction"]: row["count"] for row in class_counts}}

def reducer_prior_probability(mapped_results):
    class_counts_total = {}
    for class_counts in mapped_results.values():
        for label, count in class_counts.items():
            if label not in class_counts_total:
                class_counts_total[label] = count
            else:
                class_counts_total[label] += count

    total_samples = sum(class_counts_total.values())
    class_probabilities_prior = {label: count / total_samples for label, count in class_counts_total.items()}

    return class_probabilities_prior

mapped_results_prior = mapper_prior_probability(train_scaled)
class_probabilities_prior = reducer_prior_probability(mapped_results_prior)

# calculate feature counts
def mapper(data):
    feature_counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
    for row in data:
        label = row["satisfaction"]  # Assuming the label column is named "satisfaction"
        features = row["features"]  # Keep as Spark DataFrame column
        for idx, value in zip(features.indices, features.values):  # Iterate over non-zero elements
            feature_name = f"feature_{idx}"
            feature_value_str = str(value)
            feature_counts[label][feature_name][feature_value_str] += 1
    return [feature_counts]

def reducer(mapped_results):
    feature_counts = {}
    
    for f_count in mapped_results:
        for key, count in f_count.items():
            if key[0] not in feature_counts:
                feature_counts[key[0]] = {}
            if key[1] not in feature_counts[key[0]]:
                feature_counts[key[0]][key[1]] = {}
            feature_counts[key[0]][key[1]][key[2]] = count + feature_counts.get(key[0], {}).get(key[1], {}).get(key[2], 0)  # Handle missing values in nested dictionaries
    return feature_counts

feature_counts_rdd = train_scaled.rdd.flatMap(mapper) \
                                     .reduceByKey(reducer)
# calculate features probabilities
def mapper_train_naive_bayes_feature_probabilities(feature_counts):
    partial_feature_probabilities = []

    for label, features in feature_counts.items():
        partial_feature_probabilities.append((label, features))

    return partial_feature_probabilities


def reducer_train_naive_bayes_feature_probabilities(mapped_results):
    feature_probabilities = {}

    for label, features in mapped_results:
        # Calculate feature probabilities
        if label not in feature_probabilities:
            feature_probabilities[label] = {}
        for feature, values in features.items():
            total_feature_count = sum(values.values())
            if feature not in feature_probabilities[label]:
                feature_probabilities[label][feature] = {}
            for value, count in values.items():
                if value not in feature_probabilities[label][feature]:
                    feature_probabilities[label][feature][value] = count / total_feature_count
                else:
                    feature_probabilities[label][feature][value] += count / total_feature_count

    return feature_probabilities

feature_probabilities_rdd = feature_counts_rdd.mapPartitions(
    lambda partition: mapper_train_naive_bayes_feature_probabilities(next(partition))
) \
.reduceByKey(reducer_train_naive_bayes_feature_probabilities)

24/05/07 12:22:07 WARN Utils: Your hostname, donia-Latitude-E6520 resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlp3s0)
24/05/07 12:22:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/07 12:22:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/07 12:22:29 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: preprocessed_train_data.csv.
org.apache.hadoop.ipc.RpcException: RPC response has invalid length
	at org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1933)
	at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1238)
	at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1134)


Py4JJavaError: An error occurred while calling o27.csv.
: org.apache.hadoop.ipc.RpcException: RPC response has invalid length
	at org.apache.hadoop.ipc.Client$IpcStreams.readResponse(Client.java:1933)
	at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1238)
	at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1134)


In [None]:
feature_probabilities_dict = feature_probabilities_rdd.collectAsMap()

In [None]:
import math

'''
calculates the probability of each class for a given sample 
'''
def mapper_predict_naive_bayes(class_probabilities, feature_probabilities_rdd, sample):
    log_probs = {}

    for label, class_prob in class_probabilities.items():
        log_prob = math.log(class_prob)
        for feature, value in sample.items():
            # Retrieve feature probabilities from the RDD
            feature_prob = feature_probabilities_rdd.filter(lambda x: x[0] == label) \
                                                     .map(lambda x: x[1]) \
                                                     .flatMap(lambda x: x.get(feature, {}).items()) \
                                                     .filter(lambda x: x[0] == value) \
                                                     .map(lambda x: x[1]) \
                                                     .reduce(lambda a, b: a if a > b else b, default=0)
            log_prob += math.log(feature_prob)

        log_probs[label] = log_prob

    return log_probs

'''
selects the class with the highest probability as the predicted class. 
'''
def reducer_predict_naive_bayes(mapped_results):
    predicted_class = None
    max_log_prob = float('-inf')

    for label, log_prob in mapped_results.items():
        if log_prob > max_log_prob:
            max_log_prob = log_prob
            predicted_class = label

    return predicted_class

In [None]:
def predict_partition(iterator):
    for sample in iterator:
        log_probs = mapper_predict_naive_bayes(class_probabilities_prior, feature_probabilities_rdd, sample)
        yield reducer_predict_naive_bayes(log_probs)

# Make predictions
predictions_rdd = test_scaled.rdd.map(lambda row: row.asDict()) \
    .mapPartitions(predict_partition)

# Collect predictions into a list
predictions = predictions_rdd.collect()


In [2]:
!pip install pyspark

  from pkg_resources import load_entry_point
Processing /home/donia/.cache/pip/wheels/da/78/6d/54350e0243f65f77dccf6ebe2ed5559faf6900559e904fb957/pyspark-3.5.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 27 kB/s eta 0:00:01
[?25hInstalling collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.1
