In [None]:
# !pip install kfp
# !pip install google-cloud-aiplatform
# !pip install google-cloud-pipeline-components

In [None]:
import kfp
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
# from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

from kfp import components as comp
import kfp.dsl as dsl

project_id = 'project-id'
region = 'us-central1'
pipeline_name = 'earthquake-prediction'

main_bucket_name = 'bucket-name'
pipeline_root_path = 'gs://' + main_bucket_name + '/pipelines/'

model_name = '5390f2dc404cbe0cd01427a938160354'
problem_statement_file = 'problem_statement/weekly.json'
raw_location = 'data/quakes/raw/'
silver_location = 'data/quakes/silver/'
gold_location = 'data/quakes/gold/' + model_name + '/'
artifact_location = 'models/' + model_name + '/'
predictions_location = 'data/predictions/' + model_name + '/'
metrics_location = 'data/metrics/' + model_name + '/'

In [None]:
import google

def save_pipeline(pipeline, bucket_name, files_path):
    
    #### Get the bucket that the file will be uploaded to
    storage_client = google.cloud.storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    #### Create a new blob
    my_file = bucket.blob(files_path + pipeline)

    #### Upload from file
    my_file.upload_from_filename(pipeline, content_type = 'application/json')

In [None]:
def get_data_from_url(
    url: str,
    delta_days: int,
    downloaded_data_path: comp.OutputPath('csv')
) -> str:

    import requests
    from datetime import datetime, timedelta

    # get time
    request_time = datetime.now().astimezone().strftime('%Y-%m-%dT%H-%M-%S-%Z') + '/all_quakes.csv'

    # get resquest params
    end_range = datetime.now() + timedelta(days = 1)
    begin_range = end_range - timedelta(days = delta_days)
    end_range = end_range.strftime('%Y-%m-%d')
    begin_range = begin_range.strftime('%Y-%m-%d')

    query = {'format': 'csv', 'starttime': begin_range, 'endtime': end_range}

    response_content = None

    # make request
    try:
        with requests.get(url, params = query) as response:
            response.raise_for_status()
            response_content = response.content
    except requests.exceptions.Timeout:
        print('Timeout Exception')
        return ''
    except requests.exceptions.TooManyRedirects:
        print('Too Many Redirects Exception')
        return ''
    except requests.exceptions.HTTPError:
        print('Http Exception')
        return ''
    except requests.exceptions.ConnectionError:
        print('Error Connecting')
        return ''
    except requests.exceptions.RequestException:
        print('Request Exception')
        return ''
    except:
        print('Error requesting file')
        return ''

    # dump for next component
    with open(downloaded_data_path, 'w') as text_file:
        text_file.write(response_content.decode('utf-8'))

    return request_time

get_data_from_url_op = comp.create_component_from_func(
    get_data_from_url,
    base_image = 'python:3.7',
    packages_to_install = [
        'requests',
    ],
)

In [None]:
def save_data_to_gcp(
    file_to_save_path: comp.InputPath('csv'),
    file_name: str,
    bucket_name: str,
    bucket_folder: str,
):

    from google.cloud import storage
    
    # read from last step
    with open(file_to_save_path, 'r') as text_file:
        input_file = text_file.read()

    # if None exit
    if input_file is None:
        print('No response')
        return

    # get the bucket that the file will be uploaded to
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
 
    # create a new blob
    my_file = bucket.blob(bucket_folder + file_name)
 
    # upload from csv
    my_file.upload_from_string(input_file, content_type = 'text/csv')

save_data_to_gcp_op = comp.create_component_from_func(
    save_data_to_gcp,
    base_image = 'path-to-artifact-registry/python-gcp:basic',
    packages_to_install = [],
)

In [None]:
def clean_data(
    csv_file_path: comp.InputPath('csv'),
    cleaned_data_path: comp.OutputPath('csv'),
):

    import joblib
    import pandas as pd
    
    # convert to DataFrame
    df = pd.read_csv(csv_file_path)
    
    # select info to be used
    df = df.loc[:, ['time', 'id', 'latitude', 'longitude', 'depth', 'mag']].copy()
    
    # weird values
    z_0 = df['depth'] < 0
    print(f'Depth above sup: {sum(z_0):,d} ({sum(z_0) / len(df):.2%})')
    df.loc[z_0, 'depth'] = 0
    
    # data parsing
    date_col = 'time'
    datetimes = df[date_col].str.split('T', expand = True)
    dates = pd.to_datetime(datetimes.loc[:, 0], format = '%Y-%m-%d')
    df = pd.concat((df, dates.rename('date')), axis = 1)
    df = df.drop(date_col, axis = 1)
    
    # drop NA
    len_before = len(df)
    df = df.dropna()
    len_after = len(df)
    dropped_events = len_before - len_after
    if (dropped_events) == 0:
        print('No dropped events')
    else:
        print(f'Dropped events: {dropped_events:,d} ({dropped_events / len_before:.2%})')
    
    # dump df for next component
    joblib.dump(df, cleaned_data_path)

clean_data_op = comp.create_component_from_func(
    clean_data,
    base_image = 'path-to-artifact-registry/python-pandas:basic',
    packages_to_install = [],
)

