In [1]:
from typing import List
from collections import deque
from pandas import DataFrame
from pyspark import SparkContext, SparkConf
from scipy.stats import entropy
from catagories import *
from copy import deepcopy
from operator import add
import pandas as pd
import time
import pyspark
import pickle

In [2]:
class Node:

    def __init__(self, id: str, level: int):
        self.id: str = id
        self.children = dict()  # split attribute value : child
        self.level = level
        self.split_attr = -1
        self.is_leaf = False
        self.prediction = -1

    def __str__(self):
        dashes = '|---' * self.level
        res = f"{str(dashes)}{self.id} \n"
        for child in self.children.values():
            res = res + str(child)
        return res


In [24]:
def build_subtree_in_memory(df: DataFrame, root: Node, available_attributes: List, max_level: int,
                            entropy_threshold: float, cnt_threshold: int):
    _df = df[df[9] == root.id]
    if len(_df) == 0:
        return

    flag = True
    for _ in available_attributes:
        if _:
            flag = False
            break
    if flag:
        root.is_leaf = True
        root.prediction = int(_df.mode(axis=0).at[0, 8])
        return

    if len(_df) <= cnt_threshold or root.level >= max_level:
        root.is_leaf = True
        root.prediction = int(_df.mode(axis=0).at[0, 8])
        return

    arrival_delay_count = [0] * len(time_level_map)
    for i, _ in enumerate(time_level_map.values()):
        arrival_delay_count[i] = len(_df[_df[catagory_map['arrival_delay']] == _])
    arrival_delay_prob = [_ / sum(arrival_delay_count) for _ in arrival_delay_count]
    h_y = entropy(arrival_delay_prob, base=2)

    if h_y < entropy_threshold:
        root.is_leaf = True
        root.prediction = int(_df.mode(axis=0).at[0, 8])
        return

    conditional_entropies = [float('inf')] * (len(catagory_map) - 1)
    for k, v in catagory_map.items():  # iterate through all the possible attributes
        if v > 7:
            continue
        if available_attributes[v]:
            h_y_V = []
            for val in catagories[v]:  # iterate through all the possible values for an attribute
                # X = v
                _df_ = _df[_df[v] == val]
                if len(_df_) == 0:
                    continue
                # compute H(Y|X=v)
                conditional_arrival_delay_count = [0] * len(time_level_map)
                for i, _ in enumerate(time_level_map.values()):
                    conditional_arrival_delay_count[i] = len(_df_[_df_[catagory_map['arrival_delay']] == _])
                conditional_arrival_delay_prob = [_ / sum(conditional_arrival_delay_count) for _ in
                                                  conditional_arrival_delay_count]
                h_y_v = entropy(conditional_arrival_delay_prob, base=2)
                h_y_V.append((h_y_v, len(_df_) / len(_df)))
            # compute H(Y|X)
            h_y_x = 0
            for a, b in h_y_V:
                h_y_x += a * b
            conditional_entropies[v] = h_y_x
    max_ig, split_attr = float('-inf'), -1
    for i in range(len(conditional_entropies)):
        if h_y - conditional_entropies[i] > max_ig:
            max_ig = h_y - conditional_entropies[i]
            split_attr = i
    root.split_attr = split_attr
    available_attributes[split_attr] = False
    for idx, val in enumerate(catagories[split_attr]):
        new_id = root.id + '#' + str(idx)
        for _ in range(len(df)):
            if df.at[_, split_attr] == val and df.at[_, 9] == root.id:
                df.at[_, 9] = new_id
        root.children[val] = Node(new_id, root.level + 1)
        root.split_attr = split_attr
        build_subtree_in_memory(df, root.children[val], deepcopy(available_attributes), max_level, entropy_threshold,
                                cnt_threshold)

