# TODOs

[X] Implement a delta function checker - if it's X number of days behind current day just stop reading

[X] Currently overwrites data that already exists. This needs to stop

[ ] Store the files and put them into folders so you don't rip your eyes out when opening the folder

In [25]:
import time
import dask.bag as db
import json
from zefr.mlpl.util import feature_importance_plot, train_validation_plot, glob_s3
from tqdm import tqdm_notebook as tqdm
from datetime import datetime

start_time = time.time() 
print(start_time)

# # Dask Client Parameters
CLIENT_INIT_TIMEOUT = 1200

# # Pipeline parameters
DASK_SCHEDULER = "tcp://dask.zefr.com:8798"  # Ian's scheduler
STORAGE_PATH = "../../avro_deltas/"
TRAINING_PATH = "s3://zefr/kafka/prod/topics/qz-qzapi-review"
SEEN_FILES = 'seen_files.txt'  # So we don't re-write files we've seen. Addresses point 2 above
DATA_SRC = glob_s3(TRAINING_PATH)

DAYS_DIFFERENCE = float('inf')

1571266832.231919


# Setup Scheduler

In [6]:

# ----------------------------------------------------------------------------
#  This cell addresses issue DSMP-241.  When the dask cluster is started and 
#  the notebook is run, there's a race condition between the workers being
#  available and the use of the workers.  If an attempt to use the workers is
#  is initiated before the workers are available, a TimeoutError indicating
#  no workers are available will be raised by Dask. 
#
#  This code creates the client and if the desired number of workers are
#  not detected, it will restart the client (with exponential backoff) until 
#  the workers come up or a timeout occurs.
# ----------------------------------------------------------------------------


from dask.distributed import Client

from typing import Callable, Optional, TypeVar, Union
import time

T = TypeVar("T")
S = TypeVar("S")

def exp_backoff(
    init_data: T,
    init_fn: Callable[[T], S],
    state_transition_fn: Callable[[S, T], S],
    acceptance_criteria: Callable[[S], bool],
    init_delay_sec: float = 1,
    delay_exponent: float = 2,
    timeout_sec: float = 30,
    logger = None,
    no_sleep_til_brooklyn: bool = False
) -> Union[TimeoutError, S]:

    assert init_delay_sec >= 0, f"init_delay_sec must be non-negative, found: {init_delay_sec}"
    assert delay_exponent >= 1, f"delay_exponent must be >= 1, found: {delay_exponent}"

    retries = 0
    running_time = 0
    timeout = init_delay_sec
    s = init_fn(init_data)
    accept = acceptance_criteria(s)

    while not accept and running_time * delay_exponent < timeout_sec:
        print(f"Retrying. sleeping {timeout} seconds")
        if no_sleep_til_brooklyn is False:
            if logger is not None:
                logger.info(f"Retrying. sleeping {timeout} seconds")
            time.sleep(timeout)
        running_time += timeout
        timeout *= delay_exponent
        retries += 1
        s = state_transition_fn(s, init_data)
        accept = acceptance_criteria(s)

    if accept:
        return s
    else:
        if logger is not None:
            logger.error(
                "Retries unsuccessful. "
                f"retries: {retries}, "
                f"agg_delay_time: {running_time}, "
                f"timeout_sec: {timeout_sec}, "
                f"init_delay_sec: {init_delay_sec}, "
                f"delay_exponent: {delay_exponent}"
            )
        return TimeoutError(
            "Retries unsuccessful. "
            f"retries: {retries}, "
            f"agg_delay_time: {running_time}, "
            f"timeout_sec: {timeout_sec}, "
            f"init_delay_sec: {init_delay_sec}, "
            f"delay_exponent: {delay_exponent}"
        )

