In [1]:
import numpy as np
from typing import List
from collections import deque
import matplotlib.pyplot as plt
from flowprintOptimal.sekigo.flowUtils.commons import loadFlows
from flowprintOptimal.sekigo.core.flowRepresentation import PacketFlowRepressentation
import math
from joblib import Parallel, delayed
from tqdm import tqdm
from GGFast.grouper import Grouper
from flowprintOptimal.sekigo.utils.documentor import Documenter
from flowprintOptimal.sekigo.flowUtils.dataGetter import getTrainTestOOD
from GGFast.utils import getGrouperData, getLVectorFromFlowRep
from GGFast.core import LVector, Snippet, SnippetScorer
from GGFast.gathering import Gathering
from GGFast.commons import getLabelMapping, loadLVectors
import heapq
import os
import pickle
import json
from flowprintOptimal.sekigo.flowUtils.dataGetterV2 import readTrainTestOODFlows

In [2]:
configs = dict(
    data_config = dict(
        dataset_name = "vnat",
        subsampleConfig = None,#dict(max_gap = 20, min_gap = 5),                             
        max_flow_length = 100, # in seconds  ( each flow sample cannot excede this length)
        test_size = .2,
        ood_classes = [],
        do_balance = False,
        data_type = "packet_representation"
    )
)

if len(configs["data_config"]["ood_classes"]) == 0:
    base_dir_path = "data/ClassificationOnlyFlows"
else:
    base_dir_path = "data/ClassificationOODFlows"

In [3]:
train_packet_flows,test_packet_flows,ood_packet_flows, train_timeslot_flows,test_timeslot_flows,ood_timeslot_flows = readTrainTestOODFlows(base_path= base_dir_path, dataset_name= configs["data_config"]["dataset_name"])

In [4]:
def getTruncatePacketRep(flows : List[PacketFlowRepressentation], limit = 30):
    truncated_flows = []
    for flow in flows:
        if len(flow) <= limit:
            truncated_flows.append(flow)
        else:
            truncated_flows.append(flow.getSubFlow(0,limit))
    return truncated_flows



train_packet_flows = getTruncatePacketRep(flows= train_packet_flows)
test_packet_flows = getTruncatePacketRep(flows= test_packet_flows)

In [5]:
def preProcessFlows(flows):
    for flow in flows:
        flow.lengths = list(map(lambda x : np.round(x*1500), flow.lengths))

In [6]:
preProcessFlows(train_packet_flows)
preProcessFlows(test_packet_flows)
preProcessFlows(ood_packet_flows)


In [7]:
inbound_lengths,inbound_classes,outbound_lengths,outbound_classes = getGrouperData(flows= train_packet_flows)
forward_grouper = Grouper(features= inbound_lengths, labels = inbound_classes, threshold= .001)
backward_grouper = Grouper(features= outbound_lengths, labels = outbound_classes, threshold= .001)

In [8]:
train_l_vectors = []
test_l_vectors = []

In [9]:
for flow_rep in tqdm(train_packet_flows):
    train_l_vectors.append(getLVectorFromFlowRep(flow_rep= flow_rep, forward_grouper = forward_grouper,backward_grouper= backward_grouper))

for flow_rep in tqdm(test_packet_flows):
    test_l_vectors.append(getLVectorFromFlowRep(flow_rep= flow_rep, forward_grouper = forward_grouper,backward_grouper= backward_grouper))

100%|██████████| 2734/2734 [00:01<00:00, 1698.57it/s]
100%|██████████| 684/684 [00:00<00:00, 1767.53it/s]


In [10]:
def saveLvectors(path,l_vectors : List[LVector]):
    data = list(map(lambda x : x.__dict__,l_vectors))

    for d in data:
        for key in ["lv1", "lv2", "lv3", "lv4", "lv5"]:
            d[key] = list(map(lambda x : str(int(x[0])) + str(x[1]), d[key]))

    with open(path, 'w') as f:
        json.dump(data, f)
    return data

