In [54]:
# all import
from functools import reduce
from itertools import islice
import multiprocessing
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

## Mushroom data cleaning
This part clean the data and spilt the data for using.

In [55]:
# Mushroom Data Set
# https://archive.ics.uci.edu/ml/datasets/Mushroom

# here the path might be different when the file dir have changed
data = pd.read_csv('Downloads/mushrooms.csv', delimiter=',', header=0)
#exclude any training example that has missing values for stalk root
data = data[data['stalk-root'] != '?']

#first we must do a bit of data inspection and cleaning
#y is the first col of the data
y = data.iloc[:,0]
#X is the data except the first col
X = data.iloc[:,1:]
#Cn is the class name, here we actually have 2 class e and p
Cn=len(np.unique(y))

n,d = X.shape

print ("initial samples: {}".format(n))
print ("number of features: {}".format(d))
print ("number of class labels: {}".format(Cn))
print ()

print ("Class Labels are: {}".format(np.unique(y)))
print ()

print ("Take a look at unique outcomes per feature")
for i in range(0,d):
	print ("{}th: {}".format(i,np.unique(X.iloc[:,i])))

print ()
print ("Remove stalk-root feature because it has some missing data")
print ("Remove veil-type feature because it is always 'p'")
#X = np.delete(X,(10,15),axis=1)
X = X.drop(labels="stalk-root",axis=1)
X = X.drop(labels="veil-type",axis=1)

n,d = X.shape
# dictionary master list of unique features
featureDict = {}
for i in range(0,d):
	featureDict[i]= np.unique(X.iloc[:,i])

print ()
print ("After removing the two features")
print ("number of features: {}".format(d))

# split data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y)
n_train = len(X_train)
n_test = len(X_test)

print ()
print ("number of training samples: {}".format(n_train))
print ("number of test samples: {}".format(n_test))

# Isolate the training set based on classification label
X_train_e = X_train[y_train=='e']
X_train_p = X_train[y_train=='p']

initial samples: 5644
number of features: 22
number of class labels: 2

Class Labels are: ['e' 'p']

Take a look at unique outcomes per feature
0th: ['b' 'c' 'f' 'k' 's' 'x']
1th: ['f' 'g' 's' 'y']
2th: ['b' 'c' 'e' 'g' 'n' 'p' 'w' 'y']
3th: ['f' 't']
4th: ['a' 'c' 'f' 'l' 'm' 'n' 'p']
5th: ['a' 'f']
6th: ['c' 'w']
7th: ['b' 'n']
8th: ['g' 'h' 'k' 'n' 'p' 'r' 'u' 'w' 'y']
9th: ['e' 't']
10th: ['b' 'c' 'e' 'r']
11th: ['f' 'k' 's' 'y']
12th: ['f' 'k' 's' 'y']
13th: ['b' 'c' 'g' 'n' 'p' 'w' 'y']
14th: ['b' 'c' 'g' 'n' 'p' 'w' 'y']
15th: ['p']
16th: ['w' 'y']
17th: ['n' 'o' 't']
18th: ['e' 'l' 'n' 'p']
19th: ['h' 'k' 'n' 'r' 'u' 'w']
20th: ['a' 'c' 'n' 's' 'v' 'y']
21th: ['d' 'g' 'l' 'm' 'p' 'u']

Remove stalk-root feature because it has some missing data
Remove veil-type feature because it is always 'p'

After removing the two features
number of features: 20

number of training samples: 4233
number of test samples: 1411


##  Sequence MapReduce

In [56]:
def seq_map(col):
    #print(type(col))
    resultDict = {}
#  put all attribute to the list
    attributeList = list(np.unique(col))
#  for every attribute calculate the total number
    for attribute in attributeList:
        sumValue = col.isin([attribute]).sum()
        resultDict[attribute] = sumValue
    return pd.Series(resultDict)

def seq_reduce(result,mappedData):
    #print(type(mappedData))
    attribute = mappedData[0]
    #
    #sumValues = mappedData[1][mappedData[1].notnull()].sum(axis = 0)
    #attributeValue = mappedData[1][mappedData[1].notnull()] / sumValues

    # calculate the probability P(Attribute | Class)
    attributeValue = mappedData[1][mappedData[1].notnull()] / len(mappedData)
    result[attribute] = attributeValue
    # print(result)
    return result

def seq_MapReduce(data):
    # print(data.apply(seq_map, axis=0))
    return pd.DataFrame(reduce(seq_reduce,data.apply(seq_map, axis=0).items(),{}))

In [57]:

#testing block
#a = X_train_e.apply(seq_map, axis=0)
#print(pd.DataFrame(reduce(seq_reduce,a.items(),{})))
#print(seq_MapReduce(X_train_e))

## ParallelMapReduce

In [58]:
def mapper_run(data):
         result = data.apply(seq_map,axis=0)
         return result

def reduce_run(data):
         result = reduce(seq_reduce, data.items(),{})
         return result

