In [18]:
import time

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.appName('nb').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

# Helper method to read heartbeat dataset
def readHeartDataset():
    file_location = 'heart.dat'

    # Load training data
    df = spark.read.csv(file_location, sep=' ', schema='age float, sex float, `chest-pain` float, `rest-bp` float, `serum-chol` float, `fasting-blood-sugar` float, electrocardiographic float, `max-heart-rate` float, angina float, oldpeak float, slope float, `major-vessels` float, thal float, `heart-disease` float')

    # Normalize heart-disease column
    df = df.withColumn('heart-disease', df['heart-disease'] - 1)

    # Set feature columns
    feature_cols = ['age', 'sex', 'chest-pain', 'rest-bp', 'serum-chol', 'fasting-blood-sugar', 'electrocardiographic', 'max-heart-rate', 'angina', 'oldpeak', 'slope', 'major-vessels', 'thal']

    # Set label column
    label_col = 'heart-disease'

    return (df, feature_cols, label_col)

# Helper method to read airline satisfaction dataset
def readAirlineSatisfactionDataset():
    file_location = 'airline.csv'

    # Load training data
    df = spark.read.csv(file_location, header=True, inferSchema=True)

    # Preprocess data
    columns_to_drop = ['_c0']
    df = df.drop(*columns_to_drop)

    # Encode categorical columns
    gender_indexer = StringIndexer(inputCol = 'Gender', outputCol = 'Gender Index')
    customer_type_indexer = StringIndexer(inputCol = 'Customer Type', outputCol = 'Customer Type Index')
    type_of_travel_indexer = StringIndexer(inputCol = 'Type of Travel', outputCol = 'Type of Travel Index')
    class_indexer = StringIndexer(inputCol = 'Class', outputCol = 'Class Index')
    satisfaction_indexer = StringIndexer(inputCol = 'satisfaction', outputCol = 'Satisfaction Index')

    df = gender_indexer.fit(df).transform(df)
    df = customer_type_indexer.fit(df).transform(df)
    df = type_of_travel_indexer.fit(df).transform(df)
    df = class_indexer.fit(df).transform(df)
    df = satisfaction_indexer.fit(df).transform(df)

    columns_to_drop = ['Gender', 'Customer Type', 'Type of Travel', 'Class', 'satisfaction']
    df = df.drop(*columns_to_drop)

    # Set feature columns
    feature_cols = ['id', 'Age', 'Flight Distance', 'Inflight wifi service', 'Departure/Arrival time convenient', 
                    'Ease of Online booking', 'Gate location', 'Food and drink', 'Online boarding', 'Seat comfort', 
                    'Inflight entertainment', 'On-board service', 'Leg room service', 'Baggage handling', 'Checkin service', 
                    'Inflight service', 'Cleanliness', 'Departure Delay in Minutes', 'Arrival Delay in Minutes', 
                    'Gender Index', 'Customer Type Index', 'Type of Travel Index', 'Class Index']
    
    # Set label column
    label_col = 'Satisfaction Index'

    return (df, feature_cols, label_col)

# Helper method to read credit card fraud dataset
def readFraudDataset():
    file_location = 'fraudTrain.csv'

    # Load training data
    df = spark.read.csv(file_location, header=True, inferSchema=True)

    # Preprocess data
    columns_to_drop = ['_c0', 'trans_date_trans_time', 'cc_num', 'merchant', 'first', 'last', 'street', 
                    'city', 'state', 'zip', 'dob', 'trans_num', 'unix_time']
    df = df.drop(*columns_to_drop)

    # Encode categorical columns
    merchant_indexer = StringIndexer(inputCol='category', outputCol='categoryIndex')
    gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex')
    job_indexer = StringIndexer(inputCol='job', outputCol='jobIndex')

    df = merchant_indexer.fit(df).transform(df)
    df = gender_indexer.fit(df).transform(df)
    df = job_indexer.fit(df).transform(df)

    columns_to_drop = ['gender', 'category', 'job']
    df = df.drop(*columns_to_drop)

    df = df.withColumn('lat', col('lat') + 200)
    df = df.withColumn('long', col('long') + 200)
    df = df.withColumn('merch_lat', col('merch_lat') + 200)
    df = df.withColumn('merch_long', col('merch_long') + 200)

    # Set feature columns
    feature_cols = ['amt', 'lat', 'long', 'city_pop', 'merch_lat', 'merch_long', 'categoryIndex', 'genderIndex', 'jobIndex']

    # Set label column
    label_col = 'is_fraud'

    return (df, feature_cols, label_col)

# Helper method to read particle dataset
def readParticleDataset():
    file_location = 'SUSY.csv'

    # Load training data
    df = spark.read.csv(file_location, inferSchema=True)
    column_names = ['class', 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos_theta_r1']
    df = df.toDF(*column_names)

    df = df.withColumn('lepton_1_eta', col('lepton_1_eta') + 3)
    df = df.withColumn('lepton_1_phi', col('lepton_1_phi') + 2)
    df = df.withColumn('lepton_2_eta', col('lepton_2_eta') + 3)
    df = df.withColumn('lepton_2_phi', col('lepton_2_phi') + 2)
    df = df.withColumn('missing_energy_phi', col('missing_energy_phi') + 2)
    df = df.withColumn('axial_MET', col('axial_MET') + 17)

    # Set feature columns
    feature_cols = ['lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos_theta_r1']

    # Set label column
    label_col = 'class'

    return (df, feature_cols, label_col)

# Read dataset (change to test different datasets)
(df, feature_cols, label_col) = readParticleDataset()

# Assemble the features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features', handleInvalid="skip")

# Split the data into training and test sets
(training_data, test_data) = df.randomSplit([0.7, 0.3], seed=1)
training_data.show(n = 3)

# Initialize Naive Bayes model
nb = NaiveBayes(smoothing=1.0, modelType='multinomial', labelCol=label_col)
pipeline = Pipeline(stages=[assembler, nb])

# Start timer
start = time.time()

# Train the model
model = pipeline.fit(training_data)

# Test the model
predictions = model.transform(test_data)
predictions.show(n=5)

# End timer
end = time.time()

# Print the time taken
print(f'Time taken: {end - start}')

# Evaluate the model's performance and print accuracy
evaluator = MulticlassClassificationEvaluator(metricName='accuracy', labelCol=label_col)
accuracy = evaluator.evaluate(predictions)
print(f'Accuracy: {accuracy}')

+----+---+----------+-------+----------+-------------------+--------------------+--------------+------+-------+-----+-------------+----+-------------+
| age|sex|chest-pain|rest-bp|serum-chol|fasting-blood-sugar|electrocardiographic|max-heart-rate|angina|oldpeak|slope|major-vessels|thal|heart-disease|
+----+---+----------+-------+----------+-------------------+--------------------+--------------+------+-------+-----+-------------+----+-------------+
|29.0|1.0|       2.0|  130.0|     204.0|                0.0|                 2.0|         202.0|   0.0|    0.0|  1.0|          0.0| 3.0|          0.0|
|34.0|0.0|       2.0|  118.0|     210.0|                0.0|                 0.0|         192.0|   0.0|    0.7|  1.0|          0.0| 3.0|          0.0|
|34.0|1.0|       1.0|  118.0|     182.0|                0.0|                 2.0|         174.0|   0.0|    0.0|  1.0|          0.0| 3.0|          0.0|
+----+---+----------+-------+----------+-------------------+--------------------+-------------