# Flyte Demo

- Register all tasks
- Create a data preparation workflow by composing these tasks
- Create an orchasteration workflow by composing other workflows
- Visualize the output

In [19]:
# We will use our own staging environment for the live demo!
import os
os.environ['FLYTE_PLATFORM_URL'] = "flyte-staging.lyft.net"
version = "fake10"

In [14]:
# Register existing tasks/workflows with the system
from flytekit.clis.sdk_in_container.register import register_all, register_tasks_only
import os 
from flytekit.configuration import set_flyte_config_file

os.environ["FLYTE_INTERNAL_IMAGE"] = "docker.io/lyft/flytekubecondemo2019:bb1f44c83a6e9bc6e8bc9eb98bf9c4a8e76880c7"
set_flyte_config_file("staging.config")
os.environ["FLYTE_INTERNAL_CONFIGURATION_PATH"] = "/app/staging.config"

register_all("flytekubecondemo2019", "development", ["workflows"], version=version, test=False)

Running task, workflow, and launch plan registration for flytekubecondemo2019, development, ['workflows'] with version fake9
Registering Task:                workflows.classifier_evaluate_workflow.analyze_prediction_results
Registering Task:                workflows.classifier_evaluate_workflow.fetch_model
Registering Task:                workflows.classifier_evaluate_workflow.evaluate_on_datasets
Registering Task:                workflows.classifier_train_workflow.rearrange_data
Registering Task:                workflows.classifier_evaluate_workflow.generate_predictions
Registering Workflow:            workflows.classifier_evaluate_workflow.ClassifierEvaluateWorkflow
Registering Launch Plan:         workflows.classifier_evaluate_workflow.ClassifierEvaluateWorkflow
Registering Launch Plan:         workflows.classifier_evaluate_workflow.evaluate_lp
Registering Task:                workflows.classifier_evaluate_workflow.predict
Registering Task:                workflows.classifier_evalua

# Step 1: Data Preparation Workflow

- Map-style job to download all videos from remote locations
- Map-style job to extract frames from all videos
- Map-style job to run luminance algorithm and select frames from videos

In [15]:

from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input
from workflows.data_preparation_workflow import DEFAULT_RANDOM_SEED, DEFAULT_LUMINANCE_N_CLUSTERS, DEFAULT_LUMINANCE_SAMPLE_SIZE, download_videos, extract_from_video_collections, luminance_select_collections

@workflow_class
class DataPreparationWorkflow:
    streams_external_storage_prefix = Input(Types.String, required=True)
    streams_names = Input([Types.String], required=True)
    stream_extension = Input(Types.String, default="avi")

    # video_external_paths = Input([Types.String], required=True)
    sampling_random_seed = Input(Types.Integer, default=DEFAULT_RANDOM_SEED)
    sampling_n_clusters = Input(Types.Integer, default=DEFAULT_LUMINANCE_N_CLUSTERS)
    sampling_sample_size = Input(Types.Integer, default=DEFAULT_LUMINANCE_SAMPLE_SIZE)

    download_video_task = download_videos(
        streams_external_storage_prefix=streams_external_storage_prefix,
        streams_names=streams_names,
        stream_extension=stream_extension,
    )

    extract_from_video_collection_task = extract_from_video_collections(
        video_blobs=download_video_task.outputs.downloaded_streams_blobs,
    )

    luminance_select_collections_task = luminance_select_collections(
        raw_frames_mpblobs=extract_from_video_collection_task.outputs.raw_frames_mpblobs,
        n_clusters=sampling_n_clusters,
        sample_size=sampling_sample_size,
        random_seed=sampling_random_seed,
    )

    selected_frames_mpblobs = Output(luminance_select_collections_task.outputs.selected_image_mpblobs,
                                     sdk_type=[Types.MultiPartBlob])
    selected_frames_mpblobs_metadata = Output(luminance_select_collections_task.outputs.selected_file_names,
                                              sdk_type=[[Types.String]])
    streams_names_out = Output(streams_names, sdk_type=[Types.String])


data_prep = DataPreparationWorkflow.create_launch_plan()

DataPreparationWorkflow.register("flytekubecondemo2019", "development", "workflows.driver_workflow.DataPreparationWorkflow", version)
data_prep.register("flytekubecondemo2019", "development", "workflows.driver_workflow.DataPreparationWorkflow", version)

