In [16]:
from NaiveBayes import spark, process_dataframe, sc
from math import log

## Training

In [17]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("../../DataFiles/airline-train.csv")

df, string_indexers, bucketizers = process_dataframe(df)

In [18]:
LABEL_COL = "satisfaction_index"
FEATURES_COL = list(set(df.columns) - set([LABEL_COL]))
print("Label used: ", LABEL_COL)
print("Features used: ", FEATURES_COL)

Label used:  satisfaction_index
Features used:  ['Food and drink', 'Age_bucket', 'Gender_index', 'Customer Type_index', 'On-board service', 'Leg room service', 'Flight Distance_bucket', 'Type of Travel_index', 'Arrival Delay in Minutes_bucket', 'Checkin service', 'Inflight entertainment', 'Baggage handling', 'Departure/Arrival time convenient', 'Inflight service', 'Seat comfort', 'Inflight wifi service', 'Departure Delay in Minutes_bucket', 'Class_index', 'Cleanliness', 'Gate location', 'Online boarding', 'Ease of Online booking']


In [19]:
# Prior probability
def mapper(rows):
    result = []
    for row in rows:
        result.append((row[LABEL_COL], 1))
    return result

def reducer(row):
    key, iterable = row
    count = 0
    for val in iterable:
        count += val
    return [(key, count)]

prior = df.rdd.mapPartitions(mapper).groupByKey().flatMap(reducer).collect()
total = sum([x[1] for x in prior])
prior_dict = dict([(x[0], log(x[1]/total)) for x in prior])
print("Prior probability: ", prior_dict)



Prior probability:  {0: -0.5680907948876823, 1: -0.8361084357191146}


                                                                                

In [20]:
# Posterior probability
def mapper(rows):
    result = []
    dict = {}

    for row in rows:
        for feature in FEATURES_COL:
            if row[LABEL_COL] not in dict:
                dict[row[LABEL_COL]] = {}
            if feature not in dict[row[LABEL_COL]]:
                dict[row[LABEL_COL]][feature] = {}
            if row[feature] not in dict[row[LABEL_COL]][feature]:
                dict[row[LABEL_COL]][feature][row[feature]] = 0
            dict[row[LABEL_COL]][feature][row[feature]] += 1

    for label in dict:
        for feature in dict[label]:
            for value in dict[label][feature]:
                result.append((label, (feature, value, dict[label][feature][value])))
    return result

def reducer(row):
    key, iterable = row
    dict = {}
    iterable = list(iterable)
    total_count = 0
    result = []

    for i in range(len(iterable)):
        feature = iterable[i][0]
        value = iterable[i][1]
        value_count = iterable[i][2]
        total_count += value_count
        if feature not in dict:
            dict[feature] = {}
        if value not in dict[feature]:
            dict[feature][value] = 0
        dict[feature][value] += value_count

    for feature in dict:
        for value in dict[feature]:
            result.append((key, feature, value, dict[feature][value] / total_count))
    return result

posterior = df.rdd.mapPartitions(mapper).groupByKey().flatMap(reducer).collect()
posterior_dict={}
for i in range(len(posterior)):
    label = posterior[i][0]
    feature = posterior[i][1]
    value = posterior[i][2]
    prob = posterior[i][3]
    if label not in posterior_dict:
        posterior_dict[label] = {}
    if feature not in posterior_dict[label]:
        posterior_dict[label][feature] = {}
    posterior_dict[label][feature][value] = log(prob)

print("Posterior probability: ", posterior_dict)



Posterior probability:  {0: {'Food and drink': {5: -4.857352089312658, 1: -4.837422463623326, 2: -4.567654112941535, 4: -4.718045179935243, 3: -4.566163466470388, 0: -10.045834659807948}, 'Age_bucket': {0: -5.08086940254488, 2: -5.199821345406245, 6: -5.803737392238248, 5: -5.758560324868133, 1: -5.319237292484483, 8: -5.599409022657341, 4: -5.432129571370019, 9: -5.106107335134743, 3: -5.297028059337139, 7: -5.650944685203309}, 'Gender_index': {1: -3.810374065226739, 0: -3.758673368704047}, 'Customer Type_index': {0: -3.3735982992324356, 1: -4.49287507488633}, 'On-board service': {4: -4.506954906004798, 1: -4.90783276979125, 2: -4.77558613461609, 3: -4.417250495097482, 5: -5.031159916236591, 0: -12.972574061874987}, 'Leg room service': {3: -4.483780344980447, 5: -4.913403229305589, 4: -4.680192478873656, 2: -4.5155555934948195, 1: -5.056739215390963, 0: -8.354158649136874}, 'Flight Distance_bucket': {2: -5.226561667157795, 0: -5.284729672199096, 3: -5.2082780554244685, 6: -5.295019250

                                                                                

In [21]:
def predict(row):
    dict = {}# Dictionary of label and probability
    for label in prior_dict:
        prob = prior_dict[label]
        for feature in FEATURES_COL:
            if row[feature] in posterior_dict[label][feature]:
                prob += posterior_dict[label][feature][row[feature]]
            else:
                prob += log(0.000001)# Smoothing
        dict[label] = prob
    return max(dict, key=dict.get)

In [22]:
# Trainig accuracy
def mapper(rows):
    result = []
    for row in rows:
        if predict(row) == row[LABEL_COL]:
            result.append((0,1))
        else:
            result.append((0,0))
    return result
  
def reducer(row):
    count = 0
    for val in row[1]:
        count += val
    return [(0, count)]

correct = df.rdd.mapPartitions(mapper).groupByKey().flatMap(reducer).collect()[0][1]
total = df.count()
print("Training accuracy: ", correct / total)

                                                                                

Training accuracy:  0.8941154893140529


## Validation

In [23]:
df_val = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("../../DataFiles/airline-val.csv")
df_val,_,_ = process_dataframe(df_val, string_indexers, bucketizers)

In [24]:
# validation accuracy
correct = df_val.rdd.mapPartitions(mapper).groupByKey().flatMap(reducer).collect()[0][1]
total = df_val.count()
print("Validation accuracy: ", correct / total)

[Stage 73:>                                                         (0 + 1) / 1]

Validation accuracy:  0.8908585331942996


                                                                                