In [25]:
def find_best_split(rdd, available_attributes, entropy_threshold):
    arrival_delay_cnt = rdd.map(lambda x: (x[8], 1)).reduceByKey(add).map(lambda x: x[1]).collect()
    arrival_delay_prob = [_ / sum(arrival_delay_cnt) for _ in arrival_delay_cnt]
    h_y = entropy(arrival_delay_prob, base=2)
    if h_y <= entropy_threshold:
        return -1
    total_count = rdd.count()
    splits = dict()
    for attr, values in catagories.items():
        if attr < len(available_attributes) and available_attributes[attr]:
            h_y_v = []
            for value in values:
                rdd_value = rdd.filter(lambda x: x[attr] == value)
                value_cnt = rdd_value.map(lambda x: (x[8], 1)).reduceByKey(add).map(lambda x: x[1]).collect()
                value_prob = [_ / sum(value_cnt) for _ in value_cnt]
                h_y_v.append((rdd_value.count() / total_count, entropy(value_prob, base=2)))
            h_y_x = 0
            for k, v in h_y_v:
                h_y_x += k * v
            splits[attr] = h_y_x
    best_split, best_ig = -1, float('-inf')
    for attr, entro in splits.items():
        if h_y - entro > best_ig:
            best_ig = h_y - entro
            best_split = attr
    return best_split

In [26]:
def get_mode(rdd):
    return sorted(rdd.map(lambda x: (x[8], 1)).reduceByKey(add).collect(), key=lambda x: x[1])[-1][0]

In [27]:
def build_tree(rdd, root: Node, max_level: int, entropy_threshold: float, cnt_threshold: int):
    queue = deque()
    level = 0
    attributes = [True] * (len(catagory_map) - 1)
    rdd.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    queue.append((root, attributes, rdd))
    while queue:
        size = len(queue)
        level += 1
        print(f"level:{level}")
        for _ in range(size):
            cur, available_attributes, cur_rdd = queue.popleft()
            print(f"current Node: {cur.id}")
            cnt = cur_rdd.count()
            # print(f"current rdd size: {cnt}")
            if 0 < cnt <= 5000:
                df = pd.DataFrame(cur_rdd.collect())
                build_subtree_in_memory(df, cur, deepcopy(available_attributes), max_level, entropy_threshold,
                                        cnt_threshold)
            elif cnt > 5000:
                if cur.level >= max_level:
                    cur.is_leaf = True
                    cur.prediction = get_mode(cur_rdd)
                    cur_rdd.unpersist()
                    continue
                split_attr = find_best_split(cur_rdd, available_attributes, entropy_threshold)
                if split_attr == -1:
                    cur.is_leaf = True
                    cur.prediction = get_mode(cur_rdd)
                    cur_rdd.unpersist()
                    continue
                available_attributes[split_attr] = False
                for value in catagories[split_attr]:
                    new_id = cur.id + '#' + str(value)
                    child = Node(new_id, level)
                    cur.split_attr = split_attr
                    cur.children[value] = child
                    new_rdd = cur_rdd.filter(lambda x: x[split_attr] == value).map(
                        lambda x: x[:9] + [new_id]).persist(pyspark.StorageLevel.MEMORY_AND_DISK)
                    # print(f"attribute {split_attr}, value {value}, number ", new_rdd.count())
                    if new_rdd.count() > 0:
                        queue.append((child, deepcopy(available_attributes), new_rdd))
                    else:
                        new_rdd.unpersist()
            cur_rdd.unpersist()

In [3]:
def find_prediction(model, data: List):
    if len(data) == 0:
        return -1
    cur = model
    while cur and not cur.is_leaf:
        idx = data[cur.split_attr]
        # print(cur.split_attr)
        # print(idx)
        # print(cur.children)
        cur = cur.children[idx]
    if not cur:
        return -1
    else:
        return cur.prediction

In [34]:
def train(max_level, entropy_threshold, cnt_threshold):
    appName = "FlightDelayPrediction"
    conf = SparkConf() \
        .setAppName(appName) \
        .set("spark.executor.memory", "16g") \
        .set("spark.driver.memory", "16g") \
        .set("spark.memory.offHeap.enabled", True) \
        .set("spark.memory.offHeap.size", "16g")
    sc = SparkContext(conf=conf)
    start = time.time()
    model = Node('root', 0)
    data_path = './data/train.csv'
    origin_data = sc.textFile(data_path).map(lambda line: line.split(',')).map(
        lambda x: [eval(item) for item in x] + ['root'])
    build_tree(origin_data, model, max_level, entropy_threshold, cnt_threshold)
    end = time.time()
    total_time = end - start
    print(f"Total time: {total_time // 60} min {total_time % 60} s")
    print(model)
    sc.stop()
    file = './model_0_5.pkl'
    with open(file, 'ab') as f:
        pickle.dump(model, f)

