Skip to content

Commit

Permalink
Terraform deploy may 2023 fixes (#620)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Jun 13, 2023
1 parent 19eb82b commit 431da66
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 23 deletions.
7 changes: 4 additions & 3 deletions observatory-platform/observatory/platform/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

# BigQuery single query byte limit.
# Daily limit is set in Terraform
BIGQUERY_SINGLE_QUERY_BYTE_LIMIT = 1024 * 1024 * 1024 * 1024 * 1 # 1 TiB
BIGQUERY_SINGLE_QUERY_BYTE_LIMIT = int(2 * 2**40) # 2 TiB


def assert_table_id(table_id: str):
Expand Down Expand Up @@ -919,10 +919,11 @@ def bq_upsert_records(
main_columns = bq_select_columns(table_id=main_table_id)
upsert_columns = bq_select_columns(table_id=upsert_table_id)

# Assert that the column names and data types in main_table and upsert_table are the same
# Assert that the column names and data types in main_table and upsert_table are the same and in the same order
# Must be in same order for upsert to work
assert (
main_columns == upsert_columns
), f"bq_upsert_records: columns in {main_table_id} do not match {upsert_table_id}"
), f"bq_upsert_records: columns in {main_table_id} do not match {upsert_table_id} or are not in the same order"

# Check that primary_key is in both tables
# The data_type of primary_key must match because of the above assert
Expand Down
16 changes: 2 additions & 14 deletions observatory-platform/observatory/platform/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ resource "google_storage_bucket_iam_member" "observatory_airflow_bucket_observat

# Necessary to define the network so that the VMs can talk to the Cloud SQL database.
locals {
network_name = "observatory-network"
network_name = "ao-network"
}

resource "google_compute_network" "observatory_network" {
Expand Down Expand Up @@ -381,19 +381,6 @@ resource "google_service_networking_connection" "private_vpc_connection" {
depends_on = [google_project_service.services]
}

resource "random_id" "database_protector" {
count = var.environment == "production" ? 1 : 0
byte_length = 8
keepers = {
observatory_db_instance = google_sql_database_instance.observatory_db_instance.id
airflow_db = google_sql_database.airflow_db.id
users = google_sql_user.observatory_user.id
}
lifecycle {
prevent_destroy = true
}
}

resource "random_id" "airflow_db_name_suffix" {
byte_length = 4
}
Expand All @@ -417,6 +404,7 @@ resource "google_sql_database_instance" "observatory_db_instance" {
location = var.google_cloud.data_location
start_time = var.cloud_sql_database.backup_start_time
}
deletion_protection_enabled = true # Stops the machine being deleted at the GCP platform level
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ output "airflow_worker_vm_ip_address" {
output "airflow_main_vm_external_ip" {
value = try(google_compute_address.airflow_main_vm_static_external_ip, null)
description = "The external IP address of the Airflow Main VM."
sensitive = true
}

output "airflow_worker_vm_external_ip" {
value = try(google_compute_address.airflow_worker_vm_static_external_ip, null)
description = "The external IP address of the Airflow Worker VM."
sensitive = true
}

output "airflow_main_vm_script" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ def __init__(
:param tags: List of dag tags.
"""

if tags is None:
tags = []

super().__init__(
dag_id=dag_id,
start_date=start_date,
Expand Down Expand Up @@ -392,9 +395,10 @@ def list_release_info(self, **kwargs):

# Create a dictionary to store the latest table IDs for each table name
latest_table_ids = dict()
snapshot_date = kwargs["next_execution_date"].subtract(microseconds=1).date()
snapshot_date = pendulum.datetime(snapshot_date.year, snapshot_date.month, snapshot_date.day)
snapshot_date = kwargs["data_interval_end"]
logging.info("Searching export tables")
for table_id in all_table_ids:
logging.info(f"Found: {table_id}")
_, _, _, table_name, shard_date = bq_table_id_parts(table_id)
if shard_date is None:
raise AirflowException(
Expand All @@ -403,22 +407,30 @@ def list_release_info(self, **kwargs):

# Only include tables made on or before the current snapshot_date
if shard_date > snapshot_date:
logging.info(f"Skipping as shard_date > snapshot_date: {table_id}")
continue

# Sort tables by their shard date
if table_name not in latest_table_ids:
logging.info(f"Adding to latest_table_ids: {table_id}")
latest_table_ids[table_name] = table_id
else:
latest_table_id = latest_table_ids[table_name]
_, _, _, _, latest_shard_date = bq_table_id_parts(latest_table_id)
if shard_date > latest_shard_date:
logging.info(f"Updating latest_table_ids: {table_id}")
latest_table_ids[shard_date] = table_id

# Make a list of the latest table_ids
logging.info("Exporting tables:")
table_ids = []
for _, table_id in latest_table_ids.items():
logging.info(table_id)
table_ids.append(table_id)

if len(table_ids) == 0:
raise AirflowException("No tables found to export")

# Push table ids and release date
ti: TaskInstance = kwargs["ti"]
ti.xcom_push(Workflow.RELEASE_INFO, {"snapshot_date": snapshot_date.format("YYYYMMDD"), "table_ids": table_ids})
Expand Down
2 changes: 1 addition & 1 deletion tests/observatory/platform/cli/test_cli_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def copy_workflows_project(self, temp_dir: str):
test_fixtures_path("cli", self.workflows_package_name), os.path.join(temp_dir, self.workflows_package_name)
)

def assert_dags_loaded(self, expected_dag_ids: Set, config: ObservatoryConfig, dag_check_timeout: int = 30):
def assert_dags_loaded(self, expected_dag_ids: Set, config: ObservatoryConfig, dag_check_timeout: int = 180):
"""Assert that DAGs loaded into Airflow.
:param expected_dag_ids: the expected DAG ids.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def test_telescope(self, mock_kibana):

# Run Dummy Dags
execution_date = pendulum.datetime(year=2021, month=5, day=16)
snapshot_date = pendulum.datetime(year=2021, month=5, day=22)
snapshot_date = pendulum.datetime(year=2021, month=5, day=23)
expected_state = "success"
doi_dag = make_dummy_dag(dag_id_sensor, execution_date)
with env.create_dag_run(doi_dag, execution_date):
Expand Down Expand Up @@ -475,15 +475,15 @@ def test_telescope(self, mock_kibana):

# Test delete_stale_indices task
# Artificially load extra indices for ao-author
elastic.create_index("ao-author-20210523")
elastic.create_index("ao-author-20210524")
elastic.create_index("ao-author-20210525")
elastic.create_index("ao-author-20210526")
ti = env.run_task(workflow.delete_stale_indices.__name__)
self.assertEqual(expected_state, ti.state)
indices_after_cleanup = set(elastic.list_indices("ao-author-*"))
self.assertEqual(len(indices_after_cleanup), 2)
self.assertTrue("ao-author-20210526" in indices_after_cleanup)
self.assertTrue("ao-author-20210525" in indices_after_cleanup)
self.assertTrue("ao-author-20210524" in indices_after_cleanup)

# Test list create_kibana_index_patterns info task
expected_index_pattern_id = expected_alias_id
Expand Down

0 comments on commit 431da66

Please sign in to comment.