In [None]:
!python -m ipykernel install --user --name=conda_python3
!python -m pip install metaflow

In [None]:
%%writefile metaflow_example.py
import os
os.environ["METAFLOW_PACKAGE_SUFFIXES"] = ".py,.ipynb,.R,.RDS"

from metaflow import FlowSpec, Parameter, step, batch, conda_base, conda
import random
import boto3
import logging
from botocore.exceptions import ClientError

def upload_s3_file(filename, bucket):
    s3_client = boto3.client('s3')
    try:
        s3_client.upload_file(
            filename, 
            bucket, 
            filename
        )
    except ClientError as e:
        print(f"Something went wrong - file not uploaded successfully on s3 bucket {bucket}")
        print(e)
        return False
    
    print(f"File uploaded successfully on s3 bucket {bucket}")
    return True

@conda_base(libraries = {"papermill" : "2.3.3"})
class MetaflowExample(FlowSpec):
    data_bucket_name = Parameter(
        'data_bucket_name',
        help = 'S3 data bucket name',
        required = True,
        type = str
    )
    
    success_file_name = Parameter(
        'success_file_name',
        help = 'Metaflow successful run proof file name',
        required = True,
        type = str
    )
    
    @step
    def start(self):
        print("starting run with metaflow")
        
        # Default papermill parameters
        self.n = 10
        self.r = 20
        self.p = 0
        
        print(f"parameters evaluation\n n: {self.n}\n r: {self.r}\n p: {self.p}")
        
        self.next(self.execute_notebook_local)

    @step
    def execute_notebook_local(self):
        print("executing notebook locally")

        data = [random.randint(1, self.r) for _ in range(self.n)]
        print(f"generated data: {data}")

        self.next(self.execute_notebook_remote)
        
    @batch
    @conda(libraries = {"jupyter": "1.0.0"})
    @step
    def execute_notebook_remote(self):
        print("executing notebook remotely")

        data = [random.randint(1, self.r) for _ in range(self.n)]
        print(f"generated data: {data}")

        self.next(self.end)

    @step
    def end(self):
        print("saving txt file as a proof of successfull run")
        file = open(self.success_file_name, "w")
        file.write("Metaflow local and remote run successfully!")
        file.close()
        
        upload_s3_file(self.success_file_name, self.data_bucket_name)
        
        print("ending run with metaflow")
 
if __name__ == '__main__':
    MetaflowExample()

In [None]:
!python metaflow_example.py --environment=conda --no-pylint run --data_bucket_name $data_bucket_name --success_file_name $success_file_name