Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Add reingestion workflow for Metropolitan (#819)
Browse files Browse the repository at this point in the history
* Add reingestion workflow for Metropolitan

* Bump max active tasks to 2

* Update DAG docs
  • Loading branch information
stacimc committed Oct 25, 2022
1 parent 1506d6d commit a9cdab5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 50 deletions.
51 changes: 4 additions & 47 deletions DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ The following are DAGs grouped by their primary tag:

| DAG ID | Schedule Interval |
| --- | --- |
| [`europeana_reingestion_workflow`](#europeana_reingestion_workflow) | `@weekly` |
| [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) | `@weekly` |
| [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow) | `@weekly` |
| `europeana_reingestion_workflow` | `@weekly` |
| `flickr_reingestion_workflow` | `@weekly` |
| `metropolitan_museum_reingestion_workflow` | `@weekly` |
| `wikimedia_commons_reingestion_workflow` | `@weekly` |


# DAG documentation
Expand All @@ -112,9 +113,7 @@ The following is documentation associated with each DAG (where available):
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
1. [`audio_data_refresh`](#audio_data_refresh)
1. [`check_silenced_dags`](#check_silenced_dags)
1. [`europeana_reingestion_workflow`](#europeana_reingestion_workflow)
1. [`europeana_workflow`](#europeana_workflow)
1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow)
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
Expand All @@ -131,7 +130,6 @@ The following is documentation associated with each DAG (where available):
1. [`smithsonian_workflow`](#smithsonian_workflow)
1. [`stocksnap_workflow`](#stocksnap_workflow)
1. [`wikimedia_commons_workflow`](#wikimedia_commons_workflow)
1. [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow)
1. [`wordpress_workflow`](#wordpress_workflow)


Expand Down Expand Up @@ -214,19 +212,6 @@ after the issue has been resolved.
The DAG runs weekly.


## `europeana_reingestion_workflow`


Content Provider: Europeana

ETL Process: Use the API to identify all CC licensed images.

Output: TSV file containing the images and the
respective meta-data.

Notes: https://www.europeana.eu/api/v2/search.json


## `europeana_workflow`


Expand All @@ -240,20 +225,6 @@ Output: TSV file containing the images and the
Notes: https://www.europeana.eu/api/v2/search.json


## `flickr_reingestion_workflow`


Content Provider: Flickr

ETL Process: Use the API to identify all CC licensed images.

Output: TSV file containing the images and the
respective meta-data.

Notes: https://www.flickr.com/help/terms/api
Rate limit: 3600 requests per hour.


## `flickr_workflow`


Expand Down Expand Up @@ -537,20 +508,6 @@ Notes: https://stocksnap.io/api/load-photos/date/desc/1
## `wikimedia_commons_workflow`


Content Provider: Wikimedia Commons

ETL Process: Use the API to identify all CC-licensed images.

Output: TSV file containing the image, the respective
meta-data.

Notes: https://commons.wikimedia.org/wiki/API:Main_page
No rate limit specified.


## `wikimedia_reingestion_workflow`


Content Provider: Wikimedia Commons

ETL Process: Use the API to identify all CC-licensed images.
Expand Down
22 changes: 19 additions & 3 deletions openverse_catalog/dags/providers/provider_reingestion_workflows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta

from providers.provider_api_scripts.metropolitan_museum import MetMuseumDataIngester
from providers.provider_api_scripts.wikimedia_commons import (
WikimediaCommonsDataIngester,
)
Expand Down Expand Up @@ -41,10 +42,14 @@ class ProviderReingestionWorkflow(ProviderWorkflow):
schedule_string: str = "@weekly"
dated: bool = True

def __post_init__(self):
if not self.dag_id:
self.dag_id = f"{self.provider_script}_reingestion_workflow"


PROVIDER_REINGESTION_WORKFLOWS = [
ProviderReingestionWorkflow(
dag_id="europeana_reingestion_workflow",
# 60 total reingestion days
provider_script="europeana",
start_date=datetime(2013, 11, 21),
max_active_tasks=3,
Expand All @@ -54,7 +59,7 @@ class ProviderReingestionWorkflow(ProviderWorkflow):
three_month_list_length=40,
),
ProviderReingestionWorkflow(
dag_id="flickr_reingestion_workflow",
# 128 total reingestion days
provider_script="flickr",
pull_timeout=timedelta(minutes=30),
daily_list_length=7,
Expand All @@ -65,7 +70,18 @@ class ProviderReingestionWorkflow(ProviderWorkflow):
six_month_list_length=40,
),
ProviderReingestionWorkflow(
dag_id="wikimedia_reingestion_workflow",
# 64 total reingestion days
provider_script="metropolitan_museum",
ingestion_callable=MetMuseumDataIngester,
max_active_tasks=2,
pull_timeout=timedelta(hours=12),
daily_list_length=6,
one_month_list_length=9,
three_month_list_length=18,
six_month_list_length=30,
),
ProviderReingestionWorkflow(
# 64 total reingestion days
provider_script="wikimedia_commons",
ingestion_callable=WikimediaCommonsDataIngester,
pull_timeout=timedelta(minutes=90),
Expand Down

0 comments on commit a9cdab5

Please sign in to comment.