Skip to content

Commit

Permalink
Add aws_bucket_test_env
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Apr 28, 2023
1 parent 7fdf53e commit 0d4b381
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
from multiprocessing import Process
from typing import Dict, List, Optional
from typing import Tuple
from unittest.mock import patch

import boto3
import croniter
import freezegun
import google
Expand Down Expand Up @@ -108,6 +110,7 @@
from sftpserver.stub_sftp import StubServer, StubSFTPServer

from observatory.api.testing import ObservatoryApiEnvironment
from observatory.platform.bigquery import bq_create_dataset
from observatory.platform.bigquery import (
bq_sharded_table_id,
bq_load_table,
Expand Down Expand Up @@ -174,6 +177,59 @@ def save_empty_file(path: str, file_name: str) -> str:
return file_path


@contextlib.contextmanager
def bq_dataset_test_env(*, project_id: str, location: str, prefix: str):
client = bigquery.Client()
dataset_id = prefix + "_" + random_id()
try:
bq_create_dataset(project_id=project_id, dataset_id=dataset_id, location=location)
yield dataset_id
finally:
client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)


@contextlib.contextmanager
def aws_bucket_test_env(
*, aws_key: Tuple[str, str], prefix: str, expiration_days=1, region_name: str = "ap-southeast-2"
) -> str:
# Create an S3 client
aws_access_key_id, aws_secret_access_key = aws_key
session = boto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
)
s3 = session.client("s3", region_name=region_name)
bucket_name = prefix + "-" + random_id()
try:
s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": region_name})
# Set up the lifecycle configuration
lifecycle_configuration = {
"Rules": [
{"ID": "ExpireObjects", "Status": "Enabled", "Filter": {}, "Expiration": {"Days": expiration_days}}
]
}

# Apply the lifecycle configuration to the bucket
s3.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lifecycle_configuration)
yield bucket_name
except Exception as e:
raise e
finally:
# Get a reference to the bucket
s3_resource = session.resource("s3")
bucket = s3_resource.Bucket(bucket_name)

# Delete all objects and versions in the bucket
bucket.objects.all().delete()
bucket.object_versions.all().delete()

# Delete the bucket
bucket.delete()

print(f"Bucket {bucket_name} deleted")


class ObservatoryEnvironment:
OBSERVATORY_HOME_KEY = "OBSERVATORY_HOME"

Expand Down
79 changes: 47 additions & 32 deletions tests/observatory/platform/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

# Author: James Diprose, Aniek Roelofs

import contextlib
import datetime
import json
import os
Expand Down Expand Up @@ -53,10 +52,7 @@
bq_query_bytes_budget_check,
)
from observatory.platform.gcs import gcs_delete_old_buckets_with_prefix, gcs_upload_file
from observatory.platform.observatory_environment import (
random_id,
test_fixtures_path,
)
from observatory.platform.observatory_environment import random_id, test_fixtures_path, bq_dataset_test_env


class TestGoogleCloudUtilsNoAuth(unittest.TestCase):
Expand All @@ -68,17 +64,6 @@ def test_bigquery_sharded_table_id(self):
self.assertEqual(expected, actual)


@contextlib.contextmanager
def bq_test_env(*, project_id: str, location: str, prefix: str):
client = bigquery.Client()
dataset_id = prefix + "_" + random_id()
try:
bq_create_dataset(project_id=project_id, dataset_id=dataset_id, location=location)
yield dataset_id
finally:
client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)


class TestBigQuery(unittest.TestCase):
__init__already = False

Expand Down Expand Up @@ -116,7 +101,7 @@ def test_bq_select_latest_table(self, mock_sel_table_suffixes):
self.assertEqual(table_id, actual_table_id)

