In [1]:
# imports
import re
import time
import pickle
import logging
import gc
import os

import pandas as pd
import numpy as np
import math as m
import matplotlib.pyplot as plt

from scipy import stats


log = logging.getLogger('log')
log.setLevel(logging.DEBUG)

lhnd = logging.StreamHandler()
lhnd.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
lhnd.setFormatter(formatter)

log.addHandler(lhnd)

In [2]:
def lmap(f, arr):
    return list(map(f, arr))

def lfilter(f, arr):
    return list(filter(f, arr))

def foreach(it, f):
    for e in it:
        f(e)

In [3]:
class Event:
    def __init__(self, ts, id):
        self.ts = int(ts)
        self.id = abs(id)
        self.sub = id > 0
    
    def raw_id(self):
        return self.id if self.sub else -self.id
    
    def __eq__(self, other):
        return self.raw_id() == other.raw_id() and self.ts == other.ts
    
    def __hash__(self):
        return self.raw_id() + ts * Math.pow()
    
    def __repr__(self):
        return str(self)
    
    def __str__(self):
        return '({}; {})'.format(self.ts, self.raw_id())

In [4]:
def raw_data_filter(raw):
    start = time.time()
    
    # Mapping to events
    res = dict()
    
    for i in range(0, len(raw)):
        cur = raw[i].rstrip().split(',')
        cur = lmap(lambda p: (re.sub(';.*', '', p), re.sub('.*;', '', p)), cur)
        
        events = []
        
        for j in range(0, len(cur)):
            try:
                events.append(Event(cur[j][0], int(cur[j][1])))
            except ValueError:
                None

        if (i % 10000 == 0):
            log.debug("%d %% of mapping is done.", i / len(raw) * 100)
                
        res[i] = events
    
    log.debug("Mapping finished in %d sec.", time.time() - start)
    
    start = time.time()
        
    subs = set()
    
    # Drop unsub without sub
    foreach(res.values(), lambda s: 
            foreach(s, lambda e: subs.add(e.raw_id()))
           )
    
    empty = []
    
    for id, session in res.items():
        if (id % 10000 == 0):
            log.debug("%d %% of filtering is done.", id / len(raw) * 100)
            
        res[id] = lfilter(lambda e: e.sub or e.id in subs, session)
        
        if (len(res[id]) == 0):
            empty.append(id)
    
     # Drop empty
    for k in empty: del res[k]
        
    log.debug("Filtering finished in %d sec.", time.time() - start)
    
    return res

In [None]:
chunks_count = 15

def split_mapping(raw, start, finish = chunks_count): 
    dexes = []
    
    for i in range(0, chunks_count):
        dexes.append(len(raw) // chunks_count * i)

    dexes.append(len(raw))
    
    log.debug(dexes)
        
    for i in range(start, finish):
        log.info('Processing range %d:%d', dexes[i], dexes[i + 1])
        
        filtered = raw_data_filter(raw[dexes[i]:dexes[i + 1]])
        
        fname = 'data/raw-chunk-' + str(i)
        
        gc.collect()
        
        with open(fname, 'wb') as f:
            pickle.dump(filtered, f, pickle.HIGHEST_PROTOCOL)
            
        log.info('Dumped to the file %s', fname)
        
        gc.collect()

def load_chuncks(start, finish = chunks_count):
    res = []
    
    for i in range(start, finish):
        start = time.time()
        
        fname = 'data/raw-chunk-' + str(i)
        
        with open(fname, 'rb') as f:
            res.append(pickle.load(f))
            
        log.debug("Loading finished in %d sec.", time.time() - start)
            
    return(res)



In [None]:
processed = os.path.isfile('data/raw-chunk-0')

if (not processed):
    data_file = open("data/sessions_public.txt","r").readlines()
    
    split_mapping(data_file, 0, 8)
    
    gc.collect()
    
    split_mapping(data_file, 8)
    
    data_file = None
    
    gc.collect()

    
