In [39]:
import os
import multiprocessing
import pandas as pd
import json
import logging
import re
from itertools import repeat
import datetime
import time
from natsort import natsorted
import shutil

import tempfile
import csv

### Original functions

In [3]:
def remove_vectors(json_fields, single=False):
    # Simple function to remove the vector from results in json file might be we remove that part from the json file
    if single:
        return json_fields.replace(":vector", "")
    for i in range(len(json_fields)):
        json_fields[i] = json_fields[i].replace(":vector", "")
    return json_fields

In [4]:
def log_subprocess_output(pipe):
    for line in iter(pipe.readline, b''):  # b'\n'-separated lines
        print('Subprocess Line: %r', line)

In [5]:
def create_bins(lower_bound, width, quantity):
    """ create_bins returns an equal-width (distance) partitioning.
        It returns an ascending list of tuples, representing the intervals.
        A tuple bins[i], i.e. (bins[i][0], bins[i][1])  with i > 0
        and i < quantity, satisfies the following conditions:
            (1) bins[i][0] + width == bins[i][1]
            (2) bins[i-1][0] + width == bins[i][0] and
                bins[i-1][1] + width == bins[i][1]
    """
    bins = []
    for low in range(lower_bound, lower_bound + quantity * width + 1, width):
        bins.append((low, low + width))
    return bins

In [6]:
def tidy_data(temp_file, real_vector_path, json_fields, output_csv):
    temp_file_pt = open(temp_file, "w+")

    # Simply remove the :vector part of vector names from both sets of vectors.
    found_vector = False
    for field in json_fields:
        if ":vector" in field:
            found_vector = True
            break

    if found_vector:
        json_fields = remove_vectors(json_fields)

    print("Beginning parsing of vector file: {}".format(real_vector_path))

    # Read the file and retrieve the list of vectors
    read_vector_file(temp_file_pt, real_vector_path, json_fields)

    print("Finished parsing of vector file: {}".format(real_vector_path))

    # Ensure we are at the start of the file for sorting
    temp_file_pt.seek(0)

    results = []
    orphans = pd.DataFrame()
    
    # Tell pandas to read the data in chunks
    chunks = pd.read_csv(temp_file_pt, chunksize=1e6)
    
    for chunk in chunks:

        # Add the previous orphans to the chunk
        chunk = pd.concat((orphans, chunk))
        
        # Determine which rows are orphans
        last_val = chunk["NodeID"].iloc[-1]
        is_orphan = chunk["NodeID"] == last_val
        
        # Put the new orphans aside
        chunk, orphans = chunk[~is_orphan], chunk[is_orphan]
        
        # Parse the vector file to ensure it is formatted correclty.
        chunk['seq'] = chunk.groupby(["EventNumber", "StatisticName"]).cumcount()
        chunk = chunk.pivot_table("Value", ["EventNumber", "Time", "NodeID", "seq"], "StatisticName")
        chunk.reset_index(inplace=True)
        chunk = chunk.drop(["seq"], axis=1)
        
        results.append(chunk)
        
    print("Finsihed parsing output file: {}".format(output_csv))

    # Remove our temporary file.
    os.remove(temp_file_pt.name)
    print("Removed the temporary file")
    
    resulting_df = pd.concat(results, sort=False, axis=0, ignore_index=True)
    
    resulting_df.to_csv(output_csv)

    return resulting_df

In [7]:
def bin_fields(df, fields, bin_width=10, bin_quantity=49):
    """
    Bins multiple dfs into a single dictionary that can be used as an average for multiple fields across multiple
    runs
    :param df: dataframe to bin
    :param fields: fields to be binned.
    :param bin_width: width of each bin
    :param bin_quantity: total number of bins
    :return:
    """
    bins = create_bins(lower_bound=0, width=bin_width, quantity=bin_quantity)
    distances = []
    overall_fields = {}
    for interval in bins:
        upper_b = interval[1]
        distances.append(upper_b)

    for field in fields:
        print("{} being binned".format(field))
        overall_fields[field] = []

    overall_fields["distance"] = distances

    distance_col = config["results"]["distance"]

    for i in range(len(bins)):
        lower_b = bins[i][0]
        upper_b = bins[i][1]
        fields_temp = df[(df[distance_col] >= lower_b) & (df[distance_col] < upper_b)]
        for field in fields:
            if i < len(overall_fields[field]):
                overall_fields[field][i] = (fields_temp[field].mean() + overall_fields[field][i]) / 2
            else:
                overall_fields[field].append(fields_temp[field].mean())

    return overall_fields