In [None]:
def merge_new_data(
    new_data_path: comp.InputPath('csv'),
    bucket_name: str,
    file_path: str,
    file_name: str,
    silver_data_path: comp.OutputPath('csv'),
) -> bool:

    import joblib
    import pandas as pd
    
    # read from last step
    df_new = joblib.load(new_data_path)

    if len(df_new) == 0:
        return False

    # read source file
    df_hist = pd.read_csv('gs://' + bucket_name + '/' + file_path + file_name)
    print(f'Silver events: {len(df_hist):,d}')

    # check if there is new information
    # TODO: check if there is change, not only new records
    new_events = df_new[~df_new['id'].isin(df_hist['id'])].copy()
    print(f'New events: {len(new_events):,d}')
    
    if (len(new_events) > 0):
        
        df = pd.concat([df_hist, new_events])
        len_df, nunique_ids = len(df), df['id'].nunique()
        
        print(f'Total events: {len_df:,d} | Unique: {nunique_ids:,d}')
        assert(len_df == nunique_ids)
        
        # dump df for next component
        joblib.dump(df, silver_data_path)

        return True
    else:
        # dump df for next component
        joblib.dump(df_hist, silver_data_path)
        
        return False

merge_new_data_op = comp.create_component_from_func(
    merge_new_data,
    base_image = 'path-to-artifact-registry/python-pandas:basic',
    packages_to_install = [],
)

In [None]:
def save_df_to_gcp(
    file_to_save_path: comp.InputPath('csv'),
    file_name: str,
    suffix: str,
    bucket_name: str,
    bucket_folder: str,
) -> str:

    import joblib
    
    # read from last step
    df = joblib.load(file_to_save_path)

    # if None exit
    if df is None:
        print('No response')
        return
    
    # save to GCS
    file_and_suffix = file_name
    file_and_suffix += '_' + suffix + '.csv' if suffix != '' else '.csv'
    df.to_csv('gs://' + bucket_name + '/' + bucket_folder + file_and_suffix, index = False)
    
    return 'saved'

save_df_to_gcp_op = comp.create_component_from_func(
    save_df_to_gcp,
    base_image = 'path-to-artifact-registry/python-pandas:basic',
    packages_to_install = [],
)

In [None]:
from typing import NamedTuple

def get_problem_statement(
    bucket_name: str,
    source_file: str,
) -> NamedTuple(
    'OpOutputs',
    [
        ('main_id', str),
        ('time_ref', str),
        ('time_frequency', str),
        ('target_raw', str),
        ('early_warning_number', int),
        ('range_warning_number', int),
        ('pad_df', int),
        ('event_reference', float),
        ('degrees_latitude_grid', int),
        ('km_depth_grid', int),
        ('min_latitude', int),
        ('max_latitude', int),
        ('min_longitude', int),
        ('max_longitude', int),
        ('time_cut', str),
    ]
):

    import json
    from google.cloud import storage
    from collections import namedtuple

    # get bucket
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    # get blob
    my_file = bucket.get_blob(source_file)

    # download from json
    problem_statement_config = json.loads(my_file.download_as_text())
    
    config = namedtuple(
        'OpOutputs',
        [
            'main_id',
            'time_ref',
            'time_frequency',
            'target_raw',
            'early_warning_number',
            'range_warning_number',
            'pad_df',
            'event_reference',
            'degrees_latitude_grid',
            'km_depth_grid',
            'min_latitude',
            'max_latitude',
            'min_longitude',
            'max_longitude',
            'time_cut',
        ]
    )
    
    return config(
        problem_statement_config['main_id'],
        problem_statement_config['time_ref'],
        problem_statement_config['time_frequency'],
        problem_statement_config['target_raw'],
        problem_statement_config['early_warning_number'],
        problem_statement_config['range_warning_number'],
        problem_statement_config['pad_df'],
        problem_statement_config['event_reference'],
        problem_statement_config['degrees_latitude_grid'],
        problem_statement_config['km_depth_grid'],
        problem_statement_config['min_latitude'],
        problem_statement_config['max_latitude'],
        problem_statement_config['min_longitude'],
        problem_statement_config['max_longitude'],
        problem_statement_config['time_cut'],
    )

get_problem_statement_op = comp.create_component_from_func(
    get_problem_statement,
    base_image = 'path-to-artifact-registry/python-gcp:basic',
    packages_to_install = [],
)

In [None]:
def filter_and_preprocess(
    silver_path: comp.InputPath('csv'),
    event_reference: float,
    min_latitude: int,
    max_latitude: int,
    min_longitude: int,
    max_longitude: int,
    time_cut: str,
    filtered_path: comp.OutputPath('csv'),
):

    import joblib
    import pandas as pd
    import numpy as np
    from datetime import datetime
    import zucaml.zucaml as ml
    
    # read from last step
    df = joblib.load(silver_path)
    df['date'] = pd.to_datetime(df['date'], format = '%Y-%m-%d', exact = True)

    # events
    df['event'] = ((df['mag'] >= event_reference) * 1).astype(np.uint8)

    # filter regions
    df['keep'] = (df['latitude'] >= min_latitude) & (df['latitude'] <= max_latitude) & (df['longitude'] >= min_longitude) & (df['longitude'] <= max_longitude)
    df = df.loc[df['keep']].copy().reset_index()
    df = df.drop(['keep', 'index'], axis = 1)
    
    # filter time
    time_cut = datetime.strptime(time_cut, '%Y-%m-%d')
    df = df[df['date'] > time_cut]
    df = df.reset_index().drop(['index',], axis = 1)
    
    # energy
    df['energy'] = 5.24
    df['energy'] += 1.44 * df['mag']
    df['energy'] = np.power(10, df['energy'])

    # print info
    number_events = df['event'].sum()
    min_date = df['date'].min()
    max_date = df['date'].max()
    
    print(f'Min date:\t{min_date}')
    print(f'Max date:\t{max_date}')
    print(f'Number of events:\t{number_events:,d}')
    ml.print_memory(df)

    # dump df for next component
    joblib.dump(df, filtered_path)

filter_and_preprocess_op = comp.create_component_from_func(
    filter_and_preprocess,
    base_image = 'path-to-artifact-registry/zucaml:basic',
    packages_to_install = [],
)

In [None]:
from typing import NamedTuple

