In [1]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp.components import InputPath, OutputPath

In [2]:
def set_config(index_pkl: OutputPath("pickle"),
               config_pkl: OutputPath("pickle"), 
               datetime_pkl: OutputPath("pickle")) -> list:
    
    import obspy
    import os
    import pickle
    import datetime
    import numpy as np
    
    ## Location
    pi = 3.1415926
    degree2km = pi*6371/180
    center = (-115.53, 32.98) #salton sea
    horizontal_degree = 1.0
    vertical_degree = 1.0

    ## Time range
    starttime = obspy.UTCDateTime("2020-10-01T00")
    endtime = obspy.UTCDateTime("2020-10-03T00") ## not included

    # seismic stations
    network_list = "CI"
    # channel_list = "HNE,HNN,HNZ,HHE,HHN,HHZ,BHE,BHN,BHZ,EHE,EHN,EHZ"
    channel_list = "HHE,HHN,HHZ"

    ####### save config ########
    config = {}
    config["center"] = center
    config["xlim_degree"] = [center[0]-horizontal_degree/2, center[0]+horizontal_degree/2]
    config["ylim_degree"] = [center[1]-vertical_degree/2, center[1]+vertical_degree/2]
    config["xlim_km"] = [-horizontal_degree*2*degree2km/2, horizontal_degree*2*degree2km/2]
    config["ylim_km"] = [-vertical_degree*2*degree2km/2, vertical_degree*2*degree2km/2]
    config["degree2km"] = degree2km
    config["starttime"] = starttime.datetime
    config["endtime"] = endtime.datetime
    config["networks"] = network_list
    config["channels"] = channel_list
    config["client"] = "SCEDC"
    
    ## association
    config["dims"] = ['x(km)', 'y(km)', 'z(km)']
    config["use_dbscan"] = True
    config["use_amplitude"] = True
    config["bfgs_bounds"] = ((-horizontal_degree*2*degree2km/2-1, horizontal_degree*2*degree2km/2+1),
                        (-vertical_degree*2*degree2km/2-1, vertical_degree*2*degree2km/2+1),
                        (0, 21), (None, None))
    config["dbscan_eps"] = np.sqrt(horizontal_degree**2 + vertical_degree**2)*degree2km/(6.0/1.75)/2
    config["dbscan_min_samples"] = 10
    config["min_picks_per_eq"] = 6
    config["oversample_factor"] = 5

    with open(config_pkl, "wb") as fp:
        pickle.dump(config, fp)
        
    one_day = datetime.timedelta(days=1)
    one_hour = datetime.timedelta(hours=1)
    starttimes = []
    tmp_start = starttime
    while tmp_start < endtime:
        starttimes.append(tmp_start.datetime)
        tmp_start += one_hour
    
    with open(datetime_pkl, "wb") as fp:
        pickle.dump({"starttimes": starttimes, "interval": one_hour}, fp)
        
    num_parallel = 2
    idx = [[] for i in range(num_parallel)]
    for i in range(len(starttimes)):
        idx[i - i//num_parallel*num_parallel].append(i)
        
    with open(index_pkl, "wb") as fp:
        pickle.dump(idx, fp)

    return list(range(num_parallel))

In [3]:
import pickle
idx = set_config("index.pkl", "config.pkl", "datetimes.pkl")
print(idx)
with open("datetimes.pkl", "rb") as fp:
    data = pickle.load(fp)
print(data)

[0, 1]
{'starttimes': [datetime.datetime(2020, 10, 1, 0, 0), datetime.datetime(2020, 10, 1, 1, 0), datetime.datetime(2020, 10, 1, 2, 0), datetime.datetime(2020, 10, 1, 3, 0), datetime.datetime(2020, 10, 1, 4, 0), datetime.datetime(2020, 10, 1, 5, 0), datetime.datetime(2020, 10, 1, 6, 0), datetime.datetime(2020, 10, 1, 7, 0), datetime.datetime(2020, 10, 1, 8, 0), datetime.datetime(2020, 10, 1, 9, 0), datetime.datetime(2020, 10, 1, 10, 0), datetime.datetime(2020, 10, 1, 11, 0), datetime.datetime(2020, 10, 1, 12, 0), datetime.datetime(2020, 10, 1, 13, 0), datetime.datetime(2020, 10, 1, 14, 0), datetime.datetime(2020, 10, 1, 15, 0), datetime.datetime(2020, 10, 1, 16, 0), datetime.datetime(2020, 10, 1, 17, 0), datetime.datetime(2020, 10, 1, 18, 0), datetime.datetime(2020, 10, 1, 19, 0), datetime.datetime(2020, 10, 1, 20, 0), datetime.datetime(2020, 10, 1, 21, 0), datetime.datetime(2020, 10, 1, 22, 0), datetime.datetime(2020, 10, 1, 23, 0), datetime.datetime(2020, 10, 2, 0, 0), datetime.date

In [4]:
config_op = comp.func_to_container_op(set_config, 
                                      base_image='python:3.8',
                                      packages_to_install= [
                                          "numpy",
                                          "obspy"
                                      ])

In [5]:
def download_events(config_pkl: InputPath("pickle"),
                    event_csv: OutputPath(str)):
    
    import pickle, os
    import obspy
    from obspy.clients.fdsn import Client
    from collections import defaultdict
    import pandas as pd
#     import matplotlib
#     matplotlib.use("agg")
#     import matplotlib.pyplot as plt
    
    with open(config_pkl, "rb") as fp:
        config = pickle.load(fp)
    
    ####### IRIS catalog ########
    events = Client("IRIS").get_events(starttime=config["starttime"],
                                       endtime=config["endtime"],
                                       minlongitude=config["xlim_degree"][0],
                                       maxlongitude=config["xlim_degree"][1],
                                       minlatitude=config["ylim_degree"][0],
                                       maxlatitude=config["ylim_degree"][1],
                                       filename='events.xml')

    events = obspy.read_events('events.xml')
    print(f"Number of events: {len(events)}")
#     events.plot('local', outfile="events.png")

    ####### Save catalog ########
    catalog = defaultdict(list)
    for event in events:
        catalog["time"].append(event.origins[0].time.datetime)
        catalog["x(km)"].append((event.origins[0].longitude- config["center"][0])*config["degree2km"])
        catalog["y(km)"].append((event.origins[0].latitude - config["center"][1])*config["degree2km"])
        catalog["z(km)"].append(event.origins[0].depth/1e3)
        catalog["mag"].append(event.magnitudes[0].mag)
        catalog["longitude"].append(event.origins[0].longitude)
        catalog["latitude"].append(event.origins[0].latitude)
        catalog["depth(m)"].append(event.origins[0].depth)
    catalog = pd.DataFrame.from_dict(catalog).sort_values(["time"])
    catalog.to_csv(event_csv,
                   sep="\t", index=False, float_format="%.3f",
                   date_format='%Y-%m-%dT%H:%M:%S.%f',
                   columns=["time", "x(km)", "y(km)", "z(km)", "mag", "longitude", "latitude", "depth(m)"])

    ####### Plot catalog ########
#     t = []
#     mag = []
#     for event in events:
#         t.append(event.origins[0].time.datetime)
#         mag.append(event.magnitudes[0].mag)
#     plt.figure()
#     plt.plot_date(t, mag)
#     plt.gcf().autofmt_xdate()
#     plt.ylabel("Magnitude")
#     plt.title(f"Number of events: {len(events)}")
#     plt.savefig(os.path.join(data_path, "events_mag_time.png"))
    # plt.show()

In [6]:
# download_events("config.pkl", "events.csv")

In [7]:
download_events_op = comp.func_to_container_op(download_events, 
                                               base_image='python:3.8',
                                               packages_to_install= [
                                                  "obspy",
                                                  "pandas",
#                                                   "matplotlib"
                                               ])

In [8]:
def download_stations(config_pkl: InputPath("pickle"),
                      station_csv: OutputPath(str),
                      station_pkl: OutputPath("pickle")):
    
    import pickle, os
    import obspy
    from obspy.clients.fdsn import Client
    from collections import defaultdict
    import pandas as pd
#     import matplotlib
#     matplotlib.use("agg")
#     import matplotlib.pyplot as plt
    
    with open(config_pkl, "rb") as fp:
        config = pickle.load(fp)

    ####### Download stations ########
    stations = Client("IRIS").get_stations(network = config["networks"],
                                           station = "*",
                                           starttime=config["starttime"],
                                           endtime=config["endtime"],
                                           minlongitude=config["xlim_degree"][0],
                                           maxlongitude=config["xlim_degree"][1],
                                           minlatitude=config["ylim_degree"][0],
                                           maxlatitude=config["ylim_degree"][1],
                                           channel=config["channels"],
                                           level="response",
                                           filename="stations.xml")

    stations = obspy.read_inventory("stations.xml")
    print("Number of stations: {}".format(sum([len(x) for x in stations])))
    # stations.plot('local', outfile="stations.png")

    ####### Save stations ########
    station_locs = defaultdict(dict)
    for network in stations:
        for station in network:
            for chn in station:
                x = (chn.longitude - config["center"][0])*config["degree2km"]
                y = (chn.latitude - config["center"][1])*config["degree2km"]
                z = -chn.elevation / 1e3 #km
                sid = f"{network.code}.{station.code}.{chn.location_code}.{chn.code[:-1]}"
                if sid in station_locs:
                    station_locs[sid]["component"] += f",{chn.code[-1]}"
                    station_locs[sid]["response"] += f",{chn.response.instrument_sensitivity.value:.2f}"
                else:
                    component = f"{chn.code[-1]}"
                    response = f"{chn.response.instrument_sensitivity.value:.2f}"
                    dtype = chn.response.instrument_sensitivity.input_units.lower()
                    tmp_dict = {}
                    tmp_dict["x(km)"], tmp_dict["y(km)"], tmp_dict["z(km)"] = x, y, z
                    tmp_dict["longitude"], tmp_dict["latitude"], tmp_dict["elevation(m)"] = chn.longitude, chn.latitude, chn.elevation
                    tmp_dict["component"], tmp_dict["response"], tmp_dict["unit"] = component, response, dtype
                    station_locs[sid] = tmp_dict
                    
    station_locs = pd.DataFrame.from_dict(station_locs, orient='index')
    station_locs.to_csv(station_csv,
                        sep="\t", float_format="%.3f",
                        index_label="station",
                        columns=["x(km)", "y(km)", "z(km)", "latitude", "longitude", "elevation(m)", "unit", "component", "response"])

    with open(station_pkl, "wb") as fp:
        pickle.dump(stations, fp)
        
#     ####### Plot stations ########
#     plt.figure()
#     plt.plot(station_locs["x(km)"], station_locs["y(km)"], "^", label="Stations")
# #     plt.plot(catalog["x(km)"], catalog["y(km)"], "k.", label="Earthquakes")
#     plt.xlabel("X (km)")
#     plt.ylabel("Y (km)")
#     plt.axis("scaled")
#     plt.legend()
#     plt.title(f"Number of stations: {len(station_locs)}")
#     plt.savefig(os.path.join(data_path, "stations_events.png"))
#     # plt.show()

In [9]:
# download_stations("config.pkl", "stations.csv", "stations.pkl")

In [10]:
download_stations_op = comp.func_to_container_op(download_stations, 
                                                 base_image='python:3.8',
                                                 packages_to_install= [
                                                     "obspy",
                                                     "pandas",
#                                                      "matplotlib"
                                                 ])

In [11]:
def download_waveform(i: int, 
                      index_pkl: InputPath("pickle"),
                      config_pkl: InputPath("pickle"),
                      datetime_pkl: InputPath("pickle"),
                      station_pkl: InputPath("pickle"),
                      fname_csv: OutputPath(str),
                      data_path:str = "/tmp"
#                       bucket_name:str = "waveforms",
#                       s3_url:str = "localhost:9000", 
#                       secure:bool = True
                     ) -> str:
    
    import pickle, os
    import obspy
    from obspy.clients.fdsn import Client
    import time
    
#     from minio import Minio
#     minioClient = Minio(s3_url,
#                   access_key='minio',
#                   secret_key='minio123',
#                   secure=secure)
    
#     if not minioClient.bucket_exists(bucket_name):
#         minioClient.make_bucket(bucket_name)

    with open(index_pkl, "rb") as fp:
        index = pickle.load(fp)
    idx = index[i]
    with open(config_pkl, "rb") as fp:
        config = pickle.load(fp)
    with open(datetime_pkl, "rb") as fp:
        tmp = pickle.load(fp)
        starttimes = tmp["starttimes"]
        interval = tmp["interval"]
    with open(station_pkl, "rb") as fp:
        stations = pickle.load(fp)
    
#     waveform_dir = os.path.join("/tmp/", bucket_name)
    waveform_dir = os.path.join(data_path, "waveforms")
    if not os.path.exists(waveform_dir):
        os.makedirs(waveform_dir)
        
    ####### Download data ########
    client = Client(config["client"])
    fname_list = ["fname"]
    for i in idx: 
        starttime = obspy.UTCDateTime(starttimes[i]) 
        endtime = starttime + interval
        fname = "{}.mseed".format(starttime.datetime.strftime("%Y-%m-%dT%H:%M:%S"))
#         if not overwrite:
#         if os.path.exists(fname):
#             print(f"{fname} exists")
#             return
        max_retry = 10
        stream = obspy.Stream()
        print(f"{fname} download starts")
        for network in stations:
            for station in network:
                print(f"********{network.code}.{station.code}********")
                retry = 0
                while retry < max_retry:
                    try:
                        tmp = client.get_waveforms(network.code, station.code, "*", config["channels"], starttime, endtime)
                        stream += tmp
                        break
                    except Exception as e:
                        print("Error {}.{}: {}".format(network.code, station.code,e))
                        err = e
                        retry += 1
                        time.sleep(1)
                        continue
                if retry == max_retry:
                    print(f"{fname}: MAX {max_retry} retries reached : {network.code}.{station.code} with error: {err}")


        stream.write(os.path.join(waveform_dir, fname))
        print(f"{fname} download succeeds")
#         minioClient.fput_object(bucket_name, fname, os.path.join(waveform_dir, fname))
        fname_list.append(fname)
    
    with open(fname_csv, "w") as fp:
        fp.write("\n".join(fname_list))

    return waveform_dir

In [12]:
# waveform_path = download_waveform(0, "index.pkl", "config.pkl", "datetimes.pkl", "stations.pkl", "fname.csv")
#                                   s3_url="f84738f4ce13.ngrok.io", secure=True)

In [13]:
download_waveform_op = comp.func_to_container_op(download_waveform, 
                                                 base_image='python:3.8',
                                                 packages_to_install= [
                                                     "obspy",
#                                                      "minio"
                                                 ])

In [14]:
def phasenet_op(data_path: str, 
                data_list: str, 
                stations: str):

    return dsl.ContainerOp(name='PhaseNet picking',
                           image="zhuwq0/phasenet:latest",
                           command=['python'],
                           arguments=[
                             'predict.py',
                             '--model', "model/190703-214543",
                             '--data_dir', data_path,
                             '--data_list', dsl.InputArgumentPath(data_list),
                             '--stations', dsl.InputArgumentPath(stations),
#                              '--result_dir', "results",
                             '--input_mseed',
                             '--amplitude'
                             ],
                           file_outputs = {"picks": "/opt/results/picks.json"}
                         )

In [15]:
# !python PhaseNet/phasenet/predict.py --model=PhaseNet/model/190703-214543 --data_list=fname.csv --data_dir=waveforms --stations=stations.csv --input_mseed --amplitude

In [16]:
def gmma(i: int,
         index_pkl: InputPath("pickle"),
         config_pkl: InputPath("pickle"),
         pick_json: InputPath("json"),
         station_csv: InputPath(str),
         catalog_csv: OutputPath(str),
         bucket_name:str = "catalogs",
         s3_url:str = "localhost:9000", 
         secure:bool = True) -> str:
    
    import pandas as pd
    from datetime import datetime, timedelta
    from gmma import mixture
    import numpy as np
    from sklearn.cluster import DBSCAN 
    from datetime import datetime, timedelta
    import os
    import json
    import pickle
    
    from minio import Minio
    minioClient = Minio(s3_url,
                  access_key='minio',
                  secret_key='minio123',
                  secure=secure)
    if not minioClient.bucket_exists(bucket_name):
        minioClient.make_bucket(bucket_name)

    
    with open(index_pkl, "rb") as fp:
        index = pickle.load(fp)
    idx = index[i]
    
    with open(config_pkl, "rb") as fp:
        config = pickle.load(fp)
        
    catalog_dir = os.path.join("/tmp/", bucket_name)
    if not os.path.exists(catalog_dir):
        os.makedirs(catalog_dir)
    
    to_seconds = lambda t: datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%f").timestamp()
    from_seconds = lambda t: [datetime.fromtimestamp(x).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] for x in t]

    def convert_picks(picks, stations, config):
        data, locs, phase_type, phase_weight = ([],[],[],[])
        for pick in picks:
            data.append([to_seconds(pick["timestamp"]), np.log10(pick["amp"]*1e2)]) #cm/s
            locs.append(stations.loc[pick["id"]][config["dims"]].values.astype("float"))
            phase_type.append(pick["type"].lower())
            phase_weight.append(pick["prob"])
        data = np.array(data)
        locs = np.array(locs)
        phase_weight = np.array(phase_weight)[:,np.newaxis]
        return data, locs, phase_type, phase_weight
    
    def association(data, locs, phase_type, phase_weight, num_sta, config):
        
        db = DBSCAN(eps=config["dbscan_eps"], min_samples=config["dbscan_min_samples"]).fit(data[:,0:1])
        labels = db.labels_
        unique_labels = set(labels)
        events = []
        for k in unique_labels:
            if k == -1:
                continue

            class_mask = (labels == k)
            data_ = data[class_mask]
            locs_ = locs[class_mask]
            phase_type_ = np.array(phase_type)[class_mask]
            phase_weight_ = phase_weight[class_mask]

            num_event_ = min(max(int(len(data_)/num_sta*config["oversample_factor"]), 1), len(data_))
            t0 = data_[:,0].min()
            t_range = max(data_[:,0].max() - data_[:,0].min(), 1)
            centers_init = np.vstack([np.ones(num_event_)*np.mean(stations["x(km)"]),
                                      np.ones(num_event_)*np.mean(stations["y(km)"]),
                                      np.zeros(num_event_),
                                      np.linspace(data_[:,0].min()-0.1*t_range, data_[:,0].max()+0.1*t_range, num_event_)]).T # n_eve, n_dim(x, y, z) + 1(t)


            if config["use_amplitude"]:
                covariance_prior = np.array([[1,0],[0,1]]) * 3
            else:
                covariance_prior = np.array([[1]])
                data = data[:,0:1]
            gmm = mixture.BayesianGaussianMixture(n_components=num_event_, 
                                                  weight_concentration_prior=1000/num_event_,
                                                  mean_precision_prior = 0.3/t_range,
                                                  covariance_prior = covariance_prior,
                                                  init_params="centers",
                                                  centers_init=centers_init, 
                                                  station_locs=locs_, 
                                                  phase_type=phase_type_, 
                                                  phase_weight=phase_weight_,
                                                  loss_type="l1",
                                                  bounds=config["bfgs_bounds"],
                                                  max_covar=10.0,
                                                  reg_covar=0.1,
                                                  ).fit(data_) 

            pred = gmm.predict(data_) 
            prob = gmm.predict_proba(data_)
            prob_eq = prob.mean(axis=0)
            prob_data = prob[range(len(data_)), pred]
            score_data = gmm.score_samples(data_)

            idx = np.array([True if len(data_[pred==i, 0]) >= config["min_picks_per_eq"] else False for i in range(len(prob_eq))]) #& (prob_eq > 1/num_event) #& (std_eq[:, 0,0] < 40)
            eq_idx = np.arange(len(idx))[idx]

            time = from_seconds(gmm.centers_[idx, len(config["dims"])])
            loc = gmm.centers_[idx, :len(config["dims"])]
            if config["use_amplitude"]:
                mag = gmm.centers_[idx, len(config["dims"])+1]
            std_eq = gmm.covariances_[idx,...]

            for i in range(len(time)):
                tmp = {"time": time[i],
                       "magnitude": mag[i],
                       "std": std_eq[i].tolist()}
                for j, k in enumerate(config["dims"]):
                    tmp[k] = loc[i][j]
                events.append(tmp)

        return events
    
    with open(pick_json, "r") as fp:
        picks = json.load(fp)
    stations = pd.read_csv(station_csv, delimiter="\t", index_col="station")
    data, locs, phase_type, phase_weight = convert_picks(picks, stations, config)
    catalog = association(data, locs, phase_type, phase_weight, len(stations), config)
    catalog = pd.DataFrame(catalog, columns=["time"]+config["dims"]+["magnitude", "std"])
    
    with open(catalog_csv, 'w') as fp:
        catalog.to_csv(fp, sep="\t", index=False, 
                       float_format="%.3f",
                       date_format='%Y-%m-%dT%H:%M:%S.%f')
    
    with open(os.path.join(catalog_dir, f"catalog_{idx[0]:04d}.csv"), 'w') as fp:
        catalog.to_csv(fp, sep="\t", index=False, 
                       float_format="%.3f",
                       date_format='%Y-%m-%dT%H:%M:%S.%f')
        
    minioClient.fput_object(bucket_name, f"catalog_{i:04d}.csv", os.path.join(catalog_dir, f"catalog_{i:04d}.csv"))
    
    return f"catalog_{i:04d}.csv"
    

In [17]:
# catalog_path = gmma(1, "index.pkl", "config.pkl", "./results/picks.json", "stations.csv", "catalog.csv", #)
#                     bucket_name="catalogs", s3_url="localhost:9000", secure=False)

In [18]:
gmma_op = comp.func_to_container_op(gmma, 
                                    base_image='python:3.8',
                                    packages_to_install= [
                                         "pandas",
                                         "numpy",
                                         "scikit-learn",
                                         "minio",
                                         "gmma"
                                    ])

In [19]:
def combine_catalog(catalog_csv: OutputPath(str),
                    bucket_name:str = "catalogs",
                    s3_url:str = "localhost:9000", 
                    secure:bool = True):
     
    import pandas as pd
    from glob import glob
    import os
    
    from minio import Minio
    minioClient = Minio(s3_url,
                  access_key='minio',
                  secret_key='minio123',
                  secure=secure)
    objects = minioClient.list_objects(bucket_name, recursive=True)
    
    tmp_path = lambda x: os.path.join("/tmp/", x)
    for obj in objects:
        minioClient.fget_object(bucket_name, obj._object_name, tmp_path(obj._object_name))
    
    files_catalog = sorted(glob(tmp_path("catalog_*.csv")))

    if len(files_catalog) > 0:
        combined_catalog = pd.concat([pd.read_csv(f, sep="\t", dtype=str) for f in files_catalog]).sort_values(by="time")
        combined_catalog.to_csv(tmp_path("combined_catalog.csv"), sep="\t", index=False)
        minioClient.fput_object(bucket_name, f"combined_catalog.csv", tmp_path("combined_catalog.csv"))
        with open(catalog_csv, "w") as fout:
            with open(tmp_path("combined_catalog.csv"), "r") as fin:
                for line in fin:
                    fout.write(line)
    else:
        with open(catalog_csv, "w") as fout:
            pass
        print("No events.csv found!")


In [20]:
# combine_catalog("catalog.csv", bucket_name="catalogs", s3_url="localhost:9000", secure=True)

In [21]:
combine_op = comp.func_to_container_op(combine_catalog, 
                                       base_image='python:3.8',
                                       packages_to_install= [
                                           "pandas",
                                           "minio"
                                       ])


In [25]:
# Define the pipeline
@dsl.pipeline(name='QuakeFlow', description='')
def quakeflow_pipeline(data_path:str = "/tmp/", 
                       bucket_catalog:str = "catalogs",
                       s3_url:str="localhost:9000", 
                       secure:bool=False):
    
#     s3_url = "10.3.252.218"
#     s3_url = "127.0.0.1"
#     pvop = dsl.VolumeOp(name="Create_volume",
#                        resource_name="data-volume", 
#                        size="10Gi", 
#                        modes=dsl.VOLUME_MODE_RWO).volume
#     pvop = dsl.PipelineVolume(pvc="quakeflow-smw46-data-volume")
    
#     def set_config(index_pkl: OutputPath("pickle"),
#                    config_pkl: OutputPath("pickle"), 
#                    datetime_pkl: OutputPath("pickle")) -> list:
    
    config = config_op()
        
#     def download_events(config_pkl: InputPath("pickle"),
#                         event_csv: OutputPath(str)):
    events = download_events_op(config.outputs["config_pkl"])
    
#     def download_stations(config_pkl: InputPath("pickle"),
#                           station_csv: OutputPath(str),
#                           station_pkl: OutputPath("pkl")):
    stations = download_stations_op(config.outputs["config_pkl"])

    with kfp.dsl.ParallelFor(config.outputs["output"]) as i:
        
        vop_ = dsl.VolumeOp(name="Create volume",
                            resource_name="data_volume", 
                            size="10Gi", 
                            modes=dsl.VOLUME_MODE_RWO)
        
#         def download_waveform(i: int, 
#                               index_pkl: InputPath("pickle"),
#                               config_pkl: InputPath("pickle"),
#                               datetime_pkl: InputPath("pickle"),
#                               station_pkl: InputPath("pickle"),
#                               fname_csv: OutputPath(str),
#                               data_path:str = "/tmp"
#         #                       bucket_name:str = "waveforms",
#         #                       s3_url:str = "localhost:9000", 
#         #                       secure:bool = True
#                              ):
        download_op_ = download_waveform_op(i, 
                                            config.outputs["index_pkl"], 
                                            config.outputs["config_pkl"], 
                                            config.outputs["datetime_pkl"], 
                                            stations.outputs["station_pkl"],
                                            data_path = data_path
                                           ).add_pvolumes({data_path: vop_.volume})
#         download_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"
        
#         def phasenet_op(data_path: str, 
#                         data_list: str, 
#                         stations: str):
        phasenet_op_ = phasenet_op(download_op_.outputs["Output"], 
                                   download_op_.outputs["fname_csv"], 
                                   stations.outputs["station_csv"]
                                   ).add_pvolumes({data_path: download_op_.pvolume})
#         phasenet_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"

#         def gmma(i,
#                  index_pkl: InputPath("pickle"),
#                  config_pkl: InputPath("pickle"),
#                  pick_json: InputPath("json"),
#                  station_csv: InputPath(str),
#                  catalog_csv: OutputPath(str),
#                  bucket_name:str = "catalogs",
#                  s3_url:str = "localhost:9000", 
#                  secure:bool = True) -> str:
        gmma_op_ = gmma_op(i,
                           config.outputs["index_pkl"],
                           config.outputs["config_pkl"],
                           phasenet_op_.outputs["picks"],
                           stations.outputs["station_csv"],
                           bucket_name = "catalogs",
                           s3_url = s3_url,
                           secure = secure
                           ).add_pvolumes({data_path: phasenet_op_.pvolume})

#     def combine_catalog(catalog_csv: OutputPath(str),
#                         bucket_name:str = "catalogs",
#                         s3_url:str = "localhost:9000", 
#                         secure:bool = True):
    combine_op_ = combine_op(bucket_name = "catalogs", s3_url=s3_url, secure=secure).after(gmma_op_)
    combine_op_.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [26]:
client = kfp.Client(host='https://97745bc77884.ngrok.io/')
# client = kfp.Client(host='127.0.0.1:8080')
# client = kfp.Client(host='553ab00ece5a86e5-dot-us-west1.pipelines.googleusercontent.com')

In [27]:
experiment_name = 'QuakeFlow'
pipeline_func = quakeflow_pipeline
run_name = pipeline_func.__name__ + '_run'

arguments = {"data_path": "/tmp/",
             "bucket_catalog": "catalogs",
#              "s3_url": "localhost:9000",
#              "secure": False
#              "s3_url": "10.111.90.219:9000",
             "s3_url": "10.97.200.84:9000",
             "secure": False
             }

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func, '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
results = client.create_run_from_pipeline_func(pipeline_func, 
                                               experiment_name=experiment_name, 
                                               run_name=run_name, 
                                               arguments=arguments)