In [2]:
import os.path
import numpy as np
import copy
import networkx as nx
import os
import pickle
import time

In [3]:
arquivo = os.path.join('projeto.csv')
dataRDD = (sc.textFile(arquivo, 8))
RDD1 = dataRDD.map(lambda l: l.split(',')).map(lambda l: {'label':float(l[0]),'features':[float(l[1]),float(l[2]),l[3]]})
RDD = RDD1.filter(lambda v: v['label']<=1.61) #removing outliers
weights = [.999,.001]
seed = 42
lpRDD, lpTest = RDD.randomSplit(weights, seed)

In [4]:
def preparaMMU(t):
    """Prepares data from the function minmax_unique.

    Args:
        t (dict): An example from the dataset

    Returns:
        tuple: A tuple with tuples for the values of all numeric attributes 
               and a set of the categorical attribute
                 ((attrib0,attrib0),(attrib1,attrib1), set((attrib2,)))
    """    
    return ((t['features'][0],t['features'][0]),(t['features'][1],t['features'][1]), set((t['features'][2],)))
    
def minmax_unique(t1,t2):
    """Calculates the min and max of the numeric attributes and 
       determine the unique categoric values in a single pass on the dataset.

    Args:
        t1,t2 (tuple): tuples received from the function preparaMMU

    Returns:
        tuple: A tuple with values((min value of attrib 0,max value of attrib 0),
                                   (min value of attrib 1,max value of attrib 1),
                                   {set with unique values of attrib 2})
    """
    return ((min(t1[0][0],t2[0][0]),max(t1[0][1],t2[0][1])),
            (min(t1[1][0],t2[1][0]),max(t1[1][1],t2[1][1])),
             t1[2].union(t2[2]))

In [5]:
def makeSplits(lpRDD):
    """Determine splits for all attributes.

    Args:
        lpRDD (RDD): The dataset. It can be complete or splited.

    Returns:
        tuple: A tuple with two lists (all_features, all_splits)
    """
    (min0,max0),(min1,max1),periods =lpRDD.map(preparaMMU).reduce(minmax_unique)
    
    periods = list(periods)

    
    distsplits = [i*(max0-min0)/5. for i in range(1,5)]
    timesplits = [i*(max1-min1)/5. for i in range(1,5)]    
    perisplits = [tuple([periods[j] for j in range(i+1)]) for i in range(len(periods)-1)]

    feats = [0,1,2,3]
    splits = [distsplits,timesplits,perisplits,['full']] 
    return (feats,splits)

In [6]:
def contador(lp, feats, splits):
    """Prepare data for the soma function. Given a feature and a split candidate, returns tuples for the left 
       and right splits based on the feature and split value.

    Args:
        lp (dict): An example from the dataset
        feats (list): list with features indexes on lp
        splits (list): list of lists with splits candidates for all attributes

    Returns:
        r (list): list of tuples, where each tuple contains:
        
        for numeric attributes:
            ((feat,split),(0,label.value),(0,label.value^2),(0,1)) if feat.value >= split-num
            ((feat,split),(label.value,0),(label.value^2,0),(1,0)) if feat.value < split-num
        for categoric attributes:
            ((feat,split),(0,label.value),(0,label.value^2),(0,1)) if feat.value in split-cat
            ((feat,split),(label.value,0),(label.value^2,0),(1,0)) if feat.value not in split-cat
        for the target attribute:
            ((label,'full'),(0,label.value),(0,label.value^2),(0,1))
        
    """
    r = []
    for feat in feats:
        splitss = splits[feat]
        for split in splitss:
            if feat==3:
                r.append( ((feat,split), ((0,lp['label']), (0,lp['label']**2), (0,1))))
            elif feat==0 or feat==1:
                if lp['features'][feat] >= split:
                    r.append( ((feat,split), ((0,lp['label']), (0,lp['label']**2), (0,1))))
                else:
                    r.append( ((feat,split), ((lp['label'],0), (lp['label']**2,0), (1,0))))
                
            elif feat==2:                
                if lp['features'][feat] in split:
                    r.append( ((feat,split), ((0,lp['label']), (0,lp['label']**2), (0,1))))
                else:
                    r.append( ((feat,split), ((lp['label'],0), (lp['label']**2,0), (1,0))))
                
    return r