def reindex_df(
    filtered_path: comp.InputPath('csv'),
    degrees_latitude_grid: int,
    km_depth_grid: int,
    pad_df: int,
    main_id: str,
    time_ref: str,
    time_frequency: str,
    date_offset: int,
    is_training: str,
    grid_path: comp.OutputPath('csv'),
) -> NamedTuple(
    'OpOutputs',
    [
        ('x_degre_km', float),
        ('y_degre_km', float),
        ('dx', int),
        ('dy', int),
        ('range_x_min', int),
        ('range_x_max', int),
        ('range_x_step', int),
        ('range_y_min', int),
        ('range_y_max', int),
        ('range_y_step', int),
        ('range_z_min', int),
        ('range_z_max', int),
        ('range_z_step', int),
    ]
):

    import joblib
    from collections import namedtuple
    import pandas as pd
    import numpy as np
    from datetime import datetime, timedelta
    import zucaml.zucaml as ml
    
    # read from last step
    df = joblib.load(filtered_path)

    # grid
    x_degre_km = 94.2
    y_degre_km = 111.2

    dy = degrees_latitude_grid
    dx = int(round(dy * y_degre_km / x_degre_km))
    dz = km_depth_grid

    grid_values = [
        ('y', 'latitude', dy),
        ('x', 'longitude', dx),
        ('z', 'depth', dz)
    ]

    for new_feature, old_feature, increment in grid_values:

        old_feature_min = int(round(df[old_feature].min()))

        df[new_feature] = df[old_feature] - old_feature_min
        df[new_feature] = df[new_feature] / increment
        df[new_feature] = df[new_feature].round().astype(int)
        df[new_feature] = df[new_feature] * increment
        df[new_feature] = df[new_feature] + old_feature_min

    assert(sum(df['z'] < 0) == 0)

    df['zone_frame'] = df['x'].astype(str) + '|' + df['y'].astype(str) + '|' + df['z'].astype(str)

    min_x = df['x'].min()
    max_x = df['x'].max()
    min_y = df['y'].min()
    max_y = df['y'].max()
    min_z = df['z'].min()
    max_z = df['z'].max()

    range_x = range(min_x, max_x + dx, dx)
    range_y = range(min_y, max_y + dy, dy)
    range_z = range(min_z, max_z + dz, dz)

    all_zone_frames = [str(x) + '|' + str(y) + '|' + str(z) for x in range_x for y in range_y for z in range_z]

    used_x = df['x'].nunique()
    used_y = df['y'].nunique()
    used_z = df['z'].nunique()
    used_time = df['date'].nunique()

    print(f'Unique x:\t\t{used_x:,d}')
    print(f'Unique y:\t\t{used_y:,d}')
    print(f'Unique z:\t\t{used_z:,d}')
    print(f'Unique time:\t\t{used_time:,d}')
    print(f'All zones:\t\t{len(all_zone_frames):,d}')

    df = df.drop(['longitude', 'latitude', 'depth'], axis = 1)
    
    # offset date
    min_date = df['date'].min()
    print(f'Offset date. Min before: {str(min_date)} Records: {len(df):,d}')
    
    min_date = min_date + timedelta(days = date_offset)
    df = df.loc[df['date'] >= min_date].copy().reset_index().drop('index', axis = 1)
    
    min_date = df['date'].min()
    print(f'Offset date. Min after: {str(min_date)} Records: {len(df):,d}')
    
    # pad
    if is_training == 'True':
        pad_df = bool(pad_df)
    else:
        pad_df = True
        this_max_date = df['date'].max()
        ref_day = datetime.now()
        print(f'date: {str(this_max_date)} - reference day: {str(ref_day)}')
        if (this_max_date < ref_day):
            print('Adding dummy')
            len_before = len(df)
            
            record_dummy = df.iloc[0].copy()
            record_dummy['date'] = ref_day
            record_dummy['id'] = 'non_existant'
            for feature in ['mag', 'event', 'energy']:
                record_dummy[feature] = 0
                
            df = df.append(record_dummy, ignore_index = True)
            
            len_after = len(df)
            print(f'Rows before: {len_before:,d} - Rows after: {len_after:,d}')
            assert(len_after - len_before == 1)
            new_max_date = df['date'].max()
            print(f'New max date: {str(new_max_date)}')
    if pad_df:

        zero_fill = ['mag', 'event', 'energy']
        other_fill = {'id': 'non_existant'}

        df = ml.pad(df, 'zone_frame', 'date', all_zone_frames, 'min', zero_fill, other_fill)
        df = ml.pad(df, 'zone_frame', 'date', all_zone_frames, 'max', zero_fill, other_fill)

        df['x'] = df['zone_frame'].str.split('|').str[0].astype(int)
        df['y'] = df['zone_frame'].str.split('|').str[1].astype(int)
        df['z'] = df['zone_frame'].str.split('|').str[2].astype(int)

        assert(df.isna().sum().sum() == 0)
        
    # reindex
    df = ml.reindex_by_minmax(
        df = df.drop(['mag', 'x', 'y', 'z'], axis = 1),
        item = main_id,
        time_ref = time_ref,
        time_freq = time_frequency,
        forwardfill_features = [],
        backfill_features = [],
        zerofill_features = ['energy', 'event'],
    )

    assert(df.isna().sum().sum() == 0)

    df['event'] = ((df['event'] > 0) * 1).astype(np.uint8)

    df['x'] = df['zone_frame'].str.split('|').str[0].astype(int)
    df['y'] = df['zone_frame'].str.split('|').str[1].astype(int)
    df['z'] = df['zone_frame'].str.split('|').str[2].astype(int)

    # print info
    ml.print_memory(df)
    
    # dump df for next component
    joblib.dump(df, grid_path)
    
    # return variables
    variables = namedtuple(
        'OpOutputs',
        [
            'x_degre_km',
            'y_degre_km',
            'dx',
            'dy',
            'range_x_min',
            'range_x_max',
            'range_x_step',
            'range_y_min',
            'range_y_max',
            'range_y_step',
            'range_z_min',
            'range_z_max',
            'range_z_step',
        ]
    )
    
    return variables(
        x_degre_km,
        y_degre_km,
        dx,
        dy,
        range_x.start,
        range_x.stop,
        range_x.step,
        range_y.start,
        range_y.stop,
        range_y.step,
        range_z.start,
        range_z.stop,
        range_z.step,
    )