'lp:flytekubecondemo2019:development:workflows.driver_workflow.DataPreparationWorkflow:fake9'

In [16]:
# Kick of an execution
data_prep_exec = data_prep.execute("flytekubecondemo2019", "development", inputs={
    'streams_external_storage_prefix': 's3://lyft-modelbuilder/metadata/_FlyteKubeconDemo2019Dataset/streams',
    'streams_names': ["1537396038_cam-rgb-1","1537396038_cam-rgb-2","1537396662_cam-rgb-1","1537396662_cam-rgb-2","1537396790_cam-rgb-1","1537396790_cam-rgb-2","1537396942_cam-rgb-1","1537396942_cam-rgb-2","1538521877_cam-rgb-1","1538521877_cam-rgb-2","1538521964_cam-rgb-1","1538521964_cam-rgb-2","1538522195_cam-rgb-1","1538522195_cam-rgb-2","1538522386_cam-rgb-1","1538522386_cam-rgb-2","1538522615_cam-rgb-1","1538522615_cam-rgb-2","1538522881_cam-rgb-1","1538522881_cam-rgb-2","1538523052_cam-rgb-1","1538523052_cam-rgb-2","1538523280_cam-rgb-1","1538523280_cam-rgb-2","1538523741_cam-rgb-1","1538523741_cam-rgb-2","1538523916_cam-rgb-1","1538523916_cam-rgb-2","1538524089_cam-rgb-1","1538524089_cam-rgb-2","1539390982_cam-rgb-1","1539390982_cam-rgb-2","1539391169_cam-rgb-1","1539391169_cam-rgb-2","1539391321_cam-rgb-1","1539391321_cam-rgb-2","1539391462_cam-rgb-1","1539391462_cam-rgb-2","1539391643_cam-rgb-1","1539391643_cam-rgb-2","1539391807_cam-rgb-1","1539391807_cam-rgb-2","1539391941_cam-rgb-1","1539391941_cam-rgb-2","1539471715_cam-rgb-1","1539471715_cam-rgb-2","1539471904_cam-rgb-1","1539471904_cam-rgb-2","1539475636_cam-rgb-1","1539475636_cam-rgb-2"],
})

print("Started execution: {}".format(data_prep_exec.id))

Started execution: ex:flytekubecondemo2019:development:fcb7cc1984d89420ab75


# Step 2 Orchasterator Workflow

- Use data preparation, training and evaluation workflows to compose an orchasterator worklfow
- Kick of an execution and wait for completion
- Visualize the output

In [20]:
# The Driver Workflow
import ujson
from flytekit.common.tasks.task import SdkTask
from flytekit.sdk.tasks import python_task, inputs, outputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input
from workflows.classifier_evaluate_workflow import evaluate_lp
from workflows.classifier_train_workflow import train_lp, DEFAULT_VALIDATION_DATA_RATIO, \
    DEFAULT_TRAINING_VALIDATION_CONFIG_FILE
from workflows.data_preparation_workflow import data_prep
from workflows.driver_workflow import pick_second

# Consume a task from a different project
compute_confusion_matrix = SdkTask.fetch(
    project="kubecondemo2019-metrics",
    domain="development",
    name="demo_metrics.tasks.confusion_matrix.confusion_matrix",
    version="66b463748f25ef71c8cd4eb3001f00eafb83efc6",
)


