Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/data/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
CIResultObjectCollision,
CIResultObjectExisting,
CIResultObjectNew,
DesignationRecord,
ICRSRecord,
NatureRecord,
Record,
RecordCrossmatch,
RecordWithPGC,
RedshiftRecord,
)
from app.data.model.redshift import RedshiftCatalogObject
from app.data.model.table import (
Expand Down Expand Up @@ -46,7 +49,10 @@
"CIResultObjectCollision",
"CIResultObjectExisting",
"CIResultObjectNew",
"DesignationRecord",
"ICRSRecord",
"NatureRecord",
"RedshiftRecord",
"Record",
"RecordWithPGC",
"Layer2CatalogObject",
Expand Down
1 change: 0 additions & 1 deletion app/data/model/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class RawCatalog(enum.Enum):
aggregated data on layer 2.
"""

ALL = "all"
ICRS = "icrs"
DESIGNATION = "designation"
REDSHIFT = "redshift"
Expand Down
25 changes: 25 additions & 0 deletions app/data/model/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,28 @@ class NatureRecord:
pgc: int
record_id: str
type_name: str


@dataclass
class ICRSRecord:
pgc: int
record_id: str
ra: float
e_ra: float
dec: float
e_dec: float


@dataclass
class RedshiftRecord:
pgc: int
record_id: str
cz: float
e_cz: float


@dataclass
class DesignationRecord:
pgc: int
record_id: str
design: str
110 changes: 68 additions & 42 deletions app/data/repositories/layer1.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,70 +76,96 @@ def save_data(self, records: list[model.Record]) -> None:
object_count=len(table_items),
)

def get_new_observations(
self, dt: datetime.datetime, limit: int, offset: int, catalog: model.RawCatalog
) -> list[model.RecordWithPGC]:
"""
Returns all objects that were modified since `dt`.
`limit` is the number of PGC numbers to select, not the final number of objects.
As such, this function will return around
`limit * (average_number_of_observations_per_PGC)` objects, not `limit`.

`offset` is the first PGC number from which to start selecting.

This makes the function safe for aggregation - for each returned PGC all of its objects will be returned.
"""
object_cls = model.get_catalog_object_type(catalog)

query = f"""SELECT *
FROM {object_cls.layer1_table()} AS l1
def get_new_nature_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.NatureRecord]:
query = """SELECT o.pgc, l1.record_id, l1.type_name
FROM nature.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.pgc IN (
SELECT DISTINCT o.pgc
FROM {object_cls.layer1_table()} AS l1
FROM nature.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.modification_time > %s AND o.pgc > %s
ORDER BY o.pgc
LIMIT %s
)
ORDER BY o.pgc ASC"""

rows = self._storage.query(query, params=[dt, offset, limit])
return [model.NatureRecord(pgc=int(r["pgc"]), record_id=r["record_id"], type_name=r["type_name"]) for r in rows]

record_data: dict[tuple[int, str], list[model.CatalogObject]] = {}

for row in rows:
record_id = row.pop("record_id")
pgc = int(row.pop("pgc"))
catalog_object = object_cls.from_layer1(row)

key = (pgc, record_id)
if key not in record_data:
record_data[key] = []
record_data[key].append(catalog_object)

records: list[model.RecordWithPGC] = []
for (pgc, record_id), catalog_objects in record_data.items():
record_info = model.Record(id=record_id, data=catalog_objects)
records.append(model.RecordWithPGC(pgc, record_info))

return records
def get_new_icrs_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.ICRSRecord]:
query = """SELECT o.pgc, l1.record_id, l1.ra, l1.e_ra, l1.dec, l1.e_dec
FROM icrs.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.pgc IN (
SELECT DISTINCT o.pgc
FROM icrs.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.modification_time > %s AND o.pgc > %s
ORDER BY o.pgc
LIMIT %s
)
ORDER BY o.pgc ASC"""
rows = self._storage.query(query, params=[dt, offset, limit])
return [
model.ICRSRecord(
pgc=int(r["pgc"]),
record_id=r["record_id"],
ra=float(r["ra"]),
e_ra=float(r["e_ra"]),
dec=float(r["dec"]),
e_dec=float(r["e_dec"]),
)
for r in rows
]