In [11]:
data = saveLvectors(path= "GGFast/storage/vnat_train_l_vectors.json", l_vectors= train_l_vectors)

In [12]:
data = saveLvectors(path= "GGFast/storage/vnat_test_l_vectors.json", l_vectors= test_l_vectors)

In [13]:
data[0]

{'lv1': ['64->',
  '60<-',
  '52->',
  '154->',
  '52<-',
  '1470<-',
  '294<-',
  '52->',
  '191->',
  '52<-',
  '95->',
  '52<-',
  '95<-',
  '137<-',
  '52->'],
 'lv2': ['22->',
  '19<-',
  '10->',
  '112->',
  '11<-',
  '1151<-',
  '251<-',
  '10->',
  '149->',
  '11<-',
  '53->',
  '11<-',
  '54<-',
  '96<-',
  '10->'],
 'lv3': ['22->',
  '0<-',
  '10->',
  '112->',
  '0<-',
  '0<-',
  '0<-',
  '10->',
  '149->',
  '0<-',
  '53->',
  '0<-',
  '0<-',
  '0<-',
  '10->'],
 'lv4': ['0->',
  '19<-',
  '0->',
  '0->',
  '11<-',
  '1151<-',
  '251<-',
  '0->',
  '0->',
  '11<-',
  '0->',
  '11<-',
  '54<-',
  '96<-',
  '0->'],
 'lv5': ['0->',
  '0<-',
  '0->',
  '0->',
  '0<-',
  '0<-',
  '0<-',
  '0->',
  '0->',
  '0<-',
  '0->',
  '0<-',
  '0<-',
  '0<-',
  '0->'],
 'class_type': 'MAIL'}

In [9]:
class Gathering:
    def __init__(self,n_gram_range = [1,4], top_k = 25000):
        self.n_gram_range = n_gram_range
        self.top_k = top_k

        
    def __getCandidatesFromEncoding(self,encoding : list, tp : int):

        def getSnippetsFromNGram(n_gram):
            snippets = []
            for i in range(0,len(encoding) - n_gram + 1):
                s1 = Snippet(sequence= encoding[i:i+n_gram], position= i+1, negation= False, tp= tp)
                s2 = Snippet(sequence= encoding[i:i+n_gram], position= -(len(encoding) - i), negation= False, tp= tp)
                s3 = Snippet(sequence= encoding[i:i+n_gram], position= "*", negation= False, tp= tp)

                snippets.append(s1)
                snippets.append(s2)
                snippets.append(s3)
            return snippets

        snippets = []
        for n_gram in range(self.n_gram_range[0], self.n_gram_range[1] + 1):
            if len(encoding) < n_gram:
                break
            snippets.extend(getSnippetsFromNGram(n_gram= n_gram))
        return snippets


    def getSnippetsFromSingleLvector(self,lvector : LVector):
        snippets = []
        snippets.extend(self.__getCandidatesFromEncoding(encoding= lvector.lv1, tp= 1))
        snippets.extend(self.__getCandidatesFromEncoding(encoding= lvector.lv2, tp= 2))
        snippets.extend(self.__getCandidatesFromEncoding(encoding= lvector.lv3, tp= 3))
        snippets.extend(self.__getCandidatesFromEncoding(encoding= lvector.lv4, tp= 4))
        snippets.extend(self.__getCandidatesFromEncoding(encoding= lvector.lv5, tp= 5))
        
        return snippets


    def getCandidates(self,l_vectors : List[LVector], snippet_scorer : SnippetScorer):
        
        top_snippets = []  # this is min heap
        for l_vector in l_vectors:
            snippets = self.getSnippetsFromSingleLvector(lvector= l_vector)
            
            scores = []
            for snippet in tqdm(snippets):
                scores.append(snippet_scorer.score(snippet))
            #scores = [snippet_scorer.score(s) for s in snippets]
            #scores = Parallel(n_jobs=min(len(snippets), 2))(delayed(snippet_scorer.score)(snippet) for snippet in snippets)
            candidates = [(score,snippet) for score,snippet in zip(scores,snippets)]
            
            for candidate in candidates:
                heapq.heappush(top_snippets,candidate)

            while len(top_snippets) > self.top_k:
                heapq.heappop(top_snippets)

        return top_snippets




    




