Skip to content

Commit

Permalink
Add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Feb 26, 2024
1 parent dc68b59 commit 66d9608
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 1 deletion.
40 changes: 40 additions & 0 deletions observatory_platform/airflow/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,51 @@ def __init__(self, *, dag_id: str, run_id: str):

@property
def workflow_folder(self):
"""Get the path to the workflow folder, namespaced to a DAG run. Can contain multiple release folders.
:return: path to folder.
"""

return make_workflow_folder(self.dag_id, self.run_id)

@property
def release_folder(self):
"""Get the path to the release folder, which resides inside the workflow folder.
:return: path to folder.
"""

raise NotImplementedError("self.release_folder should be implemented by subclasses")

@property
def download_folder(self):
"""Get the path to the download folder, which contains downloaded files. Resides in a release folder.
:return: path to folder.
"""

path = os.path.join(self.release_folder, "download")
os.makedirs(path, exist_ok=True)
return path

@property
def extract_folder(self):
"""Get the path to the extract folder, which contains extracted files. Resides in a release folder.
:return: path to folder.
"""

path = os.path.join(self.release_folder, "extract")
os.makedirs(path, exist_ok=True)
return path

@property
def transform_folder(self):
"""Get the path to the transform folder, which contains transformed files. Resides in a release folder.
:return: path to folder.
"""

path = os.path.join(self.release_folder, "transform")
os.makedirs(path, exist_ok=True)
return path
Expand Down Expand Up @@ -115,6 +140,11 @@ def __init__(

@property
def release_folder(self):
"""Get the path to the release folder, which resides inside the workflow folder.
:return: path to folder.
"""

return make_workflow_folder(self.dag_id, self.run_id, f"snapshot_{self.snapshot_date.format(DATE_TIME_FORMAT)}")

def __str__(self):
Expand All @@ -141,6 +171,11 @@ def __init__(

@property
def release_folder(self):
"""Get the path to the release folder, which resides inside the workflow folder.
:return: path to folder.
"""

return make_workflow_folder(
self.dag_id, self.run_id, f"partition_{self.partition_date.format(DATE_TIME_FORMAT)}"
)
Expand Down Expand Up @@ -178,6 +213,11 @@ def __init__(

@property
def release_folder(self):
"""Get the path to the release folder, which resides inside the workflow folder.
:return: path to folder.
"""

return make_workflow_folder(
self.dag_id,
self.run_id,
Expand Down
10 changes: 10 additions & 0 deletions observatory_platform/airflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def get_data_path() -> str:


def fetch_workflows() -> List[Workflow]:
"""Fetches Workflow instances from the WORKFLOWS Airflow Variable.
:return: a list of Workflow instances.
"""

workflows = []
workflows_str = Variable.get(AirflowVars.WORKFLOWS)
logging.info(f"workflows_str: {workflows_str}")
Expand All @@ -69,6 +74,11 @@ def fetch_workflows() -> List[Workflow]:


def load_dags_from_config():
"""Loads DAGs from a workflow config file, stored in the WORKFLOWS Airflow Variable.
:return: None.
"""

for workflow in fetch_workflows():
dag_id = workflow.dag_id
logging.info(f"Making Workflow: {workflow.name}, dag_id={dag_id}")
Expand Down
11 changes: 11 additions & 0 deletions observatory_platform/dataset_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def __init__(

@staticmethod
def from_dict(_dict: Dict) -> DatasetRelease:
"""Create a DatasetRelease instance from a dictionary.
:param _dict: the dictionary.
:return: DatasetRelease instance.
"""

return DatasetRelease(
dag_id=_dict["dag_id"],
dataset_id=_dict["dataset_id"],
Expand All @@ -123,6 +129,11 @@ def from_dict(_dict: Dict) -> DatasetRelease:
)

def to_dict(self) -> Dict:
"""Convert this DatasetRelease instance to a dictionary.
:return: a dictionary representation of this DatasetRelease instance.
"""

return dict(
dag_id=self.dag_id,
dataset_id=self.dataset_id,
Expand Down
6 changes: 6 additions & 0 deletions observatory_platform/sandbox/sandbox_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def __init__(

@property
def cloud_workspace(self) -> CloudWorkspace:
"""Return a default CloudWorkspace instance constructed from the sandbox project_id, download_bucket,
transform_bucket and data_location variables.
:return: the CloudWorkspace instance.
"""

return CloudWorkspace(
project_id=self.project_id,
download_bucket=self.download_bucket,
Expand Down
38 changes: 38 additions & 0 deletions observatory_platform/sandbox/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ def __init__(self, *args, **kwargs):

@property
def fake_cloud_workspace(self):
"""Return a fake CloudWorkspace instance for mocking.
:return: a CloudWorkspace instance.
"""

return CloudWorkspace(
project_id="project-id",
download_bucket="download_bucket",
Expand Down Expand Up @@ -328,6 +333,23 @@ def save_empty_file(path: str, file_name: str) -> str:

@contextlib.contextmanager
def bq_dataset_test_env(*, project_id: str, location: str, prefix: str):
"""Creates a BigQuery dataset with a random ID for testing purposes and deletes the dataset and its contents
when the context exits. The BigQuery dataset is created with the given project_id, in the given location and with
the given prefix. See code example below for usage:
.. code-block:: python
with bq_dataset_test_env(
project_id=self.gc_project_id, location=self.gc_location, prefix=self.prefix
) as dataset_id:
print(f"My dataset id: {dataset_id}")
:param project_id: the BigQuery project ID.
:param location: the BigQuery dataset location.
:param prefix: a prefix to add to the dataset ID.
:return: yields the BigQuery dataset ID.
"""

client = bigquery.Client()
dataset_id = prefix + "_" + random_id()
try:
Expand All @@ -339,6 +361,22 @@ def bq_dataset_test_env(*, project_id: str, location: str, prefix: str):

@contextlib.contextmanager
def aws_bucket_test_env(*, prefix: str, region_name: str, expiration_days=1) -> str:
"""Creates an AWS bucket with a random ID for testing purposes and deletes the bucket and its contents
when the context exits. The AWS bucket is named with the given prefix and created in the given region.
See code example below for usage:
.. code-block:: python
with aws_bucket_test_env(prefix="my-prefix", region_name=self.aws_region_name) as aws_bucket_name:
print(f"My AWS bucket name: {aws_bucket_name}")
:param prefix: a prefix for the AWS bucket name.
:param region_name: the AWS region where the bucket should be created.
:param expiration_days: the number of days for objects on the bucket to automatically expire, in case deleting
of the bucket or its contents fail.
:return: yields the AWS bucket name.
"""

# Create an S3 client
s3 = boto3.Session().client("s3", region_name=region_name)
bucket_name = f"obs-test-{prefix}-{random_id()}"
Expand Down
2 changes: 1 addition & 1 deletion observatory_platform/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, dag_id: str, sftp_conn_id: str, sftp_root: str = "/"):
"""Initialise SftpFolders.
:param dag_id: the dag id (namespace + organisation name)
:param sftp_conn_id: TODO
:param sftp_conn_id: the Airflow Connection ID for the SFTP server.
:param sftp_root: optional root to be added to sftp home path
"""
self.dag_id = dag_id
Expand Down

0 comments on commit 66d9608

Please sign in to comment.