# About
* **Author**: Adil Rashitov (adil@wastelabs.co)
* **Created at**: 17.06.2022
* **Issue**: https://github.com/AtmosOne/research_kedro_microservice/issues/8


In [None]:
# Imports / Configs / Global vars

# Import of native python tools
import os
import json
from functools import reduce

# Import of base ML stack libs
import numpy as np
import sklearn as sc

# Multiprocessing for Mac / Linux
import platform
platform.system()
if platform.system() == 'Darwin':
    from multiprocess import Pool
else:
    from multiprocessing import Pool

# Visualization libraries
import plotly.express as px

# Logging configuraiton
import logging
logging.basicConfig(format='[ %(asctime)s ][ %(levelname)s ]: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logger = logging.getLogger()
logger.setLevel(logging.INFO)


# Ipython configs
from IPython.core.display import display, HTML
from IPython.core.interactiveshell import InteractiveShell
display(HTML("<style>.container { width:100% !important; }</style>"))
InteractiveShell.ast_node_interactivity = 'all'

# Pandas configs
import pandas as pd
import geopandas as gpd
pd.options.display.max_rows = 350
pd.options.display.max_columns = 250

# Jupyter configs
%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False

# GLOBAL VARS
from pathlib import Path
import sys
PROJECT_DIR = os.getcwd().rsplit('/', 1)[0]
SRC_DIR = PROJECT_DIR 


if SRC_DIR not in sys.path:
    sys.path.append(SRC_DIR)

if PROJECT_DIR not in sys.path:
    sys.path.append(PROJECT_DIR)

# Context management
* Kedro
* Data

In [None]:
# AWS
# 1. Initialization kedro session
from pathlib import Path
from kedro.framework.startup import bootstrap_project
from kedro.framework.session import KedroSession
from kedro.extras.extensions.ipython import _find_kedro_project
from kedro.framework.context.context import KedroContext


def get_kedro_context(env: str) -> KedroContext:
    """Function performs extraction of kedro context using built-in tools
    Args:
        env (str): input environment
    Returns:
        KedroContext: Generated kedro context
    """
    # 1. Bootstrapping project to find main path
    startup_path = Path.cwd()
    project_path = _find_kedro_project(startup_path)
    metadata = bootstrap_project(project_path)
    extra_params = None

    # 2. Initlize session & create context
    session = KedroSession.create(
        metadata.package_name,
        project_path,
        extra_params=extra_params,
        env=env,
    )
    context = session.load_context()

    return context


test = get_kedro_context("test")

# Main
Main code goes below

In [None]:
credentials = {
    "aws_access_key_id": os.environ['aws_access_key_id'],
    "aws_secret_access_key": os.environ['aws_secret_access_key'],
    "region_name": 'us-east-2',
}


In [None]:
import boto3


batch_client = boto3.client("batch", **credentials)

In [None]:
task_name = "TruckRoutePlans_2022-03-25_IWS_1_small"
JOB_DEFINIITON = "test-research-kedro-microservice-job-definition"
JOB_QUEUE = "test-research-kedro-microservice-spot"
SHARE_ID = "A1*"


batch_response = batch_client.submit_job(
    jobName=task_name + "__v1",
    jobDefinition=JOB_DEFINIITON,
    schedulingPriorityOverride=123,
    jobQueue=JOB_QUEUE,
    shareIdentifier=SHARE_ID,
    containerOverrides={
        'command': ["kedro","run","--pipeline=geocoding.geodata_gov_hk.v1","--env=test"],
        'environment': [
            {
                'name': 'task_id',
                'value': "TruckRoutePlans_2022-03-25_IWS_1_small",
            },
        ],
    },
)
# google_maps_key=AIzaSyADVHG4MuwmsQeNaYxEM_LHylQ-MZMm9vM

In [None]:
batch_response

In [None]:
status = batch_client.describe_jobs(jobs=['6ec781af-c24d-42cb-8255-bf7e453b2159'])

In [None]:
status

In [None]:
from typing import Any, List
from pydantic import BaseModel, parse_obj_as


class JobStatus(BaseModel):
    status: str
    jobQueue: str
    jobId: str
    jobName: str
    jobArn: str
    jobDefinition: str
    container: Any
    resourceRequirements: Any
    timeout: Any
        

class JobStatusResponse(BaseModel):
    jobs: List[JobStatus]


parse_obj_as(JobStatusResponse, status).dict()