In [37]:
train()

-1

In [4]:
def check_accuracy():
    with open('./model.pkl', 'rb') as f:
        model = pickle.load(f)
        test_data = pd.read_csv("./data/test.csv", header=None)
        total_data = len(test_data)
        correct_num = 0
        result = dict()
        correct = dict()
        true = dict()
        for i in range(1, 10):
            result[i] = 0;
            correct[i] = 0;
            true[i] = 0
        for i in range(total_data):
            data_point = test_data.iloc[i,:].values
            try:
                pred = find_prediction(model, data_point)
                result[pred] += 1
                true[data_point[-1]] += 1
            except:
                print(data_point)
            if pred == data_point[8]:
                correct_num += 1
                correct[pred] += 1
        print(result)
        print(correct)
        print(true)
        print(f"Prediction Accuracy: {correct_num / total_data}")

In [6]:
check_accuracy()

[2 8 3 8 3 3 3 4 7]
[3 3 6 5 3 1 4 1 6]
{1: 9, 2: 374, 3: 646857, 4: 2686093, 5: 1161053, 6: 30216, 7: 315278, 8: 286301, 9: 298423}
{1: 5, 2: 182, 3: 233746, 4: 1020333, 5: 469821, 6: 10803, 7: 124852, 8: 166697, 9: 258695}
{1: 665, 2: 101945, 3: 875369, 4: 1528655, 5: 1407082, 6: 545335, 7: 370960, 8: 294037, 9: 300556}
Prediction Accuracy: 0.42125345140273784


In [47]:
appName = "FlightDelayPrediction"
conf = SparkConf() \
    .setAppName(appName) \
    .set("spark.executor.memory", "16g") \
    .set("spark.driver.memory", "16g") \
    .set("spark.memory.offHeap.enabled", True) \
    .set("spark.memory.offHeap.size", "16g")
sc = SparkContext.getOrCreate(conf=conf)
data_path = './data/train.csv'
origin_data = sc.textFile(data_path).map(lambda line: line.split(',')).map(lambda x: (eval(x[-1]), 1)).reduceByKey(add)
origin_data.collect()

[(1, 6266),
 (2, 917678),
 (3, 7890911),
 (4, 13749076),
 (5, 12650664),
 (6, 4900722),
 (7, 3347555),
 (8, 2652638),
 (9, 2705917)]

In [7]:
with open('./model.pkl', 'rb') as f:
    model = pickle.load(f)
    print(model)

root 
|---root#1 
|---root#2 
|---|---root#2#0 
|---|---|---root#2#0#0 
|---|---|---root#2#0#1 
|---|---|---|---root#2#0#1#0 
|---|---|---|---root#2#0#1#1 
|---|---|---|---root#2#0#1#2 
|---|---|---|---root#2#0#1#3 
|---|---|---root#2#0#2 
|---|---|---root#2#0#3 
|---|---|---|---root#2#0#3#0 
|---|---|---|---root#2#0#3#1 
|---|---|---|---root#2#0#3#2 
|---|---|---|---root#2#0#3#3 
|---|---|---|---root#2#0#3#4 
|---|---|---|---root#2#0#3#5 
|---|---|---|---root#2#0#3#6 
|---|---|---|---root#2#0#3#7 
|---|---|---root#2#0#4 
|---|---|---root#2#0#5 
|---|---|---|---root#2#0#5#0 
|---|---|---|---root#2#0#5#1 
|---|---|---|---root#2#0#5#2 
|---|---|---root#2#0#6 
|---|---|---root#2#0#7 
|---|---root#2#1 
|---|---root#2#2 
|---|---|---root#2#2#0 
|---|---|---root#2#2#1 
|---|---|---root#2#2#2 
|---|---|---root#2#2#3 
|---|---|---|---root#2#2#3#0 
|---|---|---|---root#2#2#3#1 
|---|---|---|---root#2#2#3#2 
|---|---|---root#2#2#4 
|---|---|---root#2#2#5 
|---|---|---root#2#2#6 
|---|---|---root