In [7]:
def get_dask_client(
    scheduler_uri: Optional[str] = None, 
    min_workers = 0,
    init_delay_sec: float = 1,
    delay_exponent: float = 2,
    timeout_sec: float = 30,
    logger = None
) -> Client:

    err_or_client = exp_backoff(
      init_data = scheduler_uri, 
      init_fn = lambda scheduler_uri: Client(scheduler_uri),
      state_transition_fn = lambda client, _: client.restart(),
      acceptance_criteria = lambda client: min_workers <= len(client.get_worker_logs()),
      init_delay_sec = init_delay_sec,
      delay_exponent = delay_exponent,
      timeout_sec = timeout_sec,
      logger = logger
    )

    if isinstance(err_or_client, TimeoutError):
        raise err_or_client
    else: 
        return err_or_client

In [22]:
client = get_dask_client(DASK_SCHEDULER, min_workers=0, timeout_sec=CLIENT_INIT_TIMEOUT)
client

0,1
Client  Scheduler: tcp://dask.zefr.com:8798  Dashboard: http://dask.zefr.com:8799/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [34]:
def filter_none_records(video_message):
    return video_message['metadata'] is not None and video_message['payload'] is not None


def generate_f_name(s3_f_name):
    """
    Given our glob_s3, spits out the avro deltas of our data
    
    # s3://zefr/kafka/prod/topics/qz-qzapi-review/year=2019/month=10/day=14/qz-qzapi-review+37+0000037164.avro
    """
    
    name_split = s3_f_name.split('/')
    year = name_split[-4][-4:]
    month = name_split[-3][-2:]
    day = name_split[-2][-2:]
    qz_id = name_split[-1]  # 'qz-qzapi-review+37+0000037164.avro'
    
    try:
        year = int(year)  # 'year=2019'
        month = int(month)  # 'month=10'
        day = int(day)  # 'day=14'
        
    except ValueError as e:
        print('Failed on: {}'.format(s3_f_name))
        print('Tried to convert a non-integer char into an integer: {}'.format(e))
        print('year: {}'.format(year))
        print('month: {}'.format(month))
        print('day: {}'.format(day))
        print('*' * 10)
        return False  # 
    
    
    output_name = '{}_{}_{}_{}'.format(year, month, day, qz_id)
    return output_name


def generate_date(s3_f_name):
    name_split = s3_f_name.split('/')
    year = name_split[-4][-4:]
    month = name_split[-3][-2:]
    day = name_split[-2][-2:]
    qz_id = name_split[-1]  # 'qz-qzapi-review+37+0000037164.avro'
    
    return datetime.strptime("{}/{}/{}".format(day, month, year), "%d/%m/%Y")


# Load and Filter Data

In [17]:
with open('{}{}'.format(STORAGE_PATH, SEEN_FILES), 'r') as f:
    previously_seen = set([f_name.strip() for f_name in f.readlines()])

In [19]:
seen_files = set()
todays_date = datetime.today()

# We go from most recent to most distant date
for f_name in tqdm(DATA_SRC[::-1]):
    
    get_date = generate_date(f_name)
    
    if abs(todays_date - get_date) > DAYS_DIFFERENCE:
        break  # Gone beyond #days we want to consider, and since sorted recent -> distant, break
    
    # Already have the data, no sense in downloading it again
    if f_name in previously_seen:
        continue
    
    
    # Read the deltas
    source = db.read_avro(f_name)
    
    # Extract the data with meaningful information
    resp = source.filter(lambda video_message: filter_none_records(video_message)).compute()
    
    name = generate_f_name(f_name)
    if not name:  # There was an error generating the name
        continue
    
    with open('{}{}'.format(STORAGE_PATH, name), 'w') as f:
        json.dump(resp, f)
        
    seen_files.add(f_name)

HBox(children=(IntProgress(value=0, max=4525), HTML(value='')))




In [21]:
with open('{}{}'.format(STORAGE_PATH, SEEN_FILES), 'w') as f:
    for sf in seen_files:
        f.write("{}\n".format(sf))