Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed May 11, 2023
1 parent 3436180 commit ce6ce1e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
11 changes: 11 additions & 0 deletions observatory-platform/observatory/platform/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,17 @@ def save_jsonl_gz(file_path: str, data: List[Dict]) -> None:
jsonl_gzip_file.write(bytes_io.getvalue())


def save_jsonl(file_path: str, data: List[Dict]) -> None:
"""Takes a list of dictionaries and writes this to a jsonl file.
:param file_path: Path to the .jsonl.gz file
:param data: a list of dictionaries that can be written out with jsonlines
:return: None.
"""

with jsonlines.Writer(file_path) as writer:
writer.write_all(data)


def load_file(file_name: str, modes="r"):
"""Load a file.
Expand Down
18 changes: 9 additions & 9 deletions observatory-platform/observatory/platform/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,36 +217,36 @@ def __init__(
*,
dag_id: str,
run_id: str,
changefile_start_date: pendulum.DateTime = None,
changefile_end_date: pendulum.DateTime = None,
start_date: pendulum.DateTime = None,
end_date: pendulum.DateTime = None,
sequence_start: int = None,
sequence_end: int = None,
):
"""Construct a ChangefileRelease instance
:param dag_id: the DAG ID.
:param run_id: the DAG's run ID.
:param changefile_start_date: the date of the first changefile processed in this release.
:param changefile_end_date: the date of the last changefile processed in this release.
:param start_date: the date of the first changefile processed in this release.
:param end_date: the date of the last changefile processed in this release.
:param sequence_start: the starting sequence number of files that make up this release.
:param sequence_end: the end sequence number of files that make up this release.
"""

super().__init__(dag_id=dag_id, run_id=run_id)
self.changefile_start_date = changefile_start_date
self.changefile_end_date = changefile_end_date
self.start_date = start_date
self.end_date = end_date
self.sequence_start = sequence_start
self.sequence_end = sequence_end

changefile = f"changefile_{changefile_start_date.format(DATE_TIME_FORMAT)}_to_{changefile_end_date.format(DATE_TIME_FORMAT)}"
changefile = f"changefile_{start_date.format(DATE_TIME_FORMAT)}_to_{end_date.format(DATE_TIME_FORMAT)}"
self.download_folder = make_workflow_folder(self.dag_id, run_id, changefile, "download")
self.extract_folder = make_workflow_folder(self.dag_id, run_id, changefile, "extract")
self.transform_folder = make_workflow_folder(self.dag_id, run_id, changefile, "transform")

def __str__(self):
return (
f"Release(dag_id={self.dag_id}, run_id={self.run_id}, changefile_start_date={self.changefile_start_date}, "
f"changefile_end_date={self.changefile_end_date}, sequence_start={self.sequence_start}, sequence_end={self.sequence_end})"
f"Release(dag_id={self.dag_id}, run_id={self.run_id}, start_date={self.start_date}, "
f"end_date={self.end_date}, sequence_start={self.sequence_start}, sequence_end={self.sequence_end})"
)


Expand Down

0 comments on commit ce6ce1e

Please sign in to comment.