reindex_df_op = comp.create_component_from_func(
    reindex_df,
    base_image = 'path-to-artifact-registry/zucaml:basic',
    packages_to_install = [],
)

In [None]:
def get_neighbours_df(
    grid_path: comp.InputPath('csv'),
    x_degre_km: float,
    y_degre_km: float,
    dx: int,
    dy: int,
    range_x_min: int,
    range_x_max: int,
    range_x_step: int,
    range_y_min: int,
    range_y_max: int,
    range_y_step: int,
    range_z_min: int,
    range_z_max: int,
    range_z_step: int,
    full_path: comp.OutputPath('csv'),
):

    import joblib
    import pandas as pd
    import numpy as np
    from datetime import datetime
    import zucaml.zucaml as ml
    
    # read from last step
    df = joblib.load(grid_path)

    #### aux func
    def get_xyz(zone_frame):

        x, y, z = zone_frame.split('|')

        x = int(x)
        y = int(y)
        z = int(z)

        return x, y, z

    #### get neighbours
    def get_neighbours(zone_frame, neighbours, used_zone_frames):

        this_neighbours = []

        this_x, this_y, this_z = get_xyz(zone_frame)

        for zf in used_zone_frames:
            x, y, z = get_xyz(zf)

            if zone_frame != zf and x in neighbours['x'][this_x] and y in neighbours['y'][this_y] and z in neighbours['z'][this_z]:
                this_neighbours.append(zf)

        return this_neighbours

    #### calculate distance in xy plane
    distance = max(x_degre_km * dx, y_degre_km * dy)

    #### aux variable
    range_x = range(range_x_min, range_x_max, range_x_step)
    range_y = range(range_y_min, range_y_max, range_y_step)
    range_z = range(range_z_min, range_z_max, range_z_step)
    ranges = {'x': range_x, 'y': range_y}

    #### neighbours coordinates
    neighbours = {}

    #### neighbours coordinates - xy
    for dim in ['x', 'y']:

        neighbours[dim] = {}

        ordered = {}

        for i, d in enumerate(ranges[dim]):
            ordered[i] = d

        for i, d in ordered.items():
            neighbours[dim][d] = [d]
            if i > 0:
                neighbours[dim][d].append(ordered[i - 1])
            if i < len(ordered) - 1:
                neighbours[dim][d].append(ordered[i + 1])

    #### neighbours coordinates - z
    neighbours['z'] = {}

    for z in range_z:

        neighbours['z'][z] = [z]

        for z2 in range_z:
            if abs(z - z2) <= distance and z != z2:
                 neighbours['z'][z].append(z2)

    #### neighbours per zone frame
    zone_frames_neighbours = {}

    used_zone_frames = df['zone_frame'].unique()

    for zone_frame in used_zone_frames:
        zone_frames_neighbours[zone_frame] = get_neighbours(zone_frame, neighbours, used_zone_frames)

    def get_energy_neighbours(df, used_zone_frames, zone_frames_neighbours):

        dfs = []

        for zone_frame in used_zone_frames:

            df_zone = df.loc[df['zone_frame'] == zone_frame].copy()

            df_zone_neighbours = df.loc[df['zone_frame'].isin(zone_frames_neighbours[zone_frame])].copy()

            df_zone_neighbours = df_zone_neighbours.groupby(['date']).agg({'energy': np.sum}).reset_index()

            new_feature = 'neighbours_' + zone_frame

            df_zone_neighbours = df_zone_neighbours.rename({'energy': new_feature}, axis = 1)

            df_zone_neighbours = df_zone_neighbours.loc[df_zone_neighbours[new_feature] != 0].copy()

            df_zone = pd.merge(
                df_zone,
                df_zone_neighbours,
                how = 'left',
                on = ['date'],
                suffixes = ['_repeated_left', 'repeated_right'],
            )

            dfs.append(df_zone)

        dfs = pd.concat(dfs)

        for feat in dfs:
            if 'repeated' in feat:
                print(f'Warning: repeated features')

        assert(len(dfs) == len(df))

        neighbours_features = [feat for feat in dfs if feat.startswith('neighbours_')]

        dfs['energy_neighbours'] = dfs[neighbours_features].T.sum().T

        dfs = dfs.drop(neighbours_features, axis = 1)

        assert(dfs.isna().sum().sum() == 0)

        return dfs

    df = get_energy_neighbours(df, used_zone_frames, zone_frames_neighbours)

    # print info
    ml.print_memory(df)
    
    # dump df for next component
    joblib.dump(df, full_path)

get_neighbours_df_op = comp.create_component_from_func(
    get_neighbours_df,
    base_image = 'path-to-artifact-registry/zucaml:basic',
    packages_to_install = [],
)

