In [1]:
import os
import numpy as np
import pandas as pd
import json 
import timeit , time

import threading
from concurrent import futures

In [2]:
from functools import partial

In [3]:
## Only This Function Should be changed for different computer
def loadData():
    base_dir = '/home/parth/Machine Learning/Datasets/North Corp/DTC/August'
    filePcode = "Top_LMD_DTC_SPN.csv"
    file_list = os.listdir(base_dir)
    file_list_corrected = []
    for file_ in file_list:
        if file_.endswith(".csv") and file_ != filePcode:
            file_list_corrected.append(file_)
    file_list = file_list_corrected
    filename = os.path.join(base_dir , file_list[0])
    data = pd.read_csv(filename)
    pcode_data = pd.read_csv(os.path.join(base_dir , filePcode))
    return data , pcode_data

In [4]:
def preProcessData(data , pcode_data):
    ids = data.deviceID.unique()
    device_data = []
    for id_ in ids:
        device_data.append(data[data.deviceID == id_])
    pcode_data["spn_fmi"] = [(a, b) for a, b in zip(pcode_data["SPN"] , pcode_data["FMI"])]
    for i , device_data_ in enumerate(device_data):
        device_data_ = device_data_.reset_index()
        device_data_["spn_fmi"] = [(a, b) for a, b in zip(device_data_["spn"] , device_data_["fmi"])]
        device_data[i] = device_data_
    for i in range(len(device_data)):
        device_data[i] = pd.merge(
            device_data[i] , pcode_data[["Error codes " , "spn_fmi"]] , 
            how='left' , on="spn_fmi"
        )
        device_data[i]=device_data[i].rename(columns={'Error codes ':'Pcode'})
    for i in range(len(device_data)):
        device_data[i].sort_values("utc" , inplace=True , ascending=True)
    return device_data , ids

In [5]:
def getCodesAround(time_interval_before , time_interval_after , device_data_):## in minutes
    time_interval_before = int(time_interval_before * 60)
    time_interval_after = int(time_interval_after * 60)
    codes = device_data_.Pcode.notna()
    critical_utcs = device_data_[codes].utc
    critical_ranges = critical_utcs.apply(lambda x: range(x-time_interval_before , x+time_interval_after))
    masks = [device_data_.utc.isin(critical_range) for critical_range in critical_ranges]
    device_dataPcode = [device_data_[mask] for mask in masks]
    return device_dataPcode

def getPDict(device_data , time , when='before'):
    times = (time , 1/60) if when=='before' else (1/60 , time)
    return [ x.spn_fmi.value_counts().to_dict() for x in getCodesAround(*times , device_data)]

In [6]:
def populateData(timeBefore , timeAfter , device_data):
    beforeDicts = getPDict(device_data , timeBefore , 'before')
    afterDicts = getPDict(device_data , timeAfter , 'after')
    codes = [x[x.Pcode.notna()].Pcode.values[0] for x in getCodesAround(1/60 , 1/60 , device_data)]
    dataDict = {}
    dataDict["Before"] = beforeDicts
    dataDict["Codes"] = codes
    dataDict["After"] = afterDicts
    return dataDict
def getCompleteData(timeBefore , timeAfter , device_data , ids):
    completeData = []
    for device_data_ , id_ in zip(device_data , ids):
        dic = {}
        dic['data'] = populateData(1 , 1 ,device_data_)
        dic['id'] = id_
        completeData.append(dic)
    return completeData

In [7]:
def dataPipeline(loadData=loadData):
    start = time.time()
    unprocessed = loadData()
    loading_time = time.time()
    intermediate = preProcessData(*unprocessed)
    preprocesstime = time.time()
    data_processed = getCompleteData(30 , 30 , *intermediate)
    completetime = time.time()

    completetime = completetime - preprocesstime
    preprocesstime = preprocesstime - loading_time
    loading_time = loading_time - start

    print(
        " Loading Time    : {:.2f} Seconds\n".format(loading_time),
        "PreProcess Time : {:.2f} Seconds\n".format(preprocesstime),
        "Process Time    : {:.2f} Seconds".format(completetime),
    )
    return data_processed , intermediate

In [8]:
d,i=dataPipeline()

 Loading Time    : 0.49 Seconds
 PreProcess Time : 0.26 Seconds
 Process Time    : 0.62 Seconds


In [13]:
x = partial(populateData , timeBefore=30 , timeAfter=30 )
with futures.ThreadPoolExecutor(max_workers=5) as executor:
    a = time.time()
    z = executor.map(lambda y: x(device_data=y) , i[0])
print(time.time() - a)

0.7476403713226318


In [12]:
next(z)

{'After': [{(0, 0): 27,
   (0, 4): 6,
   (0, 9): 25,
   (597, 2): 3,
   (736, 1): 4,
   (3155, 0): 27,
   (4208, 29): 4,
   (6517, 0): 13,
   (6517, 4): 5,
   (7168, 3): 25,
   (9241, 0): 26,
   (9241, 4): 22,
   (9472, 0): 7,
   (10857, 0): 26,
   (10857, 4): 13,
   (16384, 4): 3,
   (16384, 6): 1,
   (18194, 0): 12,
   (18194, 4): 25,
   (18565, 0): 26,
   (18565, 4): 26,
   (24576, 4): 1,
   (24576, 6): 1,
   (24576, 8): 1,
   (24576, 14): 1,
   (24576, 15): 1,
   (30528, 19): 4,
   (32318, 0): 1,
   (32318, 4): 24,
   (32768, 6): 1,
   (32768, 14): 1,
   (33428, 4): 1,
   (35068, 4): 16,
   (40960, 4): 1,
   (49152, 4): 3,
   (49152, 15): 3,
   (49333, 0): 9,
   (49333, 4): 25,
   (57344, 4): 1,
   (60366, 0): 26,
   (63486, 0): 28,
   (63486, 4): 28,
   (65535, 8): 27,
   (65536, 30): 1,
   (76393, 19): 27,
   (81966, 28): 4,
   (84676, 31): 26,
   (97078, 3): 4,
   (97854, 1): 27,
   (103781, 30): 27,
   (105728, 3): 26,
   (105728, 30): 25,
   (105728, 31): 27,
   (109411, 25): 

In [10]:
#json.dump(str(data_processed) , open("file.json" , 'w+'))