In [19]:
small = ['small1.json', 'small2.json']
large = ['01_0.json']

files = small

In [20]:
from VwPipeline import Loggers, Handlers
from VwPipeline.VwCache import VwCache
from VwPipeline.Vw import Vw
from VwPipeline.VwOpts import dimension, product
import pandas as pd

#your vw path
vw_path = r'C:\vw\vw.exe'

cache = VwCache(r'_cache')
vw = Vw(
    vw_path,
    cache,
    handlers=[Handlers.WidgetHandler()],
    )

opts = pd.DataFrame(product(
    dimension('#base', ['--ccb_explore_adf --dsjson --compressed --synthcover --power_t 0  -P 1 --preserve_performance_counters --save_resume']),
))
preds = vw.train(files, opts, ['-p'])
prediction_files = preds.iloc[0]['!Outputs']['-p']
prediction_files

HBox(children=(HTML(value='Total'), FloatProgress(value=0.0, max=1.0), HTML(value='')))

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=2.0), HTML(value='')))

['_cache\\cache-p\\9994b6a3ba3c46af807259c8b5d13f70',
 '_cache\\cache-p\\0bc0717ead83e333f080721b1355126e']

# Dataflow

In [57]:
import json
import pandas as pd

from itertools import chain
from pathlib import Path



class FileSizeHasher:
    extension = 'size'
    
    def evaluate(self, path):
        return Path(path).stat().st_size

class FilesPipeline:
    hasher = FileSizeHasher()
    
    def _load_hash(self, path):
        hash_path = f'{path}.{self.hasher.extension}'
        if not Path(hash_path).exists():
            return None
        try:
            hash_value = int(open(hash_path, 'r').read())
        except:
            hash_value = None
        return hash_value
    
    def _is_in_sync(self, inp, output):
        input_hash = self.hasher.evaluate(inp)
        output_hash = self._load_hash(output)
        return input_hash and output_hash and input_hash == output_hash

    def _sync(self, inp, output):
        with open(f'{output}.{self.hasher.extension}','w') as f:
            f.write(str(self.hasher.evaluate(inp)))
    
    def __init__(self, hasher = None):
        self.hasher = hasher if hasher is not None else self.hasher
    
    def process(self, 
        files,
        processor,
        path_gen=None,
        process=False):
        path_gen = path_gen or (lambda f: f'{f}.{processor.__name__}') 
        result = []
        for path_in in files:
            print(f'Processing {path_in}...')
            path_out = path_gen(path_in)
            Path(path_out).parent.mkdir(parents=True, exist_ok=True)
            if process or not self._is_in_sync(path_in, path_out):
                with open(path_out, 'w') as fout:
                    with open(path_in) as fin:
                        fout.writelines(processor(fin))
                self._sync(path_in, path_out)
            if Path(path_out).exists():
                result.append(path_out)
        return result

def files_2_csvs(
    files,
    processor,
    path_gen=None,
    process=False):
    result = []
    for f in files:
        print(f'Processing {f}...')
        output = path_gen(f)
        Path(output).parent.mkdir(parents=True, exist_ok=True)
        if process or not _is_in_sync(f, output):
            df = pd.DataFrame(processor(open(f)))
            if len(df) > 0:
                df.to_csv(output, index=False)
            _sync(f, output)
        if Path(output).exists():
            result.append(output)
    return result

def csvs_2_rows(files, processors=[]):
    if not processors:
        processors = [lambda d: d]
    for kv in chain.from_iterable(map(lambda f: pd.read_csv(f).iterrows(), files)):
        yield ChainMap(*[p(kv[1]) for p in processors])

def ndjsons_2_rows(files, processors=[]):
    if not processors:
        processors = [lambda d: d]
    for o in map(lambda l: json.loads(l), chain.from_iterable(map(lambda f: open(f), files))):
        yield o

# Parsers

In [77]:
import itertools
import json
import uuid
import pandas as pd
from collections import ChainMap

def is_ccb_decision(line):
        return line.startswith('{"Timestamp"')
    
