In [None]:
!pip install fastapi nest-asyncio pyngrok uvicorn h2o

Collecting fastapi
  Downloading fastapi-0.112.1-py3-none-any.whl.metadata (27 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.0-py3-none-any.whl.metadata (7.4 kB)
Collecting uvicorn
  Downloading uvicorn-0.30.6-py3-none-any.whl.metadata (6.6 kB)
Collecting h2o
  Downloading h2o-3.46.0.4.tar.gz (265.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m265.3/265.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting starlette<0.39.0,>=0.37.2 (from fastapi)
  Downloading starlette-0.38.2-py3-none-any.whl.metadata (5.9 kB)
Collecting h11>=0.8 (from uvicorn)
  Downloading h11-0.14.0-py3-none-any.whl.metadata (8.2 kB)
Downloading fastapi-0.112.1-py3-none-any.whl (93 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.2/93.2 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyngrok-7.2.0-py3-none-any.whl (22 kB)
Downloading uvicorn-0.30.6-py3-none-any.whl (62 kB)
[2K   [90m━━━━━━

In [None]:
!ngrok authtoken 'YOUR_NGROK_AUTHTOKEN'

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
%%writefile app.py
import asyncio
import sched
import time
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse
import pandas as pd
import h2o
from h2o.automl import H2OAutoML
from io import StringIO
import uuid

app = FastAPI()
h2o.init()

# in-memory storage for tasks (the queue that contains tasks and their ids)
tasks = {}
scheduler = sched.scheduler(time.time, time.sleep)

# the predifined statuses possible (4)
class TaskStatus:
    WAITING = "waiting"
    IN_PROGRESS = "in_progress"
    DONE = "done"
    ERROR = "error"

# the function that limits rows in the dataset in order to minimize training time
def limit_rows(df, max_rows=500):
    num_rows, num_cols = df.shape
    df_non_null = df.dropna()
    # if the rows num exceeds the threshold, we extract randomly 500 rows from the dataset
    if len(df_non_null) > max_rows:
        df_non_null = df_non_null.sample(n=max_rows, random_state=42)
    return df_non_null

def train_model_task(task_id, file_content):
    try:
        # change the task's status to in progress and put the dataset in a h2oframe
        tasks[task_id]['status'] = TaskStatus.IN_PROGRESS
        csv_data = StringIO(file_content.decode('utf-8'))
        df = pd.read_csv(csv_data)
        df = limit_rows(df)
        h2o_df = h2o.H2OFrame(df)
        x = h2o_df.columns
        y = x[-1]
        x.remove(y)
        # define the problem type (classification if target variable is not numeric or if numeric and unique values are less than 10, else it's regression)
        prob_type = "regression"
        target_unique_values = h2o_df[y].unique().nrow
        if (h2o_df[y].isnumeric()[0] and target_unique_values < 10) or not h2o_df[y].isnumeric()[0]:
            prob_type = "classification"
            h2o_df[y] = h2o_df[y].asfactor()
        # train h2o model with three algos and a max of 10 models
        include_algos = ["GLM", "GBM", "XGBoost"]
        aml = H2OAutoML(max_models=10, seed=1, include_algos=include_algos)
        aml.train(x=x, y=y, training_frame=h2o_df)
        model = aml.leader
        model_metrics = model.model_performance()._metric_json
        # compare the metrics of the leader with the threshold, if they are less than the threshold, we include deeplearning
        include_dl = False
        if prob_type == "classification": #if classification we compare the logloss to the threshold
            if float(model_metrics['logloss']) > 0.2:
                include_dl = True
        elif float(model_metrics['r2']) < 0.8: #if regression we compare the r2 to the threshold
            include_dl = True

        if include_dl: # if the metrics of the model are not satisfying (we included deeplearning)
            metric = model_metrics['logloss'] if prob_type == "classification" else model_metrics['r2']
            # train another h2o model with including only deeplearning as algo and a maximum of 3 models (to miniize train time)
            aml2 = H2OAutoML(max_models=2, seed=1, include_algos=["DeepLearning"])
            aml2.train(x=x, y=y, training_frame=h2o_df)
            model2 = aml2.leader
            # we compare the metrics of the two leaders we got and we take the leader with the best metrics
            if prob_type == "classification":
                if float(model2.model_performance()._metric_json['logloss']) < float(model_metrics['logloss']):
                    model = model2
            elif float(model2.model_performance()._metric_json['r2']) > float(model_metrics['r2']):
                model = model2
        # save the best model and extract its metrics in a json object
        model_path = h2o.save_model(model=model, path="./models", force=True)
        model_metrics = model.model_performance()._metric_json

        #if the problem is classification, we extract the metrics : auc, logloss and mse
        if prob_type == "classification":
            model_details = {
                'model_id': model.model_id,
                'model_type': model.algo,
                'model_path': model_path,
                'model_category': model_metrics['model_category'],
                'AUC': model_metrics['AUC'],
                'logloss': model_metrics['logloss'],
                'MSE': model_metrics['MSE'],
            }
        else: #if the problem is classification, we extract the metrics : mse, rmse and r2
            model_details = {
                'model_id': model.model_id,
                'model_type': model.algo,
                'model_path': model_path,
                'model_category': model_metrics['model_category'],
                'MSE': model_metrics['MSE'],
                'RMSE': model_metrics['RMSE'],
                'R2': model_metrics['r2']
            }

        # change the task's status to done and add it's metrics and file path in the tasks queue
        tasks[task_id]['status'] = TaskStatus.DONE
        tasks[task_id]['model_details'] = model_details
    except Exception as e:
        tasks[task_id]['status'] = TaskStatus.ERROR
        tasks[task_id]['error'] = str(e)

@app.post('/train')
async def train_model(file: UploadFile = File(...)):
    # generate a random id for the task and add it to the queue withe status waiting
    task_id = str(uuid.uuid4())
    tasks[task_id] = {'status': TaskStatus.WAITING}
    file_content = await file.read()
    # schedule the task to be executed immediately (0) with priority 1, so all tasks will be executed with FIFO
    scheduler.enter(0, 1, train_model_task, (task_id, file_content))
    asyncio.create_task(run_scheduler())
    #return the task's id
    return JSONResponse(content={'task_id': task_id})

@app.get('/monitor/{task_id}') #returns the json object containing the task's info
async def monitor_task(task_id: str):
    task = tasks.get(task_id)
    if not task:
        return JSONResponse(status_code=404, content={'error': 'Task not found'})
    return JSONResponse(content=task)

@app.post('/predict')
async def predict_model(task_id: str = Form(...), file: UploadFile = File(...)):
    #fetch task details based on the task_id
    task_details = tasks.get(task_id)
    if task_details is None:
        return JSONResponse(status_code=404, content={'error': 'Task not found'})
    #check if the task is still in progress
    if task_details['status'] != "done":
        return JSONResponse(content={"message": "Task is still in progress"})
    model_details = task_details.get('model_details')
    if model_details is None or model_details.get('model_path') is None:
        return JSONResponse(status_code=404, content={'error': 'Model path not found'})

    modelpath = model_details['model_path']
    csv_data = StringIO((await file.read()).decode('utf-8'))
    input_df = pd.read_csv(csv_data)
    h2o_input_df = h2o.H2OFrame(input_df)
    #load the saved h2o model
    model = h2o.load_model(modelpath)
    #make the prediction and convert it to a dataframe then dictionaries list (because the list is json serializable)
    predictions = model.predict(h2o_input_df)
    predictions_df = predictions.as_data_frame()
    return JSONResponse(content=predictions_df.to_dict(orient="records"))

async def run_scheduler():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, scheduler.run)

@app.on_event("startup")
async def startup_event():
    loop = asyncio.get_running_loop()
    loop.create_task(run_scheduler())


Overwriting app.py


In [None]:
import uvicorn
from pyngrok import ngrok

ngrok.kill()
public_url = ngrok.connect(8000)
print(f"Public URL: {public_url}")

!pip install python-multipart
!uvicorn app:app --host 0.0.0.0 --port 8000 --reload

Public URL: NgrokTunnel: "https://20af-34-83-164-103.ngrok-free.app" -> "http://localhost:8000"
[32mINFO[0m:     Will watch for changes in these directories: ['/content']
[32mINFO[0m:     Uvicorn running on [1mhttp://0.0.0.0:8000[0m (Press CTRL+C to quit)
[32mINFO[0m:     Started reloader process [[36m[1m3627[0m] using [36m[1mStatReload[0m
Checking whether there is an H2O instance running at http://localhost:54321. connected.
--------------------------  -----------------------------------------------------------------------------------------
H2O_cluster_uptime:         10 mins 40 secs
H2O_cluster_timezone:       Etc/UTC
H2O_data_parsing_timezone:  UTC
H2O_cluster_version:        3.46.0.4
H2O_cluster_version_age:    1 month and 6 days
H2O_cluster_name:           H2O_from_python_unknownUser_p5tqe7
H2O_cluster_total_nodes:    1
H2O_cluster_free_memory:    3.167 Gb
H2O_cluster_total_cores:    2
H2O_cluster_allowed_cores:  2
H2O_cluster_status:         locked, healthy
H2O_conn



[32mINFO[0m:     Shutting down
[32mINFO[0m:     Waiting for application shutdown.
[32mINFO[0m:     Application shutdown complete.
[32mINFO[0m:     Finished server process [[36m3633[0m]
[32mINFO[0m:     Stopping reloader process [[36m[1m3627[0m]
