# <b><center>DarkVec: Automatic Analysis of Darknet Trafficwith Word Embeddings</center></b>
## <b><center>Appendix1: Corpus Generation</center></b>  

___

# <b>Table of Content</b> <a name="toc"></a>
* [<b>State of Art Comparison</b>](#corpus)  
    * [DarkVec 5 Days](#c_darkvec)  
    * [DarkVec 30 Days Per-Service Language](#c_dante)  
    * [Dante 5 Days](#c_ip2vec)  
    * [IP2VEC 5 Days](#c_ip2vec)
* [<b>Grid Search for $𝑘$</b>](#model)  
    * [Darkvec 30 days Single Language](#m_darkvec)  
    * [Darkvec 30 days Auto Language](#m_dante)  
    * [Darkvec 30 days Per-Service Language](#m_ip2vec)  
    
    
In this notebook we provide the snippets used to generate the corpus fed to the Word2Vec model. We generate corpus for two different experiments:

1) State of art comparison, in which we compare our methodology with DANTE[ref] and IP2VEC[ref]

2) Grid Search for $k$, in which we generate the models for different languages

<b>Note.</b> This notebooks is designed for running on a Spark cluster. Furthermore, according to the data size, the computational times are quite expensive

In [1]:
from config import *
SAVE = False

If the `SAVE` flag is set to `True`, then the processed corpus is saved ***ADD CORPUS PATH***. Thus, with the following snippet, the corpora we provide are saved moved to a backup folder `corpus.BAK` and all the `.txt.gz` or `.npz` files are removed. At the bottom of the notebook we report a [snippet](#restore) for restoring our corpora. 

In [None]:
if SAVE:
    os.system(f'cp -r {CORPUS} {CORPUS}.BAK')
    folders = ['dante5', 'darkvec30single', 'darkvec5xserv',
               'darkvec30auto', 'darkvec30xserv', 'ip2vec5']
    for ff in folders:
        os.system(f'rm {CORPUS}/{ff}/*')

In [2]:
def get_time_windows(hours):
    import numpy as np
    cnt = 0
    slice_ = []
    slices = []
    hours = np.asarray(hours)
    for h in hours:
        slice_.append(h)
        cnt+=1
        if cnt == 4:
            cnt = 0
            slices.append(slice_)
            slice_ = []
    if hours.shape[0]%4!=0:
        slices.append(list(hours[-(hours.shape[0]%4):]))
    date_map = {}
    for i in range(len(slices)):
        for date in slices[i]:
            date_map[date] = f'w_{i}'
    
    return date_map

def load_data(partition): 
    """Distributed function to load the traces skipping the file header
    """
    for l in partition:
        if l.startswith("ts"): 
            continue    
        yield l.split(" ")

def convert_fields(partition):
    """Distributed function to convert the protocols of the traces from 
    the decimal notation to human readable version
    """
    for (ts, s_ip, d_ip, port, proto) in partition:
        try:
            proto = PROTO_CONVERSION[proto]
        except:
            proto = 'oth'

        yield (ts, s_ip, d_ip, port, proto)


#==============================================================================
# Corpus Generation
#==============================================================================
def convert_timestamp(partition):
    """Convert a timestamp to the datetime format, then
    extract the hours of the day and port/protocol pairs
    """
    from datetime import datetime
    
    for (ts, s_ip, d_ip, port, proto) in partition:
        new_ts = datetime.fromtimestamp(float(ts))
        hhdd = f'{new_ts.year}_{new_ts.month}_{new_ts.day}_{new_ts.hour}'
        dd = f'{new_ts.year}_{new_ts.month}_{new_ts.day}'
        pair = f'{port}_{proto}'
        
        yield ((pair, hhdd, dd), (ts, s_ip))
        
def extract_services(partition):
    import json
    SERVICES = ['ftp', 'telnet', 'p2p', 'netbios-smb', 'mail', 'dbs', 
                'http', 'ssh', 'netbios', 'proxy', 'dns', 'kerberos', 
                'unk_sys', 'unk_usr', 'unk_eph']
    # Services division by ports
    with open(f'{DATA}/services/services.json', 'r') as file:
        CLASSES = json.loads(file.read())
        
    def unknown_class(x):
        """Manage the unclassified ports. Three unknown ports ranges are
        used: 
        System:  0 <= port <= 1023
        User:  1024 <= port <= 49151
        Ephemeral:  49152 <= port <= 65535
        """
        x = x.split('/')[0]
        #System Ports
        if x!='-':
            if int(x) >= 0 and int(x) <= 1023: return 'sys'
            # User Ports
            elif int(x) >= 1024 and int(x) <= 49151:return 'usr'
            # Ephemeral Ports
            elif int(x) >= 49152 and int(x) <= 65535:return 'eph'
        else:
            return 'icmp'
    
    for ((x, hh, dd),(ts, ip)) in partition:
        if x in CLASSES: 
            service = CLASSES[x]
        else: 
            service = f'unk_{unknown_class(x)}'
        
        yield ((service, hh, dd),(ts, ip))
        
def check_dayflows(flows_to_filter):
    def run(partition):
        import numpy as np
        for ip, v_list in partition:
            x = np.where(np.asarray(v_list)>flows_to_filter)[0].shape[0]
            if flows_to_filter!=0:
                if x>=3:
                    yield ip
            else:
                yield ip
    return run

def get_slices(partition):
    """The keys are the port/prtocol pairs and the hours of
    the day. Sort the IPs according to the timestamp
    """
    for k1, (k, v) in partition:
        v.sort(key=lambda x:x[0])
        v = [x[1] for x in v]
        yield (k1, (k, v))
        
        
def get_sentences(method, save=False):
    def run(partition):   
        """Split each file in sentences. One sentence per hour.
        Then save the file in corpus file
        """
        import numpy as np
        import gzip

        def save_corpus(corpus_method, fname, sentences):
            file = gzip.open(f'{CORPUS}/{corpus_method}/{fname}.txt.gz', 'wt')
            for line in sentences:
                txt = ""
                for token in line:
                    txt+=token
                    txt+=" "
                txt = txt[:-1]+"\n"
                file.write(txt)

        def zero_padding_fname(day_limit):
            yyyy, mm, dd, hh = day_limit.split('_')

            if len(mm)==1: mm='0'+mm
            if len(dd)==1: dd='0'+dd
            if len(hh)==1: hh='0'+hh

            day_limit = f'{yyyy}_{mm}_{dd}_{hh}'

            return day_limit

        for k, v in partition:
            v = [(zero_padding_fname(hour), ip_list) for hour, ip_list in v]
            v.sort(key=lambda x:x[0])
            day_limit = v[-1][0]
            
            v = [np.asarray(x[1]) for x in v]
                
            if method in ['darkvec30xserv', 'darkvec30auto', 'darkvec5xserv']:
                fname = f'{day_limit}_{k[0]}'
            else:
                fname = f'{day_limit}'
            if save:
                save_corpus(corpus_method = method, 
                            fname = fname, 
                            sentences = v)

            yield fname
    
    return run

def sort_by_ts(partition):
    for k, v in partition:
        v.sort(key=lambda x:x[0])
        v = [x[1] for x in v]
        if k[-2] == '_':
            k1 = k[:-2]
        else:
            k1 = k[:-3]
        yield(k1,(k, v))

## State of Art Comparison

We compare our methodology with DANTE[ref] and IP2VEC[ref]. For each methodology we analyze 5 and 30 days of traffic. Since only Darkvec was able to finish the training, we report the corpus generation for:

- Darkvec 5 days
- Darkvec 30 days
- Dante 5 days
- IP2VEC 5 days
    
By changing the experiment parameters it is possible to extend the generation to other experiments.

### DarkVec 5 days

According to the time requiring processing, we comment the correct line 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

which processes only the last two days of traffic taking less time. This is only for demonstrative purposes.

For running the original experiment, please comment

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`


In [12]:
#raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))

bro = raw.mapPartitions(load_data).repartition(1000)

In [13]:
filt = bro.map(lambda x: (x[2], 1)).reduceByKey(lambda x,y: x+y)\
          .filter(lambda x:x[1]>=10).map(lambda x: x[0]).collect()

In [14]:
_1 = bro.map(lambda x: (x[0], x[2], x[4], x[5], x[6])).mapPartitions(convert_fields)\
        .mapPartitions(convert_timestamp).filter(lambda x: x[1][1] in filt).map(lambda x: ((x[0][0]\
        .replace('_', '/'), x[0][1], x[0][2]), x[1])).mapPartitions(extract_services)

In [None]:
_1.groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda x: ((x[0][0], x[0][2]), (x[0][1], x[1])))\
  .mapPartitions(get_slices).groupByKey().map(lambda x: (x[0], list(x[1])))\
  .mapPartitions(get_sentences(method='darkvec5xserv', save=SAVE)).collect();

### Dante 5 days
According to the time requiring processing, we comment the correct line 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

which processes only the last two days of traffic taking less time. This is only for demonstrative purposes.

For running the original experiment, please comment

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`


In [16]:
def convert_timestamp_DE(partition):
    """Convert a timestamp to the datetime format, then
    extract the hours of the day and port/protocol pairs
    """
    from datetime import datetime
    
    for (ts, s_ip, port) in partition:
        new_ts = datetime.fromtimestamp(float(ts))
        hhdd = f'{new_ts.year}_{new_ts.month}_{new_ts.day}_{new_ts.hour}'
        dd = f'{new_ts.year}_{new_ts.month}_{new_ts.day}'
        
        yield (hhdd, (s_ip, (ts, port)))
        
def zero_padding_DE(partition):
    for (hhdd, (s_ip, (ts, port))) in partition:
        yyyy, mm, dd, hh = hhdd.split('_')

        if len(mm)==1: mm='0'+mm
        if len(dd)==1: dd='0'+dd
        if len(hh)==1: hh='0'+hh

        rebuilt = f'{yyyy}_{mm}_{dd}_{hh}'

        yield (rebuilt, (s_ip,(ts, port)))

def date_to_window(date_map):
    def run(partition):
        for k, v in partition:
            yield(date_map[k], v)
    return run

def sort_ports(partition):
    for k, v in partition:
        v.sort(key=lambda x: x[0])
        v = [x[1] for x in v]
        
        yield (k, v)
    
def process_window(method, save = False):
    import gzip
    def run(partition):
        for k, v in partition:
            a, b = k.split('_')
            if len(b) == 1:
                k = f'{a}_00{b}'
            elif len(b) == 2:
                k = f'{a}_0{b}'
                
            if save:
                file = gzip.open(f'{CORPUS}/dante5/{k}.txt.gz', 'wt')
            string = ''
            cnt = 0
            for (s_ip, p_list) in v:
                if len(p_list) > 2:
                    cnt+=1
                    for port in p_list:
                        string += port
                        string += ' '
                    string = string[:-1]
                    string+='\n'
                    if save:
                        file.write(string)

            yield k, cnt
    
    return run

In [17]:
#raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
bro = raw.mapPartitions(load_data).repartition(1000)

In [18]:
filt = bro.map(lambda x: (x[2], 1)).reduceByKey(lambda x,y: x+y)\
          .filter(lambda x:x[1]>10).map(lambda x: x[0]).collect()

In [None]:
_2 = bro.map(lambda x: (x[0], x[2], x[4], x[5], x[6])).mapPartitions(convert_fields)\
        .map(lambda x: (x[0], x[1], x[2], x[3])).filter(lambda x: x[1] in filt)\
        .map(lambda x: (x[0], x[1], x[3])).mapPartitions(convert_timestamp_DE)\
        .mapPartitions(zero_padding_DE)
hours = _2.groupByKey().map(lambda x: x[0]).collect()
date_map = get_time_windows(hours)
_2.mapPartitions(date_to_window(date_map)).map(lambda x: ((x[0], x[1][0]), x[1][1]))\
  .groupByKey().map(lambda x: (x[0], list(x[1]))).sortByKey()\
  .mapPartitions(sort_ports).map(lambda x: (x[0][0], (x[0][1], x[1])))\
  .groupByKey().map(lambda x: (x[0], list(x[1])))\
  .mapPartitions(process_window(method = 'dante5', save=SAVE))\
  .collect()

### IP2VEC 5 days
According to the time requiring processing, we comment the correct line 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

which processes only the last two days of traffic taking less time. This is only for demonstrative purposes.

For running the original experiment, please comment

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`


In [20]:
def gen_samples(partition):
    for sip, dip, dp, p in partition:
        yield (sip, dip)
        yield (sip, dp)
        yield (sip, p)
        yield (dp, dip)
        yield (p, dip)

def save_data(partition):
    import numpy as np
    import random
    
    chunk_id = random.randint(1, 100)
    X = []
    Y = []
    try:
        for x, y in partition:
            X.append(x)
            Y.append(y)
        X = np.array(X)
        Y = np.array(Y)

        np.savez_compressed(f'{CORPUS}/ip2vec5/{X[0]}{Y[0]}{Y[1]}{Y[2]}{Y[3]}', x=X, y=Y)
        
        yield len(X)
    except:
        pass

In [21]:
#raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
bro = raw.mapPartitions(load_data).repartition(1000)

In [22]:
filt = bro.map(lambda x: (x[2], 1)).reduceByKey(lambda x,y: x+y)\
          .filter(lambda x:x[1]>=10).map(lambda x: x[0]).collect()

In [23]:
_0 = bro.map(lambda x: (x[0], x[2], x[4], x[5], x[6])).filter(lambda x: x[1] in filt)\
        .mapPartitions(convert_fields)\
        .map(lambda x: (x[1], x[2], x[3], x[4])).map(lambda x: (x, 1))\
        .reduceByKey(lambda x,y:x+y).map(lambda x: x[0])

In [24]:
step1 = _0.mapPartitions(gen_samples)
step1 = step1.coalesce(50)
step1.mapPartitions(save_data).collect();

In [39]:
doc_cnt = 1
str_cnt = '01'
for _file in os.listdir(CORPUS+'/ip2vec5'):
    if doc_cnt<=9: str_cnt = f'0{doc_cnt}'
    else: str_cnt = doc_cnt
    os.system(f"mv {CORPUS}/ip2vec5/{_file} {CORPUS}/ip2vec5/doc{str_cnt}.npz")
    doc_cnt += 1

## Grid Search for $k$

Grid Search for $k$. According to the language definition provided in the paper, we generate the DarkVec corpus for 30 days. Then for each language we generate a different corpus. Thus we generate:

- Darkvec 30 days single language. The corpus is the sequence of the IPs as they reach the darknet
- Darkvec 30 days auto-defined language. 11 languages (top-10 ports/protocol pairs plus one for the others). For each language the corpus is the sequence of the IPs as they reach the respecive darknet port/protocol pair
- Darkvec 30 days per-service language. Knowledge-based languages (`services/services.json`). For each language the corpus is the sequence of the IPs as they reach the respecive darknet port/protocol pair set


According to the time requiring processing, we comment the correct line 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

which processes only the last two days of traffic taking less time. This is only for demonstrative purposes.

For running the original experiment, please comment

`
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
`

replacing it with 

`
raw = sc.textFile(','.join(DEBUG.split(',')[-5:]))
`


Distribute raw data

In [7]:
#raw = sc.textFile(DEBUG)
raw = sc.textFile(','.join(DEBUG.split(',')[-2:]))
bro = raw.mapPartitions(load_data).repartition(1000)

In [8]:
step1 = bro.map(lambda x: (x[0], x[2], x[4], x[5], x[6]))\
           .mapPartitions(convert_fields)\
           .mapPartitions(convert_timestamp)
step1_1 = step1.map(lambda x: ((x[0][0].replace('_', '/'), x[0][1], x[0][2]), x[1]))\
       .mapPartitions(extract_services)

### Darkvec 30 days Single Language

In [None]:
step1.map(lambda x: (x[0][1],(x[1][0], x[1][1]))).groupByKey()\
     .map(lambda x:( x[0], list(x[1]))).mapPartitions(sort_by_ts)\
     .groupByKey().map(lambda x: (x[0], list(x[1])))\
     .mapPartitions(get_sentences(method='darkvec30single', save=SAVE)).collect();

### Darkvec 30 days Auto Language

In [10]:
def select_ports(top10):
    def run(partition):
        for ((pp, hh, dd),(ts, ip)) in partition:
            if pp not in top10:
                pp = 'oth'
        
            yield ((pp, hh, dd),(ts, ip))
    return run

In [11]:
top10 = step1.map(lambda x: (x[0][0], 1)).reduceByKey(lambda x, y: x+y)\
          .sortBy(lambda x: x[1], ascending=False).take(10)
top10 = set([x[0] for x in top10])

In [None]:
step1.mapPartitions(select_ports(top10)).groupByKey().map(lambda x: (x[0], list(x[1])))\
     .map(lambda x: ((x[0][0], x[0][2]), (x[0][1], x[1]))).mapPartitions(get_slices)\
     .groupByKey().map(lambda x: (x[0], list(x[1])))\
     .mapPartitions(get_sentences(method='darkvec30auto', save=SAVE)).collect();

### Darkvec 30 days Per-Service Language

In [13]:
flows_to_filter = 0

In [14]:
filt = step1_1.map(lambda x: ((x[0][2], x[1][1]), 1)).reduceByKey(lambda x,y: x+y)\
              .map(lambda x: (x[0][1], x[1])).groupByKey()\
              .map(lambda x: (x[0], list(x[1])))\
              .mapPartitions(check_dayflows(flows_to_filter)).collect()
filt = set(filt)

Keep processing. 
Apply filter

In [None]:
step1_1.filter(lambda x: x[1][1] in filt).groupByKey()\
        .map(lambda x: (x[0], list(x[1])))\
        .map(lambda x: ((x[0][0], x[0][2]), (x[0][1], x[1])))\
        .mapPartitions(get_slices)\
        .groupByKey().map(lambda x: (x[0], list(x[1])))\
        .mapPartitions(get_sentences(method='darkvec30xserv', save=SAVE))\
        .collect();

### Restore Originals <a id='restore'></a>

In [None]:
os.system(f'rm -rf {CORPUS}')
os.system(f'mv {CORPUS}.BAK {CORPUS}');