class UniformSampler:
    counter = 0
    
    def __init__(self, fraction):
        self.period = int(1/fraction)
        
    def do(self, line):
        self.counter = (self.counter + 1) % self.period
        return self.counter == 0

def sample(fraction):
    import random
    return random.random() < fraction

class DsJsonCcb:   
    default_top_processor = lambda o: {
        'Timestamp': o['Timestamp'],
        '_skipLearn': False if '_skipLearn' not in o else o['_skipLearn'],
        'pdrop': 0.0 if 'pdrop' not in o else o['pdrop']}
    
    default_slot_processor = lambda o: {
        '_inc': o['_inc'] if '_inc' in o else []}
    
    default_outcome_processor = lambda o: {
        '_label_cost': o['_label_cost'],
        '_id': o['_id'],
        '_a': o['_a'],
        '_p': o['_p']} 
    
    processors = {
        '/': [default_top_processor],
        'c': [],
        '_multi': [],
        '_slots': [default_slot_processor],
        '_outcomes': [default_outcome_processor]
    }

    filters = [is_ccb_decision]

    def __init__(self, processors = None, filters = None):
        self.processors = processors if processors is not None else self.processors
        self.filters = filters if filters is not None else self.filters

    def process_decision(self, line):
        parsed = json.loads(line)
        top = dict(ChainMap(*[p(parsed) for p in self.processors['/']]))
        shared = dict(ChainMap(*[p(parsed['c']) for p in self.processors['c']]))

        actions = [None] * len(parsed['c']['_multi'])
        for i, o in enumerate(parsed['c']['_multi']):
            actions[i] = dict(ChainMap(*[p(o) for p in self.processors['_multi']]))

        slots = [None] * len(parsed['c']['_slots'])
        for i, o in enumerate(parsed['c']['_slots']):
            slots[i] = dict(ChainMap(*[p(o) for p in self.processors['_slots']]))
        
        outcomes = [None] * len(parsed['_outcomes'])
        for i, o in enumerate(parsed['_outcomes']):
            outcomes[i] = dict(ChainMap(*[p(o) for p in self.processors['_outcomes']]))
        
        result = dict(ChainMap(top, {'c': dict(ChainMap(shared, {'_multi': actions, '_slots': slots})), '_outcomes': outcomes}))
        return json.dumps(result)
        
    def ndjson_2_slots(self, line):
        parsed = json.loads(line)
        session = {'Session': str(uuid.uuid4()),
                    'T': pd.to_datetime(parsed['shared']['T']),
                    'NumActions': len(parsed['_multi']),
                    'NumSlots': len(parsed['_slots']),
                    'SkipLearn': False if '_skipLearn' not in parsed['shared'] else parsed['shared']['_skipLearn'],
                    'Pdrop': 0.0 if 'pdrop' not in parsed['shared'] else parsed['shared']['pdrop']}
        session_custom = [p(parsed['shared']) for p in self.context_processors]

        slots = [None] * len(parsed['slots'])
        for i, o in enumerate(parsed['slots']):
            slots[i] = ChainMap({'SlotIdx': i,
                    'Reward': o['Reward'],
                    'Id': o['Id'],
                    'ActionsPerSlot': len(o['_a']),
                    'Chosen': o['Chosen'],
                    'Prob': o['P']},
                    *[p(o) for p in self.slot_processors])
        
        return map(lambda s: ChainMap(session, *session_custom, s), slots)
    
    def process(self, lines):
        for f in self.filters:
            lines = filter(lambda l: f(l), lines)
        if len(self.processors) > 1:
            return map(lambda l: f'{self.process_decision(l)}\n', lines) 
        return lines
    

class VwPredictionsCcb:
    @staticmethod
    def line_2_slot(line):
        return {p.split(':')[0] : float(p.split(':')[1])  for p in line.split(',')}

    @staticmethod
    def lines_2_slots(lines):
        return map(VwPredictionsCcb.line_2_slot, filter(lambda l : not l.isspace(), lines))

    @staticmethod
    def files_2_slots(files):
        return itertools.chain.from_iterable(map(lambda f: VwPredictionsCcb.lines_2_slots(open(f)), files))