def get_new_nature_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.NatureRecord]:
query = """SELECT o.pgc, l1.record_id, l1.type_name
FROM nature.data AS l1
def get_new_redshift_records(self, dt: datetime.datetime, limit: int, offset: int) -> list[model.RedshiftRecord]:
query = """SELECT o.pgc, l1.record_id, l1.cz, l1.e_cz
FROM cz.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.pgc IN (
SELECT DISTINCT o.pgc
FROM nature.data AS l1
FROM cz.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.modification_time > %s AND o.pgc > %s
ORDER BY o.pgc
LIMIT %s
)
ORDER BY o.pgc ASC"""
rows = self._storage.query(query, params=[dt, offset, limit])
return [model.NatureRecord(pgc=int(r["pgc"]), record_id=r["record_id"], type_name=r["type_name"]) for r in rows]
return [
model.RedshiftRecord(
pgc=int(r["pgc"]),
record_id=r["record_id"],
cz=float(r["cz"]),
e_cz=float(r["e_cz"]),
)
for r in rows
]

def get_new_designation_records(
self, dt: datetime.datetime, limit: int, offset: int
) -> list[model.DesignationRecord]:
query = """SELECT o.pgc, l1.record_id, l1.design
FROM designation.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.pgc IN (
SELECT DISTINCT o.pgc
FROM designation.data AS l1
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.modification_time > %s AND o.pgc > %s
ORDER BY o.pgc
LIMIT %s
)
ORDER BY o.pgc ASC"""
rows = self._storage.query(query, params=[dt, offset, limit])
return [
model.DesignationRecord(
pgc=int(r["pgc"]),
record_id=r["record_id"],
design=r["design"],
)
for r in rows
]

def query_records(
self,
Expand Down
42 changes: 8 additions & 34 deletions app/data/repositories/layer2/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ def update_last_update_time(self, dt: datetime.datetime, catalog: model.RawCatal
params=[dt, catalog.value],
)

def get_column_units(self, schema: str, table: str) -> dict[str, str]:
rows = self._storage.query(
"SELECT column_name, param->>'unit' as unit FROM meta.column_info "
"WHERE schema_name = %s AND table_name = %s AND param->>'unit' IS NOT NULL",
params=[schema, table],
)
return {row["column_name"]: row["unit"] for row in rows}

def get_orphaned_pgcs(self, catalogs: list[model.RawCatalog]) -> dict[str, list[int]]:
result: dict[str, list[int]] = {}
for catalog in catalogs:
Expand Down Expand Up @@ -64,40 +72,6 @@ def remove_pgcs(self, catalogs: list[model.RawCatalog], pgcs: list[int]) -> None
query = f"DELETE FROM {layer2_table} WHERE pgc = ANY(%s)"
self._storage.exec(query, params=[pgcs])

def save_data(self, objects: list[model.Layer2CatalogObject]):
objects_by_table: dict[str, list[model.Layer2CatalogObject]] = {}
for obj in objects:
table = obj.catalog_object.layer2_table()
if table not in objects_by_table:
objects_by_table[table] = []
objects_by_table[table].append(obj)

for table, table_objects in objects_by_table.items():
if len(table_objects) == 0:
continue

columns = table_objects[0].catalog_object.layer2_keys() + ["pgc"]

params = []
for obj in table_objects:
data = obj.catalog_object.layer2_data()
data["pgc"] = obj.pgc

params.extend([data.get(column, None) for column in columns])

placeholders = f"({','.join(['%s'] * len(columns))})"

value_groups = ",".join([placeholders] * len(table_objects))
on_conflict_statement = ", ".join([f"{column} = EXCLUDED.{column}" for column in columns])

query = f"""
INSERT INTO {table} ({", ".join(columns)})
VALUES {value_groups}
ON CONFLICT (pgc) DO UPDATE SET {on_conflict_statement}
"""

self._storage.exec(query, params=params)

def save(self, table: str, columns: list[str], pgcs: list[int], data: list[list[Any]]) -> None:
if not pgcs:
return
Expand Down
3 changes: 3 additions & 0 deletions app/lib/logging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from app.lib.logging.table import print_table

__all__ = ["print_table"]
55 changes: 55 additions & 0 deletions app/lib/logging/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from collections.abc import Sequence


def print_table(
headers: Sequence[str],
rows: Sequence[Sequence[str | int]],
sections: Sequence[tuple[str, Sequence[Sequence[str | int]]]] | None = None,
min_column_widths: Sequence[int] | None = None,
) -> None:
ncols = len(headers)
min_widths = list(min_column_widths) if min_column_widths else [0] * ncols
while len(min_widths) < ncols:
min_widths.append(0)

def cell_str(c: str | int) -> str:
return str(c)

def col_width(col_index: int, extra_rows: Sequence[Sequence[str | int]]) -> int:
candidates = [len(headers[col_index]), min_widths[col_index]]
for row in rows:
candidates.append(len(cell_str(row[col_index])))
if sections:
for title, section_rows in sections:
if col_index == 0:
candidates.append(len(title))
for row in section_rows:
candidates.append(len(cell_str(row[col_index])))
for row in extra_rows:
candidates.append(len(cell_str(row[col_index])))
return max(candidates)

widths = [col_width(i, ()) for i in range(ncols)]
alignments = ["<"] + [">"] * (ncols - 1)

def sep_line() -> str:
return "+" + "+".join("-" * (w + 2) for w in widths) + "+"

def data_line(cells: Sequence[str | int], align: Sequence[str] | None = None) -> str:
al = align if align else alignments
return "| " + " | ".join(f"{cell_str(c):{al[i]}{widths[i]}}" for i, c in enumerate(cells)) + " |"

lines = [sep_line(), data_line(headers), sep_line()]
for row in rows:
lines.append(data_line(row))
lines.append(sep_line())

if sections:
for title, section_rows in sections:
lines.append(data_line([title] + [""] * (ncols - 1)))
for row in section_rows:
lines.append(data_line(row))
lines.append(sep_line())

for line in lines:
print(line)
Loading
Loading