def test_bq_create_dataset(self):
# This test doesn't use bq_test_env as it is testing bq_create_dataset
# This test doesn't use bq_dataset_test_env as it is testing bq_create_dataset
dataset_id = self.prefix + "_" + random_id()
client = bigquery.Client()
try:
Expand All @@ -131,7 +116,9 @@ def test_bq_create_empty_table(self):
test_data_path = test_fixtures_path("utils")
schema_file_path = os.path.join(test_data_path, "people_schema.json")

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
# Create table with all parameters set
table_id = bq_table_id(self.gc_project_id, dataset_id, "with_schema")
table = bq_create_empty_table(
Expand Down Expand Up @@ -228,15 +215,19 @@ def test_bq_run_query(self):
self.assertRaises(Exception, bq_run_query, query, bytes_budget=1000)

def test_bq_copy_table(self):
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
src_table_id = self.patents_table_id
dst_table_id = bq_table_id(self.gc_project_id, dataset_id, "figures")
success = bq_copy_table(src_table_id=src_table_id, dst_table_id=dst_table_id)
self.assertTrue(success)
self.assertTrue(bq_table_exists(table_id=dst_table_id))

def test_bq_create_view(self):
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
# Create a view
query = f"SELECT * FROM `{self.patents_table_id}` LIMIT 3"
view_id = bq_table_id(self.gc_project_id, dataset_id, "test_view")
Expand All @@ -261,7 +252,9 @@ def test_bq_create_table_from_query_without_schema(self):
SELECT * FROM presidents
"""

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
# Test with clustering fields
table_id = bq_table_id(self.gc_project_id, dataset_id, "clustered")
success = bq_create_table_from_query(
Expand Down Expand Up @@ -292,7 +285,9 @@ def test_bq_create_table_from_query_with_schema(self):
SELECT * FROM presidents
"""

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
with CliRunner().isolated_filesystem():
schema = [
{"mode": "NULLABLE", "name": "name", "type": "STRING", "description": "Foo Bar"},
Expand Down Expand Up @@ -334,7 +329,9 @@ def test_bq_create_table_from_query_bytes_within_budget(self):
SELECT * FROM presidents
"""

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents")
success = bq_create_table_from_query(
sql=query,
Expand All @@ -356,7 +353,9 @@ def test_bq_create_table_from_query_bytes_over_budget(self):
SELECT * FROM presidents
"""

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents")
self.assertRaises(
Exception,
Expand All @@ -368,7 +367,9 @@ def test_bq_create_table_from_query_bytes_over_budget(self):
)

def test_bq_list_datasets_with_prefix(self):
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
# Get list of datasets under project
dataset_list = bq_list_datasets_with_prefix()
dataset_names = [dataset.dataset_id for dataset in dataset_list]
Expand Down Expand Up @@ -410,7 +411,9 @@ def test_bq_select_table_shard_dates(self):
release_3 = pendulum.date(year=2019, month=7, day=1)
query = f"SELECT * FROM `{self.patents_table_id}` LIMIT 1"

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
bq_create_table_from_query(
sql=query,
table_id=bq_sharded_table_id(self.gc_project_id, dataset_id, "fundref", release_1),
Expand Down Expand Up @@ -489,7 +492,9 @@ def test_bq_load_table(self):
json_file_path = os.path.join(test_data_path, "people.jsonl")
json_blob_name = f"people_{random_id()}.jsonl"

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
try:
# Upload CSV to storage bucket
result, upload = gcs_upload_file(
Expand Down Expand Up @@ -578,7 +583,9 @@ def test_bq_select_columns(self):
)

def test_bq_upsert_records(self):
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
# Create main table
main_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents")
sql = """
Expand Down Expand Up @@ -666,7 +673,9 @@ def insert_data(
)
self.assertTrue(success)

with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
main_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents")
delete_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents_deletes")
insert_data(main_table_id, delete_table_id)
Expand All @@ -690,7 +699,9 @@ def insert_data(
self.assertEqual(expected, results)

# Delete records: different primary key
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
main_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents")
delete_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents_deletes")
insert_data(main_table_id, delete_table_id, delete_key="hello")
Expand All @@ -714,7 +725,9 @@ def insert_data(
self.assertEqual(expected, results)

# Delete records: add a prefix
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
main_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents")
delete_table_id = bq_table_id(self.gc_project_id, dataset_id, "presidents_deletes")
insert_data(main_table_id, delete_table_id, main_prefix="President", delete_prefix="")
Expand All @@ -740,7 +753,9 @@ def insert_data(
self.assertEqual(expected, results)

def test_bq_snapshot(self):
with bq_test_env(project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix) as dataset_id:
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
# First copy data to src_table_id
src_table_id = bq_table_id(self.gc_project_id, dataset_id, "figures")
success = bq_copy_table(
Expand Down

0 comments on commit 0d4b381

Please sign in to comment.