# Processors

In [62]:
def timestamp(row):
    return {'T': row['Timestamp']}

In [63]:
parser = DsJsonCcb()
pipeline = FilesPipeline()

result = pipeline.process(small, parser.process, path_gen=lambda p: fr'processed\{p}', process=True)
result

Processing small1.json...
Processing small2.json...


['processed\\small1.json', 'processed\\small2.json']

In [64]:
parser = DsJsonCcb(filters=[lambda l: True])
result = pipeline.process(result, parser.process, process=True)
result

Processing processed\small1.json...
Processing processed\small2.json...


['processed\\small1.json.process', 'processed\\small2.json.process']

In [79]:
sampler = UniformSampler(0.5)

parser = DsJsonCcb(filters=[lambda l: sampler.do(l)])
result = pipeline.process(small, parser.process, process=True)
result

Processing small1.json...
Processing small2.json...


['small1.json.process', 'small2.json.process']

In [None]:
pd.DataFrame(filter(lambda s: s['SkipLearn']==False,
    csvs_2_rows(slot_files, processors = [
        timestamp
    ])))

In [None]:
files = small

parser = DsJsonCcb()
slot_files= files_2_csvs(files, parser.lines_2_slots, path_gen=lambda p: fr'processed\{p}.txt')

In [None]:
slots = filter(lambda s: s['SkipLearn']==False, csvs_2_rows(slot_files))
preds = VwPredictionsCcb.files_2_slots(prediction_files)

ds = map(lambda kv: ChainMap(kv[0], kv[1]), zip(slots, preds))

In [None]:
pd.DataFrame(ds)

In [None]:
pd.DataFrame(VwPredictionsCcb.files_2_slots(prediction_files))

In [None]:
pd.DataFrame(map(lambda kv: ChainMap(kv[0], kv[1]), zip(slots, preds)))

In [None]:
parser = DsJsonCcb()
list(parser.lines_2_slots(open(small[0])))

In [None]:
large_result = parser.lines_2_slots(open(large[0]))

In [None]:
next(large_result)

In [None]:
list(zip(DsJsonCcb.files_2_slots(inputs,context_processors = [cp]), VwPredictionsCcb.lines_2_slots(open(prediction_file))))

In [None]:
list(DsJsonCcb.files_2_slots(inputs,context_processors = [cp]))

In [None]:
import pandas as pd
import json
import uuid
import itertools
import pytz