def soma(t1,t2):
    """Sums tuples with the same key received from contador function using reduceByKey.
       Number of tuples returned equals to the sum of the number of splits of each attribute plus one.

    Args:
        t1 (tuple): A tuple containing values from the left or right split 
                    or all data for the target value
        t2 (tuple): A tuple containing values from the left or right split 
                    or all data for the target value

    Returns:
        tuple of tuples: Tuple indexed by key value (feat, split) with values:
                         ((sum of left split values,sum of right split values),
                          (squared sum of left split values, squared sum of right split values),
                          (count of left split values, count of right split values))
                        or
                         ((0,sum of target values),
                          (0, squared sum of target values),
                          (0, count of target values))
                        
    """
    return  ((t1[0][0]+t2[0][0],t1[0][1]+t2[0][1]), 
             (t1[1][0]+t2[1][0],t1[1][1]+t2[1][1]),
             (t1[2][0]+t2[2][0],t1[2][1]+t2[2][1]))

In [7]:
def calc_stats(t):
    """Calculates the mean, variance and the coefficient of variation (CV) for each (feat, split) 
       and for the full (unsplited) data.

    Args:
        t (tuple): A tuple received from the function soma

    Returns:
        A tuple with tuples ((feat,split),(left variance, left CV, left count),
                                          (right variance, right CV, right count) )
    """
    if t[1][2][0] > 0:
        mmenor = t[1][0][0] / t[1][2][0]
        sdmenor = (t[1][1][0] - (t[1][0][0]**2 / t[1][2][0]))/(t[1][2][0])
        cvmenor = np.sqrt(sdmenor)/mmenor*100
        menor = (sdmenor, cvmenor,t[1][2][0])
    else:
        menor = (0, 0, 0)
    
    if t[1][2][1] > 0:    
        mmaior = t[1][0][1] / t[1][2][1]
        sdmaior = (t[1][1][1] - (t[1][0][1]**2 / t[1][2][1]))/(t[1][2][1])
        cvmaior = np.sqrt(sdmaior)/mmaior*100
        maior = (sdmaior, cvmaior,t[1][2][1])
    else:
        maior = (0, 0, 0)
    return (t[0], menor, maior)

def informationGain(t,c):
    """Calculates de Information Gain.

    Args:
        t (tuple): tuple with ((feat,split),(left variance, left CV, left count),
                                                 (right variance, right CV, right count))
        c (tuple): tuple with ((target,full),(0, 0, 0),(full variance, full CV, full count))

    Returns:
        tuple with ((feat,split), informationGain, (left CV,right CV))
    """
    return ((t[0], c[2][0] - (t[1][2]/c[2][2])*t[1][0] - (t[2][2]/c[2][2])*t[2][0], (t[1][1], t[2][1]))) 

In [8]:
def infoGainSplit(lpRDD,feats,splits):
    """Select the tuple (feat,split) with the greatest information gain

    Args:
        lpRDD (RDD): The dataset. It can be complete or splited.
        feats (list): List with the attribute indexes.
        splits (list): List with all the splits candidates.

    Returns:
        e (tuple): A tuple containing the split with max information gain. Also contains left and right CVs.
                   ((feat,split), maxInformationGain, (left CV,right CV))
    """
    results = lpRDD.flatMap(lambda lp: contador(lp,feats,splits)).reduceByKey(soma)    
    statistics = results.map(lambda c: calc_stats(c))    
    classe = statistics.filter(lambda x: x[0][0]==3).collect()[0]
    es = statistics.filter(lambda x: x[0][0]<3).map(lambda x: informationGain(x,classe)).collect()          
    e = max(es, key = lambda t: t[1]) #atributo / split escolhido   
    return e

In [9]:
def splitData(lpRDD, e):
    """Splits the data given a chosen (feat,split).

    Args:
        lpRDD (RDD): The dataset. It can be complete or splited.
        e (tuple): Chosen (feat,split) with max Information Gain.

    Returns:
        tuple: A tuple with RDDs splited at attribute feat and value split: (leftRDD,rightRDD)
    """
    if e[0][0]<2:
            menorRDD = lpRDD.filter(lambda v: v['features'][e[0][0]]<e[0][1])
            maiorRDD = lpRDD.filter(lambda v: v['features'][e[0][0]]>e[0][1])
    elif e[0][0]==2:
            menorRDD = lpRDD.filter(lambda v: v['features'][e[0][0]] in e[0][1])
            maiorRDD = lpRDD.filter(lambda v: v['features'][e[0][0]] not in e[0][1])
    return (menorRDD,maiorRDD)