In [None]:
def set_problem_statement(
    full_path: comp.InputPath('csv'),
    main_id: str,
    time_ref: str,
    target_raw: str,
    early_warning_number: int,
    range_warning_number: int,
    is_training: str,
    ps_path: comp.OutputPath('csv'),
):

    import joblib
    import pandas as pd
    import numpy as np
    from datetime import datetime
    import zucaml.zucaml as ml
    
    # read from last step
    df = joblib.load(full_path)
    
    # set target
    drop_na_target = is_training == 'True'
    df = ml.set_target(
        df = df,
        item = main_id,
        time_ref = time_ref,
        target = target_raw,
        early_warning = early_warning_number,
        range_warning = range_warning_number,
        drop_na_target = drop_na_target,
    )

    balance = df['target'].sum() / len(df)
    print(f'Balance: {balance:.4%}')

    # print info
    ml.print_memory(df)

    # dump df for next component
    joblib.dump(df, ps_path)

set_problem_statement_op = comp.create_component_from_func(
    set_problem_statement,
    base_image = 'path-to-artifact-registry/zucaml:basic',
    packages_to_install = [],
)

In [None]:
def feature_engineering(
    ps_path: comp.InputPath('csv'),
    main_id: str,
    time_ref: str,
    target_raw: str,
    gold_data_path: comp.OutputPath('csv'),
):

    import joblib
    import pandas as pd
    import numpy as np
    from datetime import datetime
    import zucaml.zucaml as ml
    
    # read from last step
    df = joblib.load(ps_path)
    
    # create reset
    df = ml.create_reset(
        df = df,
        item = main_id,
        time_ref = time_ref,
        order = None
    )
    
    # M.A.
    for window_rolling_mean in [30, 90, 180, 330, 360]:
        df = ml.ts_feature(
            df = df,
            feature_base = 'energy',
            func = 'rolling.mean',
            func_val = window_rolling_mean,
            label = None,
        )

    for window_rolling_mean in [30, 90, 180, 330, 360]:
        df = ml.ts_feature(
            df = df,
            feature_base = 'energy_neighbours',
            func = 'rolling.mean',
            func_val = window_rolling_mean,
            label = None,
        )
        
    # ratios
    df = ml.math_feature(
        df = df,
        feature_1 = 'energy|rolling.mean#30',
        feature_2 = 'energy|rolling.mean#360',
        func = 'ratio',
        label = None,
    )

    df = ml.math_feature(
        df = df,
        feature_1 = 'energy|rolling.mean#90',
        feature_2 = 'energy|rolling.mean#360',
        func = 'ratio',
        label = None,
    )

    df = ml.math_feature(
        df = df,
        feature_1 = 'energy|rolling.mean#180',
        feature_2 = 'energy|rolling.mean#360',
        func = 'ratio',
        label = None,
    )

    df = ml.math_feature(
        df = df,
        feature_1 = 'energy|rolling.mean#330',
        feature_2 = 'energy|rolling.mean#360',
        func = 'ratio',
        label = None,
    )
    
    # track last event
    df = ml.track_feature(
        df = df,
        feature_base = time_ref,
        condition = df[target_raw] > 0,
        track_window = 0,
        track_function = 'diff.days',
        label = 'days.since.last'
    )

    # clean and order
    df = df.drop('reset', axis = 1)
    df = df.sort_values(['zone_frame', 'date']).reset_index().drop('index', axis = 1)

    # print info
    balance = df['target'].sum() / len(df)
    print(f'{balance:.6%}')
    ml.print_memory(df)

    # dump df for next component
    joblib.dump(df, gold_data_path)

feature_engineering_op = comp.create_component_from_func(
    feature_engineering,
    base_image = 'path-to-artifact-registry/zucaml:basic',
    packages_to_install = [],
)

In [None]:
def make_predictions(
    gold_data_path: comp.InputPath('csv'),
    bucket_name: str,
    artifact_folder: str,
    step_x: int,
    step_y: int,
    step_z: int,
    time_frequency: str,
    early_warning_number: int,
    range_warning_number: int,
    date_offset: int,
    predictions_data_path: comp.OutputPath('csv'),
) -> str:

    import joblib
    import pandas as pd
    import numpy as np
    import json
    from datetime import datetime, timedelta
    from pickle import loads
    from google.cloud import storage
    import zucaml.zucaml as ml
    
    # read from last step
    df = joblib.load(gold_data_path)

    # get bucket
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    # get blobs
    config_file = bucket.get_blob(artifact_folder + 'notes.txt')
    model_file = bucket.get_blob(artifact_folder + 'model.pkl')

    # download
    model_config = json.loads(config_file.download_as_text())
    model = loads(model_file.download_as_string())
    
    # select max date
    df = df.loc[df['date'] == df['date'].max()].copy().reset_index().drop('index', axis = 1)
    
    # make predictions
    df['probability'] = model.predict_proba(df[model_config['features']])[:, 1]
    df['prediction'] = (df['probability'] > model_config['threshold']) * 1
    
    # select fields
    df = df.loc[:, ['x', 'y', 'z', 'date', 'probability', 'prediction']].copy()
    
    # transform fields
    for location, step in [('x', step_x), ('y', step_y), ('z', step_z)]:
        df[location + '_min'] = df[location] - int(step / 2)
        df[location + '_max'] = df[location] + int(step / 2)
    df.loc[df['z_min'] < 0, 'z_min'] = 0
    if 'D' in time_frequency:
        time_frequency_value = int(time_frequency.replace('D', ''))
        df['date_min'] = df['date'] + timedelta(days = early_warning_number * time_frequency_value)
        df['date_max'] = df['date_min'] + timedelta(days = range_warning_number * time_frequency_value)
    elif 'W' in time_frequency:
        time_frequency_value = int(time_frequency.replace('W', ''))
        df['date_min'] = df['date'] + timedelta(weeks = early_warning_number * time_frequency_value)
        df['date_max'] = df['date_min'] + timedelta(weeks = range_warning_number * time_frequency_value)
    else:
        print(f'Unknown timefrequency: {str(time_frequency)}')
    df['timestamp'] = datetime.now()
    df['offset'] = date_offset
    
    # check if is to predict
    dataframe_date = df['date'].iloc[0]
    this_day = datetime.now().date()
    print(f'date: {str(dataframe_date)} - today: {str(this_day)}')
    if dataframe_date >= this_day:
        
        df = df.drop(['x', 'y', 'z', 'date'], axis = 1)
        
        # print info
        number_predictions = df['prediction'].sum()
        print(f'Prediction true: {number_predictions:,d}')
        ml.print_memory(df)

        # dump df for next component
        joblib.dump(df, predictions_data_path)
    
        return str(date_offset) + '_' + datetime.now().astimezone().strftime('%Y-%m-%dT%H-%M-%S-%Z')
    else:
        return ''

