Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions uploader/app/action_description.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json
from contextvars import ContextVar, Token

import attrs

_action_description: ContextVar[str | None] = ContextVar("action_description", default=None)


def build(task_id: str, run_id: str, parameters: dict[str, object]) -> str:
return json.dumps(
{"task_id": task_id, "run_id": run_id, "parameters": parameters},
sort_keys=True,
separators=(",", ":"),
)


def set_current(description: str) -> Token[str | None]:
return _action_description.set(description)


def reset_current(token: Token[str | None]) -> None:
_action_description.reset(token)


def current() -> str | None:
return _action_description.get()


def apply[T](body: T) -> T:
desc = current()
if desc is None:
return body
return attrs.evolve(body, action_description=desc)
13 changes: 8 additions & 5 deletions uploader/app/crossmatch/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import matplotlib.pyplot as plt
from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app import log
from uploader.app.crossmatch.models import (
Expand Down Expand Up @@ -361,11 +362,13 @@ def _write_crossmatch_results(
handle_call(
set_crossmatch_results.sync_detailed(
client=client,
body=SetCrossmatchResultsRequest(
statuses=StatusesPayload(
new=new_pl if new_pl is not None else UNSET,
existing=existing_pl if existing_pl is not None else UNSET,
collided=collided_pl if collided_pl is not None else UNSET,
body=action_description.apply(
SetCrossmatchResultsRequest(
statuses=StatusesPayload(
new=new_pl if new_pl is not None else UNSET,
existing=existing_pl if existing_pl is not None else UNSET,
collided=collided_pl if collided_pl is not None else UNSET,
),
),
),
)
Expand Down
5 changes: 4 additions & 1 deletion uploader/app/crossmatch/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app.storage import PgStorage
from uploader.app.upload import handle_call
Expand Down Expand Up @@ -69,7 +70,9 @@ def run_submit_crossmatch(
handle_call(
assign_record_pgcs.sync_detailed(
client=client,
body=AssignRecordPgcsRequest(record_ids=record_ids),
body=action_description.apply(
AssignRecordPgcsRequest(record_ids=record_ids),
),
)
)
submitted += len(record_ids)
Expand Down
13 changes: 8 additions & 5 deletions uploader/app/structured/designations/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import matplotlib.pyplot as plt
from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app import log
from uploader.app.display import format_table
Expand Down Expand Up @@ -162,11 +163,13 @@ def upload_designations(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="designation",
columns=["design"],
ids=batch_ids,
data=batch_names,
body=action_description.apply(
SaveStructuredDataRequest(
catalog="designation",
columns=["design"],
ids=batch_ids,
data=batch_names,
),
),
)
)
Expand Down
15 changes: 9 additions & 6 deletions uploader/app/structured/geometry/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app.display import format_table
from uploader.app.lib.expression import Expression, parse
Expand Down Expand Up @@ -255,12 +256,14 @@ def upload_geometry_isophotal(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="geometry",
columns=geometry_columns,
ids=batch_ids,
data=batch_data,
units=geometry_units,
body=action_description.apply(
SaveStructuredDataRequest(
catalog="geometry",
columns=geometry_columns,
ids=batch_ids,
data=batch_data,
units=geometry_units,
),
),
),
)
Expand Down
15 changes: 9 additions & 6 deletions uploader/app/structured/icrs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import numpy as np
from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app.display import format_table
from uploader.app.lib.expression import Expression, parse
Expand Down Expand Up @@ -193,12 +194,14 @@ def upload_icrs(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="icrs",
columns=ICRS_COLUMNS,
ids=batch_ids,
data=batch_data,
units=units,
body=action_description.apply(
SaveStructuredDataRequest(
catalog="icrs",
columns=ICRS_COLUMNS,
ids=batch_ids,
data=batch_data,
units=units,
),
),
)
)
Expand Down
13 changes: 8 additions & 5 deletions uploader/app/structured/nature/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import matplotlib.pyplot as plt
from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app.display import format_table
from uploader.app.lib.rawdata import rawdata_batches
Expand Down Expand Up @@ -95,11 +96,13 @@ def upload_nature(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="nature",
columns=NATURE_COLUMNS,
ids=batch_ids,
data=batch_data,
body=action_description.apply(
SaveStructuredDataRequest(
catalog="nature",
columns=NATURE_COLUMNS,
ids=batch_ids,
data=batch_data,
),
),
)
)
Expand Down
13 changes: 8 additions & 5 deletions uploader/app/structured/note.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections.abc import Callable

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app.upload import handle_call
from uploader.clients.gen.client import adminapi
Expand All @@ -20,11 +21,13 @@ def upload_note(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="note",
columns=NOTE_COLUMNS,
ids=[record_id],
data=[[note]],
body=action_description.apply(
SaveStructuredDataRequest(
catalog="note",
columns=NOTE_COLUMNS,
ids=[record_id],
data=[[note]],
),
),
)
)
Expand Down
15 changes: 9 additions & 6 deletions uploader/app/structured/photometry/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app import log
from uploader.app.display import format_table
Expand Down Expand Up @@ -78,12 +79,14 @@ def upload_photometry_hyperleda(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="photometry",
columns=PHOTOMETRY_COLUMNS,
ids=batch_ids,
data=batch_data,
units=PHOTOMETRY_UNITS,
body=action_description.apply(
SaveStructuredDataRequest(
catalog="photometry",
columns=PHOTOMETRY_COLUMNS,
ids=batch_ids,
data=batch_data,
units=PHOTOMETRY_UNITS,
),
),
)
)
Expand Down
15 changes: 9 additions & 6 deletions uploader/app/structured/redshift/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from psycopg import sql

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app.display import format_table
from uploader.app.lib.rawdata import rawdata_batches
Expand Down Expand Up @@ -125,12 +126,14 @@ def upload_redshift(
handle_call(
save_structured_data.sync_detailed(
client=client,
body=SaveStructuredDataRequest(
catalog="redshift",
columns=REDSHIFT_COLUMNS,
ids=batch_ids,
data=batch_data,
units=REDSHIFT_UNITS,
body=action_description.apply(
SaveStructuredDataRequest(
catalog="redshift",
columns=REDSHIFT_COLUMNS,
ids=batch_ids,
data=batch_data,
units=REDSHIFT_UNITS,
),
),
)
)
Expand Down
33 changes: 20 additions & 13 deletions uploader/app/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader.app import interface, log
from uploader.app.display import format_table
Expand Down Expand Up @@ -103,10 +104,12 @@ def _upload(
resp = handle_call(
create_source.sync_detailed(
client=client,
body=models.CreateSourceRequest(
title=pub_name,
authors=pub_authors,
year=pub_year,
body=action_description.apply(
models.CreateSourceRequest(
title=pub_name,
authors=pub_authors,
year=pub_year,
),
),
)
)
Expand All @@ -116,12 +119,14 @@ def _upload(
resp = handle_call(
create_table.sync_detailed(
client=client,
body=models.CreateTableRequest(
table_name=table_name,
columns=schema,
bibcode=bibcode,
datatype=models.DataType[table_type],
description=table_description,
body=action_description.apply(
models.CreateTableRequest(
table_name=table_name,
columns=schema,
bibcode=bibcode,
datatype=models.DataType[table_type],
description=table_description,
),
),
)
)
Expand Down Expand Up @@ -163,9 +168,11 @@ def process_chunk(data: pd.DataFrame) -> None:
_ = handle_call(
add_data.sync_detailed(
client=client,
body=models.AddDataRequest(
table_name=table_name,
data=request_data,
body=action_description.apply(
models.AddDataRequest(
table_name=table_name,
data=request_data,
),
),
)
)
Expand Down
5 changes: 5 additions & 0 deletions uploader/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from pydantic import BaseModel

import uploader.app.action_description as action_description
import uploader.app.report as report
from uploader import history
from uploader.app.log import logger
Expand Down Expand Up @@ -127,6 +128,9 @@ def report_func(event: report.Event) -> None:

def worker() -> None:
nonlocal final_status, final_message
token = action_description.set_current(
action_description.build(task_id, run_id, form.model_dump(mode="json")),
)
try:
defn.handler(form, report_func)
except TaskCancelledError:
Expand All @@ -137,6 +141,7 @@ def worker() -> None:
message = f"{e}\n\n{traceback.format_exc()}"
append_report_event(report.ErrorEvent(message=message))
finally:
action_description.reset_current(token)
run.done.set()
if defn.rerunnable and final_status is not None:
history.append_entry(
Expand Down
Loading