In [10]:
def makeTree(lpRDD, maxDepth=1, currDepth=0, st=None, arvore={}, pos=0):
    """Build the Decision Tree model. This function should not be called by the user. 
       Call buildTree instead.

    Args:
        lpRDD (RDD): The dataset. It can be complete or splited.
        maxDepth (int): the max depth the tree can grow.
        currDepth (int): Current depth of the tree. Should not be changed.
        st (tuple): Tuple with the statistics of the chosen split. Should not be changed.
        arvore (dict): The model which will be returned at the end of the processing. Should not be changed.
        pos (int): Position of the node on the tree. Should not be changed.

    Returns:
        str: A string with 's' added to it.
    """
    if st!=None:
        if st[0]<10 or maxDepth<currDepth:
            media = lpRDD.map(lambda x: x['label']).mean()
            arvore[pos] = media
            return arvore 
    

    feats,splits = makeSplits(lpRDD)
    
    e = infoGainSplit(lpRDD,feats,splits)
    
    menorRDD,maiorRDD = splitData(lpRDD, e)
    
    arvore[pos] = e[0]
    
    amenor = makeTree(menorRDD, maxDepth, currDepth+1,(e[2][0], e[1]),arvore,pos=(2*pos+1))
    amaior = makeTree(maiorRDD, maxDepth, currDepth+1,(e[2][1], e[1]),arvore,pos=(2*pos+2))
    return arvore

In [11]:
def buildTree(lpRDD, maxDepth=1):
    """Build the Decision Tree model.

    Args:
        lpRDD (RDD): The full dataset.
        maxDepth (int): the max depth the tree can grow (deeper leafs will be on depth maxDepth+1)

    Returns:
        dict: The decision Tree model.
    """
    return makeTree(lpRDD, maxDepth=1, currDepth=0, st=None, arvore={}, pos=0)
    

In [19]:
def predict(arvore, lp):
    def pred(arvore, lp, i=0):
        """Makes a prediction.

        Args:
            arvore (dict): The decision Tree model.
            lp (dict): A new data we wish to determine the target value.
            i (int): position on the tree. Should not be changed by the user.

        Returns:
            str: A string with 's' added to it.
        """
        if type(arvore[i]) != tuple:
            if arvore[i]!=0:
                return (lp['label'], arvore[i])
            else:
                return (lp['label'], lp['label'])

        node = arvore[i]

        if node[0]<2:
            if lp['features'][node[0]]<node[1]:
                return pred(arvore, lp, 2*i+1)
            else:
                return pred(arvore, lp, 2*i+2)
        elif node[0]==2:
            if lp['features'][node[0]] in node[1]:
                return pred(arvore, lp, 2*i+1)
            else:
                return pred(arvore, lp, 2*i+2)
    return pred(arvore, lp)

In [13]:
def calcError(lpTest, arvore):
    """Calculates the Residual Sum of Squares for predictions on a test set.

    Args:
        lpTest (RDD): A test dataset with data unseen on training.
        arvore (dict): The decision Tree model.

    Returns:
        int: The Residual Sum of Squares of the predictions.
    """
    sqr = lpTest.map(lambda x: predict(arvore, x)).map(lambda x: (x[0]-x[1])**2).sum()
    return sqr

In [14]:
def savetree(arvore, filename):
    """Saves the model to disk

    Args:
        arvore (dict): The decision Tree model.
        filename (str): String with the saved model filename

    """
    with open(filename, 'wb') as handle:
        pickle.dump(arvore, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [15]:
def readtree(filename):
    """Load the model from disk.

    Args:
        filename (str): String with the saved model filename

    Returns:
        arvore (dict): The decision Tree model.
    """
    with open(filename, 'rb') as handle:
        arvore = pickle.load(handle)
    return arvore

In [16]:
def treeimage(arvore, filename):
    """Generates a visualization of the model and save to an image file.
       Needs PyDot and standalone application Graphviz installed.

    Args:
        arvore (dict): The decision Tree model.
        filename (str): the image filename.
        
    """
    G = nx.DiGraph()
    for v,l in arvore.items():
        if type(l)==tuple:
            if type(l[1])!=tuple:
                G.add_node(v,label=(l[0],'{0:.2f}'.format(l[1])))
            else:
                G.add_node(v,label=l)
            
        else:
            G.add_node(v,label='{0:.2f}'.format(l))

    for v in G.nodes():
    
        if 2*v+1 in G:
            G.add_edge(v,2*v+1)
        if 2*v+2 in G:
            G.add_edge(v,2*v+2)

    os.environ["PATH"] += os.pathsep + 'C:/Program Files (x86)/Graphviz2.38/bin/'
    p=nx.drawing.nx_pydot.to_pydot(G)
    p.write_png(filename)

In [17]:
ti=time.time()
arvore = buildTree(lpRDD,maxDepth=1)
tf = time.time()
print('Tempo total de construção paralelizado (local[4]): %.2f segundos'%(tf-ti))

Tempo total de construção paralelizado (local[4]): 297.28 segundos


In [20]:
print(calcError(lpTest, arvore))

103.74703903274495
