diff --git a/observatory_platform/schema/dataset_release.json b/observatory_platform/schema/dataset_release.json index fb787c20a..df7ce0873 100644 --- a/observatory_platform/schema/dataset_release.json +++ b/observatory_platform/schema/dataset_release.json @@ -6,7 +6,7 @@ "description": "The Airflow DAG ID, e.g. doi_workflow" }, { - "name": "dataset_id", + "name": "entity_id", "mode": "REQUIRED", "type": "STRING", "description": "A unique identifier to represent the dataset being processed." @@ -83,4 +83,5 @@ "type": "TIMESTAMP", "description": "The date that this record was modified." } -] \ No newline at end of file +] + diff --git a/observatory_platform/tests/test_dataset_api.py b/observatory_platform/tests/test_dataset_api.py index 89c2bdcd7..2866bd6dd 100644 --- a/observatory_platform/tests/test_dataset_api.py +++ b/observatory_platform/tests/test_dataset_api.py @@ -32,18 +32,18 @@ def __init__(self, *args, **kwargs): def test_add_dataset_release(self): env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location) bq_dataset_id = env.add_dataset(prefix="dataset_api") - api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location) + api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location) with env.create(): api.seed_db() # Add dataset release dag_id = "doi_workflow" - dataset_id = "doi" + entity_id = "doi" dt = pendulum.now() expected = DatasetRelease( dag_id=dag_id, - dataset_id=dataset_id, + entity_id=entity_id, dag_run_id="test", created=dt, modified=dt, @@ -68,19 +68,19 @@ def test_add_dataset_release(self): def test_get_dataset_releases(self): env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location) bq_dataset_id = env.add_dataset(prefix="dataset_api") - api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location) + api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location) expected = [] with env.create(): api.seed_db() # Create dataset releases dag_id = "doi_workflow" - dataset_id = "doi" + entity_id = "doi" for i in range(10): dt = pendulum.now() release = DatasetRelease( dag_id=dag_id, - dataset_id=dataset_id, + entity_id=entity_id, dag_run_id="test", created=dt, modified=dt, @@ -92,44 +92,44 @@ def test_get_dataset_releases(self): expected.sort(key=lambda r: r.created, reverse=True) # Get releases - actual = api.get_dataset_releases(dag_id=dag_id, dataset_id=dataset_id) + actual = api.get_dataset_releases(dag_id=dag_id, entity_id=entity_id) self.assertListEqual(expected, actual) def test_is_first_release(self): env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location) bq_dataset_id = env.add_dataset(prefix="dataset_api") - api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location) + api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location) with env.create(): api.seed_db() dag_id = "doi_workflow" - dataset_id = "doi" + entity_id = "doi" # Is first release - is_first = api.is_first_release(dag_id=dag_id, dataset_id=dataset_id) + is_first = api.is_first_release(dag_id=dag_id, entity_id=entity_id) self.assertTrue(is_first) # Not first release dt = pendulum.now() release = DatasetRelease( dag_id=dag_id, - dataset_id=dataset_id, + entity_id=entity_id, dag_run_id="test", created=dt, modified=dt, ) api.add_dataset_release(release) - is_first = api.is_first_release(dag_id=dag_id, dataset_id=dataset_id) + is_first = api.is_first_release(dag_id=dag_id, entity_id=entity_id) self.assertFalse(is_first) def test_get_latest_dataset_release(self): dag_id = "doi_workflow" - dataset_id = "doi" + entity_id = "doi" dt = pendulum.now() releases = [ DatasetRelease( dag_id=dag_id, - dataset_id=dataset_id, + entity_id=entity_id, dag_run_id="test", created=dt, modified=dt, @@ -137,7 +137,7 @@ def test_get_latest_dataset_release(self): ), DatasetRelease( dag_id=dag_id, - dataset_id=dataset_id, + entity_id=entity_id, dag_run_id="test", created=dt, modified=dt, @@ -145,7 +145,7 @@ def test_get_latest_dataset_release(self): ), DatasetRelease( dag_id=dag_id, - dataset_id=dataset_id, + entity_id=entity_id, dag_run_id="test", created=dt, modified=dt, @@ -154,7 +154,7 @@ def test_get_latest_dataset_release(self): ] env = SandboxEnvironment(project_id=self.project_id, data_location=self.data_location) bq_dataset_id = env.add_dataset(prefix="dataset_api") - api = DatasetAPI(project_id=self.project_id, dataset_id=bq_dataset_id, location=self.data_location) + api = DatasetAPI(bq_project_id=self.project_id, bq_dataset_id=bq_dataset_id, location=self.data_location) with env.create(): api.seed_db() @@ -163,7 +163,7 @@ def test_get_latest_dataset_release(self): for release in releases: api.add_dataset_release(release) - latest = api.get_latest_dataset_release(dag_id=dag_id, dataset_id=dataset_id, date_key="snapshot_date") + latest = api.get_latest_dataset_release(dag_id=dag_id, entity_id=entity_id, date_key="snapshot_date") self.assertEqual(releases[-1], latest) def test_build_schedule(self):