Skip to content

Commit

Permalink
Updated naming convention (#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed May 31, 2024
1 parent 17e2949 commit 7b06f0f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 52 deletions.
64 changes: 32 additions & 32 deletions observatory_platform/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
class DatasetRelease:
id: int
dag_id: str
dataset_id: str
entity_id: str
dag_run_id: str
data_interval_start: pendulum.DateTime
data_interval_end: pendulum.DateTime
Expand All @@ -55,7 +55,7 @@ def __init__(
self,
*,
dag_id: str,
dataset_id: str,
entity_id: str,
dag_run_id: str,
created: pendulum.DateTime,
modified: pendulum.DateTime,
Expand All @@ -72,7 +72,7 @@ def __init__(
"""Construct a DatasetRelease object.
:param dag_id: the DAG ID.
:param dataset_id: the dataset ID.
:param entity_id: the release's unique entity ID.
:param dag_run_id: the DAG's run ID.
:param created: datetime created in UTC.
:param modified: datetime modified in UTC.
Expand All @@ -89,7 +89,7 @@ def __init__(
"""

self.dag_id = dag_id
self.dataset_id = dataset_id
self.entity_id = entity_id
self.dag_run_id = dag_run_id
self.data_interval_start = data_interval_start
self.data_interval_end = data_interval_end
Expand All @@ -113,7 +113,7 @@ def from_dict(_dict: Dict) -> DatasetRelease:

return DatasetRelease(
dag_id=_dict["dag_id"],
dataset_id=_dict["dataset_id"],
entity_id=_dict["entity_id"],
dag_run_id=_dict["dag_run_id"],
created=bq_timestamp_to_pendulum(_dict["created"]),
modified=bq_timestamp_to_pendulum(_dict["modified"]),
Expand All @@ -136,7 +136,7 @@ def to_dict(self) -> Dict:

return dict(
dag_id=self.dag_id,
dataset_id=self.dataset_id,
entity_id=self.entity_id,
dag_run_id=self.dag_run_id,
created=self.created.to_iso8601_string(),
modified=self.modified.to_iso8601_string(),
Expand All @@ -160,31 +160,31 @@ def __eq__(self, other):
class DatasetAPI:
def __init__(
self,
project_id: str = None,
dataset_id: str = "dataset_api",
table_id: str = "dataset_releases",
bq_project_id: str = None,
bq_dataset_id: str = "dataset_api",
bq_table_id: str = "dataset_releases",
location: str = "us",
client: Optional[bigquery.Client] = None,
):
"""Create a DatasetAPI instance.
:param project_id: the BigQuery project ID.
:param dataset_id: the BigQuery dataset ID.
:param table_id: the BigQuery table ID.
:param bq_project_id: the BigQuery project ID.
:param bq_dataset_id: the BigQuery dataset ID.
:param bq_table_id: the BigQuery table ID.
:param location: the BigQuery dataset location.
:param client: Optional BigQuery client.
"""

parts = []
if project_id is None:
project_id = get_bigquery_default_project()
parts.append(project_id)
parts.append(dataset_id)
parts.append(table_id)

self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
if bq_project_id is None:
bq_project_id = get_bigquery_default_project()
parts.append(bq_project_id)
parts.append(bq_dataset_id)
parts.append(bq_table_id)

self.bq_project_id = bq_project_id
self.bq_dataset_id = bq_dataset_id
self.bq_table_id = bq_table_id
self.location = location
self.client = client
self.full_table_id = ".".join(parts)
Expand All @@ -198,8 +198,8 @@ def seed_db(self):

# Create BigQuery dataset if it does not exist
bq_create_dataset(
project_id=self.project_id,
dataset_id=self.dataset_id,
project_id=self.bq_project_id,
dataset_id=self.bq_dataset_id,
location=self.location,
description="Observatory Platform Dataset Release API",
client=self.client,
Expand Down Expand Up @@ -232,12 +232,12 @@ def add_dataset_release(self, release: DatasetRelease):
raise Exception("Failed to add dataset release")

def get_dataset_releases(
self, *, dag_id: str, dataset_id: str, date_key: str = "created", limit: int | None = None
self, *, dag_id: str, entity_id: str, date_key: str = "created", limit: int | None = None
) -> List[DatasetRelease]:
"""Get a list of dataset releases for a given dataset.
:param dag_id: dag id.
:param dataset_id: Dataset id.
:param entity_id: Dataset id.
:param date_key: the date key to use when sorting by date. One of: "created", "modified", "data_interval_start",
"data_interval_end", "snapshot_date", "partition_date", "changefile_start_date" or "changefile_end_date".
:param limit: the maximum number of rows to return.
Expand All @@ -257,7 +257,7 @@ def get_dataset_releases(
if date_key not in valid_date_keys:
raise ValueError(f"get_dataset_releases: invalid date_key: {date_key}, should be one of: {valid_date_keys}")

sql = [f"SELECT * FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND dataset_id = '{dataset_id}'"]
sql = [f"SELECT * FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND entity_id = '{entity_id}'"]
sql.append(f"ORDER BY {date_key} DESC")
if limit is not None:
sql.append(f"LIMIT {limit}")
Expand All @@ -270,31 +270,31 @@ def get_dataset_releases(

return results

def get_latest_dataset_release(self, *, dag_id: str, dataset_id: str, date_key: str) -> Optional[DatasetRelease]:
def get_latest_dataset_release(self, *, dag_id: str, entity_id: str, date_key: str) -> Optional[DatasetRelease]:
"""Get the latest dataset release.
:param dag_id: the Airflow DAG id.
:param dataset_id: the dataset id.
:param entity_id: the dataset id.
:param date_key: the date key. One of: "created", "modified", "data_interval_start", "data_interval_end",
"snapshot_date", "partition_date", "changefile_start_date" or "changefile_end_date".
:return: the latest release or None if there is no release.
"""

releases = self.get_dataset_releases(dag_id=dag_id, dataset_id=dataset_id, date_key=date_key, limit=1)
releases = self.get_dataset_releases(dag_id=dag_id, entity_id=entity_id, date_key=date_key, limit=1)
if len(releases) == 0:
return None
return releases[0]

def is_first_release(self, *, dag_id: str, dataset_id: str) -> bool:
def is_first_release(self, *, dag_id: str, entity_id: str) -> bool:
"""Use the API to check whether this is the first release of a dataset, i.e., are there no dataset release records.
:param dag_id: DAG ID.
:param dataset_id: dataset id.
:param entity_id: dataset id.
:return: Whether this is the first release.
"""

results = bq_run_query(
f"SELECT COUNT(*) as count FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND dataset_id = '{dataset_id}'",
f"SELECT COUNT(*) as count FROM `{self.full_table_id}` WHERE dag_id = '{dag_id}' AND entity_id = '{entity_id}'",
client=self.client,
)
count = results[0]["count"]
Expand Down
5 changes: 3 additions & 2 deletions observatory_platform/schema/dataset_release.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"description": "The Airflow DAG ID, e.g. doi_workflow"
},
{
"name": "dataset_id",
"name": "entity_id",
"mode": "REQUIRED",
"type": "STRING",
"description": "A unique identifier to represent the dataset being processed."
Expand Down Expand Up @@ -83,4 +83,5 @@
"type": "TIMESTAMP",
"description": "The date that this record was modified."
}
]
]

36 changes: 18 additions & 18 deletions observatory_platform/tests/test_dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ def __init__(self, *args, **kwargs):
def test_add_dataset_release(self):
env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location)
bq_dataset_id = env.add_dataset(prefix="dataset_api")
api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location)
api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location)
with env.create():
api.seed_db()

# Add dataset release
dag_id = "doi_workflow"
dataset_id = "doi"
entity_id = "doi"

dt = pendulum.now()
expected = DatasetRelease(
dag_id=dag_id,
dataset_id=dataset_id,
entity_id=entity_id,
dag_run_id="test",
created=dt,
modified=dt,
Expand All @@ -68,19 +68,19 @@ def test_add_dataset_release(self):
def test_get_dataset_releases(self):
env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location)
bq_dataset_id = env.add_dataset(prefix="dataset_api")
api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location)
api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location)
expected = []
with env.create():
api.seed_db()

# Create dataset releases
dag_id = "doi_workflow"
dataset_id = "doi"
entity_id = "doi"
for i in range(10):
dt = pendulum.now()
release = DatasetRelease(
dag_id=dag_id,
dataset_id=dataset_id,
entity_id=entity_id,
dag_run_id="test",
created=dt,
modified=dt,
Expand All @@ -92,60 +92,60 @@ def test_get_dataset_releases(self):
expected.sort(key=lambda r: r.created, reverse=True)

# Get releases
actual = api.get_dataset_releases(dag_id=dag_id, dataset_id=dataset_id)
actual = api.get_dataset_releases(dag_id=dag_id, entity_id=entity_id)
self.assertListEqual(expected, actual)

def test_is_first_release(self):
env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location)
bq_dataset_id = env.add_dataset(prefix="dataset_api")
api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location)
api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location)
with env.create():
api.seed_db()

dag_id = "doi_workflow"
dataset_id = "doi"
entity_id = "doi"

# Is first release
is_first = api.is_first_release(dag_id=dag_id, dataset_id=dataset_id)
is_first = api.is_first_release(dag_id=dag_id, entity_id=entity_id)
self.assertTrue(is_first)

# Not first release
dt = pendulum.now()
release = DatasetRelease(
dag_id=dag_id,
dataset_id=dataset_id,
entity_id=entity_id,
dag_run_id="test",
created=dt,
modified=dt,
)
api.add_dataset_release(release)
is_first = api.is_first_release(dag_id=dag_id, dataset_id=dataset_id)
is_first = api.is_first_release(dag_id=dag_id, entity_id=entity_id)
self.assertFalse(is_first)

def test_get_latest_dataset_release(self):
dag_id = "doi_workflow"
dataset_id = "doi"
entity_id = "doi"
dt = pendulum.now()
releases = [
DatasetRelease(
dag_id=dag_id,
dataset_id=dataset_id,
entity_id=entity_id,
dag_run_id="test",
created=dt,
modified=dt,
snapshot_date=pendulum.datetime(2022, 1, 1),
),
DatasetRelease(
dag_id=dag_id,
dataset_id=dataset_id,
entity_id=entity_id,
dag_run_id="test",
created=dt,
modified=dt,
snapshot_date=pendulum.datetime(2023, 1, 1),
),
DatasetRelease(
dag_id=dag_id,
dataset_id=dataset_id,
entity_id=entity_id,
dag_run_id="test",
created=dt,
modified=dt,
Expand All @@ -154,7 +154,7 @@ def test_get_latest_dataset_release(self):
]
env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location)
bq_dataset_id = env.add_dataset(prefix="dataset_api")
api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location)
api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location)

with env.create():
api.seed_db()
Expand All @@ -163,7 +163,7 @@ def test_get_latest_dataset_release(self):
for release in releases:
api.add_dataset_release(release)

latest = api.get_latest_dataset_release(dag_id=dag_id, dataset_id=dataset_id, date_key="snapshot_date")
latest = api.get_latest_dataset_release(dag_id=dag_id, entity_id=entity_id, date_key="snapshot_date")
self.assertEqual(releases[-1], latest)

def test_build_schedule(self):
Expand Down

0 comments on commit 7b06f0f

Please sign in to comment.