# Automatic Fs low values reduction calculator
This script automatically calculates the config parameter `min_value_for_not_reduce` for Fs small values reduction (for noisy Fs). It was written both for research (prior to writing production code) and for POCs (running versions of the product without automatic Fs low values reduction calculation in the production code).

### Assumptions:
This script assumes that there's an accessible mongo with the collections starting with `scored___aggr_event` populated. These are used in order to find which Fs are noisy (such that low values reduction can help reducing the noise).

### Configuration:
* `mongo_ip` should be configured with the right ip.
* `verbose` can be set to `True` in order to print more stuff.
* `show_graphs` should be set to `True` only when you want to display graphs (typically in research environment).
* `aggregated_feature_event_prevalance_stats_path` is the path to the version of the configuration installed for the customer. The reason this is needed is so we can undo the reduction done in runtime - so we can see the real values and scores and decide on the right new reduction (which might be different than what we've set during the installation process).

### Output:
The names of the Fs that should be reduced are printed following by a number - this is the `min_value_for_not_reduce` parameter. All the other parameters (`max_value_for_fully_reduce` and `reducing_factor`) should be set manually.

In [None]:
#mongo_ip = 'localhost'
mongo_ip = '192.168.45.44'
#mongo_ip = 'tc-agent3'
verbose = True
aggregated_feature_event_prevalance_stats_path = r'C:\Users\yoelz\projects\fortscale-core\fortscale\fortscale-streaming\config\aggregated-feature_event-prevalance-stats.properties'
#aggregated_feature_event_prevalance_stats_path = '/home/cloudera/fortscale/streaming/config/aggregated-feature_event-prevalance-stats.properties'

In [None]:
try:
    get_ipython()
    show_graphs = True
    import matplotlib.pyplot as plt
    %matplotlib inline
except NameError:
    show_graphs = False

In [None]:
import pymongo
import itertools
import copy
import json
import re
import time
import datetime
import sys
sys.path.append('..')
from entities import Entities
from utils import print_verbose

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#%%javascript
#//IPython.load_extensions('usability\\execute_time\\ExecuteTime');

In [None]:
start_time = time.time()

In [None]:
db = pymongo.MongoClient(mongo_ip, 27017).fortscale

In [None]:
def query_hist_of_value_and_score(collection):
    res = collection.aggregate([
            {
                '$project': {
                    'aggregated_feature_value': 1,
                    'score': 1
                }
            },
            {
                '$group': {
                    '_id': {
                        'aggregated_feature_value': '$aggregated_feature_value',
                        'score': '$score'
                    },
                    'count': {'$sum': 1}
                }
            },
            {
                '$project': {
                    'aggregated_feature_value': '$_id.aggregated_feature_value',
                    'score': '$_id.score',
                    'count': 1,
                    '_id': 0
                }
            },
            {
                '$sort': {
                    'score': 1
                }
            }
        ])
    if type(res) == dict:
        res = res['result'] # in some versions of pymongo the result is a dict (with  'ok' and 'result' fields) instead of a cursor
    else:
        res = list(res)
    return dict(((entry['aggregated_feature_value'], entry['score']), entry['count']) for entry in res)

In [None]:
def get_all_collection_names():
    if pymongo.version_tuple[0] > 2 or (pymongo.version_tuple[0] == 2 and pymongo.version_tuple[1] > 7):
        return db.collection_names()
    else:
        return [e['name'] for e in db.command('listCollections')['cursor']['firstBatch'] if e['name'].startswith('scored___aggr_event')]

def load_fs_from_mongo():
    f_collection_names = filter(lambda name : name.startswith('scored___aggr_event'), get_all_collection_names())
    print_verbose('collection names:')
    for collection_name in f_collection_names:
        print_verbose('\t', collection_name)
    print
    fs = {}
    for f_type in ['hourly', 'daily']:
        fs[f_type] = {}
        for collection_name in f_collection_names:
            if collection_name.endswith(f_type):
                print_verbose('quering', collection_name, '...')
                fs[f_type][collection_name[:-len('_' + f_type)]] = query_hist_of_value_and_score(db[collection_name])
    return fs

def load_old_low_values_reducers():
    '''
    TODO: this function was copied from "alphas and betas.ipynb". it should be refactored into a utils library.
    '''
    res = {}
    with open(aggregated_feature_event_prevalance_stats_path, 'r') as f:
        for l in f.readlines():
            match = re.search('fortscale\.aggr_event\..*\.(.*)\.fortscale\.score\..*\.reduction\.configs=.*"reducingFactor":(\d+\.?\d*).*"maxValueForFullyReduce":(\d+\.?\d*).*"minValueForNotReduce":(\d+\.?\d*)', l)
            if match is not None:
                f_name, reducing_factor, max_value_for_fully_reduce, min_value_for_not_reduce = match.groups()
                if res.has_key(f_name):
                    raise Exception(f_name + ' was already encountered')
                res[f_name] = {
                    'reducing_factor': float(reducing_factor),
                    'max_value_for_fully_reduce': float(max_value_for_fully_reduce),
                    'min_value_for_not_reduce': float(min_value_for_not_reduce)
                }
    return res

def calc_reducing_factor(value, min_value_for_not_reduce, max_value_for_fully_reduce, reducing_factor):
    '''
    TODO: this function was copied from "alphas and betas.ipynb". it should be refactored into a utils library.
    '''
    if value <= max_value_for_fully_reduce:
        factor = reducing_factor
    elif value < min_value_for_not_reduce:
        numerator = value - max_value_for_fully_reduce
        denominator = min_value_for_not_reduce - max_value_for_fully_reduce
        part_to_add = 1. * numerator / denominator
        factor = reducing_factor + (1 - reducing_factor) * part_to_add
    else:
        factor = 1
    return factor

def cancel_low_values_reduction_for_single_score(score, value, min_value_for_not_reduce, max_value_for_fully_reduce, reducing_factor):
    if min_value_for_not_reduce != None:
        score /= calc_reducing_factor(value, min_value_for_not_reduce, max_value_for_fully_reduce, reducing_factor)
    return score

def cancel_low_values_reduction(fs):
    old = load_old_low_values_reducers()
    fs = copy.deepcopy(fs)
    for f_type in ['hourly', 'daily']:
        for collection_name, hist in list(fs[f_type].iteritems()):
            f_name = collection_name + '_' + f_type
            f_name = f_name[len('scored___aggr_event__'):]
            old_f_reducer = old.get(f_name, None)
            if old_f_reducer is None:
                continue
            unreduced_hist = {}
            for value_and_score, count in hist.iteritems():
                unreduced_score = cancel_low_values_reduction_for_single_score(value_and_score[1],
                                                                               value_and_score[0],
                                                                               min_value_for_not_reduce = old_f_reducer['min_value_for_not_reduce'],
                                                                               max_value_for_fully_reduce = old_f_reducer['max_value_for_fully_reduce'],
                                                                               reducing_factor = old_f_reducer['reducing_factor'])
                unreduced_hist[(value_and_score[0], unreduced_score)] = count
            fs[f_type][collection_name] = unreduced_hist
    return fs

try:
    fs_before_canceling_reduction
except NameError:
    fs_before_canceling_reduction = load_fs_from_mongo()
fs = cancel_low_values_reduction(fs_before_canceling_reduction)

In [None]:
def create_values_hist(hist, score_to_weight):
    hist = [{'value': value_and_score[0], 'count': score_to_weight(value_and_score[1]) * count} for value_and_score, count in hist.iteritems()]
    hist = [entry for entry in hist if entry['count'] > 0]
    hist = sorted(hist, key = lambda entry: entry['value'])
    hist = [{'value': value, 'count': sum(entry['count']
                                          for entry in entries_with_same_value)}
            for value, entries_with_same_value in itertools.groupby(hist, lambda entry: entry['value'])]
    return dict((entry['value'], entry['count']) for entry in hist)
    
def show_hist(hist, score_to_weight, max_value = 20):
    hist = create_values_hist(hist, score_to_weight)
    hist = dict(filter(lambda value_and_count: value_and_count[0] <= max_value, hist.iteritems()))
    print_verbose(hist)
    if not show_graphs:
        return
    # if hist has only one entry, matplotlib will fail to plot it:
    hist[0] = hist.get(0, 0.00001)
    hist[1] = hist.get(1, 0.00001)
    fig, ax = plt.subplots()
    fig.set_figwidth(20)
    fig.set_figheight(3)
    plt.xlim(0, max_value)
    plt.hist([val for val in hist],
             weights = list(hist.itervalues()),
             bins = 1000,
             histtype='stepfilled')
    plt.xticks(range(max_value))
    plt.xlabel('value', fontsize = 20)
    plt.ylabel('count', fontsize = 20)
    plt.show()
    
def show_hists(fs, f_type, score_to_weight, max_value = 20):
    for collection_name, hist in fs[f_type].iteritems():
        collection_name = collection_name + '_' + f_type
        print_verbose()
        print_verbose(collection_name + ':')
        print_verbose('(creating the histogram using score_to_weight)')
        show_hist(hist, score_to_weight = score_to_weight, max_value = max_value)
        
def create_score_to_weight_squared(min_score):
    def score_to_weight_squared(score):
        return max(0, 1 - ((score - 100) / (100.0 - min_score)) ** 2)
    return score_to_weight_squared

score_to_weight_squared_min_80 = create_score_to_weight_squared(80)

In [None]:
max_value = 20
show_hists(fs, 'daily', score_to_weight = score_to_weight_squared_min_80, max_value = max_value)
show_hists(fs, 'hourly', score_to_weight = score_to_weight_squared_min_80, max_value = max_value)

In [None]:
def calc_min_value_for_not_reduce(hist, score_to_weight, max_value = 20):
    hist = create_values_hist(hist, score_to_weight)
    total_count = sum(hist.itervalues())
    if total_count == 0:
        return None
    cumsum = 0
    prev_count = 0
    max_count_seen = 0
    peek_start = None
    min_value_for_not_reduce = None
    for value, count in sorted(hist.iteritems(), key = lambda value_and_count: value_and_count[0]):
        # don't overdo it (we don't want to reduce everything)
        if 1. * cumsum / total_count > 0.5 and (peek_start is None or value - peek_start > 1):
            break
        
        # don't bother if there are not enough candidates to be considered as noise:
        is_enough_noise_absolutely = count > 10
        
        # peeks can exist only at low values:
        if value <= max_value:
            if peek_start is None and is_enough_noise_absolutely and 1. * (count - max_count_seen) / total_count > 0.15:
                peek_start = value
            if peek_start is not None and 1. * count / (prev_count + 1) < 0.85:
                min_value_for_not_reduce = value
                break
        
        cumsum += count
        prev_count = count
        max_count_seen = max(max_count_seen, count)
        
    return min_value_for_not_reduce

def calc_min_value_for_not_reduce_for_hists(fs, f_type, score_to_weight, max_value = 20):
    print
    print '----------------------------------------------------------------------'
    print '-------- Calculating min_value_for_not_reduce for all', f_type, 'Fs -------'
    print '----------------------------------------------------------------------'
    for collection_name, hist in fs[f_type].iteritems():
        collection_name = collection_name + '_' + f_type
        min_value_for_not_reduce = calc_min_value_for_not_reduce(hist, score_to_weight = score_to_weight, max_value = max_value)
        if min_value_for_not_reduce is not None:
            print collection_name + ':', min_value_for_not_reduce
    
calc_min_value_for_not_reduce_for_hists(fs, 'daily', score_to_weight = score_to_weight_squared_min_80)
calc_min_value_for_not_reduce_for_hists(fs, 'hourly', score_to_weight = score_to_weight_squared_min_80)

In [None]:
print_verbose("The script's run time was", datetime.timedelta(seconds = int(time.time() - start_time)))

In [None]:
entities = Entities(path = 'entities-TCAGENT7-new.txt', mongo_ip = 'tc-agent7')
print 'Querying entities...'
if entities.query(start_time = None, end_time = None):
    print 'Saving...'
    entities.save()

In [None]:
def find_bad_values(entities, score_to_weight, is_daily):
    f_to_bad_values_hist = {}
    f_to_contexts = {}
    m = (0, None)
    for e in sorted(entities.iterate(is_daily = is_daily), key = lambda e: e['startTime']):
        for a in e['includedAggrFeatureEvents']:
            if a['type'] != 'F':
                continue
            context_to_history = f_to_contexts.setdefault(a['name'], {})
            history = context_to_history.setdefault(e['contextId'], [])
            weight = score_to_weight(a['score'])
            if len(history) > 0 and abs(sum(history) / len(history) - a['value']) < 2 and weight > 0:
                bad_values_hist = f_to_bad_values_hist.setdefault(a['name'], {})
                bad_values_hist[a['value']] = bad_values_hist.get(a['value'], 0) + weight
            history.append(a['value'])
            if len(history) > m[0]:
                m = (len(history), a['name'], e['contextId'])
    print m
    return f_to_bad_values_hist, f_to_contexts
    
res = find_bad_values(entities, score_to_weight = score_to_weight_squared_min_80, is_daily = True)

In [None]:
res[0]