### Importando as dependecias

In [1]:
import csv
from pyspark.sql.types import StringType
import random
from math import log, sqrt
import logging

### Função que retorna um RDD a partir de um CSV

In [2]:
def csv_to_rdd(filepath):
    rdd = sc.textFile(filepath).mapPartitions(lambda line: csv.reader(line, delimiter=','))
    rdd_2 = rdd.map(lambda row: (int(row[0]), row[1], row[2], int(row[3]), row[4],row[5], row[6], row[7], row[8], int(row[9]),int(row[10]), int(row[11]), row[12], row[13]))
    rdd_3 = rdd_2.map(lambda x: x.strip() if isinstance(x, str) else x)
    return rdd_3

## DecisionTreeClassifier

### Fit

In [3]:
def choose_random_features(row):
    num_features = len(row) - 1
    return random.sample(range(num_features), int(sqrt(num_features)))

In [4]:
def get_features_subset(row, features_indexes):
    return [row[i] for i in features_indexes]

In [5]:
def unique_counts(rows):
    results = {}
    for row in rows:
        r = row[len(row) - 1]
        if r not in results:
            results[r] = 0
        results[r] += 1
    return results

In [6]:
def entropy(rows):
    log2 = lambda x: log(x) / log(2)
    results = unique_counts(rows)
    ent = 0.0
    for r in results.keys():
        p = float(results[r]) / len(rows)
        ent = ent - p * log2(p)
    return ent

In [7]:
def divide_set(rows, column, value):
    split_function = None
    if isinstance(value, int) or isinstance(value, float):
        split_function = lambda row: row[column] >= value
    else:
        split_function = lambda row: row[column] == value

    set1 = [row for row in rows if split_function(row)]
    set2 = [row for row in rows if not split_function(row)]

    return set1, set2

In [8]:
def build_tree(rows,depth):
    if len(rows) == 0:
        return 0
    if depth == 0:
        return 1

    current_score = entropy(rows)
    best_gain = 0.0
    best_criteria = None
    best_sets = None
    column_count = len(rows[0]) - 1

    for col in range(0, column_count):
        column_values = {}
        for row in rows:
            column_values[row[col]] = 1
        for value in column_values.keys():
            set1, set2 = divide_set(rows, col, value)

            p = float(len(set1)) / len(rows)
            gain = current_score - p * entropy(set1) - (1 - p) * entropy(set2)
            if gain > best_gain and len(set1) > 0 and len(set2) > 0:
                best_gain = gain
                best_criteria = (col, value)
                best_sets = (set1, set2)

    if best_gain > 0:
        trueBranch = build_tree(best_sets[0], depth - 1)
        falseBranch = build_tree(best_sets[1], depth - 1)
        return {'col':best_criteria[0], 'value':best_criteria[1], 'results':None, 'tb':trueBranch, 'fb':falseBranch}
    else:
        return {'col':-1, 'value':None, 'results':unique_counts(rows), 'tb':None, 'fb':None}

In [9]:
def tree_fit(rows, features_indexes):
    if len(rows) < 1:
        raise ValueError("Nao ha amostras suficientes no dataset de entrada.")

    rows = [get_features_subset(row, features_indexes) + [row[-1]] for row in rows]
    
    #return (build_tree(rows,-1), features_indexes)
    return (build_tree(rows,-1), features_indexes)

### Predict

In [10]:
def tree_classify(observation, tree):
    if tree['results'] is not None:
        return list(tree['results'].keys())[0]
    else:
        v = observation[tree['col']]
        branch = None
        if isinstance(v, int) or isinstance(v, float):
            if v >= tree['value']:
                branch = tree['tb']
            else:
                branch = tree['fb']
        else:
            if v == tree['value']:
                branch = tree['tb']
            else:
                branch = tree['fb']
        return tree_classify(observation, branch)

In [11]:
def tree_predict(features, features_indexes, tree):
    if not all(i in range(len(features))
            for i in features_indexes):
        raise ValueError("As variaveis passadas nao batem com o conjunto utilizado para treino")
    features = get_features_subset(features, features_indexes)

    return tree_classify(features, tree)

### Random Forest Classifier

In [12]:
#def predict(trees, feature, features_indexes):
def predict(trees, feature):
    predictions = []

    for tree in trees:
        predictions.append(tree_predict(feature, tree[1], tree[0]))

    return max(set(predictions), key=predictions.count)

### Lendo o dataset e quebrando ele em 60 partições para que o Random Forest tenha 60 Árvores

In [13]:
data = csv_to_rdd("../data/income.csv").repartition(60)

### Função que cria lista de listas em cada partição, pois precisamos de um RDD do tipo [indice da partição, ((linha 1),(linha 2)...)]

In [14]:
def dataset_tree(partition):
    final_iterator = []
    for sub_list in partition:
        final_iterator.append(tuple(sub_list))
    return iter(final_iterator)

### Cirando um RDD de treino e um RDD de teste

In [15]:
weights = [.7, .3]
seed = 42
train, test = data.randomSplit(weights, seed)

In [16]:
train.getNumPartitions()

60

In [17]:
train.count()

22727

In [24]:
train_ = train.sample(False,0.5, seed)

In [25]:
train_.count()

11370

In [35]:
train_withReplacement = train.union(train_).repartition(60)

In [36]:
train_withReplacement.count()

34097

In [38]:
train_withReplacement.getNumPartitions()

60

### Executando o Random Forest e calculando tempo de execução

In [39]:
from timeit import default_timer as timer

#Inicio da execucao
start = timer()

#Criando o dataset que sera utilizado em cada uma das 60 arvores
rdd_trees = train_withReplacement.mapPartitionsWithIndex(lambda index, part: (yield index, list(dataset_tree(part))))

#seleciona aleatoriamente as variaveis que serao utilizadas
#features_indexes = choose_random_features(rdd_trees.first()[1][0])

#Calcula e retorna cada uma das 60 árvores
#result_trees = rdd_trees.map(lambda x: tree_fit(x[1], features_indexes)).collect()
result_trees = rdd_trees.map(lambda x: tree_fit(x[1], choose_random_features(x[1][0]))).collect()

#Prediz a classe de cada linha do dataset de teste
#my_predict = test.map(lambda x: (predict(result_trees, x[:-1], features_indexes), x[-1]))
my_predict = test.map(lambda x: (predict(result_trees, x[:-1]), x[-1]))

#Calcula taxa de erro
total_count = my_predict.count()

errors_count = my_predict.filter(lambda x: x[0]!= x[1]).count()

print ('Error rate: ' + str(errors_count / total_count * 100))

end = timer()

print('Execution took: %s secs' % (end - start))

Error rate: 18.954647142566607
Execution took: 16.193546270000297 secs
