Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jun 22, 2023
1 parent 736f40f commit b7c4c05
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"mode": "NULLABLE",
"name": "orcid",
"type": "STRING",
"description": "ORCID iD."
},
{
"mode": "NULLABLE",
"name": "path",
"type": "STRING",
"description": "ORCID iD in URI form."
},
{
"mode": "NULLABLE",
"name": "date_created",
"type": "DATE",
"description": "The date this ID was created"
},
{
"mode": "NULLABLE",
"name": "last_modified",
"type": "DATE",
"description": "The most recent modification instance for this ID"
}
]
284 changes: 284 additions & 0 deletions academic_observatory_workflows/workflows/orcid_telescope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
# Copyright 2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# Author: Keegan Smith

from __future__ import annotations

import datetime
import logging
import os
import pathlib
import time
from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
from datetime import timedelta
from typing import List, Dict, Tuple
import tarfile

import jsonlines
import pendulum
import requests
from airflow.exceptions import AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from airflow.operators.dummy import DummyOperator
from airflow.hooks.base import BaseHook
import boto3
from google.cloud import bigquery
from google.cloud.bigquery import SourceFormat

from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag
from observatory.api.client.model.dataset_release import DatasetRelease
from observatory.platform.airflow import PreviousDagRunSensor, is_first_dag_run
from observatory.platform.api import get_dataset_releases, get_latest_dataset_release
from observatory.platform.api import make_observatory_api
from observatory.platform.bigquery import (
bq_table_id,
bq_find_schema,
bq_load_table,
bq_upsert_records,
bq_snapshot,
bq_sharded_table_id,
bq_create_dataset,
bq_delete_records,
)
from observatory.platform.config import AirflowConns
from observatory.platform.files import list_files, yield_jsonl, merge_update_files, save_jsonl_gz, load_csv
from observatory.platform.gcs import gcs_upload_files, gcs_blob_uri, gcs_blob_name_from_path
from observatory.platform.observatory_config import CloudWorkspace
from observatory.platform.utils.url_utils import get_user_agent, retry_get_url
from observatory.platform.workflows.workflow import Workflow, ChangefileRelease, cleanup, set_task_state

ORCID_AWS_SUMMARIES_BUCKET = "v2.0-summaries"
ORCID_AWS_LAMBDA_BUCKET = "orcid-lambda-file"
ORCID_LAMBDA_OBJECT = "last_modified.csv.tar"


class OrcidRelease(ChangefileRelease):
def __init__(
self,
*,
dag_id: str,
run_id: str,
start_date: pendulum.DateTime,
end_date: pendulum.DateTime,
):
"""Construct a CrossrefEventsRelease instance
:param dag_id: the id of the DAG.
:param start_date: the start_date of the release. Inclusive.
:param end_date: the end_date of the release. Exclusive.
"""

super().__init__(
dag_id=dag_id,
run_id=run_id,
start_date=start_date,
end_date=end_date,
)
self.upsert_table_file_path = os.path.join(self.transform_folder, "upsert_table.jsonl")
self.delete_table_file_path = os.path.join(self.transform_folder, "delete_table.jsonl")

self.lambda_download_file_path = os.path.join(self.download_folder, "lambda", "orcid_lambda.csv.tar")
self.lambda_transform_file_path = os.path.join(self.transform_folder, "lambda", "orcid_lambda.csv")

@property
def lambda_extract_files(self):
return list_files(self.extract_folder, r"^.*\.csv$") # Match files ending with .csv


class CrossrefEventsTelescope(Workflow):
"""Crossref Events telescope"""

def __init__(
self,
dag_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str = "orcid",
bq_summary_table_name: str = "orcid",
bq_lambda_table_name: str = "orcid_lambda",
schema_folder: str = os.path.join(default_schema_folder(), "orcid"),
dataset_description: str = "The ORCID dataset",
table_description: str = "The ORCID dataset",
max_processes: int = os.cpu_count(),
api_dataset_id: str = "orcid",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
aws_orcid_conn_id: str = "aws_orcid",
start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1),
schedule_interval: str = "@weekly",
queue: str = "remote_queue",
catchup: bool = False,
):
"""Construct an ORCID telescope instance"""

self.dag_id = dag_id
self.cloud_workspace = cloud_workspace
self.bq_dataset_id = bq_dataset_id
self.bq_summary_table_name = bq_summary_table_name
self.bq_lambda_table_name = bq_lambda_table_name
self.api_dataset_id = api_dataset_id
self.schema_folder = schema_folder
self.dataset_description = dataset_description
self.table_description = table_description
self.max_processes = max_processes
self.observatory_api_conn_id = observatory_api_conn_id
self.aws_orcid_conn_id = aws_orcid_conn_id
self.start_date = start_date
self.schedule_interval = schedule_interval
self.queue = queue
self.catchup = catchup