make_predictions_op = comp.create_component_from_func(
    make_predictions,
    base_image = 'path-to-artifact-registry/zucaml:basic',
    packages_to_install = ['google-cloud-storage', 'gcsfs', 'fsspec'],
)

In [None]:
def calculate_metrics(
    dummy_input: str,
    main_bucket_name: str,
    silver_location: str,
    predictions_location: str,
    metrics_location: str,
    event_reference: float,
    min_latitude: int,
    max_latitude: int,
    min_longitude: int,
    max_longitude: int,
):

    from os import listdir
    from datetime import datetime, timedelta
    import pandas as pd
    import numpy as np
    from google.cloud import storage
    
    # read predictions
    storage_client = storage.Client()
    onlyfiles = [f.name for f in storage_client.list_blobs(main_bucket_name, prefix = predictions_location) if f.name.endswith('.csv')]
    dfs = []
    for f in onlyfiles:
        dfs.append(pd.read_csv('gs://' + main_bucket_name + '/' + f))
    predictions = pd.concat(dfs)
    predictions['date_min'] = pd.to_datetime(predictions['date_min'])
    predictions['date_max'] = pd.to_datetime(predictions['date_max'])
    first_prediction = predictions['date_min'].min()

    # filter predictions
    predictions = predictions.loc[predictions['prediction'] == 1].copy().reset_index().drop('index', axis = 1)

    # id prediction
    predictions = predictions.reset_index().rename({'index': 'id_pred'}, axis = 1)
    total_predictions = predictions[predictions['date_min'] <= datetime.now()]['id_pred'].nunique()
    
    # read events
    events = pd.read_csv('gs://' + main_bucket_name + '/' + silver_location + 'silver.csv')
    events['date'] = pd.to_datetime(events['date'])
    assert(events['id'].nunique() == len(events))

    # filter events
    study = events['mag'] >= event_reference
    study = study & (events['longitude'] >= min_longitude)
    study = study & (events['longitude'] <= max_longitude)
    study = study & (events['latitude'] >= min_latitude)
    study = study & (events['latitude'] <= max_latitude)
    events = events.loc[(study) & (events['date'] >= first_prediction)].copy().reset_index().drop('index', axis = 1)

    for dimension in ['longitude', 'latitude', 'depth']:
        events[dimension] = events[dimension].round()
    
    # get tp, fp and fn
    events_values = events['date'].values
    predictions_min = predictions['date_min'].values
    predictions_max = predictions['date_max'].values

    i, j = np.where((events_values[:, None] >= predictions_min) & (events_values[:, None] <= predictions_max))

    joined = pd.DataFrame(
        np.column_stack([events.values[i], predictions.values[j]]),
        columns = events.columns.append(predictions.columns)
    )
    
    joined['keep'] = True
    joined['keep'] = joined['keep'] & (joined['longitude'] >= joined['x_min'])
    joined['keep'] = joined['keep'] & (joined['longitude'] <= joined['x_max'])
    joined['keep'] = joined['keep'] & (joined['latitude'] >= joined['y_min'])
    joined['keep'] = joined['keep'] & (joined['latitude'] <= joined['y_max'])
    joined['keep'] = joined['keep'] & (joined['depth'] >= joined['z_min'])
    joined['keep'] = joined['keep'] & (joined['depth'] <= joined['z_max'])
    joined = joined.loc[joined['keep']].copy().reset_index().drop('index', axis = 1)
    
    events['predicted'] = 'Missed'
    events.loc[events['id'].isin(joined['id']), 'predicted'] = 'Predicted'
    
    predictions['correct'] = 'False alarm'
    predictions.loc[predictions['id_pred'].isin(joined['id_pred']), 'correct'] = 'Correct'
    predictions.loc[(predictions['date_max'] > datetime.now() - timedelta(days = 1)) & (predictions['correct'] != 'Correct'), 'correct'] = ''
    
    # get metrics
    number_earthquakes_predicted = sum(events['predicted'] == 'Predicted')
    number_earthquakes_missed = sum(events['predicted'] == 'Missed')
    number_predictions_correct = sum(predictions['correct'] == 'Correct')
    number_predictions_false = sum(predictions['correct'] == 'False alarm')

    epsilon = np.finfo(float).eps
    precision = number_predictions_correct / (number_predictions_correct + number_predictions_false + epsilon)
    recall = number_earthquakes_predicted / (number_earthquakes_predicted + number_earthquakes_missed + epsilon)

    beta = 0.5
    f05 = (1.0 + beta ** 2) * (precision * recall) / ((beta ** 2 * precision) + recall + epsilon)
    beta = 1.0
    f1 = (1.0 + beta ** 2) * (precision * recall) / ((beta ** 2 * precision) + recall + epsilon)

    metrics = pd.DataFrame({
        'Predicted': [number_earthquakes_predicted],
        'Missed': [number_earthquakes_missed],
        'Correct': [number_predictions_correct],
        'False alarm': [number_predictions_false],
        'Precision': [precision],
        'Recall': [recall],
        'F0.5': [f05],
        'F1': [f1]
    })

    # dump files
    events.to_csv('gs://' + main_bucket_name + '/' + metrics_location + 'events.csv', index = False)
    predictions.to_csv('gs://' + main_bucket_name + '/' + metrics_location + 'predictions.csv', index = False)
    metrics.to_csv('gs://' + main_bucket_name + '/' + metrics_location + 'metrics.csv', index = False)

