In [198]:
from __future__ import print_function
import datetime
from functools import reduce
import os

import pandas as pd
import numpy as np
%matplotlib nbagg
import matplotlib.pyplot as plt
import time

# Do not truncate values
pd.set_option('display.max_colwidth', -1)

In [233]:
# Validates that putting together all the dataset ids on every day of a week are the same as the
# dataset ids of the week
def validate_days_in_weeks(week_ts, weeks_ds, days_ds):
    
    ret_val = True
    # Get the set of dataset_ids accessed in the week identified by 'week_ts'
    week_set = set(weeks_ds[weeks_ds['week_ts']==week_ts].datasets_set.values[0])    
    
    # Get the set of dataset_ids accessed in every day that belongs to a week identified by 'week_ts'
    days_set=set()
    for day_set in days_ds[days_ds['week_ts']==week_ts]['datasets_set']:
        days_set.update(set(day_set))
    
    # Makes sure both sets are the same size
    week_set_len= len(week_set)
    days_set_len= len(days_set)
    if week_set_len != days_set_len:
        print(str(week_set_len)+" != "+str(days_set_len))
        ret_val = False
    
    # If both sets are the same size proceed to make sure tha both
    # sets contain the same items
    if ret_val != False:
        if days_set != week_set:
            ret_val = False
    
    return ret_val


# Receives a weeks DataFrame ('weeks_df') and returns a sorted (by week_ts) list
# of datasets sets
# @weeks_ds: a pandas DataFrame of the form:
#    ------------------------------------------------
#    |weeks_ts      |datasets_set                   |
#    ------------------------------------------------
#    |1.561594e+09  |[12686513, 13766504, 14689984] |
#    |1.361594e+09  |[15686513, 16766504]           |
#    |1.761594e+09  |[17686513, 18766504, 13689984] |
#    ------------------------------------------------
#    where:
#     'weeks_ts' is a Linux timestamp that identifies the week and 
#     'datasets_set' is an array of datasets IDs that were accessed in that week
#    @return: [{15686513, 16766504},{12686513, 13766504, 14689984},{17686513, 18766504, 13689984}] 
#
def get_sorted_list_of_datasets_sets(weeks_df):
    # Sort the dataset in cronological order (by week_ts (week timestamp))
    # Reset the index once the dataFrame is sorted so that we can access it
    # in order using the indices
    weeks_df_sorted = weeks_df.sort_values('week_ts')
    weeks_df_sorted = weeks_df_sorted.reset_index(drop=True)

    # count() returns a series structure, get an integer 
    weeks_df_count = weeks_df_sorted.count()
    weeks_df_count = weeks_df_count.week_ts
    # Create a cronological ordered list of datasets sets(arrays are converted into sets)
    weeks_sorted_list= []
    for i in range(0, weeks_df_count):
        weeks_sorted_list.append(set(weeks_df_sorted.datasets_set[i]))
        
    return weeks_sorted_list

