In [2]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [3]:
txtFile=sc.textFile('../data/covtype.csv')
#Convert it into RDD of lists 
rdd=(txtFile.map(lambda x:x.split())
    .map(lambda x: x[0].strip("'").split(","))
    .map(lambda x:[float(v) for v in x])
    .map(lambda x: (x[-1]-1,x[0:-1])))

In [4]:
import itertools
soil_list =[]
for k in range(40):
    string = 'Soil_Type_' + str(k+1)
    soil_list.append(string)
WA_list =[]
for k in range(4):
    string = 'WA_' + str(k+1)
    WA_list.append(string)
names = [['Elevation'], ['Aspect'], ['Slope'], ['HDHyrdo'], ['VDHydro'], ['HDRoadways'], \
         ['9amHills'],['NoonHills'], ['3pmHills'], ['HDFirePoints'], WA_list,\
         soil_list, ['Cover_Type']]
columns = list(itertools.chain(*names))

In [5]:
import random

In [90]:
#selecting random features
m = 5 #No of features
indices = range(len(columns)-1)
random.shuffle(indices)
sampled_c = [columns[j] for j in indices[:m]]

In [91]:
#Code for implementing in rdd
%time sampled_rdd = rdd.map(lambda x: (x[0],[x[1][j] for j in indices[:m]]))

CPU times: user 24 µs, sys: 1 µs, total: 25 µs
Wall time: 28.1 µs


In [8]:
#Imports - move later to top
import numpy as np

In [9]:
#total counts of the data
t_count = rdd.count()

In [87]:
##Implementing the ID3 and testing the Entropy
test_index = 0
%time entrop_score = entropy_score(sampled_rdd,test_index)
%time entrop_score = entrop_score.take(1)[0][1]

CPU times: user 19.9 ms, sys: 5.2 ms, total: 25.1 ms
Wall time: 40 ms
CPU times: user 8.84 ms, sys: 2.15 ms, total: 11 ms
Wall time: 8 s


In [10]:
def entropy_score(sampled_rdd,index):
    new_rdd = sampled_rdd.map(lambda x:x[1][index]).map(lambda x:(x,1.0)).reduceByKey(lambda a,b:a+b)\
            .map(lambda x:(x[0],(-x[1]/t_count)*(np.log2(x[1]/t_count))))\
            .map(lambda x:(index,x[1])).reduceByKey(lambda a,b:a+b)
    return new_rdd

In [28]:
def get_all_scores(sampled_rdd,m):
    new_rdd = entropy_score(sampled_rdd,0)
    for idx in range(1,m):
        rdd_to_join = entropy_score(sampled_rdd,idx)
        new_rdd = new_rdd.union(rdd_to_join)
    return new_rdd

In [37]:
%time entropy_score(sampled_rdd,1)

[(1, 0.14900598458760123)]

In [29]:
%time new_rdd = sampled_rdd.map(lambda x:x[1][test_index]).map(lambda x:(x,1.0)).reduceByKey(lambda a,b:a+b)\
            .map(lambda x:(x[0],(-x[1]/t_count)*(np.log2(x[1]/t_count))))\
            .map(lambda x:(1,x[1])).reduceByKey(lambda a,b:a+b)

CPU times: user 14.6 ms, sys: 3.09 ms, total: 17.7 ms
Wall time: 31.9 ms


In [30]:
%time print get_all_scores(sampled_rdd,m).collect()

[(0, 0.16207143284308864), (3, 0.027547699424360673), (6, 0.0025077543955656055), (9, 0.0095463517593474476), (1, 0.14900598458760123), (4, 6.5981678852644112), (7, 11.375626917177499), (2, 0.71862713465838812), (5, 0.031695711359855931), (8, 7.2756316788852136)]
CPU times: user 159 ms, sys: 36 ms, total: 195 ms
Wall time: 1min 31s


In [88]:
def infoGain(sampled_rdd, attr=0, target_attr=0):
    subset_entropy = 0
    val_freq = sampled_rdd.map(lambda x:x[1][attr]).map(lambda x:(x,1.0)).reduceByKey(lambda a,b:a+b)\
            .map(lambda x:(x[0],x[1]/t_count))
    val_freq = val_freq.collectAsMap()
    classes = sampled_rdd.map(lambda x: x[1][attr]).distinct().collect()
    print 'numClasses:',len(classes)
    for cls in classes:
        sub_rdd=sampled_rdd.filter(lambda x: x[1][attr]==cls)
        try:
            if sub_rdd.isEmpty() == True:
                continue
        except:
            print "Error in calculation. Moving on"
            continue
        subs_entropy = entropy_score(sub_rdd,target_attr)
        entropy = subs_entropy.collect()
        entropy = entropy[0][1]
        subset_entropy += entropy*val_freq[cls]
    return entropy_score(sampled_rdd,target_attr).collect()[0][1] - subset_entropy




In [41]:
%time infoGain(sampled_rdd,attr=0,target_attr=1)

{0.0: 0.9762380122957873, 1.0: 0.02376198770421265}
0.530139880469
0.544515664964
CPU times: user 97.2 ms, sys: 22.2 ms, total: 119 ms
Wall time: 44.1 s


-0.39550968037623996

In [78]:
def choose_attribute(sampled_rdd, attributes, target_attr):
    best_gain = 0.0
    best_attr = None
    for attr in attributes:
        gain = infoGain(sampled_rdd,attr,target_attr)
        print attr,gain
        if (gain >= best_gain and attr != target_attr):
            best_gain = gain
            best_attr = attr
    
    return best_gain,best_attr

In [92]:
indices[:m]

[50, 41, 34, 39, 27]

In [93]:
%time choose_attribute(sampled_rdd,range(m),0)

numClasses: 2
0 0.00560298911277
numClasses: 2
1 -0.00235627158851
numClasses: 2
2 -0.00208572828512
numClasses: 2
3 -0.00650925980561
numClasses: 2
4 -0.00148830226104
CPU times: user 505 ms, sys: 114 ms, total: 619 ms
Wall time: 3min 46s


(0.0, None)