From 62f710dce9f898cc43bc497580aca23f9ecd4a8d Mon Sep 17 00:00:00 2001 From: Charini Nanayakkara Date: Tue, 7 Jul 2020 09:26:53 +1000 Subject: [PATCH 1/5] Initial implementation of sub provider retrieval from Smithsonian at API level --- .../dags/provider_api_scripts/smithsonian.py | 27 ++++++- .../provider_api_scripts/test_smithsonian.py | 31 +++++--- .../dags/util/loader/provider_details.py | 73 +++++++++++++++++++ 3 files changed, 118 insertions(+), 13 deletions(-) diff --git a/src/cc_catalog_airflow/dags/provider_api_scripts/smithsonian.py b/src/cc_catalog_airflow/dags/provider_api_scripts/smithsonian.py index c36d811f..8325aa7e 100644 --- a/src/cc_catalog_airflow/dags/provider_api_scripts/smithsonian.py +++ b/src/cc_catalog_airflow/dags/provider_api_scripts/smithsonian.py @@ -15,6 +15,8 @@ from common.storage import image from common import requester +from util.loader import provider_details as prov + logger = logging.getLogger(__name__) API_KEY = os.getenv('DATA_GOV_API_KEY') @@ -24,7 +26,9 @@ API_ROOT = 'https://api.si.edu/openaccess/api/v1.0/' SEARCH_ENDPOINT = API_ROOT + 'search' UNITS_ENDPOINT = API_ROOT + 'terms/unit_code' -PROVIDER = 'smithsonian' +PROVIDER = prov.SMITHSONIAN_DEFAULT_PROVIDER +# SUB_PROVIDERS is a collection of all providers within smithsonian +SUB_PROVIDERS = prov.SMITHSONIAN_SUB_PROVIDERS ZERO_URL = 'https://creativecommons.org/publicdomain/zero/1.0/' DEFAULT_PARAMS = { 'api_key': API_KEY, @@ -218,13 +222,16 @@ def _process_response_json(response_json): for row in rows: image_list = _get_image_list(row) if image_list: + meta_data = _extract_meta_data(row) + source = _extract_source(meta_data) total_images = _process_image_list( image_list, _get_foreign_landing_url(row), _get_title(row), _get_creator(row), - _extract_meta_data(row), - _extract_tags(row) + meta_data, + _extract_tags(row), + source ) return total_images @@ -313,6 +320,16 @@ def _extract_meta_data(row, description_types=DESCRIPTION_TYPES): return {k: v for (k, v) in meta_data.items() if v is not None} +def _extract_source(meta_data, sub_providers=SUB_PROVIDERS): + unit_code = meta_data.get('unit_code').strip() + source = next((s for s in sub_providers if unit_code in + sub_providers[s]), None) + if source is None: + raise Exception( + f"An unknown unit code value {unit_code} encountered ") + return source + + def _extract_tags(row, tag_types=TAG_TYPES): indexed_structured = _get_indexed_structured_dict(row) tag_lists_generator = ( @@ -387,6 +404,7 @@ def _process_image_list( creator, meta_data, tags, + source, license_url=ZERO_URL ): total_images = None @@ -404,7 +422,8 @@ def _process_image_list( title=title, creator=creator, meta_data=meta_data, - raw_tags=tags + raw_tags=tags, + source=source ) return total_images diff --git a/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py b/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py index 5b0ef0eb..2eac897f 100644 --- a/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py +++ b/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py @@ -247,6 +247,7 @@ def test_process_response_json_uses_required_getters(): creator_list = ['creator0', 'creator1'] metadata_list = ['metadata0', 'metadata1'] tags_list = ['tags0', 'tags1'] + source_list = ['source0', 'source1'] get_row_list = patch.object(si, '_get_row_list', return_value=row_list) process_image_list = patch.object( @@ -264,6 +265,9 @@ def test_process_response_json_uses_required_getters(): si, '_extract_meta_data', side_effect=metadata_list ) ext_tags = patch.object(si, '_extract_tags', side_effect=tags_list) + ext_source = patch.object( + si, '_extract_source', side_effect=source_list + ) with\ get_row_list as mock_get_row_list,\ @@ -271,8 +275,9 @@ def test_process_response_json_uses_required_getters(): get_flu as mock_get_foreign_landing_url,\ get_title as mock_get_title,\ get_creator as mock_get_creator,\ - ext_meta_data as mock_extract_meta_data,\ - ext_tags as mock_extract_tags,\ + ext_meta_data as mock_extract_meta_data, \ + ext_tags as mock_extract_tags, \ + ext_source as mock_extract_source, \ process_image_list as mock_process_image_list: si._process_response_json(response_json) @@ -284,7 +289,8 @@ def test_process_response_json_uses_required_getters(): title_list[0], creator_list[0], metadata_list[0], - tags_list[0] + tags_list[0], + source_list[0] ), call( image_lists[1], @@ -292,7 +298,8 @@ def test_process_response_json_uses_required_getters(): title_list[1], creator_list[1], metadata_list[1], - tags_list[1] + tags_list[1], + source_list[1] ) ] mock_get_row_list.assert_called_once_with(response_json) @@ -303,6 +310,7 @@ def test_process_response_json_uses_required_getters(): assert mock_get_creator.mock_calls == getter_calls_list assert mock_extract_meta_data.mock_calls == getter_calls_list assert mock_extract_tags.mock_calls == getter_calls_list + assert mock_extract_source.mock_calls == [call(m) for m in metadata_list] def test_get_row_list_with_no_rows(): @@ -898,8 +906,9 @@ def test_check_type_with_bad_inputs(required_type, good_indices, default): foreign_identifier='id_one', title='The Title', creator='Alice', - meta_data={'meta': 'data'}, + meta_data={'unit_code': 'NMNHBOTANY'}, raw_tags=['tag', 'list'], + source='smithsonian_national_museum_of_natural_history', ), call( foreign_landing_url='https://foreignlanding.url', @@ -909,8 +918,9 @@ def test_check_type_with_bad_inputs(required_type, good_indices, default): foreign_identifier='id_two', title='The Title', creator='Alice', - meta_data={'meta': 'data'}, + meta_data={'unit_code': 'NMNHBOTANY'}, raw_tags=['tag', 'list'], + source='smithsonian_national_museum_of_natural_history', ) ] ), @@ -946,8 +956,9 @@ def test_check_type_with_bad_inputs(required_type, good_indices, default): foreign_identifier='id_two', title='The Title', creator='Alice', - meta_data={'meta': 'data'}, + meta_data={'unit_code': 'NMNHBOTANY'}, raw_tags=['tag', 'list'], + source='smithsonian_national_museum_of_natural_history', ) ] ), @@ -983,8 +994,9 @@ def test_check_type_with_bad_inputs(required_type, good_indices, default): foreign_identifier='id_one', title='The Title', creator='Alice', - meta_data={'meta': 'data'}, + meta_data={'unit_code': 'NMNHBOTANY'}, raw_tags=['tag', 'list'], + source='smithsonian_national_museum_of_natural_history', ) ] ) @@ -999,8 +1011,9 @@ def test_process_image_list(input_media, expect_calls): foreign_landing_url='https://foreignlanding.url', title='The Title', creator='Alice', - meta_data={'meta': 'data'}, + meta_data={'unit_code': 'NMNHBOTANY'}, tags=['tag', 'list'], + source='smithsonian_national_museum_of_natural_history', license_url='https://license.url' ) assert expect_calls == mock_add_item.mock_calls diff --git a/src/cc_catalog_airflow/dags/util/loader/provider_details.py b/src/cc_catalog_airflow/dags/util/loader/provider_details.py index dff0efcc..e4618acf 100644 --- a/src/cc_catalog_airflow/dags/util/loader/provider_details.py +++ b/src/cc_catalog_airflow/dags/util/loader/provider_details.py @@ -37,3 +37,76 @@ EUROPEANA_SUB_PROVIDERS = { 'wellcome_collection': "Wellcome Collection" } + +# Smithsonian parameters +SMITHSONIAN_DEFAULT_PROVIDER = 'smithsonian' + +SMITHSONIAN_SUB_PROVIDERS = { + 'smithsonian_national_museum_of_natural_history': { + 'NMNHANTHRO', # NMNH - Paleobiology Dept. + 'NMNHBIRDS', # NMNH - Vertebrate Zoology - Birds Division + 'NMNHBOTANY', # NMNH - Botany Dept. + 'NMNHEDUCATION', # NMNH - Education & Outreach + 'NMNHENTO', # NMNH - Entomology Dept. + 'NMNHFISHES', # NMNH - Vertebrate Zoology - Fishes Division + 'NMNHHERPS', # NMNH - Vertebrate Zoology - Herpetology Division + 'NMNHINV', # NMNH - Invertebrate Zoology Dept. + 'NMNHMAMMALS', # NMNH - Vertebrate Zoology - Mammals Division + 'NMNHMINSCI', # NMNH - Mineral Sciences Dept. + 'NMNHPALEO' # NMNH - Paleobiology Dept. + }, + 'smithsonian_anacostia_museum': { + 'ACM' # Anacostia Community Museum + }, + 'smithsonian_cooper_hewitt_museum': { + 'CHNDM' # Cooper Hewitt, Smithsonian Design Museum + }, + 'smithsonian_field_book_project': { + 'FBR' # Smithsonian Field Book Project + }, + 'smithsonian_freer_gallery_of_art': { + 'FSG' # Freer Gallery of Art and Arthur M. Sackler Gallery + }, + 'smithsonian_gardens': { + 'HAC' # Smithsonian Gardens + }, + 'smithsonian_hirshhorn_museum': { + 'HMSG' # Hirshhorn Museum and Sculpture Garden + }, + 'smithsonian_anthropological_archives': { + 'NAA' # National Anthropological Archives + }, + 'smithsonian_air_and_space_museum': { + 'NASM' # National Air and Space Museum + }, + 'smithsonian_african_american_history_museum': { + 'NMAAHC' # National Museum of African American History and Culture + }, + 'smithsonian_american_history_museum': { + 'NMAH' # National Museum of American History + }, + 'smithsonian_american_indian_museum': { + 'NMAI' # National Museum of the American Indian + }, + 'smithsonian_african_art_museum': { + 'NMAfA' # National Museum of African Art + }, + 'smithsonian_portrait_gallery': { + 'NPG' # National Portrait Gallery + }, + 'smithsonian_postal_museum': { + 'NPM' # National Postal Museum + }, + 'smithsonian_zoo_and_conservation': { + 'NZP' # Smithsonian's National Zoo & Conservation Biology Institute + }, + 'smithsonian_american_art_museum': { + 'SAAM' # Smithsonian American Art Museum + }, + 'smithsonian_institution_archives': { + 'SIA' # Smithsonian Institution Archives + }, + 'smithsonian_libraries': { + 'SIL' # Smithsonian Libraries + }, +} From 83c08809ff807826e29af1e546d9d5ef06c58399 Mon Sep 17 00:00:00 2001 From: Charini Nanayakkara Date: Tue, 7 Jul 2020 21:16:19 +1000 Subject: [PATCH 2/5] Add test for sub-provider retrieval from Smithsonian at API level --- .../provider_api_scripts/test_smithsonian.py | 36 +++++ .../smithsonian/sub_provider_example.json | 147 ++++++++++++++++++ 2 files changed, 183 insertions(+) create mode 100644 src/cc_catalog_airflow/dags/provider_api_scripts/tests/resources/smithsonian/sub_provider_example.json diff --git a/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py b/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py index 2eac897f..8af71a05 100644 --- a/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py +++ b/src/cc_catalog_airflow/dags/provider_api_scripts/test_smithsonian.py @@ -20,6 +20,12 @@ ) +def _get_resource_json(json_name): + with open(os.path.join(RESOURCES, json_name)) as f: + resource_json = json.load(f) + return resource_json + + def test_get_hash_prefixes_with_len_one(): expect_prefix_list = [ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', @@ -1017,3 +1023,33 @@ def test_process_image_list(input_media, expect_calls): license_url='https://license.url' ) assert expect_calls == mock_add_item.mock_calls + + +def test_process_image_data_with_sub_provider(): + response = _get_resource_json('sub_provider_example.json') + with patch.object( + si.image_store, + 'add_item', + return_value=100 + ) as mock_add_item: + total_images = si._process_response_json(response) + + expect_meta_data = { + 'unit_code': 'SIA', + 'data_source': 'Smithsonian Institution Archives' + } + + mock_add_item.assert_called_once_with( + foreign_landing_url=None, + image_url='https://ids.si.edu/ids/deliveryService?id=SIA-SIA2010-2358', + thumbnail_url='https://ids.si.edu/ids/deliveryService?id=SIA-SIA2010-2358&max=150', + license_url='https://creativecommons.org/publicdomain/zero/1.0/', + foreign_identifier='SIA-SIA2010-2358', + creator='Gruber, Martin A', + title='Views of the National Zoological Park in Washington, DC, showing Elephant', + meta_data=expect_meta_data, + raw_tags=[ + '1920s', '1910s', 'Archival materials', 'Photographs', 'Animals'], + source='smithsonian_institution_archives' + ) + assert total_images == 100 diff --git a/src/cc_catalog_airflow/dags/provider_api_scripts/tests/resources/smithsonian/sub_provider_example.json b/src/cc_catalog_airflow/dags/provider_api_scripts/tests/resources/smithsonian/sub_provider_example.json new file mode 100644 index 00000000..d5d24355 --- /dev/null +++ b/src/cc_catalog_airflow/dags/provider_api_scripts/tests/resources/smithsonian/sub_provider_example.json @@ -0,0 +1,147 @@ +{ + "status": 200, + "responseCode": 1, + "response": { + "rows": [ + { + "id": "edanmdm-siris_arc_291918", + "title": "Views of the National Zoological Park in Washington, DC, showing Elephant", + "unitCode": "SIA", + "type": "edanmdm", + "url": "edanmdm:siris_arc_291918", + "content": { + "descriptiveNonRepeating": { + "record_ID": "siris_arc_291918", + "online_media": { + "mediaCount": 1, + "media": [ + { + "thumbnail": "https://ids.si.edu/ids/deliveryService?id=SIA-SIA2010-2358&max=150", + "idsId": "SIA-SIA2010-2358", + "usage": { + "access": "CC0", + "text": "" + }, + "type": "Images", + "content": "https://ids.si.edu/ids/deliveryService?id=SIA-SIA2010-2358" + } + ] + }, + "unit_code": "SIA", + "title_sort": "VIEWS OF THE NATIONAL ZOOLOGICAL PARK IN WASHINGTON DC SHOWING ELEPHANT", + "title": { + "label": "Title", + "content": "Views of the National Zoological Park in Washington, DC, showing Elephant" + }, + "metadata_usage": { + "access": "CC0" + }, + "data_source": "Smithsonian Institution Archives" + }, + "indexedStructured": { + "date": [ + "1920s", + "1910s" + ], + "object_type": [ + "Archival materials", + "Photographs" + ], + "name": [ + { + "type": "personal_main", + "content": "Gruber, Martin A" + }, + { + "type": "corporate_subj", + "content": "National Zoological Park (U.S.)" + } + ], + "topic": [ + "Animals" + ], + "usage_flag": [ + "Personal Paper Deposit" + ], + "online_media_type": [ + "Images" + ] + }, + "freetext": { + "date": [ + { + "label": "Date", + "content": "1919" + }, + { + "label": "Date", + "content": "C. 1920-1924" + } + ], + "identifier": [ + { + "label": "Local number", + "content": "SIA RU007355 [SIA2010-2358]" + } + ], + "notes": [ + { + "label": "Cite as", + "content": "Smithsonian Institution Archives, Record Unit 7355, Martin A. Gruber Photograph Collection, Image No. SIA2010-2358" + }, + { + "label": "Repository Loc.", + "content": "Smithsonian Institution Archives, Capital Gallery, Suite 3000, MRC 507; 600 Maryland Avenue, SW; Washington, DC 20024-2520" + } + ], + "name": [ + { + "label": "Creator", + "content": "Gruber, Martin A" + }, + { + "label": "Subject", + "content": "National Zoological Park (U.S.)" + } + ], + "topic": [ + { + "label": "Topic", + "content": "Animals" + } + ], + "dataSource": [ + { + "label": "Data Source", + "content": "Smithsonian Institution Archives" + } + ], + "objectRights": [ + { + "label": "Restrictions & Rights", + "content": "No access restrictions Many of SIA's holdings are located off-site, and advance notice is recommended to consult a collection. Please email the SIA Reference Team at osiaref@si.edu" + }, + { + "label": "Restrictions & Rights", + "content": "No Copyright - United States" + } + ], + "objectType": [ + { + "label": "Type", + "content": "Black-and-white photographs" + } + ] + } + }, + "hash": "fffe9e6a7103a3449d84f3acf75ae7260e1c0386", + "docSignature": "b943f314acfdbcd3ed740a1c4bb2774f1e0abc24_c58eacf430afa83b10f6a76fc64459d9", + "timestamp": "1580116221", + "lastTimeUpdated": "1580116213", + "version": "" + } + ], + "rowCount": 734, + "message": "content found" + } +} \ No newline at end of file From 1341dd03f8d87441f4ba3e7aa8ad7310ad6be600 Mon Sep 17 00:00:00 2001 From: Charini Nanayakkara Date: Tue, 7 Jul 2020 22:19:20 +1000 Subject: [PATCH 3/5] Initial implementation of Smithsonian sub-provider retrieval at DB level --- .../dags/util/loader/sql.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/src/cc_catalog_airflow/dags/util/loader/sql.py b/src/cc_catalog_airflow/dags/util/loader/sql.py index 7b6209b4..772cb766 100644 --- a/src/cc_catalog_airflow/dags/util/loader/sql.py +++ b/src/cc_catalog_airflow/dags/util/loader/sql.py @@ -505,3 +505,56 @@ def update_europeana_sub_providers( Drop the temporary table """ postgres.run(f'DROP TABLE public.{temp_table};') + + +def update_smithsonian_sub_providers( + postgres_conn_id, + image_table=IMAGE_TABLE_NAME, + default_provider=prov.SMITHSONIAN_DEFAULT_PROVIDER, + sub_providers=prov.SMITHSONIAN_SUB_PROVIDERS +): + postgres = PostgresHook(postgres_conn_id=postgres_conn_id) + + """ + Select all records where the source value is not yet updated + """ + select_query = dedent( + f''' + SELECT foreign_id, + {col.META_DATA} ->> 'unit_code' AS unit_code + FROM {image_table} + WHERE + {col.PROVIDER} = '{default_provider}' + AND + {col.SOURCE} = '{default_provider}'; + ''' + ) + + selected_records = postgres.get_records(select_query) + + """ + Set the source value of each selected row to the sub-provider value + corresponding to unit code. If the unit code is unknown, an error is thrown + """ + for row in selected_records: + foreign_id = row[0] + unit_code = row[1] + + source = next((s for s in sub_providers if unit_code in + sub_providers[s]), None) + if source is None: + raise Exception( + f"An unknown unit code value {unit_code} encountered ") + + postgres.run( + dedent( + f''' + UPDATE {image_table} + SET {col.SOURCE} = '{source}' + WHERE + {image_table}.{col.PROVIDER} = '{default_provider}' + AND + MD5({image_table}.{col.FOREIGN_ID}) = MD5('{foreign_id}'); + ''' + ) + ) From 9e9ba965bd8eab972bea6954488c3b84371fa8a1 Mon Sep 17 00:00:00 2001 From: Charini Nanayakkara Date: Wed, 8 Jul 2020 07:51:14 +1000 Subject: [PATCH 4/5] Add test case for checking Smithsonian sub-provider retrieval at DB level --- .../dags/util/loader/sql.py | 2 +- .../dags/util/loader/test_sql.py | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/src/cc_catalog_airflow/dags/util/loader/sql.py b/src/cc_catalog_airflow/dags/util/loader/sql.py index 772cb766..5e725605 100644 --- a/src/cc_catalog_airflow/dags/util/loader/sql.py +++ b/src/cc_catalog_airflow/dags/util/loader/sql.py @@ -520,7 +520,7 @@ def update_smithsonian_sub_providers( """ select_query = dedent( f''' - SELECT foreign_id, + SELECT {col.FOREIGN_ID}, {col.META_DATA} ->> 'unit_code' AS unit_code FROM {image_table} WHERE diff --git a/src/cc_catalog_airflow/dags/util/loader/test_sql.py b/src/cc_catalog_airflow/dags/util/loader/test_sql.py index 7a3545de..d05da13b 100644 --- a/src/cc_catalog_airflow/dags/util/loader/test_sql.py +++ b/src/cc_catalog_airflow/dags/util/loader/test_sql.py @@ -1240,3 +1240,66 @@ def test_update_europeana_sub_providers(postgres_with_load_and_image_table): assert actual_row[5] == 'wellcome_collection' else: assert actual_row[6] == 'b' and actual_row[5] == 'europeana' + + +def test_update_smithsonian_sub_providers(postgres_with_load_and_image_table): + postgres_conn_id = POSTGRES_CONN_ID + load_table = TEST_LOAD_TABLE + image_table = TEST_IMAGE_TABLE + identifier = TEST_ID + + FID_A = 'a' + FID_B = 'b' + IMG_URL_A = 'https://images.com/a/img.jpg' + IMG_URL_B = 'https://images.com/b/img.jpg' + PROVIDER = 'smithsonian' + LICENSE = 'by-nc-nd' + META_DATA_A = { + 'unit_code': 'SIA', + 'data_source': 'Smithsonian Institution Archives' + } + META_DATA_B = { + 'unit_code': 'NMNHBIRDS', + 'data_source': 'NMNH - Vertebrate Zoology - Birds Division' + } + + insert_data_query = ( + f"INSERT INTO {load_table} VALUES" + f"('{FID_A}',null,'{IMG_URL_A}',null,null,null,null,'{LICENSE}',null," + f"null,null,null,'{json.dumps(META_DATA_A)}',null,null," + f"'{PROVIDER}','{PROVIDER}')," + f"('{FID_B}',null,'{IMG_URL_B}',null,null,null,null,'{LICENSE}',null," + f"null,null,null,'{json.dumps(META_DATA_B)}',null,null," + f"'{PROVIDER}','{PROVIDER}');" + ) + + postgres_with_load_and_image_table.cursor.execute(insert_data_query) + postgres_with_load_and_image_table.connection.commit() + sql.upsert_records_to_image_table( + postgres_conn_id, + identifier, + image_table=image_table + ) + postgres_with_load_and_image_table.connection.commit() + postgres_with_load_and_image_table.cursor.execute( + f"DELETE FROM {load_table};" + ) + postgres_with_load_and_image_table.connection.commit() + + sql.update_smithsonian_sub_providers( + postgres_conn_id, + image_table + ) + postgres_with_load_and_image_table.connection.commit() + postgres_with_load_and_image_table.cursor.execute( + f"SELECT * FROM {image_table};" + ) + actual_rows = postgres_with_load_and_image_table.cursor.fetchall() + assert len(actual_rows) == 2 + + for actual_row in actual_rows: + if actual_row[6] == 'a': + assert actual_row[5] == 'smithsonian_institution_archives' + else: + assert actual_row[6] == 'b' and actual_row[5] == \ + 'smithsonian_national_museum_of_natural_history' From 88914f30da863acf071251f70b9a0d5512088707 Mon Sep 17 00:00:00 2001 From: Charini Nanayakkara Date: Wed, 8 Jul 2020 08:03:22 +1000 Subject: [PATCH 5/5] Add workflow for updating Smithsonian sub-providers --- ...mithsonian_sub_provider_update_workflow.py | 66 +++++++++++++++++++ .../dags/test_sub_provider_update_workflow.py | 9 +++ .../dags/util/loader/operators.py | 12 ++++ 3 files changed, 87 insertions(+) create mode 100644 src/cc_catalog_airflow/dags/smithsonian_sub_provider_update_workflow.py diff --git a/src/cc_catalog_airflow/dags/smithsonian_sub_provider_update_workflow.py b/src/cc_catalog_airflow/dags/smithsonian_sub_provider_update_workflow.py new file mode 100644 index 00000000..6fe393fa --- /dev/null +++ b/src/cc_catalog_airflow/dags/smithsonian_sub_provider_update_workflow.py @@ -0,0 +1,66 @@ +""" +This file configures the Apache Airflow DAG to update the database table to +reflect appropriate Smithsonian sub provider names in the source field +""" + +from datetime import datetime, timedelta +import logging +import os +import util.operator_util as ops +from airflow import DAG + +from util.loader import operators + + +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s: %(message)s', + level=logging.INFO +) + +logger = logging.getLogger(__name__) + +DAG_ID = 'smithsonian_sub_provider_update_workflow' +DB_CONN_ID = os.getenv('OPENLEDGER_CONN_ID', 'postgres_openledger_testing') +CONCURRENCY = 5 + +DAG_DEFAULT_ARGS = { + 'owner': 'data-eng-admin', + 'depends_on_past': False, + 'start_date': datetime(2020, 1, 15), + 'email_on_retry': False, + 'retries': 2, + 'retry_delay': timedelta(seconds=15), + 'schedule_interval': None, +} + + +def create_dag( + dag_id=DAG_ID, + args=DAG_DEFAULT_ARGS, + concurrency=CONCURRENCY, + max_active_runs=CONCURRENCY, + postgres_conn_id=DB_CONN_ID, +): + dag = DAG( + dag_id=dag_id, + default_args=args, + concurrency=concurrency, + max_active_runs=max_active_runs, + catchup=False, + schedule_interval=None, + ) + + with dag: + start_task = ops.get_log_operator(dag, dag.dag_id, 'Starting') + run_task = operators.get_smithsonian_sub_provider_update_operator( + dag, + postgres_conn_id + ) + end_task = ops.get_log_operator(dag, dag.dag_id, 'Finished') + + start_task >> run_task >> end_task + + return dag + + +globals()[DAG_ID] = create_dag() diff --git a/src/cc_catalog_airflow/dags/test_sub_provider_update_workflow.py b/src/cc_catalog_airflow/dags/test_sub_provider_update_workflow.py index 10c44f7f..a0d1a783 100644 --- a/src/cc_catalog_airflow/dags/test_sub_provider_update_workflow.py +++ b/src/cc_catalog_airflow/dags/test_sub_provider_update_workflow.py @@ -21,3 +21,12 @@ def test_europeana_dag_loads_with_no_errors(tmpdir): FILE_DIR, 'europeana_sub_provider_update_workflow.py')) assert len(dag_bag.import_errors) == 0 assert len(dag_bag.dags) == 1 + + +def test_smithsonian_dag_loads_with_no_errors(tmpdir): + tmp_directory = str(tmpdir) + dag_bag = DagBag(dag_folder=tmp_directory, include_examples=False) + dag_bag.process_file(os.path.join( + FILE_DIR, 'smithsonian_sub_provider_update_workflow.py')) + assert len(dag_bag.import_errors) == 0 + assert len(dag_bag.dags) == 1 diff --git a/src/cc_catalog_airflow/dags/util/loader/operators.py b/src/cc_catalog_airflow/dags/util/loader/operators.py index 5fb9f562..6cc20a06 100644 --- a/src/cc_catalog_airflow/dags/util/loader/operators.py +++ b/src/cc_catalog_airflow/dags/util/loader/operators.py @@ -192,3 +192,15 @@ def get_europeana_sub_provider_update_operator( op_args=[postgres_conn_id], dag=dag ) + + +def get_smithsonian_sub_provider_update_operator( + dag, + postgres_conn_id, +): + return PythonOperator( + task_id='update_smithsonian_sub_providers', + python_callable=sql.update_smithsonian_sub_providers, + op_args=[postgres_conn_id], + dag=dag + )