def get_freed_recalled_and_ws_sizes(weeks_list, policy, datasets_size):
    freed = set()
    recalled_per_week = []
    freed_per_week = []
    called_per_week = []
    working_set_size_per_week=[]
    ws_per_week = []
    to_free = set()
    to_recall = set()
    
    # Fill in the first 'policy' weeks with empty sets given that nothing could have
    # recalled nor freed during those weeks.
    # The working set size for these first 'policy' weeks will be accumulated set of
    # datasets accessed during those weeks
    current_working_set = set()
    current_working_set_size = 0
    for i in range(0, policy):
        recalled_per_week.append(to_recall)
        freed_per_week.append(to_free)
        current_working_set = current_working_set.union(weeks_list[i])
        current_working_set_size = get_dataset_set_bytes(current_working_set, datasets_size)
        working_set_size_per_week.append(current_working_set_size)
   
    # For each week in the list, starting on the first week
    # after the policy
    for i in range(policy, len(weeks_list)):
        #print(i)
        # Calculate the intermediate working_set that includes the set of datasets
        # accesed between the week leaving the working set(old_week) and the
        # the current week(new_week)
        int_ws = set()
        int_ws_to = i - 1
        int_ws_from = i - (policy) + 1

        #print("from: "+str(int_ws_from))
        #print("to: "+str(int_ws_to))
        
        for j in range(int_ws_from, int_ws_to+1):
            #print("adding: "+str(weeks_list[j])+" to int_ws")
            int_ws.update(weeks_list[j])
        new_week = weeks_list[i]
        old_week = weeks_list[int_ws_from -1]
        #print(old_week)
        #print(int_ws)
        #print(new_week)
        
        current_working_set = int_ws.union(new_week)
        current_working_set_size = get_dataset_set_bytes(current_working_set, datasets_size)
        to_free = old_week - (int_ws.union(new_week))
        to_call = (new_week - (int_ws.union(old_week)))
        to_recall = (new_week - (int_ws.union(old_week))).intersection(freed)
       
        working_set_size_per_week.append(current_working_set_size)
        freed.update(to_free)
        recalled_per_week.append(to_recall)
        freed_per_week.append(to_free)
        
        #called_per_week.append(to_call)
        #ws_per_week.append(int_ws.union(old_week))

        #print("to free: "+ str(to_free))
        #print("to call: "+ str(to_call))
        #print("to recall: "+ str(to_recall))

    return freed_per_week, recalled_per_week, working_set_size_per_week

def get_size_of_datasets_sets(datasets_set, datasets_size):
    # TODO:
    # - call get_dataset_set_bytes
    week_sizes = []
    for week in datasets_set:
        total_size=0
        for dataset_id in week:
            size = datasets_size[datasets_size['d_dataset_id'] == dataset_id].dataset_size.values[0]
            total_size = total_size + size
            #print("id: "+str(dataset_id))
            #print("size: "+str(size))
        week_sizes.append(total_size)
    return week_sizes


# Get the set of datasets recalled in every day of a given week
def get_datasets_recalled_per_day(recalled_set, week_ts, days_df):
    datasets_recalled_per_day = dict()
    
    for day_ts in days_df[days_df['week_ts'] == week_ts]['day_ts'].values:
        datasets_recalled_per_day[day_ts]=set()
    
    for recalled_dataset in recalled_set:   
        # For each of the days in the week
        for day_ts in days_df[days_df['week_ts'] == week_ts]['day_ts'].values:
            a= days_df['week_ts'] == week_ts
            b= days_df['day_ts'] == day_ts
            # Is the recalled dataset in this day
            if recalled_dataset in days_df[a&b]['datasets_set'].values[0]:
                #print(recalled_dataset)
                datasets_recalled_per_day[day_ts].add(recalled_dataset)
                # If a dataset was accessed more than once within the same week
                # it was only recalled the first time since the minimum delete
                # policy is 1 week
                break
                
    return datasets_recalled_per_day


def get_dataset_set_bytes(datasets_set, datasets_size):
    size=0
    total_size=0
    for dataset_id in datasets_set:
        size = datasets_size[datasets_size['d_dataset_id'] == dataset_id].dataset_size.values[0]
        total_size = total_size + size
    
    return total_size

