diff --git a/academic_observatory_workflows/database/sql/export_access_types.sql.jinja2 b/academic_observatory_workflows/database/sql/export_access_types.sql.jinja2 deleted file mode 100644 index 5d1b6c0a..00000000 --- a/academic_observatory_workflows/database/sql/export_access_types.sql.jinja2 +++ /dev/null @@ -1,27 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: James Diprose, Richard Hosking -#} -{# -The purpose of this script it to export the access_types section from any of the aggregration tables (country, institution, group, etc) -Primarily, the goal is to create what is a nested array, and turn that into a flat table that can be exported into Elasticsearch for use with Kibana -#} - -SELECT - id as {{ aggregate }}_id, - name as {{ aggregate }}_name, - country as {{ aggregate }}_country, - country_code as {{ aggregate }}_country_code, - region as {{ aggregate }}_region, - subregion as {{ aggregate }}_subregion, - coordinates as {{ aggregate }}_coordinates, - DATE(time_period, 12, 31) AS published_year, - access_type.access_type as access_types_access_type, - access_type.status as access_types_status, - access_type.label as access_types_label, - access_type.total_outputs as access_types_total_outputs, - access_type.outputs_with_citations as access_types_outputs_with_citations, - access_type.outputs_without_citations as access_types_outputs_without_citations, - access_type.citations.openalex.total_citations as access_types_total_citations, -FROM `{{ table_id }}`, UNNEST( access_types.breakdown ) as access_type -ORDER BY id, published_year ASC \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_disciplines.sql.jinja2 b/academic_observatory_workflows/database/sql/export_disciplines.sql.jinja2 deleted file mode 100644 index 9741fa76..00000000 --- a/academic_observatory_workflows/database/sql/export_disciplines.sql.jinja2 +++ /dev/null @@ -1,43 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: Richard Hosking -#} -{# -The purpose of this script it to export the disciplines section from any of the aggregration tables (country, institution, group, etc) -Primarily, the goal is to create what is a nested array, and turn that into a flat table that can be exported into Elasticsearch for use with Kibana -#} - -SELECT - id as {{ aggregate }}_id, - name as {{ aggregate }}_name, - country as {{ aggregate }}_country, - country_code as {{ aggregate }}_country_code, - region as {{ aggregate }}_region, - subregion as {{ aggregate }}_subregion, - coordinates as {{ aggregate }}_coordinates, - DATE(time_period, 12, 31) AS published_year, - discipline.field as disciplines_field, - discipline.total_outputs as disciplines_total_outputs, - discipline.sum_of_scores as disciplines_sum_of_scores, - discipline.num_oa_outputs as disciplines_num_oa_outputs, - discipline.num_green_outputs as disciplines_num_green_outputs, - discipline.num_gold_outputs as disciplines_num_gold_outputs, - discipline.num_gold_just_doaj_outputs as disciplines_num_gold_just_doaj_outputs, - discipline.num_hybrid_outputs as disciplines_num_hybrid_outputs, - discipline.num_bronze_outputs as disciplines_num_bronze_outputs, - discipline.num_green_only_outputs as disciplines_num_green_only_outputs, - ROUND(SAFE_DIVIDE( ( discipline.num_oa_outputs ) * 100 , discipline.total_outputs ), 2) as disciplines_percent_oa, - ROUND(SAFE_DIVIDE( ( discipline.num_green_outputs ) * 100 , discipline.total_outputs ), 2) as disciplines_percent_green, - ROUND(SAFE_DIVIDE( ( discipline.num_gold_outputs ) * 100 , discipline.total_outputs ), 2) as disciplines_percent_gold, - discipline.citations.openalex.total_citations as disciplines_total_citations, - discipline.funding.total_funded_outputs as disciplines_total_funded_outputs, - discipline.funding.num_international_outputs as disciplines_num_international_funded_outputs, - discipline.funding.num_domestic_outputs as disciplines_num_domestic_funded_outputs, - discipline.funding.num_international_and_domestic_outputs as disciplines_num_international_and_domestic_funded_outputs, - discipline.funding.num_government_outputs as disciplines_num_government_funded_outputs, - discipline.funding.num_private_outputs as disciplines_num_private_funded_outputs, - discipline.funding.num_government_and_private_outputs as disciplines_num_government_and_private_funded_outputs -FROM - `{{ table_id }}`, - UNNEST(disciplines.level0) as discipline -ORDER BY id, published_year ASC \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_dois.sql.jinja2 b/academic_observatory_workflows/database/sql/export_dois.sql.jinja2 deleted file mode 100644 index 5b826d93..00000000 --- a/academic_observatory_workflows/database/sql/export_dois.sql.jinja2 +++ /dev/null @@ -1,40 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: Richard Hosking -#} -{# -The purpose of this script it to export a subsection of fields from the DOI table. -The whole table is very large, and while this export is not small, it is more managable and targets the key fields required for search and dashboarding -#} - -SELECT - doi, - mag.PaperId as mag_paperId, - crossref.title as title, - mag.abstract as abstract, - DATE(crossref.published_year, 12, 31) AS published_year, - unpaywall.output_type, - crossref.publisher, - unpaywall.journal_name as journal, - mag.CitationCount as citation_count, - unpaywall.is_oa, - unpaywall.gold, - unpaywall.green, - unpaywall.bronze, - unpaywall.hybrid, - unpaywall.green_only, - - ARRAY(SELECT DisplayName FROM UNNEST(mag.fields.fields.level_0)) as level_0, - ARRAY(SELECT DisplayName FROM UNNEST(mag.fields.fields.level_1)) as level_1, - ARRAY(SELECT DisplayName FROM UNNEST(mag.fields.fields.level_2)) as level_2, - ARRAY(SELECT DisplayName FROM UNNEST(mag.fields.fields.level_3)) as level_3, - - ARRAY(SELECT identifier FROM UNNEST( affiliations.institutions ) WHERE identifier IS NOT NULL) as institutions, - ARRAY(SELECT identifier FROM UNNEST( affiliations.countries ) WHERE identifier IS NOT NULL) as countries, - ARRAY(SELECT identifier FROM UNNEST( affiliations.regions ) WHERE identifier IS NOT NULL) as regions, - ARRAY(SELECT identifier FROM UNNEST( affiliations.groupings ) WHERE identifier IS NOT NULL) as groupings, - ARRAY(SELECT identifier FROM UNNEST( affiliations.funders ) WHERE identifier IS NOT NULL) as funders, - ARRAY(SELECT OriginalAuthor FROM UNNEST( mag.authors.authors ) WHERE OriginalAuthor IS NOT NULL) as authors -FROM - `{{ table_id }}` -WHERE crossref.published_year > 2000 \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_events.sql.jinja2 b/academic_observatory_workflows/database/sql/export_events.sql.jinja2 deleted file mode 100644 index 9bee93e5..00000000 --- a/academic_observatory_workflows/database/sql/export_events.sql.jinja2 +++ /dev/null @@ -1,27 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: James Diprose, Richard Hosking -#} -{# -The purpose of this script it to export the events section from any of the aggregration tables (country, institution, group, etc) -Primarily, the goal is to create what is a nested array, and turn that into a flat table that can be exported into Elasticsearch for use with Kibana -#} - -SELECT - id as {{ aggregate }}_id, - name as {{ aggregate }}_name, - country as {{ aggregate }}_country, - country_code as {{ aggregate }}_country_code, - region as {{ aggregate }}_region, - subregion as {{ aggregate }}_subregion, - coordinates as {{ aggregate }}_coordinates, - DATE(time_period, 12, 31) AS published_year, - events.source as events_source, - events.total_outputs as events_total_outputs, - events.num_oa_outputs as events_num_oa_outputs, - events.num_green_outputs as events_num_green_outputs, - events.num_gold_outputs as events_num_gold_outputs -FROM - `{{ table_id }}`, - UNNEST(events) as events -ORDER BY id, published_year ASC \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_metrics.sql.jinja2 b/academic_observatory_workflows/database/sql/export_metrics.sql.jinja2 deleted file mode 100644 index 652cb965..00000000 --- a/academic_observatory_workflows/database/sql/export_metrics.sql.jinja2 +++ /dev/null @@ -1,41 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: James Diprose, Richard Hosking -#} -{# -The purpose of this script it to export the yearly metrics of any aggregration tables (country, institution, group, etc) -#} - -SELECT - id as {{ aggregate }}_id, - name as {{ aggregate }}_name, - country as {{ aggregate }}_country, - country_code as {{ aggregate }}_country_code, - region as {{ aggregate }}_region, - subregion as {{ aggregate }}_subregion, - coordinates as {{ aggregate }}_coordinates, - DATE(time_period, 12, 31) AS published_year, - total_outputs as metrics_total_outputs, - access_types.oa.total_outputs as metrics_num_oa_outputs, - total_outputs - access_types.oa.total_outputs AS metrics_num_not_oa_outputs, - access_types.green.total_outputs as metrics_num_green_outputs, - access_types.gold.total_outputs as metrics_num_gold_outputs, - access_types.hybrid.total_outputs as metrics_num_hybrid_outputs, - access_types.bronze.total_outputs as metrics_num_bronze_outputs, - access_types.green_only.total_outputs as metrics_num_green_only_outputs, - access_types.gold_doaj.total_outputs as metrics_num_gold_doaj_outputs, - access_types.oa.percent AS metrics_percent_oa, - access_types.green.percent as metrics_percent_green, - access_types.green_only.percent as metrics_percent_green_only, - access_types.gold.percent as metrics_percent_gold, - access_types.gold_doaj.percent as metrics_percent_gold_doaj, - access_types.hybrid.percent as metrics_percent_hybrid, - access_types.bronze.percent as metrics_percent_bronze, - citations.openalex.total_citations as metrics_total_citations, - citations.openalex.citations_per_output as metrics_citations_per_output, - citations.openalex.outputs_with_citations as metrics_outputs_with_citations, - citations.openalex.outputs_without_citations as metrics_outputs_without_citations, - citations.openalex.citations_per_cited_output as metrics_citations_per_cited_output -FROM - `{{ table_id }}` -ORDER BY id, published_year ASC \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_output_types.sql.jinja2 b/academic_observatory_workflows/database/sql/export_output_types.sql.jinja2 deleted file mode 100644 index 53e9c5cb..00000000 --- a/academic_observatory_workflows/database/sql/export_output_types.sql.jinja2 +++ /dev/null @@ -1,34 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: James Diprose, Richard Hosking -#} -{# -The purpose of this script it to export the output_types section from any of the aggregration tables (country, institution, group, etc) -Primarily, the goal is to create what is a nested array, and turn that into a flat table that can be exported into Elasticsearch for use with Kibana -#} - -SELECT - id as {{ aggregate }}_id, - name as {{ aggregate }}_name, - country as {{ aggregate }}_country, - country_code as {{ aggregate }}_country_code, - region as {{ aggregate }}_region, - subregion as {{ aggregate }}_subregion, - coordinates as {{ aggregate }}_coordinates, - DATE(time_period, 12, 31) AS published_year, - output_types.output_type as output_types_output_type, - output_types.total_outputs as output_types_total_outputs, - output_types.num_oa_outputs as output_types_num_oa_outputs, - output_types.num_green_outputs as output_types_num_green_outputs, - output_types.num_gold_outputs as output_types_num_gold_outputs, - output_types.num_gold_just_doaj_outputs as output_types_num_gold_just_doaj_outputs, - output_types.num_hybrid_outputs as output_types_num_hybrid_outputs, - output_types.num_bronze_outputs as output_types_num_bronze_outputs, - output_types.num_green_only_outputs as output_types_num_green_only_outputs, - ROUND(SAFE_DIVIDE( ( output_types.num_oa_outputs ) * 100 , output_types.total_outputs ), 2) as output_types_percent_oa, - ROUND(SAFE_DIVIDE( ( output_types.num_green_outputs ) * 100 , output_types.total_outputs ), 2) as output_types_percent_green, - ROUND(SAFE_DIVIDE( ( output_types.num_gold_outputs ) * 100 , output_types.total_outputs ), 2) as output_types_percent_gold -FROM - `{{ table_id }}`, - UNNEST(output_types) as output_types -ORDER BY id, published_year ASC \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_relations.sql.jinja2 b/academic_observatory_workflows/database/sql/export_relations.sql.jinja2 deleted file mode 100644 index 0b760e53..00000000 --- a/academic_observatory_workflows/database/sql/export_relations.sql.jinja2 +++ /dev/null @@ -1,44 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: Richard Hosking -#} -{# -The purpose of this script it to export a range of sections from any of the aggregration tables (country, institution, group, etc) -The sections this applies to are: institution, memember, country, funders, publishers and journals -Primarily, the goal is to create what is a nested array, and turn that into a flat table that can be exported into Elasticsearch for use with Kibana -#} - -SELECT - entity.id as {{ aggregate }}_id, - entity.name as {{ aggregate }}_name, - entity.country as {{ aggregate }}_country, - entity.country_code as {{ aggregate }}_country_code, - entity.region as {{ aggregate }}_region, - entity.subregion as {{ aggregate }}_subregion, - entity.coordinates as {{ aggregate }}_coordinates, - DATE(entity.time_period, 12, 31) AS published_year, - relation.id as {{ facet }}_id, - relation.name as {{ facet }}_name, - relation.country as {{ facet }}_country, - relation.country_code as {{ facet }}_country_code, - relation.region as {{ facet }}_region, - relation.subregion as {{ facet }}_subregion, - relation.coordinates as {{ facet }}_coordinates, - relation.total_outputs as {{ facet }}_total_outputs, - relation.num_oa_outputs as {{ facet }}_num_oa_outputs, - relation.num_green_outputs as {{ facet }}_num_green_outputs, - relation.num_gold_outputs as {{ facet }}_num_gold_outputs, - relation.num_gold_just_doaj_outputs as {{ facet }}_num_gold_just_doaj_outputs, - relation.num_hybrid_outputs as {{ facet }}_num_hybrid_outputs, - relation.num_bronze_outputs as {{ facet }}_num_bronze_outputs, - relation.num_green_only_outputs as {{ facet }}_num_green_only_outputs, - relation.citations.openalex as {{ facet }}_total_citations, - ROUND(SAFE_DIVIDE( ( relation.num_oa_outputs ) * 100 , relation.total_outputs ), 2) as {{ facet }}_percent_oa, - ROUND(SAFE_DIVIDE( ( relation.num_green_outputs ) * 100 , relation.total_outputs ), 2) as {{ facet }}_percent_green, - ROUND(SAFE_DIVIDE( ( relation.num_gold_outputs ) * 100 , relation.total_outputs ), 2) as {{ facet }}_percent_gold, - relation.percentage_of_all_outputs as {{ facet }}_percent_of_all_outputs, - relation.percentage_of_all_oa as {{ facet }}_percent_of_all_oa -FROM - `{{ table_id }}` as entity, - UNNEST({{ facet }}) as relation -ORDER BY entity.id, published_year ASC \ No newline at end of file diff --git a/academic_observatory_workflows/database/sql/export_unique_list.sql.jinja2 b/academic_observatory_workflows/database/sql/export_unique_list.sql.jinja2 deleted file mode 100644 index 049b0d64..00000000 --- a/academic_observatory_workflows/database/sql/export_unique_list.sql.jinja2 +++ /dev/null @@ -1,20 +0,0 @@ -{# -Copyright 2020 Curtin University. -Authors: Richard Hosking -#} -{# -The purpose of this script it to export a unique list of entities for each of the aggregation tables (institution, country, group, etc) -This is to improve the performance of drop-down lists in Elasticsearch and Kibana -#} - -SELECT - entity.id as {{ aggregate }}_id, - ANY_VALUE(entity.name) as {{ aggregate }}_name, - ANY_VALUE(entity.country) as {{ aggregate }}_country, - ANY_VALUE(entity.country_code) as {{ aggregate }}_country_code, - ANY_VALUE(entity.region) as {{ aggregate }}_region, - ANY_VALUE(entity.subregion) as {{ aggregate }}_subregion, - ANY_VALUE(entity.coordinates) as {{ aggregate }}_coordinates, -FROM - `{{ table_id }}` as entity -GROUP BY entity.id \ No newline at end of file diff --git a/academic_observatory_workflows/workflows/doi_workflow.py b/academic_observatory_workflows/workflows/doi_workflow.py index 5db3a7c8..be9d79ca 100644 --- a/academic_observatory_workflows/workflows/doi_workflow.py +++ b/academic_observatory_workflows/workflows/doi_workflow.py @@ -203,110 +203,6 @@ def make_dataset_transforms( ) -def make_elastic_tables( - aggregate_table_name: str, - relate_to_institutions: bool = False, - relate_to_countries: bool = False, - relate_to_groups: bool = False, - relate_to_members: bool = False, - relate_to_journals: bool = False, - relate_to_funders: bool = False, - relate_to_publishers: bool = False, -): - # Always export - tables = [ - { - "file_name": make_sql_jinja2_filename("export_unique_list"), - "aggregate": aggregate_table_name, - "facet": "unique_list", - }, - { - "file_name": make_sql_jinja2_filename("export_access_types"), - "aggregate": aggregate_table_name, - "facet": "access_types", - }, - { - "file_name": make_sql_jinja2_filename("export_disciplines"), - "aggregate": aggregate_table_name, - "facet": "disciplines", - }, - { - "file_name": make_sql_jinja2_filename("export_output_types"), - "aggregate": aggregate_table_name, - "facet": "output_types", - }, - {"file_name": make_sql_jinja2_filename("export_events"), "aggregate": aggregate_table_name, "facet": "events"}, - { - "file_name": make_sql_jinja2_filename("export_metrics"), - "aggregate": aggregate_table_name, - "facet": "metrics", - }, - ] - - # Optional Relationships - export_relations = make_sql_jinja2_filename("export_relations") - if relate_to_institutions: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "institutions", - } - ) - if relate_to_countries: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "countries", - } - ) - if relate_to_groups: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "groupings", - } - ) - if relate_to_members: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "members", - } - ) - if relate_to_journals: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "journals", - } - ) - - if relate_to_funders: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "funders", - } - ) - - if relate_to_publishers: - tables.append( - { - "file_name": export_relations, - "aggregate": aggregate_table_name, - "facet": "publishers", - } - ) - - return tables - - def fetch_ror_affiliations(repository_institution: str, num_retries: int = 3) -> Dict: """Fetch the ROR affiliations for a given affiliation string. @@ -343,9 +239,7 @@ def get_snapshot_date(project_id: str, dataset_id: str, table_id: str, snapshot_ if len(table_shard_dates): shard_date = table_shard_dates[0] else: - raise AirflowException( - f"{table_id} with a table shard date <= {snapshot_date} not found" - ) + raise AirflowException(f"{table_id} with a table shard date <= {snapshot_date} not found") return shard_date @@ -508,7 +402,6 @@ def __init__( bq_intermediate_dataset_id: str = "observatory_intermediate", bq_dashboards_dataset_id: str = "coki_dashboards", bq_observatory_dataset_id: str = "observatory", - bq_elastic_dataset_id: str = "data_export", bq_unpaywall_dataset_id: str = "unpaywall", bq_ror_dataset_id: str = "ror", api_dataset_id: str = "doi", @@ -525,9 +418,8 @@ def __init__( :param bq_intermediate_dataset_id: the BigQuery intermediate dataset id. :param bq_dashboards_dataset_id: the BigQuery dashboards dataset id. :param bq_observatory_dataset_id: the BigQuery observatory dataset id. - :param bq_elastic_dataset_id: the BigQuery elastic dataset id. - :param bq_unpaywall_dataset_id: the BigQuery elastic dataset id. - :param bq_ror_dataset_id: the BigQuery elastic dataset id. + :param bq_unpaywall_dataset_id: the BigQuery Unpaywall dataset id. + :param bq_ror_dataset_id: the BigQuery ROR dataset id. :param api_dataset_id: the DOI dataset id. :param max_fetch_threads: maximum number of threads to use when fetching. :param start_date: the start date. @@ -554,7 +446,6 @@ def __init__( self.bq_intermediate_dataset_id = bq_intermediate_dataset_id self.bq_dashboards_dataset_id = bq_dashboards_dataset_id self.bq_observatory_dataset_id = bq_observatory_dataset_id - self.bq_elastic_dataset_id = bq_elastic_dataset_id self.bq_unpaywall_dataset_id = bq_unpaywall_dataset_id self.bq_ror_dataset_id = bq_ror_dataset_id self.api_dataset_id = api_dataset_id @@ -636,16 +527,6 @@ def create_tasks(self): self.add_task(self.update_table_descriptions) self.add_task(self.copy_to_dashboards) self.add_task(self.create_dashboard_views) - - # Export for Elastic - with self.parallel_tasks(): - # Remove the author aggregation from the list of aggregations to reduce cluster size on Elastic - for agg in self.remove_aggregations(self.AGGREGATIONS, {"author"}): - task_id = f"export_{agg.table_name}" - self.add_task( - self.export_for_elastic, op_kwargs={"aggregation": agg, "task_id": task_id}, task_id=task_id - ) - self.add_task(self.add_new_dataset_releases) def make_release(self, **kwargs) -> SnapshotRelease: @@ -672,7 +553,6 @@ def create_datasets(self, release: SnapshotRelease, **kwargs): (self.bq_intermediate_dataset_id, "Intermediate processing dataset for the Academic Observatory."), (self.bq_dashboards_dataset_id, "The latest data for display in the COKI dashboards."), (self.bq_observatory_dataset_id, "The Academic Observatory dataset."), - (self.bq_elastic_dataset_id, "The Academic Observatory dataset for Elasticsearch."), ] for dataset_id, description in datasets: @@ -891,62 +771,6 @@ def create_dashboard_views(self, release: SnapshotRelease, **kwargs): view_id = bq_table_id(self.output_project_id, self.bq_dashboards_dataset_id, view_name) bq_create_view(view_id=view_id, query=query) - def export_for_elastic(self, release: SnapshotRelease, **kwargs): - """Export data in a de-nested form for Elasticsearch.""" - - agg = kwargs["aggregation"] - tables = make_elastic_tables( - agg.table_name, - relate_to_institutions=agg.relate_to_institutions, - relate_to_countries=agg.relate_to_countries, - relate_to_groups=agg.relate_to_groups, - relate_to_members=agg.relate_to_members, - relate_to_journals=agg.relate_to_journals, - relate_to_funders=agg.relate_to_funders, - relate_to_publishers=agg.relate_to_publishers, - ) - - # Calculate the number of parallel queries. Since all of the real work is done on BigQuery run each export task - # in a separate thread so that they can be done in parallel. - num_queries = min(len(tables), MAX_QUERIES) - results = [] - - with ThreadPoolExecutor(max_workers=num_queries) as executor: - futures = list() - futures_msgs = {} - for table in tables: - template_file_name = table["file_name"] - aggregate = table["aggregate"] - facet = table["facet"] - - msg = f"Exporting file_name={template_file_name}, aggregate={aggregate}, facet={facet}" - logging.info(msg) - input_table_id = bq_sharded_table_id( - self.output_project_id, self.bq_observatory_dataset_id, agg.table_name, release.snapshot_date - ) - output_table_id = bq_sharded_table_id( - self.output_project_id, self.bq_elastic_dataset_id, f"ao_{aggregate}_{facet}", release.snapshot_date - ) - future = executor.submit( - export_aggregate_table, template_file_name, input_table_id, output_table_id, aggregate, facet - ) - - futures.append(future) - futures_msgs[future] = msg - - # Wait for completed tasks - for future in as_completed(futures): - success = future.result() - msg = futures_msgs[future] - results.append(success) - if success: - logging.info(f"Exporting feed success: {msg}") - else: - logging.error(f"Exporting feed failed: {msg}") - - success = all(results) - set_task_state(success, kwargs["task_id"]) - def add_new_dataset_releases(self, release: SnapshotRelease, **kwargs): """Adds release information to API.""" @@ -986,25 +810,3 @@ def remove_aggregations( aggregations_removed.append(agg) return aggregations_removed - - -def export_aggregate_table( - template_file_name: str, - input_table_id: str, - output_table_id: str, - aggregate: str, - facet: str, -): - template_path = os.path.join(sql_folder(), template_file_name) - sql = render_template( - template_path, - table_id=input_table_id, - aggregate=aggregate, - facet=facet, - ) - success = bq_create_table_from_query( - sql=sql, - table_id=output_table_id, - ) - - return success diff --git a/academic_observatory_workflows/workflows/tests/test_doi_workflow.py b/academic_observatory_workflows/workflows/tests/test_doi_workflow.py index 02c06c61..7961a413 100644 --- a/academic_observatory_workflows/workflows/tests/test_doi_workflow.py +++ b/academic_observatory_workflows/workflows/tests/test_doi_workflow.py @@ -37,7 +37,6 @@ from academic_observatory_workflows.workflows.doi_workflow import ( DoiWorkflow, make_dataset_transforms, - make_elastic_tables, fetch_ror_affiliations, ror_to_ror_hierarchy_index, ) @@ -247,24 +246,7 @@ def test_dag_structure(self): "create_subregion": ["update_table_descriptions"], "update_table_descriptions": ["copy_to_dashboards"], "copy_to_dashboards": ["create_dashboard_views"], - "create_dashboard_views": [ - "export_country", - "export_funder", - "export_group", - "export_institution", - "export_journal", - "export_publisher", - "export_region", - "export_subregion", - ], - "export_country": ["add_new_dataset_releases"], - "export_funder": ["add_new_dataset_releases"], - "export_group": ["add_new_dataset_releases"], - "export_institution": ["add_new_dataset_releases"], - "export_journal": ["add_new_dataset_releases"], - "export_publisher": ["add_new_dataset_releases"], - "export_region": ["add_new_dataset_releases"], - "export_subregion": ["add_new_dataset_releases"], + "create_dashboard_views": ["add_new_dataset_releases"], "add_new_dataset_releases": [], }, dag, @@ -316,7 +298,6 @@ def test_telescope(self): bq_intermediate_dataset_id = env.add_dataset(prefix="intermediate") bq_dashboards_dataset_id = env.add_dataset(prefix="dashboards") bq_observatory_dataset_id = env.add_dataset(prefix="observatory") - bq_elastic_dataset_id = env.add_dataset(prefix="elastic") bq_settings_dataset_id = env.add_dataset(prefix="settings") dataset_transforms = make_dataset_transforms( input_project_id=self.project_id, @@ -344,7 +325,6 @@ def test_telescope(self): bq_intermediate_dataset_id=bq_intermediate_dataset_id, bq_dashboards_dataset_id=bq_dashboards_dataset_id, bq_observatory_dataset_id=bq_observatory_dataset_id, - bq_elastic_dataset_id=bq_elastic_dataset_id, bq_unpaywall_dataset_id=fake_dataset_id, bq_ror_dataset_id=fake_dataset_id, transforms=dataset_transforms, @@ -546,33 +526,6 @@ def test_telescope(self): table_id = bq_table_id(self.project_id, bq_dashboards_dataset_id, f"{table_name}_comparison") self.assert_table_integrity(table_id) - # Test create exported tables for Elasticsearch - # Remove author from AGGREGATIONS list to save space on Elastic. - for agg in DoiWorkflow.remove_aggregations(DoiWorkflow, DoiWorkflow.AGGREGATIONS, {"author"}): - table_name = agg.table_name - task_id = f"export_{table_name}" - ti = env.run_task(task_id) - self.assertEqual(expected_state, ti.state) - - # Check that the correct tables exist for each aggregation - tables = make_elastic_tables( - agg.table_name, - relate_to_institutions=agg.relate_to_institutions, - relate_to_countries=agg.relate_to_countries, - relate_to_groups=agg.relate_to_groups, - relate_to_members=agg.relate_to_members, - relate_to_journals=agg.relate_to_journals, - relate_to_funders=agg.relate_to_funders, - relate_to_publishers=agg.relate_to_publishers, - ) - for table in tables: - aggregate = table["aggregate"] - facet = table["facet"] - table_id = bq_sharded_table_id( - self.project_id, bq_elastic_dataset_id, f"ao_{aggregate}_{facet}", snapshot_date - ) - self.assert_table_integrity(table_id) - # add_dataset_release_task dataset_releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=workflow.api_dataset_id) self.assertEqual(len(dataset_releases), 0)