# KaggleOps Template

## Overview

KaggleOpsで利用するための実行用ノートブックのひな形です。

## Import packages

In [None]:
# default packages
import logging
import os
import pathlib
import typing as t

In [None]:
# third party packages
import google.cloud.storage as gcs
import toml
import yaml

## Preset

In [None]:
# mode
MODE_DEBUG = True

In [None]:
# logger
_logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.DEBUG if MODE_DEBUG else logging.INFO
)

In [None]:
# git info
GIT_REPOSITORY = "iimuz/til"
GIT_BRANCH = "master"
PROJECT_DIR = "machine_learning/kaggle_ops"

In [None]:
# env
DOTENV_PATH = pathlib.Path("/content/.env")

In [None]:
# training
CONFIG_PATH = pathlib.Path("models/config.yml")

## Check gpu

In [None]:
def show_gpu_info() -> None:
    """GPU情報を確認する."""
    gpu_info = !nvidia-smi
    gpu_info = "\n".join(gpu_info)
    _logger.info(gpu_info)


show_gpu_info()

## Clone repository

In [None]:
def clone_git(repository: str,branch: str, project_dir: str) -> None:
    """git repositoryをクローンして、利用するディレクトリに移動する."""
    clone_path = pathlib.Path("/content").joinpath(repository.split("/")[-1])

    if not clone_path.exists():
        !git clone https://github.com/{repository}.git {clone_path}
        %cd {clone_path}
        !git checkout -b {branch} origin/{branch}
    else:
        %cd {clone_path}
        !git fetch origin --prune
        !git merge --ff origin/{branch}

    %cd {project_dir}
    _logger.info(f"current direcotry: {pathlib.Path().resolve()}")


clone_git(GIT_REPOSITORY, GIT_BRANCH, PROJECT_DIR)

## Install packages

In [None]:
def install_packages() -> None:
    """pyprojectから依存パッケージを導入する.
    
    Notes:
        poetryをインストールして、poetry installを利用すると、
        colabで必要なパッケージの依存関係を壊すようなのでpipでインストール.
    """
    config = toml.load("pyproject.toml")
    package_list = config["tool"]["poetry"]["dependencies"]
    for package in package_list.keys():
        if package == "python":
            continue

        !pip install -q {package}


install_packages()

## Set environments

In [None]:
import dotenv

In [None]:
def load_env(filepath: pathlib.Path) -> None:
    dotenv.load_dotenv(filepath)


load_env(DOTENV_PATH)

## GCloud settings

In [None]:
GCLOUD_PROJECT = os.environ.get("GCLOUD_PROJECT_ID", "")
GCS_BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME", "")

In [None]:
try:
    import google.colab.auth as gca
    gca.authenticate_user()
except ImportError as e:
    _logger.debug(e)

In [None]:
!gcloud config set project {GCLOUD_PROJECT}
!gcloud config set core/disable_usage_reporting False

In [None]:
os.environ["GOOGLE_CLOUD_PROJECT"] = GCLOUD_PROJECT

## Download mlruns

In [None]:
def download_mlruns(project: str, bucket_name: str) -> None:
    """mlrunsをGCSから復元する.

    Notes:
        実験の再開はしない前提で meta.yaml のみ復元。
    """
    client = gcs.Client(project=project)

    mlflow_tracking_uri = os.environ.get("MLFLOW_TRACKING_URI", "data/processed/mlruns")

    blobs = client.list_blobs(bucket_name)
    for blob in blobs:
        filepath = pathlib.Path(blob.name)

        dirname_top = str(filepath.parents[len(filepath.parents) - 2])
        filename = str(filepath.name)
        if (dirname_top == "mlruns") and (filename == "meta.yaml"):
            download_path = pathlib.Path(mlflow_tracking_uri).joinpath(
                "/".join(str(filepath).split("/")[1:])
            )
            download_path.parent.mkdir(exist_ok=True, parents=True)
            blob.download_to_filename(download_path)


download_mlruns(GCLOUD_PROJECT, GCS_BUCKET_NAME)

## Run a experiment

In [None]:
%%writefile {CONFIG_PATH}
experiment_name: "ColabTest"
uri: "/content/til/machine_learning/kaggle_ops/"
git_version: null

In [None]:
# ここではPYTHONPATHは書き換えられない
!env $(cat {DOTENV_PATH} | xargs) | python src/models/mlproject.py {CONFIG_PATH}

## Upload mlruns to GCS

In [None]:
def load_yaml(filepath: pathlib.Path) -> t.Dict:
    """yamlファイルを読み込んだ辞書を返す."""
    with open(filepath, "r") as f:
        data = yaml.load(f, Loader=yaml.SafeLoader)

    return data


def search_experiment_dir(
    mlruns_dir: pathlib.Path,
    experiment_name: str,
) -> pathlib.Path:
    """対象の実験フォルダのみを選択."""
    exp_dirs = pathlib.Path(mlruns_dir).glob("*")
    for exp_dir in exp_dirs:
        try:
            meta_path = exp_dir.joinpath("meta.yaml")
            meta = load_yaml(meta_path)
        except FileNotFoundError as e:
            _logger.debug(e)

        if meta["name"] == experiment_name:
            return exp_dir

    return pathlib.Path()


def mlruns_files(experiment_dir: pathlib.Path) -> t.Dict[str, pathlib.Path]:
    """(blob name, filepath)で保存するファイル一覧を生成."""
    filepath_list = [
        filepath
        for filepath in experiment_dir.glob("**/*")
        if filepath.is_file()
    ]
    files = {
        str(
            pathlib.Path(f"mlruns/{experiment_dir.stem}").joinpath(
                filepath.relative_to(experiment_dir)
            )
        ): filepath
        for filepath in filepath_list
    }

    return files


def upload_mlruns_to_gcs(
    config_path: pathlib.Path,
    project_name: str,
    bucket_name: str,
) -> None:
    import pprint

    config = load_yaml(config_path)
    experiment_name = config["experiment_name"]

    mlflow_tracking_uri = os.environ.get("MLFLOW_TRACKING_URI", "data/processed/mlruns")
    exp_dir = search_experiment_dir(mlflow_tracking_uri, experiment_name)
    candidates = mlruns_files(exp_dir)

    client = gcs.Client(project=project_name)
    files_in_bucket = [f.name for f in client.list_blobs(bucket_name)]
    bucket = client.get_bucket(bucket_name)

    blob_names = set(candidates.keys()) - set(files_in_bucket)
    for blob_name in blob_names:
        filepath = candidates[blob_name]
        blob = bucket.blob(blob_name)
        blob.upload_from_filename(str(filepath))


upload_mlruns_to_gcs(CONFIG_PATH, GCLOUD_PROJECT, GCS_BUCKET_NAME)