class DsJson:
    @staticmethod
    def is_ccb_event(line):
        try:
            o = json.loads(line)
        except:
            return False
        return line.startswith('{"Timestamp"')

    @staticmethod
    def is_cb_event(line):
        return line.startswith('{"_label_cost"')

    @staticmethod
    def is_dangling_reward(line):
        return line.startswith('{"RewardValue')

    @staticmethod
    def get_timestamp(line):
        obj = NaiveJson(line)
        if line.startswith('{"RewardValue'):
            return pd.to_datetime(obj.get_string("EnqueuedTimeUtc"))
        return pd.to_datetime(obj.get_string("Timestamp"))

    @staticmethod
    def context(line):
        parsed = json.loads(line)
        return json.dumps(parsed['c']) + '\n'

    @staticmethod
    def dangling_reward(line):
        parsed = json.loads(line)
        return {'Timestamp': pd.to_datetime(parsed['EnqueuedTimeUtc']), 'EventId': parsed['EventId'], 'Reward': parsed['RewardValue']}

    @staticmethod
    def analyze_observations(obj):
        rewards = 0
        activations =0
        for o in obj:
            if 'ActionTaken' in o and o['ActionTaken']==True:
                activations = activations + 1
            else:
                rewards = rewards + 1

        return rewards, activations

    def get_title_from_obj(action):
        c = action['c']
        if 'Title' in c:
            return c['Title']
        elif 'ProductTitle' in c:
            return c['ProductTitle']
        return None

    @staticmethod
    def ccb_event(line):
        parsed = json.loads(line)
        session = {'Session': str(uuid.uuid4()),
                 'Timestamp': pd.to_datetime(parsed['Timestamp']),
                 'NumActions': len(parsed['c']['_multi']),
                 'NumSlots': len(parsed['c']['_slots']),
                 'VWState': parsed['VWState']['m'],
                 'SkipLearn': False if '_skipLearn' not in parsed else parsed['_skipLearn'],
                 'StringLen': len(line),
                 'Pdrop': 0.0 if 'pdrop' not in parsed else parsed['pdrop']}

        multi = [None] * len(parsed['c']['_multi'])
        for i, o in enumerate(parsed['c']['_multi']):
            multi[i] = {'Id': o['c']['Id'],
                        'Len': len(json.dumps(o))}

        slots = [None] * len(parsed['_outcomes'])
        for i, o in enumerate(parsed['_outcomes']):
            r, a = DsJson.analyze_observations(o['_o'])
            slots[i] = {'SlotIdx': i,
                    'Cost': o['_label_cost'],
                    'EventId': o['_id'],
                    'ActionsPerSlot': len(o['_a']),
                    'Chosen': o['_a'][0],
                    'Prob': o['_p'][0],
                    'Rewards': r,
                    'Activations': a,
                    'Product': multi[o['_a'][0]]['Id'],
                    'ChosenActionLen': multi[o['_a'][0]]['Len']}
        
        return [dict(session, **m) for m in multi] 

    @staticmethod
    def ccb_2_cb(session, slots, multi):
        return [dict(session, **s) for s in slots]

    @staticmethod
    def ccb_as_cb_to_stats(df):
        result = df
        result['TimestampFloor'] = result.index.floor('1min')
        result['TimestampFloor'] = result['TimestampFloor'].dt.tz_localize(None)
        result['Observations'] = result['HasObservation'].astype(int).div(1 - result['Pdrop'])
        result['Rewards'] = -result['Cost'].div(1 - result['Pdrop'])
        result['Events'] = 1
        result['EventsLogged'] = result['Events']
        result['Events'] = result['Events'].div(1 - result['Pdrop'])
        result['RewardsSlot1'] = result['Rewards'].mul((result['SlotIdx']==0).astype(int))
        result['EventsSlot1'] = result['Events'].mul((result['SlotIdx']==0).astype(int))
        result['RewardsIps1'] = result['Rewards'].mul((result['SlotIdx']==result['Chosen']).astype(int)).div(result['Prob'])
        result['EventsIps1'] = result['Events'].mul((result['SlotIdx']==result['Chosen']).astype(int)).div(result['Prob'])
        result['RewardsIps1Slot1'] = result['RewardsIps1'].mul((result['SlotIdx']==0).astype(int))
        result['EventsIps1Slot1'] = result['EventsIps1'].mul((result['SlotIdx']==0).astype(int))
        result['RewardsIpsR'] = result['Rewards'].mul(result['ActionsPerSlot']).div(result['Prob'])
        result['EventsIpsR'] = result['Events'].mul(result['ActionsPerSlot']).div(result['Prob'])
        result['RewardsIpsRSlot1'] = result['RewardsIpsR'].mul((result['SlotIdx']==0).astype(int))
        result['EventsIpsRSlot1'] = result['EventsIpsR'].mul((result['SlotIdx']==0).astype(int))

        return result[['TimestampFloor', 'Observations', 'Rewards', 'Events', 'RewardsSlot1', 'EventsSlot1', 'RewardsIps1', 'EventsIps1', 'RewardsIps1Slot1', 'EventsIps1Slot1', 'RewardsIpsR', 'EventsIpsR', 'RewardsIpsRSlot1', 'EventsIpsRSlot1', 'EventsLogged']].reset_index().drop('Timestamp', axis=1).rename(columns = {'TimestampFloor': 'Timestamp'}).groupby('Timestamp').sum()

    @staticmethod
    def get_title_from_obj(action):
        c = action['c']
        if 'Title' in c:
            return c['Title']
        elif 'ProductTitle' in c:
            return c['ProductTitle']
        return None
    
    @staticmethod
    def ccb_action(line):
        parsed = json.loads(line)
        session = {'Session': parsed['_outcomes'][0]['_id'], 'Timestamp': pd.to_datetime(parsed['Timestamp'])}
        multi = [None] * len(parsed['c']['_multi'])
        for i, o in enumerate(parsed['c']['_multi']):
            multi[i] = {'Id': o['c']['Id'],
                        'Index': i,
                        'ChannelId': o['c']['Id'],
                        'Title': DsJson.get_title_from_obj(o),
                        'SlotIdx': -1,
                        'Cost': 0,
                        'Prob': 0,
                        'ActionLen': len(str(o)),
                        'CLen': len(o['c']),
                        'DLen': len(o['d']),
                        'ELen': len(o['e']),
                        'HLen': len(o['h']),                        
       #                 'plc0': o['c']['plc0'],
       #                 'plc1': o['c']['plc1'],
       #                 'plc2': o['c']['plc2'],
       #                 'plc3': o['c']['plc3'],
       #                 'plc4': o['c']['plc4'],
                       }
      #      for key in o['c']:
      #          multi[i][f'c/{key}'] = o['c'][key]
        for i, o in enumerate(parsed['_outcomes']):
            multi[o['_a'][0]]['SlotIdx'] = i
            multi[o['_a'][0]]['Cost'] = o['_label_cost']
            multi[o['_a'][0]]['Prob'] = o['_p'][0]
        return [dict(session, **m) for m in multi]      

    @staticmethod
    def dangling_reward_lines(lines):
        return filter(lambda l: DsJson.is_dangling_reward(l), lines)

    @staticmethod
    def ccb_decision_lines(lines):
        return filter(lambda l: DsJson.is_ccb_event(l), lines)
    
    @staticmethod
    def dangling_rewards(lines):
        df = pd.DataFrame(
            map(lambda l: DsJson.dangling_reward(l), DsJson.dangling_reward_lines(lines)))
        return df.set_index('Timestamp') if len(df) > 0 else df

    @staticmethod
    def ccb_events(lines):
        events = map(lambda l: DsJson.ccb_2_cb(*DsJson.ccb_event(l)), DsJson.ccb_decision_lines(lines))
        df = pd.DataFrame(itertools.chain(*events))
        return df#.set_index('Timestamp')

    @staticmethod
    def ccb_stats(lines):
        events = map(lambda l: DsJson.ccb_2_cb(*DsJson.ccb_event(l)), DsJson.ccb_decision_lines(lines))
        df = pd.DataFrame(itertools.chain(*events))
        return DsJson.ccb_as_cb_to_stats(df.set_index('Timestamp'))

    @staticmethod
    def ccb_actions(lines):
        actions = map(lambda l: DsJson.ccb_action(l), DsJson.ccb_decision_lines(lines))
        df = pd.DataFrame(itertools.chain(*actions))
        return df.set_index('Timestamp')

    @staticmethod
    def contexts(lines):
        return map(lambda e: DsJson.context(e),
            filter(lambda l: DsJson.is_ccb_event(l), lines))
    
    @staticmethod
    def first_timestamp(lines):
        line = next(lines)
        return DsJson.get_timestamp(line)
    
def ccb_actions(file):
    return DsJson.ccb_actions(open(file, 'r', encoding='utf-8'))

def ccb_slots(file):
    return DsJson.ccb_events(open(file, 'r', encoding='utf-8'))

def dangling_rewards(file):
    return DsJson.dangling_rewards(open(file, 'r', encoding='utf-8'))

In [None]:
import pandas as pd

df = pd.DataFrame([{'i': i} for i in range(16)])

In [None]:
df

In [None]:
df.sample(frac = 0.5)