In [10]:
snippet_scorer = SnippetScorer(l_vectors= l_vectors)

In [11]:
gathering = Gathering(n_gram_range= [1,4], top_k=25000)

In [12]:
from multiprocessing import Process, Queue
import time
n_scoring_processes = 16
snippets_publish_batch = 50

def putLVectorIntoQueue(l_vector_queue : Queue,l_vectors : List[LVector]):
    for l_vector in l_vectors:
        while l_vector_queue.qsize() > 10000:
            time.sleep(1)
        l_vector_queue.put(l_vector)
    l_vector_queue.put(None)

def putSnippetsIntoQueue(l_vector_queue : Queue,snippet_queue : Queue,gathering : Gathering):
    snippets_to_publish = []
    while True:
        l_vector = l_vector_queue.get()
        if l_vector == None:
            if len(snippets_to_publish) > 0:
                snippet_queue.put(snippets_to_publish)
            l_vector_queue.put(None) # to signal siblings
            snippet_queue.put(None) # to signal the scoring process
            break
        snippets = gathering.getSnippetsFromSingleLvector(lvector= l_vector)

        for snippet in snippets:
            while snippet_queue.qsize() > 1000:
                time.sleep(1)
            snippets_to_publish.append(snippet)

            if len(snippets_to_publish) == snippets_publish_batch:
                snippet_queue.put(snippets_to_publish)
                snippets_to_publish = []
            
        
       


def scoreSnippetsFromQueue(snippet_queue : Queue,candidate_queue : Queue,snippet_scorer : SnippetScorer):
    candidates_to_publish = []
    while True:
        snippets = snippet_queue.get()
        if snippets == None:
            snippet_queue.put(None) # to signal sibling processes
            break

        for snippet in snippets:
            score = snippet_scorer.score(snippet= snippet)
            candidate = (score,snippet)
            candidates_to_publish.append(candidate)
        candidate_queue.put(candidates_to_publish)
    candidate_queue.put(None)


l_vector_queue = Queue()
snippet_queue = Queue()
candidate_queue = Queue()

l_vector_process = Process(target= putLVectorIntoQueue, args=(l_vector_queue,l_vectors[:1]))
snippet_process = Process(target= putSnippetsIntoQueue,  args= (l_vector_queue, snippet_queue, gathering))
scoring_processes : List[Process] = []

for _ in range(n_scoring_processes):
    scoring_processes.append(Process(target= scoreSnippetsFromQueue, args= (snippet_queue, candidate_queue,snippet_scorer)))

l_vector_process.start()
snippet_process.start()

for scoring_process in scoring_processes:
    scoring_process.start()


top_candidates = []
sentinel_count = 0
while sentinel_count < n_scoring_processes:
    candidates = candidate_queue.get()

    if candidates == None:
        print("one process ended")
        sentinel_count += 1
        continue

    for candidate in candidates:
        heapq.heappush(top_candidates,candidate)
    
    while len(top_candidates) > 25000:
        heapq.heappop(top_candidates)


l_vector_process.join()
snippet_process.join()
for scoring_process in scoring_processes:
    scoring_process.join()


one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended
one process ended


In [7]:
top_2 = heapq.nlargest(2, top_candidates)

In [8]:
top_2[0][1].sequence

[(0, '<-'), (0, '<-'), (0, '<-'), (12, '->'), (151, '->')]

In [11]:
top_2[0][1].tp

3

In [9]:
top_2[0][0]

5.702333692464288