Skip to content

Commit

Permalink
Updated naming convention
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed May 29, 2024
1 parent 9188399 commit afc5fa2
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions observatory_platform/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
class DatasetRelease:
id: int
dag_id: str
dataset_id: str
entity_id: str
dag_run_id: str
data_interval_start: pendulum.DateTime
data_interval_end: pendulum.DateTime
Expand All @@ -55,7 +55,7 @@ def __init__(
self,
*,
dag_id: str,
dataset_id: str,
entity_id: str,
dag_run_id: str,
created: pendulum.DateTime,
modified: pendulum.DateTime,
Expand All @@ -72,7 +72,7 @@ def __init__(
"""Construct a DatasetRelease object.
:param dag_id: the DAG ID.
:param dataset_id: the dataset ID.
:param entity_id: the release's unique entity ID.
:param dag_run_id: the DAG's run ID.
:param created: datetime created in UTC.
:param modified: datetime modified in UTC.
Expand All @@ -89,7 +89,7 @@ def __init__(
"""

self.dag_id = dag_id
self.dataset_id = dataset_id
self.entity_id = entity_id
self.dag_run_id = dag_run_id
self.data_interval_start = data_interval_start
self.data_interval_end = data_interval_end
Expand All @@ -113,7 +113,7 @@ def from_dict(_dict: Dict) -> DatasetRelease:

return DatasetRelease(
dag_id=_dict["dag_id"],
dataset_id=_dict["dataset_id"],
entity_id=_dict["entity_id"],
dag_run_id=_dict["dag_run_id"],
created=bq_timestamp_to_pendulum(_dict["created"]),
modified=bq_timestamp_to_pendulum(_dict["modified"]),
Expand All @@ -136,7 +136,7 @@ def to_dict(self) -> Dict:

return dict(
dag_id=self.dag_id,
dataset_id=self.dataset_id,
entity_id=self.entity_id,
dag_run_id=self.dag_run_id,
created=self.created.to_iso8601_string(),
modified=self.modified.to_iso8601_string(),
Expand All @@ -160,31 +160,31 @@ def __eq__(self, other):
class DatasetAPI:
def __init__(
self,
project_id: str = None,
dataset_id: str = "dataset_api",
table_id: str = "dataset_releases",
bq_project_id: str = None,
bq_dataset_id: str = "dataset_api",
bq_table_id: str = "dataset_releases",
location: str = "us",
client: Optional[bigquery.Client] = None,
):
"""Create a DatasetAPI instance.
:param project_id: the BigQuery project ID.
:param dataset_id: the BigQuery dataset ID.
:param table_id: the BigQuery table ID.
:param bq_project_id: the BigQuery project ID.
:param bq_dataset_id: the BigQuery dataset ID.
:param bq_table_id: the BigQuery table ID.
:param location: the BigQuery dataset location.
:param client: Optional BigQuery client.
"""

parts = []
if project_id is None:
project_id = get_bigquery_default_project()
parts.append(project_id)
parts.append(dataset_id)
parts.append(table_id)

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
if bq_project_id is None:
bq_project_id = get_bigquery_default_project()
parts.append(bq_project_id)
parts.append(bq_dataset_id)
parts.append(bq_table_id)

self.bq_project_id = bq_project_id
self.bq_dataset_id = bq_dataset_id
self.bq_table_id = bq_table_id
self.location = location
self.client = client
self.full_table_id = ".".join(parts)
Expand All @@ -198,8 +198,8 @@ def seed_db(self):

# Create BigQuery dataset if it does not exist
bq_create_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id,
project_id=self.bq_project_id,
dataset_id=self.bq_dataset_id,
location=self.location,
description="Observatory Platform Dataset Release API",
client=self.client,
Expand Down Expand Up @@ -232,12 +232,12 @@ def add_dataset_release(self, release: DatasetRelease):
raise Exception("Failed to add dataset release")

def get_dataset_releases(
self, *, dag_id: str, dataset_id: str, date_key: str = "created", limit: int | None = None
self, *, dag_id: str, entity_id: str, date_key: str = "created", limit: int | None = None
) -> List[DatasetRelease]:
"""Get a list of dataset releases for a given dataset.
:param dag_id: dag id.
:param dataset_id: Dataset id.
:param entity_id: Dataset id.
:param date_key: the date key to use when sorting by date. One of: "created", "modified", "data_interval_start",
"data_interval_end", "snapshot_date", "partition_date", "changefile_start_date" or "changefile_end_date".
:param limit: the maximum number of rows to return.
Expand All @@ -257,7 +257,7 @@ def get_dataset_releases(
if date_key not in valid_date_keys:
raise ValueError(f"get_dataset_releases: invalid date_key: {date_key}, should be one of: {valid_date_keys}")

sql = [f"SELECT * FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND dataset_id = '{dataset_id}'"]
sql = [f"SELECT * FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND entity_id = '{entity_id}'"]
sql.append(f"ORDER BY {date_key} DESC")
if limit is not None:
sql.append(f"LIMIT {limit}")
Expand All @@ -270,31 +270,31 @@ def get_dataset_releases(

return results

def get_latest_dataset_release(self, *, dag_id: str, dataset_id: str, date_key: str) -> Optional[DatasetRelease]:
def get_latest_dataset_release(self, *, dag_id: str, entity_id: str, date_key: str) -> Optional[DatasetRelease]:
"""Get the latest dataset release.
:param dag_id: the Airflow DAG id.
:param dataset_id: the dataset id.
:param entity_id: the dataset id.
:param date_key: the date key. One of: "created", "modified", "data_interval_start", "data_interval_end",
"snapshot_date", "partition_date", "changefile_start_date" or "changefile_end_date".
:return: the latest release or None if there is no release.
"""

releases = self.get_dataset_releases(dag_id=dag_id, dataset_id=dataset_id, date_key=date_key, limit=1)
releases = self.get_dataset_releases(dag_id=dag_id, entity_id=entity_id, date_key=date_key, limit=1)
if len(releases) == 0:
return None
return releases[0]

def is_first_release(self, *, dag_id: str, dataset_id: str) -> bool:
def is_first_release(self, *, dag_id: str, entity_id: str) -> bool:
"""Use the API to check whether this is the first release of a dataset, i.e., are there no dataset release records.
:param dag_id: DAG ID.
:param dataset_id: dataset id.
:param entity_id: dataset id.
:return: Whether this is the first release.
"""

results = bq_run_query(
f"SELECT COUNT(*) as count FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND dataset_id = '{dataset_id}'",
f"SELECT COUNT(*) as count FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND entity_id = '{entity_id}'",
client=self.client,
)
count = results[0]["count"]
Expand Down

0 comments on commit afc5fa2

Please sign in to comment.