calculate_metrics_op = comp.create_component_from_func(
    calculate_metrics,
    base_image = 'path-to-artifact-registry/python-pandas:basic',
    packages_to_install = ['google-cloud-storage'],
)

In [None]:
@kfp.dsl.pipeline(
    name = pipeline_name,
    pipeline_root = pipeline_root_path
)

def pipeline(
    is_training: str,
    maint_bucket: str,
    problem_statement_file: str,
    date_offset: int,
    raw_location: str,
    silver_location: str,
    gold_location: str,
    artifact_location: str,
    predictions_location: str,
    metrics_location: str,
):

    raw_data = get_data_from_url_op(
        url = 'https://earthquake.usgs.gov/fdsnws/event/1/query',
        delta_days = 3,
    )

    with dsl.Condition(raw_data.outputs['Output'] != '', name = 'Download ok'):
        
        save_data_to_gcp_op(
            file_to_save = raw_data.outputs['downloaded_data'],
            file_name = raw_data.outputs['Output'],
            bucket_name = maint_bucket,
            bucket_folder = raw_location,
        )

        cleaned_data = clean_data_op(
            raw_data.outputs['downloaded_data'],
        )

        merged_data = merge_new_data_op(
            new_data = cleaned_data.outputs['cleaned_data'],
            bucket_name = maint_bucket,
            file_path = silver_location,
            file_name = 'silver.csv',
        )

        with dsl.Condition(merged_data.outputs['Output'] == 'True', name = 'New info'):
            
            save_df_to_gcp_op(
                file_to_save = merged_data.outputs['silver_data'],
                file_name = 'silver',
                suffix = '',
                bucket_name = maint_bucket,
                bucket_folder = silver_location,
            )
            
        problem_statement_config = get_problem_statement_op(
            bucket_name = maint_bucket,
            source_file = problem_statement_file,
        )

        filtered_data = filter_and_preprocess_op(
            silver = merged_data.outputs['silver_data'],
            event_reference = problem_statement_config.outputs['event_reference'],
            min_latitude = problem_statement_config.outputs['min_latitude'],
            max_latitude = problem_statement_config.outputs['max_latitude'],
            min_longitude = problem_statement_config.outputs['min_longitude'],
            max_longitude = problem_statement_config.outputs['max_longitude'],
            time_cut = problem_statement_config.outputs['time_cut'],
        )

        grid_data = reindex_df_op(
            filtered = filtered_data.outputs['filtered'],
            degrees_latitude_grid = problem_statement_config.outputs['degrees_latitude_grid'],
            km_depth_grid = problem_statement_config.outputs['km_depth_grid'],
            pad_df = problem_statement_config.outputs['pad_df'],
            main_id = problem_statement_config.outputs['main_id'],
            time_ref = problem_statement_config.outputs['time_ref'],
            time_frequency = problem_statement_config.outputs['time_frequency'],
            date_offset = date_offset,
            is_training = is_training,
        )

        full_data = get_neighbours_df_op(
            grid = grid_data.outputs['grid'],
            x_degre_km = grid_data.outputs['x_degre_km'],
            y_degre_km = grid_data.outputs['y_degre_km'],
            dx = grid_data.outputs['dx'],
            dy = grid_data.outputs['dy'],
            range_x_min = grid_data.outputs['range_x_min'],
            range_x_max = grid_data.outputs['range_x_max'],
            range_x_step = grid_data.outputs['range_x_step'],
            range_y_min = grid_data.outputs['range_y_min'],
            range_y_max = grid_data.outputs['range_y_max'],
            range_y_step = grid_data.outputs['range_y_step'],
            range_z_min = grid_data.outputs['range_z_min'],
            range_z_max = grid_data.outputs['range_z_max'],
            range_z_step = grid_data.outputs['range_z_step'],
        )

        ps_data = set_problem_statement_op(
            full = full_data.outputs['full'],
            main_id = problem_statement_config.outputs['main_id'],
            time_ref = problem_statement_config.outputs['time_ref'],
            target_raw = problem_statement_config.outputs['target_raw'],
            early_warning_number = problem_statement_config.outputs['early_warning_number'],
            range_warning_number = problem_statement_config.outputs['range_warning_number'],
            is_training = is_training,
        )

        gold_data = feature_engineering_op(
            ps = ps_data.outputs['ps'],
            main_id = problem_statement_config.outputs['main_id'],
            time_ref = problem_statement_config.outputs['time_ref'],
            target_raw = problem_statement_config.outputs['target_raw'],
        )

        save_df_to_gcp_op(
            file_to_save = gold_data.outputs['gold_data'],
            file_name = 'gold',
            suffix = str(date_offset),
            bucket_name = maint_bucket,
            bucket_folder = gold_location,
        )
        
        predictions = make_predictions_op(
            gold_data = gold_data.outputs['gold_data'],
            bucket_name = maint_bucket,
            artifact_folder = artifact_location,
            step_x = grid_data.outputs['range_x_step'],
            step_y = grid_data.outputs['range_y_step'],
            step_z = grid_data.outputs['range_z_step'],
            time_frequency = problem_statement_config.outputs['time_frequency'],
            early_warning_number = problem_statement_config.outputs['early_warning_number'],
            range_warning_number = problem_statement_config.outputs['range_warning_number'],
            date_offset = date_offset,
        )
        
        with dsl.Condition(predictions.outputs['Output'] != '', name = 'New prediction'):

            df_saved = save_df_to_gcp_op(
                file_to_save = predictions.outputs['predictions_data'],
                file_name = 'predictions',
                suffix = predictions.outputs['Output'],
                bucket_name = maint_bucket,
                bucket_folder = predictions_location,
            )
            
            calculate_metrics_op(
                dummy_input = df_saved.outputs['Output'],
                main_bucket_name = maint_bucket,
                silver_location = silver_location,
                predictions_location = predictions_location,
                metrics_location = metrics_location,
                event_reference = problem_statement_config.outputs['event_reference'],
                min_latitude = problem_statement_config.outputs['min_latitude'],
                max_latitude = problem_statement_config.outputs['max_latitude'],
                min_longitude = problem_statement_config.outputs['min_longitude'],
                max_longitude = problem_statement_config.outputs['max_longitude'],
            )

    return