def read_and_prepare_data(test=False):
    days_df_list = [] 
    if(test):
        for i in range(1, 3):
            inputfile = "/Users/ddavila/projects/DOMA/data/model/df"+str(i)+".parquet"
            #print("reading: "+inputfile)
            day_df = pd.read_parquet(inputfile)
            days_df_list.append(day_df)
    else:
        for i in range(1, 7):
            inputfile = "/Users/ddavila/projects/DOMA/data/model/days_20190"+str(i)+".parquet"
            print("reading: "+inputfile)
            day_df = pd.read_parquet(inputfile)
            days_df_list.append(day_df)
    
    # Data is coming separated in months, let's concatenate all these months
    # to make a single DataFrame containing all the data
    all_days = pd.concat(days_df_list)
    all_days.reset_index(inplace=True)
    
    # Transform 'datasets_set' from array to list, so that we can 
    all_days['datasets_set']=all_days['datasets_set'].apply(list)
    
    # Make sure that the same week_ts + day_ts doesn't exist in more than 1 month
    # and if so, group it together
    all_days= all_days.groupby(['week_ts','day_ts']).agg({'datasets_set':sum})
    all_days.reset_index(inplace=True)
    
    # Create a new DataFrame 'all_weeks' where we group all days, within a week, together
    all_weeks = all_days.groupby(['week_ts']).agg({'datasets_set':sum})

    # Transform the 'datasets_set' from list to set, to remove duplicates
    all_weeks['datasets_set']=all_weeks['datasets_set'].apply(set)
    all_weeks.reset_index(inplace=True)
    
    # Transform the 'datasets_set' from list to set, to remove duplicates
    all_days['datasets_set']=all_days['datasets_set'].apply(set)
    all_days.reset_index(inplace=True)

    return all_days, all_weeks

def add_record_report(report, policy, max_ws_size, total_recalled_bytes, total_freed_bytes, max_recall, mr_week_ts, mr_day_ts):
    record = { 'policy': policy,
                'max_workingset_size':format_bytes(max_ws_size),
                'total_recalled':format_bytes(total_recalled_bytes),
                'total_freed':format_bytes(total_freed_bytes),
                'max_recalled_per_day':format_bytes(max_recall),
                'max_recalled_week_ts':mr_week_ts,
                'max_recalled_day_ts':mr_day_ts,
            }
    #print(record)
    report.append(record)
    #print(report)



def print_report(report):
    df = pd.DataFrame(report)
    df['max_recalled_week'] = pd.to_datetime(df['max_recalled_week_ts'], unit='s').dt.date
    df['max_recalled_day'] = pd.to_datetime(df['max_recalled_day_ts'], unit='s').dt.date
    df = df[['policy', 'max_recalled_per_day', 'max_workingset_size','total_recalled', 'total_freed']]
    print(df)

def format_bytes(size):
    # 2**10 = 1024
    #power = 2**10
    power = 1000
    n = 0
    power_labels = {0 : 'B', 1: 'KB', 2: 'MB', 3: 'GB', 4: 'TB', 5:'PB'}
    while size > power:
        size /= power
        n += 1
    formated = "{0:.2f}".format(size)
    formated = formated +" "+power_labels[n]
    return formated

In [234]:
# Use this for REAL data
datasets_df = pd.read_parquet("/Users/ddavila/projects/DOMA/data/model/dataset.parquet/")
datasets_size = datasets_df[['d_dataset_id', 'dataset_size']]
days_df, weeks_df = read_and_prepare_data()

reading: /Users/ddavila/projects/DOMA/data/model/days_201901.parquet
reading: /Users/ddavila/projects/DOMA/data/model/days_201902.parquet
reading: /Users/ddavila/projects/DOMA/data/model/days_201903.parquet
reading: /Users/ddavila/projects/DOMA/data/model/days_201904.parquet
reading: /Users/ddavila/projects/DOMA/data/model/days_201905.parquet
reading: /Users/ddavila/projects/DOMA/data/model/days_201906.parquet


In [208]:
# Use this for TEST data
datasets_df = pd.read_parquet("/Users/ddavila/projects/DOMA/data/model/dfs.parquet/")
datasets_size = datasets_df[['d_dataset_id', 'dataset_size']]
days_df, weeks_df = read_and_prepare_data(test=True)

In [44]:
# Validate that the union of the sets of the days that belong to a week are the same
# as the set of the whole week.
# Note. This will make no sense if we calculate the 'weeks' dataset from the 'days'
#for week_ts in weeks_df['week_ts']:
#    print(str(week_ts) + ": " + str(validate_days_in_weeks(week_ts, weeks_df, days_df)))