external_task_id = "dag_run_complete"
self.add_operator(
PreviousDagRunSensor(
dag_id=self.dag_id,
external_task_id=external_task_id,
execution_delta=timedelta(days=7), # To match the @weekly schedule_interval
)
)
self.add_task(self.check_dependencies)
self.add_task(self.transfer_orcid)
self.add_task(self.process_lambda)
self.add_task(self.bq_load_lambda)
# self.add_task(self.upsert_files)

# self.add_task(self.add_new_dataset_release)
# self.add_task(self.cleanup)

# The last task that the next DAG run's ExternalTaskSensor waits for.
self.add_operator(
DummyOperator(
task_id=external_task_id,
)
)

@property
def aws_orcid_key(self) -> Tuple[str, str]:
"""Return API login and password"""
connection = BaseHook.get_connection(self.aws_orcid_conn_id)
return connection.login, connection.password

def make_release(self, **kwargs) -> OrcidRelease:
is_first_run = is_first_dag_run(kwargs["dag_run"])
releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=self.api_dataset_id)

# Get start date
if is_first_run:
assert (
len(releases) == 0
), "fetch_releases: there should be no DatasetReleases stored in the Observatory API on the first DAG run."
start_date = pendulum.instance(datetime.datetime.min)
else:
assert (
len(releases) >= 1
), f"fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run"
start_date = kwargs["data_interval_start"]

return OrcidRelease(
dag_id=self.dag_id,
run_id=kwargs["run_id"],
start_date=start_date,
end_date=kwargs["data_interval_end"],
)

def transfer_orcid(self):
pass

def process_lambda(self, release: OrcidRelease, **kwargs):
"""Downloads, extracts, transforms and uploads the ORCID Lambda manifest file"""
aws_key_id, aws_key = self.aws_orcid_key
s3client = boto3.client("s3", aws_access_key_id=aws_key_id, aws_secret_access_key=aws_key)

# Download from S3 bucket
s3client.download_file(ORCID_AWS_LAMBDA_BUCKET, ORCID_LAMBDA_OBJECT, release.lambda_download_file_path)

# Extract
with tarfile.open(release.lambda_download_file_path) as lambda_tar:
lambda_tar.extractall(release.extract_folder)
assert len(release.lambda_extract_files) == 1, "Unexpected number of files in extract folder"

# Transform
orcid_lambda = load_csv(release.lambda_extract_files[0])
with ProcessPoolExecutor(max_workers=self.max_processes) as executor:
futures = []
for row in orcid_lambda:
future = executor.submit(transform_item, row)
futures.append(future)

transformed = []
for future in as_completed(futures):
transformed.append(future.result())
finished += 1
if finished % 100000 == 0:
logging.info(f"Transformed {finished}/{len(orcid_lambda)} rows")
save_jsonl_gz(release.lambda_transform_file_path, transformed)
gcs_upload_files(
bucket_name=self.cloud_workspace.transform_bucket, file_paths=[release.lambda_transform_file_path]
)

def bq_load_lambda(self, release: OrcidRelease, **kwargs):
bq_create_dataset(
project_id=self.cloud_workspace.project_id,
dataset_id=self.bq_dataset_id,
location=self.cloud_workspace.data_location,
description=self.dataset_description,
)

# Selects all jsonl.gz files in the releases transform folder on the Google Cloud Storage bucket and all of its
# subfolders: https://cloud.google.com/bigquery/docs/batch-loading-data#load-wildcards
uri = gcs_blob_uri(
self.cloud_workspace.transform_bucket,
gcs_blob_name_from_path(release.lambda_transform_file_path),
)
table_id = bq_table_id(
self.cloud_workspace.project_id, self.bq_dataset_id, self.bq_lambda_table_name, release.end_date
)
schema_file_path = bq_find_schema(path=self.schema_folder, table_name=self.bq_table_name)
success = bq_load_table(
uri=uri,
table_id=table_id,
schema_file_path=schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
table_description="The ORCID Lambda Manifest",
ignore_unknown_values=True,
)
set_task_state(success, self.bq_load.__name__, release)


def transform_item(item):
"""Transform a single Crossref Metadata JSON value.
:param item: a JSON value.
:return: the transformed item.
"""

if isinstance(item, dict):
new = {}
for k, v in item.items():
# Replace hyphens with underscores for BigQuery compatibility
k = k.replace("-", "_")

# Get inner array for date parts
if k == "date_created" or k == "last_modified":
try:
datetime.strptime(v, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
v = ""

new[k] = transform_item(v)
return new
elif isinstance(item, list):
return [transform_item(i) for i in item]
else:
return item

0 comments on commit b7c4c05

Please sign in to comment.