compiler.Compiler().compile(
    pipeline_func = pipeline,
    package_path = pipeline_name.replace('-', '_') + '.json'
)

save_pipeline(pipeline_name.replace('-', '_') + '.json', main_bucket_name, 'pipelines/json/')

In [None]:
# from datetime import datetime

# api_client = AIPlatformClient(
#     project_id = project_id,
#     region = region
# )
# parameter_values = {
#     'is_training': 'False',
#     'maint_bucket': main_bucket_name,
#     'problem_statement_file': problem_statement_file,
#     'date_offset': 5,
#     'raw_location': raw_location,
#     'silver_location': silver_location,
#     'gold_location': gold_location,
#     'artifact_location': artifact_location,
#     'predictions_location': predictions_location,
#     'metrics_location': metrics_location,
# }

# run_time = datetime.now().strftime('%Y%m%d%H%m%S%f')

# api_client.create_run_from_job_spec(
#     job_spec_path = pipeline_root_path + 'json/' + pipeline_name.replace('-', '_') + '.json',
#     job_id = pipeline_name.replace('-', '') + '{0}'.format(run_time),
#     pipeline_root = pipeline_root_path,
#     enable_caching = False,
#     parameter_values = parameter_values
# )

In [None]:
# def local_get_data_from_url(url, delta_days, end_date):

#     import requests
#     from datetime import datetime, timedelta
#     import pandas as pd

#     # get resquest params
#     if end_date is None:
#         end_range = datetime.now() + timedelta(days = 1)
#     else:
#         end_range = end_date - timedelta(days = 1)
#     begin_range = end_range - timedelta(days = delta_days)
#     end_range = end_range.strftime('%Y-%m-%d')
#     begin_range = begin_range.strftime('%Y-%m-%d')

#     query = {'format': 'csv', 'starttime': begin_range, 'endtime': end_range}
    
#     # get time
#     request_name = begin_range + '_' + end_range + '_.csv'

#     response_content = None

#     # make request
#     try:
#         with requests.get(url, params = query) as response:
#             response.raise_for_status()
#             response_content = response.content
#     except requests.exceptions.Timeout:
#         print('Timeout Exception')
#         return ''
#     except requests.exceptions.TooManyRedirects:
#         print('Too Many Redirects Exception')
#         return ''
#     except requests.exceptions.HTTPError:
#         print(query)
#         print('Http Exception')
#         return ''
#     except requests.exceptions.ConnectionError:
#         print('Error Connecting')
#         return ''
#     except requests.exceptions.RequestException:
#         print('Request Exception')
#         return ''
#     except:
#         print('Error requesting file')
#         return ''

#     # dump for next component
#     with open('./temp/' + request_name, 'w') as text_file:
#         text_file.write(response_content.decode('utf-8'))
        
#     df = pd.read_csv('./temp/' + request_name)
    
#     df['date'] = pd.to_datetime(df['time'].str.split('T').str[0], format = '%Y-%m-%d', exact = True)
    
#     assert((df['date'].max() - df['date'].min()).days + 1 == delta_days == df['date'].nunique())

#     return df['date'].min()

# min_date = None

# for i in range(25):
#     min_date = local_get_data_from_url('https://earthquake.usgs.gov/fdsnws/event/1/query', 29, min_date)

In [None]:
# from os import listdir
# from os.path import isfile, join
# import pandas as pd

# onlyfiles = [f for f in listdir('./temp/') if f.endswith('.csv')]

# dfs = []

# for f in onlyfiles:
#     dfs.append(pd.read_csv('./temp/' + f))
    
# df = pd.concat(dfs)

# assert(df['id'].nunique() == len(df))

# def local_clean_data(df):

#     import joblib
#     import pandas as pd
    
#     # select info to be used
#     df = df.loc[:, ['time', 'id', 'latitude', 'longitude', 'depth', 'mag']].copy()
    
#     # weird values
#     z_0 = df['depth'] < 0
#     print(f'Depth above sup: {sum(z_0):,d} ({sum(z_0) / len(df):.2%})')
#     df.loc[z_0, 'depth'] = 0
    
#     # data parsing
#     date_col = 'time'
#     datetimes = df[date_col].str.split('T', expand = True)
#     dates = pd.to_datetime(datetimes.loc[:, 0], format = '%Y-%m-%d')
#     df = pd.concat((df, dates.rename('date')), axis = 1)
#     df = df.drop(date_col, axis = 1)
    
#     # drop NA
#     len_before = len(df)
#     df = df.dropna()
#     len_after = len(df)
#     dropped_events = len_before - len_after
#     if (dropped_events) == 0:
#         print('No dropped events')
#     else:
#         print(f'Dropped events: {dropped_events:,d} ({dropped_events / len_before:.2%})')
        
#     return df

# df = local_clean_data(df)

# df.to_csv('silver.csv', index = False)

# df[:5]