In [240]:
# Get a sorted list of the week sets so that the first element in the list
# would be the set of dataset IDs accessed in the first week and so on
weeks_list = get_sorted_list_of_datasets_sets(weeks_df)

report= []
# Execute general algorithm for each of the different policies
for policy in range(3,6):
   
    # Calculate for each week, the working_set size and the set of datasets freed and recalled depending
    # on the delete policy used
    datasets_freed, datasets_recalled, ws_sizes = get_freed_recalled_and_ws_sizes(weeks_list, policy, datasets_size)
    max_ws_size = 0
    for ws_size in ws_sizes:
        if ws_size > max_ws_size:
            max_ws_size = ws_size

    #print(datasets_recalled)
    #print("max working set size: "+str(max_ws_size)+" Bytes")

    # Get the week timestamps sorted so that they corresponds to 'datasets_recalled'
    weeks_ts = weeks_df.sort_values('week_ts')['week_ts'].values
    #print(weeks_ts)

    # Calculate the day with more Bytes recalled
    max_recall = 0
    mr_day_ts = 0
    mr_week_ts = 0
    for i in range(0, len(weeks_ts)):
        #print(weeks_ts[i])
        recalled_datasets_per_day= get_datasets_recalled_per_day(datasets_recalled[i], weeks_ts[i], days_df)
        #print(recalled_datasets_per_day)
        for day in recalled_datasets_per_day:
            max_recall_per_week = get_dataset_set_bytes(recalled_datasets_per_day[day], datasets_size)
            if max_recall_per_week > max_recall:
                max_recall = max_recall_per_week
                mr_day_ts = day
                mr_week_ts = weeks_ts[i]

    #print("max recall per day: "+str(max_recall)+" Bytes, on week: "+str(mr_week_ts)+" on day: "+ str(mr_day_ts))


    # Calculate the amount of Bytes recalled and freed per week
    datasets_freed_sizes = get_size_of_datasets_sets(datasets_freed, datasets_size)
    datasets_recalled_sizes = get_size_of_datasets_sets(datasets_recalled, datasets_size)

    # Calculate totals
    total_freed_bytes =0
    for week in datasets_freed_sizes:
        total_freed_bytes = total_freed_bytes + week

    total_recalled_bytes = 0
    for week in datasets_recalled_sizes:
        total_recalled_bytes = total_recalled_bytes + week

    add_record_report(report, policy, max_ws_size, total_recalled_bytes, total_freed_bytes, max_recall, mr_week_ts, mr_day_ts)

    #print(datasets_freed)
    #print(datasets_freed_sizes)
    #print("total freed Bytes: "+str(total_freed_bytes))
    #print(datasets_recalled)
    #print(datasets_recalled_sizes)
    #print("total recalled Bytes: "+str(total_recalled_bytes))


In [241]:
print_report(report)


   policy max_recalled_per_day max_workingset_size total_recalled total_freed
0  3       670.78 TB            18.87 PB            17.32 PB       41.61 PB  
1  4       615.54 TB            20.07 PB            12.31 PB       35.46 PB  
2  5       590.30 TB            21.40 PB            9.40 PB        30.97 PB  


In [238]:
print_report(old_report)


   policy max_recalled_per_day max_workingset_size total_recalled total_freed
0  1       1.42 PB              14.75 PB            40.30 PB       70.37 PB  
1  2       812.02 TB            17.22 PB            25.37 PB       52.40 PB  


In [243]:
all_report= old_report + report

In [244]:
print_report(all_report)



   policy max_recalled_per_day max_workingset_size total_recalled total_freed
0  1       1.42 PB              14.75 PB            40.30 PB       70.37 PB  
1  2       812.02 TB            17.22 PB            25.37 PB       52.40 PB  
2  3       670.78 TB            18.87 PB            17.32 PB       41.61 PB  
3  4       615.54 TB            20.07 PB            12.31 PB       35.46 PB  
4  5       590.30 TB            21.40 PB            9.40 PB        30.97 PB  
