In [None]:
import pandas as pd
import numpy as np
from path import Path  # pip install --user path.py
import re
from IPython.display import display
from pprint import pprint
import netCDF4
from IPython.core.debugger import Pdb
from collections import namedtuple
from random import randint, choice

## Work with topology

In [None]:
def build_extended_node_pairs(node_pairs):
    inversed_node_pairs = node_pairs.rename(columns={"node1": "node2", "node2": "node1"})
    return pd.concat(
        [node_pairs, inversed_node_pairs],
        ignore_index = True, verify_integrity=True
    ) \
        .drop_duplicates(subset=["node1", "node2"]) \
        .set_index(["node1", "node2"], verify_integrity=True)

In [None]:
node_pairs = pd.read_pickle("paths_with_classes.pkl").drop("shortest_path", axis=1)
classes = pd.read_pickle("classes.pkl")

In [None]:
extended_node_pairs = build_extended_node_pairs(node_pairs)

## This program can only work with data results with following properties

In [None]:
MSG_LEN_START = 0
MSG_LEN_END = 10000  # last message length should be 9900
MSG_LEN_STEP = 100
STEPS_COUNT = (MSG_LEN_END - MSG_LEN_START) // MSG_LEN_STEP - 1
LENGTHS = range(MSG_LEN_START, MSG_LEN_END, MSG_LEN_STEP)

assert MSG_LEN_START + (STEPS_COUNT + 1) * MSG_LEN_STEP == MSG_LEN_END

## Code for loading benchmark results

In [None]:
TestResults = namedtuple("TestResults", ["hostnames", "medians"])

In [None]:
def matrix_to_table(matrix):
    table = matrix.stack().reset_index()
    table.columns = ["node1", "node2", "ping"]
    return table

In [None]:
def read_benchmark_hostnames(path_to_file):
    lines = path_to_file.lines()
    return (re.match(r"^(n\d{5})\.", line).groups()[0] for line in lines)

In [None]:
def import_data(directory):
    hostnames = tuple(read_benchmark_hostnames(directory.joinpath("network_hosts.txt")))
    with netCDF4.Dataset(directory.joinpath("network_median.nc"), "r")  as dataset:
        step_len = dataset["step_length"][0]
        start_len = dataset["begin_mes_length"][0]
        end_len = dataset["end_mes_length"][0]
        
        assert len(hostnames) == dataset["proc_num"][0]
        assert dataset["test_type"][0] == 1  # one-to-one
        assert start_len == MSG_LEN_START
        assert end_len == MSG_LEN_END  
        assert step_len == MSG_LEN_STEP
        
        # build matrices where columns represent node1, rows represent node2,
        # cells contain ping values
        matrices = (
            pd.DataFrame(dataset["data"][index], index=hostnames, columns=hostnames)
            for index in range(STEPS_COUNT + 1)
        )

        # convert matrices to tables
        tables = {
            length: matrix_to_table(matrix)
                  for (length, matrix) in zip(LENGTHS, matrices)
        }
    return TestResults(hostnames=hostnames, medians=tables)

In [None]:
test_results = import_data(Path("/home/shibbiry/Dropbox/documents/msu/bachelors_thesis_cluster_topology/test_results/2017-02-12__118_nodes/"))

## Predict using topology and benchmark results

In [None]:
def check_all_classes_covered(extended_node_pairs, hostnames):
    """Fails an assertion if there is a class of pairs
    that was not covered by the test"""
    # WORKING HERE RIGHT NOW
    pairs_tested = pd.DataFrame.from_records(
        ({"node1": node1, "node2": node2}
         for node1 in hostnames for node2 in hostnames)
    )
    pairs_tested_with_class = pairs_tested \
        .join(extended_node_pairs, on=["node1", "node2"])
    assert len(pairs_tested_with_class["class_"].unique()) == len(classes)

In [None]:
class Predictor():
    """Predicts ping for a packet with specific message_size between 2 nodes,
    measured in seconds."""
    def __init__(self, extended_node_pairs, test_results):
        # build tables with 2 columns each: class_, ping. There will be many rows with same class_
        pings_classes = (
            test_results.medians[msg_len] \
                .join(extended_node_pairs, on=["node1", "node2"], how="left") \
                .reset_index(drop=True)
            for msg_len in LENGTHS
        )
        
        # reverse lookup table (by message length and class)
        self._data = pd.concat(
            {
                msg_len: df.groupby("class_").mean()
                for (msg_len, df) in zip(LENGTHS, pings_classes)
            },
            names=["msg_len", "class_"]
        ).rename(columns={"ping": "mean_of_medians_ping"})
        
        check_all_classes_covered(extended_node_pairs, test_results.hostnames)
        self._extended_node_pairs = extended_node_pairs
    
    def _get_class(self, node1, node2):
        return self._extended_node_pairs.loc[node1, node2]["class_"]
    
    def predict_many(self, df):
        """df must have columns: msg_len, node1, node2.
        Returns table with rows in the same order,
        all other columns dropped and column of ping predictions appended."""
        return df \
            .join(self._extended_node_pairs, on=["node1", "node2"], how="left") \
            .join(self._data, on=["msg_len", "class_"], how="left") \
            .rename(columns={"mean_of_medians_ping": "predicted_ping"}) \
            .drop(labels=["class_"], axis=1)
    
    def predict(self, msg_len, node1, node2):
        """This function takes about 1ms"""
        return self._data.loc[msg_len, self._get_class(node1, node2)].iloc[0]

In [None]:
predictor = Predictor(extended_node_pairs, test_results)

## Test it

In [None]:
assert predictor.predict(4000, "n48003", "n48009") == 5.0074094301694399e-06

In [None]:
def get_random_samples(count):
    return node_pairs.sample(n=count)[["node1", "node2"]] \
        .assign(msg_len=np.random.randint(0, STEPS_COUNT, size=count) * MSG_LEN_STEP)

In [None]:
%%timeit
predictor.predict_many(get_random_samples(100000))

## Validation

In [None]:
def join_dict_to_table(dict_):
    """Can be used on test_results.medians.
    
    Takes a dict (msg_len -> df(node1, node2, ping))
    
    Returns df(msg_len, node1, node2, ping) with dummy index"""
    return pd.concat(dict_, names=["msg_len", "dumb_index"], verify_integrity=True) \
        .reset_index(level=1, drop=True) \
        .reset_index()

In [None]:
def join_ping_data(multiple_tests_results):
    """Takes an iterable of TestResults.
    Takes medians from all of them and concatenates them all.
    You get a DataFrame with columns (msg_len, node1, node2, ping)"""
    return pd.concat(
        (join_dict_to_table(test_results.medians) for test_results in multiple_tests_results),
        ignore_index=True
    )

In [None]:
TEST_RESULT_DIRECTORIES = Path("/home/shibbiry/Dropbox/documents/msu/bachelors_thesis_cluster_topology/test_results") \
    .dirs()

In [None]:
study_data = import_data(TEST_RESULT_DIRECTORIES[0])
validation_data = join_ping_data(import_data(directory) for directory in TEST_RESULT_DIRECTORIES[1:])

In [None]:
predictor = Predictor(extended_node_pairs, study_data)

In [None]:
predictions = predictor.predict_many(validation_data)

In [None]:
error = (predictions["ping"] - predictions["predicted_ping"]) ** 2

In [None]:
error.mean()