In [8]:
def combine_results(combined, results):
    for result in results:
        for field in result:
            if field in combined:
                for i in range(len(result[field])):
                    combined[field][i] = (combined[field][i] + result[field][i]) / 2
            else:
                combined[field] = result[field]
    return combined

In [9]:
def parse_data(results_dirs, now):
    combined_results = {}

    configs = []
    for config in config["config_names"]:
        config_data = config["config_names"][config]
        if config_data["repeat"] != 0:
            if "naming" in config_data and len(config_data["naming"]) > 0:
                for name in config_data["naming"]:
                    configs.append(name)
            else:
                configs.append(config)

    for result_dir, config_name in zip(results_dirs, configs):

        folder_name = os.path.basename(result_dir)

        print("Dealing with config: {} of result folder: {}".format(config_name, folder_name))
        combined_results[config_name] = {}

        orig_loc = os.getcwd()

        print("Moving to results dir: {}".format(result_dir))
        os.chdir(result_dir)

        runs = []

        for run in os.listdir(result_dir):
            if ".vec" in run:
                runs.append(run)

        num_processes = config["parallel_processes"]
        if num_processes > multiprocessing.cpu_count():
            ("Too many processes, going to revert to total - 1")
            num_processes = multiprocessing.cpu_count() - 1

        print("Number of files to parse : {}".format(len(runs)))
        number_of_batches = len(runs) // num_processes
        if number_of_batches == 0:
            number_of_batches = 1

        i = 0
        while i < len(runs):
            if len(runs) < num_processes:
                num_processes = len(runs)
            print(
                "Starting up processes, batch {}/{}".format((i // num_processes) + 1, number_of_batches))
            pool = multiprocessing.Pool(processes=num_processes)

            multiple_results = pool.starmap(filter_data, zip(runs[i:i + num_processes], repeat(config_name), repeat(now), repeat(orig_loc)))
            pool.close()
            pool.join()

            combined_results[config_name] = combine_results(combined_results[config_name], multiple_results)

            print("Batch {}/{} complete".format((i // num_processes) + 1, number_of_batches))

            i += num_processes

        print("Moving back to original location: {}".format(orig_loc))
        os.chdir(orig_loc)

    processed_file = "{}/data/processed_data/{}-{}.json".format(os.getcwd(), experiment_type, now)
    print("Writing processed data to {}".format(processed_file))
    with open(processed_file, "w") as json_output:
        json.dump(combined_results, json_output)

    return processed_file

In [10]:
def filter_data(raw_data_file, config_name, now, orig_loc):

    run_num = raw_data_file.split(".")[0]

    temp_file_name = run_num + ".csv"

    print("File being parsed: {}".format(temp_file_name))

    output_csv_dir = "{}/data/raw_data/{}/{}".format(orig_loc, experiment_type, config_name)

    os.makedirs(output_csv_dir, exist_ok=True)

    output_csv = "{}/{}-{}.csv".format(output_csv_dir, run_num, now)

    print("Raw output file: {}".format(output_csv))

    vector_df = tidy_data(temp_file_name, raw_data_file, results["filtered_vectors"], output_csv)

    print("Completed tidying of dataframes")
    
    return

    graphs = config["results"]["graphs"]
    print("The data for the following graphs must be prepared {}".format(graphs))

    if ":vector" in config["results"]["decoded"]:
        # Assuming if decoded contains :vector then fails will too.
        config["results"]["decoded"]  = remove_vectors(config["results"]["decoded"], single=True)
        config["results"]["distance"] = remove_vectors(config["results"]["distance"], single=True)
        config["results"]["fails"]    = remove_vectors(config["results"]["fails"])

    fields = []
    if "pdr-dist" in graphs:
        print("Calculating pdr for pdr graph")
        fields.append(results["decoded"])
    if "error-dist" in graphs:
        for fail in config["results"]["fails"]:
            fields.append(fail)
        fields.append(results["decoded"])

    print("Binning all the necessary information for the graphs")
    binned_results = bin_fields(vector_df, fields)

    del vector_df

    print("Completed data parsing for this run")
    return binned_results

### New parsing system

In [11]:
def parse_vector_desc_line(line):
    try:
        # Converts a vector description line to a dictionary for use in parsing later
        node_id_pattern = re.compile("\[\d+\]")

        vector_line_dict = {"nodeID": None, "vectorName": None, "ETV": True}
        split_line = line.split(" ")
        vector_num = int(split_line[1])
        match = node_id_pattern.search(split_line[2])
        nodeID = int(match.group().strip("[]"))
        vector_name = split_line[3]
        vector_name = vector_name.split(":")[0]
        if "ETV" in split_line[4]:
            ETV = True
        else:
            ETV = False

        vector_line_dict["nodeID"] = nodeID
        vector_line_dict["vectorName"] = vector_name
        vector_line_dict["ETV"] = ETV

        return vector_num, vector_line_dict
    except AttributeError as e:
        print("Line: {} : Could not be parsed".format(line))
        return None, None

In [12]:
def parse_vector_line(line):
    # Simple function to split a vector line and convert to floats.
    try:
        line = bytes(line, 'utf-8').decode('utf-8', 'ignore')
        split_nums = line.split()
        for i in range(len(split_nums)):
            split_nums[i] = float(split_nums[i])
        return split_nums
    except ValueError as e:
        print("Line: {} could not be converted due to bad encoding".format(line))
        return

In [13]:
def prepare_csv_line(vector_dict, vector_id, parsed_vec):
    # Parses the vector line information to be written to the csv file.
    node_id = vector_dict[vector_id]["nodeID"]
    vector_name = vector_dict[vector_id]["vectorName"]
    if vector_dict[vector_id]["ETV"]:
        etv = parsed_vec[1]
        time = parsed_vec[2]
        value = parsed_vec[3]
    else:
        etv = None
        time = parsed_vec[1]
        value = parsed_vec[2]

    csv_line = [node_id, time, vector_name, value]
    return csv_line, time

In [62]:
def setup_chunk_writer(output_file, chunk_num, title_line):
    # Setup our chunk writer
    
    # First create a folder to hold chunks
    chunk_folder = output_file.split(".")[0]
    os.makedirs(chunk_folder, exist_ok=True)
    chunk_name = "{}/chunk-{}.csv".format(chunk_folder, chunk_num)
    
    # Create the chunk file and create a csv writer which uses it
    print("Setting up new chunk: {}".format(chunk_name))
    temp_file_pt = open(chunk_name, "w+")
    output_writer = csv.writer(temp_file_pt, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    output_writer.writerow(title_line)
    
    return temp_file_pt, output_writer

In [63]:
def read_vector_file(output_file, vector_path, stats, chunk_size=1e+8):
    """
    chunk_size: the time between different files 1.5s as default
    """
    # Reads the csv file, parses it and writes to a temp file for use later in generating a DF and CSV file.
    vector_dict = {}
    no_interest_vectors = {} # Probably don't need to remember one's we don't care for.

    chunk_times = []
    chunk_info = {}
    
    last_time = -1
    current_chunk_index = 0

    # Patterns which identify vector declaration lines and result lines
    vector_dec_line_pattern = re.compile("^vector")
    vector_res_line_pattern = re.compile("^\d+")

    vector_file = open(vector_path, "r")

    # Stores lines appearing before their declaration. Files are oddly formatted, this is purely safety ensuring we
    # don't accidentally miss anything.
    early_vectors = tempfile.NamedTemporaryFile(mode="r+")

    # Prepare and write out first line format NodeID, EventNumber, Time, Stat1, Stat2, Stat3, ...
    title_line = ["NodeID", "Time", "StatisticName", "Value"]
    
    temp_file_pt, writer = setup_chunk_writer(output_file, current_chunk_index, title_line)
    chunk_info["CurrentChunk"] = {"file": temp_file_pt, "writer": writer}
    
    for line in vector_file:
        if vector_dec_line_pattern.match(line):
            # if line matches a vector declaration, parse the vector description
            vector_num, vec_dict = parse_vector_desc_line(line)
            if vector_num is None and vec_dict is None:
                continue
            if vec_dict["vectorName"] in stats:
                # Vector is of interest, add it to our overall dictionary and update it's index.
                vector_dict[vector_num] = vec_dict
            else:
                # Mark this as a vector we don't care about.
                no_interest_vectors[vector_num] = None

        elif vector_res_line_pattern.match(line):
            # {"nodeID": None, "vectorName": None, "ETV": True} This is what it looks like
            parsed_vec = parse_vector_line(line)
            # If the previous step fails then we can simply continue to the next line ignoring this line.
            if parsed_vec is None:
                continue
            vector_id = parsed_vec[0]
            if vector_id in vector_dict:
                # Write out to a csv file correctly
                csv_line, time = prepare_csv_line(vector_dict, vector_id, parsed_vec)

                if time > last_time:
                    chunk_info["CurrentChunk"]["writer"].writerow(csv_line)
                    if chunk_info["CurrentChunk"]["file"].tell() >= chunk_size:
                        print("Time ending this chunk:{}".format(time))
                        
                        # This chunk is old and as such can be placed into the previous chunks
                        chunk_info[time] = {"file": chunk_info["CurrentChunk"]["file"], "writer": chunk_info["CurrentChunk"]["writer"]}
                        chunk_times.append(time)
                        last_time = time
                        current_chunk_index += 1
                        
                        # This file is at max size, create a new writer
                        temp_file_pt, writer = setup_chunk_writer(output_file, current_chunk_index, title_line)
                        # Update current chunk writer to point at this new one.
                        chunk_info["CurrentChunk"] = {"file": temp_file_pt, "writer": writer}
                if time <= last_time:
                    for chunk_time in chunk_times:
                        if time < chunk_time:
                            chunk_info[chunk_time]["writer"].writerow(csv_line)
            else:
                if vector_id not in no_interest_vectors:
                    # Write the line out in case we found it before declaration. Only if it is of possible interest.
                    early_vectors.write(line)

    # Rewind the early vectors file so we can search it for missed vectors
    early_vectors.seek(0)

    for line in early_vectors:
        print("We have early vectors")
        # Parse the line again.
        parsed_vec = parse_vector_line(line)
        vector_id = parsed_vec[0]
        # check for the vector
        if vector_id in vector_dict:
            # If we have it create the csv line and write it our
            csv_line = prepare_csv_line(vector_dict, vector_id, parsed_vec)
            output_writer.writerow(csv_line)

    # Close our vector file.
    vector_file.close()

In [64]:
def csv_pivot(directory):
    orig_loc = os.getcwd()
    os.chdir(directory)
    
    csv_files = os.listdir()
    csv_files = natsorted(csv_files)
    header = True
    for csv_file in csv_files:
        if ".csv" in csv_file:
            print("Dealing with chunk file: {}".format(csv_file))
            chunk_df = pd.read_csv(csv_file)

            chunk_df = chunk_df.infer_objects()

            chunk_df = chunk_df.sort_values(by=["NodeID", "Time"])
            # Parse the vector file to ensure it is formatted correclty.
            chunk_df['seq'] = chunk_df.groupby(["Time", "NodeID", "StatisticName"]).cumcount()

            chunk_df = chunk_df.pivot_table("Value", ["Time", "NodeID", "seq"], "StatisticName")
            chunk_df.reset_index(inplace=True)
            chunk_df = chunk_df.drop(["seq"], axis=1)
            chunk_df.to_csv(csv_file, index=False, header=header)
            header = False

            del chunk_df
            
    os.chdir(orig_loc)

In [82]:
def combine_files(csv_directory, outfile):
    destination = open(outfile,'wb')
    
    orig_loc = os.getcwd()
    os.chdir(csv_directory)
    
    csv_files = os.listdir()
    csv_files = natsorted(csv_files)
    header = True
    for csv_file in csv_files:
        if ".csv" in csv_file and csv_file != outfile:
            print("Dealing with chunk file: {}".format(csv_file))
            shutil.copyfileobj(open(csv_file,'rb'), destination)
            os.remove(csv_file)
    destination.close()
    
    os.chdir(orig_loc)
    
    os.rmdir(csv_directory)

In [66]:
vector_dir = "/Users/brianmccarthy/git_repos/results-analysis/data/omnet/cv2x/test"

vector_file_name_long = "long.vec"
vector_file_name_short = "short.vec"

vector_path_long = vector_dir + vector_file_name_long
vector_path_short = vector_dir + vector_file_name_short

config_name_long = "long-test"
config_name_short = "short-test"

experiment_type = "cv2x"

json_path = "/Users/brianmccarthy/git_repos/results-analysis/configs/cv2x.json"

now = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S")

orig_loc = "/Users/brianmccarthy/git_repos/results-analysis"

In [67]:
with open(json_path, "r") as json_file:
    config = json.load(json_file)["cv2x"]
    results = config["results"]

In [85]:
temp_file = "long.csv"
real_vector_path = "long.vec"

### Setup for dealing with results

In [69]:
json_fields = results["filtered_vectors"]
# Simply remove the :vector part of vector names from both sets of vectors.
found_vector = False
for field in json_fields:
    if ":vector" in field:
        found_vector = True
        break

if found_vector:
    json_fields = remove_vectors(json_fields)

In [87]:
os.chdir(vector_dir)

# Read in all the vectors
start_time = time.time()
overall_start_time = start_time

read_vector_file(temp_file, real_vector_path, json_fields)
print("Reading Execution took {}s".format(time.time() - start_time))
start_time = time.time()
csv_pivot()
print("Pivoting Execution took {}s".format(time.time() - start_time))
start_time = time.time()
combine_files(, )
print("Copying Execution took {}s".format(time.time() - start_time))

print("Overall Execution took {}s".format(time.time() - overall_start_time))

os.chdir(orig_loc)

Setting up new chunk: long/chunk-0.csv
Time ending this chunk:500.824
Setting up new chunk: long/chunk-1.csv
Time ending this chunk:501.227
Setting up new chunk: long/chunk-2.csv
Time ending this chunk:501.574
Setting up new chunk: long/chunk-3.csv
Time ending this chunk:501.885
Setting up new chunk: long/chunk-4.csv
Time ending this chunk:502.225
Setting up new chunk: long/chunk-5.csv
Time ending this chunk:502.544
Setting up new chunk: long/chunk-6.csv
Time ending this chunk:502.866
Setting up new chunk: long/chunk-7.csv
Time ending this chunk:503.215
Setting up new chunk: long/chunk-8.csv
Time ending this chunk:503.577
Setting up new chunk: long/chunk-9.csv
Time ending this chunk:503.92
Setting up new chunk: long/chunk-10.csv
Time ending this chunk:504.275
Setting up new chunk: long/chunk-11.csv
Time ending this chunk:504.586
Setting up new chunk: long/chunk-12.csv
Time ending this chunk:504.962
Setting up new chunk: long/chunk-13.csv
Time ending this chunk:505.3
Setting up new chun

FileNotFoundError: [Errno 2] No such file or directory: 'short'

### Changing parsed result files

In [None]:
df = pd.read_csv("/Users/brianmccarthy/git_repos/results-analysis/data/raw_data/cv2x/long-test/longRun-1-2019-09-19-16:17:32.csv")

In [None]:
df.head()

In [None]:
df = df.infer_objects()

In [None]:
df.describe()