def get_result(data):
    results = pd.DataFrame(data[0])
    for item in range(1, len(data)):
        results = pd.concat([results,pd.DataFrame(data[item])],axis=1)
    return results

class ParallelMapReduce(object):
    def __init__(self, map_func, reduce_func, num_workers=None):
        self.num_workers = num_workers
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)

    def partition(self, n, iterable):
        i = iter(iterable)
        piece = list(islice(i, n))
        while piece:
            yield piece
            piece = list(islice(i, n))

    # def spilt_data(self,data):
    #     i = iter(data.columns.values)
    #     piece = data[i]
    #     while piece:
    #         yield piece
    #         piece = data[i]
    def mapper_run(self,data):
         result = data.apply(self.map_func,axis=0)
         return result

    def reduce_run(self,data):
         result = reduce(self.reduce_func, data.items(),{})
         return result

    def __call__(self, inputs):
        # print(type(inputs))
        # -------
        spilt_data = np.array_split(inputs,self.num_workers,axis = 1)
        spilt_data = pd.Series(spilt_data)
        # print("**********")
        # print(type(spilt_data))
        # -------
        values = self.pool.imap(mapper_run,spilt_data)
        # values = self.pool.map(self.map_func,spilt_data)
        #print ('>>> MAPPED VALUES (%s values): %s, ...' % (len(values), str(values[:10])))

        # deal with data again
        values = pd.concat(values,axis=1)
        # print("----------")
        # print(type(values))
        # print(values)
        # spilt data
        values = np.array_split(values,self.num_workers,axis=1)
        # print(type(values))
        # values = pd.DataFrame(values)
        # print(type(values))

        # values = self.pool.map(self.reduce_func({},values.items()),values)
        values = self.pool.map(reduce_run,values)
        # values = self.pool.map(reduce(self.reduce_func,values.items(),{}),values)
        values = get_result(values)
        # print ('>>> REDUCED VALUES', values)

        return values

In [59]:
# test block
#i = iter(X_train_e.columns.values)
#print(next(i))
#print(X_train_e["cap-shape"])
# mapreduce = ParallelMapReduce(seq_map, seq_reduce, 10)
# print (mapreduce(X_train_e))

##  Map reduce the data

In [60]:
#train from data in sequence version
trained_e = seq_MapReduce(X_train_e)
trained_p = seq_MapReduce(X_train_p)
trained_X = seq_MapReduce(X_train)
trained_y = y_train.value_counts()
trained_e.loc["r"] = 1
trained_p.loc["r"] = 1
#trained_y = seq_MapReduce(y_train)

In [61]:
# -----------here is the parallel version train---------------------------------------------
# mapreduce = ParallelMapReduce(seq_map, seq_reduce, 10)
# trained_e = mapreduce(X_train_e)
# trained_p = mapreduce(X_train_p)
# trained_X = mapreduce(X_train)
# trained_e.loc["r"] = 1
# trained_p.loc["r"] = 1
# --------------------------------------------------------------------------------------

##  Classifier
This classifier using the MAP rules to guss the class of the target mushroom

In [62]:
# probability dictionary in data frame of the class P
def probability_of_P(row):
    probabilityP = trained_y["p"] / len(y_train)
    probabilityResult = 1
    for item in row.items():
        probabilityResult = probabilityResult * trained_p[item[0]].loc[item[1]]
    probabilityResult = probabilityP * probabilityResult
    if np.isnan(probabilityResult):
        probabilityResult = 0
    #print(probabilityResult)
    return probabilityResult
#X_test.apply(probability_of_P,axis=1)

# probability dictionary in data frame of the class E
def probability_of_E(row):
    probabilityE = trained_y["e"] / len(y_train)
    probabilityResult = 1
    for item in row.items():
        probabilityResult = probabilityResult * trained_e[item[0]].loc[item[1]]
    probabilityResult = probabilityE * probabilityResult
    if np.isnan(probabilityResult):
        probabilityResult = 0
    #print(probabilityResult)
    return probabilityResult
#X_test.apply(probability_of_E,axis=1)

In [63]:
# compare the probability of both
def classifier(row):
    probabilityP = probability_of_P(row)
    probabilityE = probability_of_E(row)
    #print(probabilityP)
    # if probabilityP higher than probabilityE, make the decision p
    if probabilityP > probabilityE:
        result = pd.Series(["p"])
        return result
    # else, make the decision e
    else:
        result = pd.Series(["e"])
        return result


## Generation of test results

In [64]:
test_result_1 = X_test.apply(classifier,axis=1)
resultDict = dict(test_result_1[0])
correctDict = dict(y_test)
#print(dict(test_result_1[0]))
#print(dict(y_test))
#print(len(resultDict.items()& correctDict.items()))
print ()
print ("Probability of correct prediction:")
print ("{:.2%}".format(len(resultDict.items()& correctDict.items()) / len(correctDict)))


Probability of correct prediction:
99.08%
