From b1b4a58a9338f81a8977371f01f48a42730f1acd Mon Sep 17 00:00:00 2001 From: felix0496 Date: Tue, 29 Nov 2022 14:00:15 +0100 Subject: [PATCH 1/4] save weak supervision stats as pickle --- controller/integration.py | 21 +++++++++++++++++++-- start | 15 +++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/controller/integration.py b/controller/integration.py index a34f621..7942d07 100644 --- a/controller/integration.py +++ b/controller/integration.py @@ -1,6 +1,8 @@ +import os from typing import Any, Dict, List, Tuple import traceback import pandas as pd +import pickle from collections import defaultdict from submodules.model.models import ( @@ -25,7 +27,8 @@ def fit_predict( task_type, df = collect_data(project_id, labeling_task_id, True) try: if task_type == enums.LabelingTaskType.CLASSIFICATION.value: - results = integrate_classification(df) + results = integrate_classification(project_id, labeling_task_id, df) + else: results = integrate_extraction(df) weak_supervision.store_data( @@ -48,9 +51,23 @@ def fit_predict( ) -def integrate_classification(df: pd.DataFrame): +def integrate_classification(project_id: str, labeling_task_id: str, df: pd.DataFrame): cnlm = util.get_cnlm_from_df(df) weak_supervision_results = cnlm.weakly_supervise() + + with open( + os.path.join( + "/inference", project_id, f"weak-supervision-{labeling_task_id}.pkl" + ), + "wb", + ) as f: + stats_df = cnlm.quality_metrics() + if len(stats_df) != 0: + stats_lkp = stats_df.set_index(["identifier", "label_name"]).to_dict( + orient="index" + ) + pickle.dump(stats_lkp, f) + return_values = defaultdict(list) for record_id, ( label_id, diff --git a/start b/start index b066f6d..5b2dc13 100755 --- a/start +++ b/start @@ -5,6 +5,20 @@ echo -ne 'stopping old container...' docker stop refinery-weak-supervisor > /dev/null 2>&1 echo -ne '\t [done]\n' +INFERENCE_DIR=${PWD%/*}/dev-setup/inference/ +if [ ! -d "$_DIR" ] +then + INFERENCE_DIR=${PWD%/*/*}/dev-setup/inference/ + if [ ! -d "$INFERENCE_DIR" ] + then + # to include volume for local development, use the dev-setup model data folder: + # alternative use manual logic with + # -v /path/to/dev-setup/model-data:/models \ + echo "Can't find model data directory: $INFERENCE_DIR -> stopping" + exit 1 + fi +fi + echo -ne 'building container...' docker build -t refinery-weak-supervisor-dev -f dev.Dockerfile . > /dev/null 2>&1 echo -ne '\t\t [done]\n' @@ -17,6 +31,7 @@ docker run -d --rm \ -e WS_NOTIFY_ENDPOINT="http://refinery-websocket:8080" \ --mount type=bind,source="$(pwd)"/,target=/app \ -v /var/run/docker.sock:/var/run/docker.sock \ +-v "$INFERENCE_DIR":/inference \ --network dev-setup_default \ refinery-weak-supervisor-dev > /dev/null 2>&1 echo -ne '\t\t\t [done]\n' From 63b8b814bf46184d8b861b7e7d8726fd023bec5b Mon Sep 17 00:00:00 2001 From: felix0496 Date: Wed, 30 Nov 2022 16:02:06 +0100 Subject: [PATCH 2/4] adds endpoint to export weak supervision statistics --- app.py | 27 ++++++++++++++++---- controller/integration.py | 53 ++++++++++++++++++++++++++------------- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/app.py b/app.py index 53dd07b..e790bc6 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,5 @@ -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException, responses +from pydantic import BaseModel from controller import stats from controller import integration @@ -7,8 +8,6 @@ # API creation and description app = FastAPI() -from pydantic import BaseModel - class WeakSupervisionRequest(BaseModel): project_id: str @@ -29,6 +28,11 @@ class SourceStatsRequest(BaseModel): user_id: str +class ExportWsStatsRequest(BaseModel): + project_id: str + labeling_task_id: str + + @app.post("/fit_predict") async def weakly_supervise(request: WeakSupervisionRequest) -> int: session_token = general.get_ctx_token() @@ -43,7 +47,7 @@ async def weakly_supervise(request: WeakSupervisionRequest) -> int: @app.post("/labeling_task_statistics") -async def calculate_stats(request: TaskStatsRequest): +async def calculate_task_stats(request: TaskStatsRequest): session_token = general.get_ctx_token() stats.calculate_quality_statistics_for_labeling_task( request.project_id, request.labeling_task_id, request.user_id @@ -53,7 +57,7 @@ async def calculate_stats(request: TaskStatsRequest): @app.post("/source_statistics") -async def calculate_stats(request: SourceStatsRequest): +async def calculate_source_stats(request: SourceStatsRequest): session_token = general.get_ctx_token() has_coverage = stats.calculate_quantity_statistics_for_labeling_task_from_source( request.project_id, request.source_id, request.user_id @@ -64,3 +68,16 @@ async def calculate_stats(request: SourceStatsRequest): ) general.remove_and_refresh_session(session_token) return None, 200 + + +@app.post("/export_ws_stats") +async def export_ws_stats(request: ExportWsStatsRequest) -> responses.HTMLResponse: + session_token = general.get_ctx_token() + status_code, message = integration.export_weak_supervision_stats( + request.project_id, request.labeling_task_id + ) + general.remove_and_refresh_session(session_token) + + if status_code != 200: + raise HTTPException(status_code=status_code, detail=message) + return responses.HTMLResponse(status_code=status_code) diff --git a/controller/integration.py b/controller/integration.py index 7942d07..7bbd3df 100644 --- a/controller/integration.py +++ b/controller/integration.py @@ -27,7 +27,7 @@ def fit_predict( task_type, df = collect_data(project_id, labeling_task_id, True) try: if task_type == enums.LabelingTaskType.CLASSIFICATION.value: - results = integrate_classification(project_id, labeling_task_id, df) + results = integrate_classification(df) else: results = integrate_extraction(df) @@ -40,7 +40,7 @@ def fit_predict( weak_supervision_task_id, with_commit=True, ) - except: + except Exception: print(traceback.format_exc(), flush=True) general.rollback() weak_supervision.update_state( @@ -51,23 +51,42 @@ def fit_predict( ) -def integrate_classification(project_id: str, labeling_task_id: str, df: pd.DataFrame): - cnlm = util.get_cnlm_from_df(df) - weak_supervision_results = cnlm.weakly_supervise() +def export_weak_supervision_stats( + project_id: str, labeling_task_id: str +) -> Tuple[int, str]: - with open( - os.path.join( - "/inference", project_id, f"weak-supervision-{labeling_task_id}.pkl" - ), - "wb", - ) as f: - stats_df = cnlm.quality_metrics() - if len(stats_df) != 0: - stats_lkp = stats_df.set_index(["identifier", "label_name"]).to_dict( - orient="index" - ) - pickle.dump(stats_lkp, f) + task_type, df = collect_data(project_id, labeling_task_id, False) + try: + if task_type == enums.LabelingTaskType.CLASSIFICATION.value: + cnlm = util.get_cnlm_from_df(df) + stats_df = cnlm.quality_metrics() + if len(stats_df) != 0: + stats_lkp = stats_df.set_index(["identifier", "label_name"]).to_dict( + orient="index" + ) + else: + return 404, "Can't compute weak supervision" + + os.makedirs(os.path.join("/inference", project_id), exist_ok=True) + with open( + os.path.join( + "/inference", project_id, f"weak-supervision-{labeling_task_id}.pkl" + ), + "wb", + ) as f: + pickle.dump(stats_lkp, f) + else: + return 404, f"Task type {task_type} not implemented" + except Exception: + print(traceback.format_exc(), flush=True) + general.rollback() + return 500, "Internal server error" + return 200, "OK" + +def integrate_classification(df: pd.DataFrame): + cnlm = util.get_cnlm_from_df(df) + weak_supervision_results = cnlm.weakly_supervise() return_values = defaultdict(list) for record_id, ( label_id, From 08232b79ac3b1afae67606153dc6258985288487 Mon Sep 17 00:00:00 2001 From: felix0496 Date: Thu, 12 Jan 2023 14:06:27 +0100 Subject: [PATCH 3/4] adds export of weak supervision statistics for extraction tasks --- controller/integration.py | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/controller/integration.py b/controller/integration.py index 7bbd3df..47b5256 100644 --- a/controller/integration.py +++ b/controller/integration.py @@ -60,23 +60,28 @@ def export_weak_supervision_stats( if task_type == enums.LabelingTaskType.CLASSIFICATION.value: cnlm = util.get_cnlm_from_df(df) stats_df = cnlm.quality_metrics() - if len(stats_df) != 0: - stats_lkp = stats_df.set_index(["identifier", "label_name"]).to_dict( - orient="index" - ) - else: - return 404, "Can't compute weak supervision" - - os.makedirs(os.path.join("/inference", project_id), exist_ok=True) - with open( - os.path.join( - "/inference", project_id, f"weak-supervision-{labeling_task_id}.pkl" - ), - "wb", - ) as f: - pickle.dump(stats_lkp, f) + elif task_type == enums.LabelingTaskType.INFORMATION_EXTRACTION.value: + enlm = util.get_enlm_from_df(df) + stats_df = enlm.quality_metrics() else: return 404, f"Task type {task_type} not implemented" + + if len(stats_df) != 0: + stats_lkp = stats_df.set_index(["identifier", "label_name"]).to_dict( + orient="index" + ) + else: + return 404, "Can't compute weak supervision" + + os.makedirs(os.path.join("/inference", project_id), exist_ok=True) + with open( + os.path.join( + "/inference", project_id, f"weak-supervision-{labeling_task_id}.pkl" + ), + "wb", + ) as f: + pickle.dump(stats_lkp, f) + except Exception: print(traceback.format_exc(), flush=True) general.rollback() From be61d1c92d9f1d011ad580f1b24deb0c9d5d93ec Mon Sep 17 00:00:00 2001 From: felix0496 Date: Mon, 30 Jan 2023 16:53:18 +0100 Subject: [PATCH 4/4] pr comments --- start | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/start b/start index 5b2dc13..cd6c64f 100755 --- a/start +++ b/start @@ -11,9 +11,9 @@ then INFERENCE_DIR=${PWD%/*/*}/dev-setup/inference/ if [ ! -d "$INFERENCE_DIR" ] then - # to include volume for local development, use the dev-setup model data folder: + # to include volume for local development, use the dev-setup inference folder: # alternative use manual logic with - # -v /path/to/dev-setup/model-data:/models \ + # -v /path/to/dev-setup/inference:/models \ echo "Can't find model data directory: $INFERENCE_DIR -> stopping" exit 1 fi