@workflow_class
class Orchestrator:
    # Define inputs
    streams_external_storage_prefix = Input(Types.String, required=True)
    streams_names = Input([Types.String], required=True)
    stream_extension = Input(Types.String, default="avi")

    streams_metadata_path = Input(Types.String, required=True)
    training_validation_config_json = Input(Types.Generic,
                                            default=ujson.loads(open(DEFAULT_TRAINING_VALIDATION_CONFIG_FILE).read()))
    validation_data_ratio = Input(Types.Float, default=DEFAULT_VALIDATION_DATA_RATIO)

    # Define workflow steps
    prepare = data_prep(
        streams_external_storage_prefix=streams_external_storage_prefix,
        streams_names=streams_names,
        stream_extension=stream_extension)

    train = train_lp(
        available_streams_names=prepare.outputs.streams_names_out,
        available_streams_mpblobs=prepare.outputs.selected_frames_mpblobs,
        streams_metadata_path=streams_metadata_path,
        training_validation_config_json=training_validation_config_json,
        validation_data_ratio=validation_data_ratio
    )

    pick_second = pick_second(models=train.outputs.trained_models)

    evaluate = evaluate_lp(
        available_streams_names=prepare.outputs.streams_names_out,
        available_streams_mpblobs=prepare.outputs.selected_frames_mpblobs,
        streams_metadata_path=streams_metadata_path,
        evaluation_config_json=training_validation_config_json,
        model=pick_second.outputs.second,
        validation_data_ratio=validation_data_ratio
    )

    confusion_matrix_task = compute_confusion_matrix(
        y_true=evaluate.outputs.ground_truths,
        y_pred=evaluate.outputs.predictions,
        title="Confusion Matrix",
        normalize=True,
        classes=["busy", "clear"],
    )

    # Define workflow outputs
#     ground_truths = Output(evaluate.outputs.ground_truths, sdk_type=[Types.Integer])
#     predictions = Output(evaluate.outputs.predictions, sdk_type=[Types.Integer])
    confusion_matrix_image = Output(confusion_matrix_task.outputs.visual, sdk_type=Types.Blob)


orchestrator_lp = Orchestrator.create_launch_plan()

Orchestrator.register("flytekubecondemo2019", "development", "workflows.driver_workflow.DriverWorkflow", version)
orchestrator_lp.register("flytekubecondemo2019", "development", "workflows.driver_workflow.DriverWorkflow", version)

'lp:flytekubecondemo2019:development:workflows.driver_workflow.RegisterNotebook:fake10'

In [21]:
# Kick of an execution
execution = orchestrator_lp.execute("flytekubecondemo2019", "development", inputs={
    'streams_external_storage_prefix': 's3://lyft-modelbuilder/metadata/_FlyteKubeconDemo2019Dataset/streams',
    'streams_names': ["1537396038_cam-rgb-1","1537396038_cam-rgb-2","1537396662_cam-rgb-1","1537396662_cam-rgb-2","1537396790_cam-rgb-1","1537396790_cam-rgb-2","1537396942_cam-rgb-1","1537396942_cam-rgb-2","1538521877_cam-rgb-1","1538521877_cam-rgb-2","1538521964_cam-rgb-1","1538521964_cam-rgb-2","1538522195_cam-rgb-1","1538522195_cam-rgb-2","1538522386_cam-rgb-1","1538522386_cam-rgb-2","1538522615_cam-rgb-1","1538522615_cam-rgb-2","1538522881_cam-rgb-1","1538522881_cam-rgb-2","1538523052_cam-rgb-1","1538523052_cam-rgb-2","1538523280_cam-rgb-1","1538523280_cam-rgb-2","1538523741_cam-rgb-1","1538523741_cam-rgb-2","1538523916_cam-rgb-1","1538523916_cam-rgb-2","1538524089_cam-rgb-1","1538524089_cam-rgb-2","1539390982_cam-rgb-1","1539390982_cam-rgb-2","1539391169_cam-rgb-1","1539391169_cam-rgb-2","1539391321_cam-rgb-1","1539391321_cam-rgb-2","1539391462_cam-rgb-1","1539391462_cam-rgb-2","1539391643_cam-rgb-1","1539391643_cam-rgb-2","1539391807_cam-rgb-1","1539391807_cam-rgb-2","1539391941_cam-rgb-1","1539391941_cam-rgb-2","1539471715_cam-rgb-1","1539471715_cam-rgb-2","1539471904_cam-rgb-1","1539471904_cam-rgb-2","1539475636_cam-rgb-1","1539475636_cam-rgb-2"],
    'streams_metadata_path': 's3://lyft-modelbuilder/metadata/_FlyteKubeconDemo2019Dataset/metadata/streams_metadata.json',
})

print("Started execution: {}".format(execution.id))

Started execution: ex:flytekubecondemo2019:development:f60884584e83c48afb12


In [None]:
execution.wait_for_completion()
print(execution.outputs)

In [None]:
local_file = "/tmp/cm-4.png"
execution.outputs["confusion_matrix_image"].download(local_file, overwrite=True)
from IPython.display import Image
Image(filename=local_file)