diff --git a/observatory-platform/observatory/platform/files.py b/observatory-platform/observatory/platform/files.py index 8e061053d..26283e851 100644 --- a/observatory-platform/observatory/platform/files.py +++ b/observatory-platform/observatory/platform/files.py @@ -238,6 +238,17 @@ def save_jsonl_gz(file_path: str, data: List[Dict]) -> None: jsonl_gzip_file.write(bytes_io.getvalue()) +def save_jsonl(file_path: str, data: List[Dict]) -> None: + """Takes a list of dictionaries and writes this to a jsonl file. + :param file_path: Path to the .jsonl.gz file + :param data: a list of dictionaries that can be written out with jsonlines + :return: None. + """ + + with jsonlines.Writer(file_path) as writer: + writer.write_all(data) + + def load_file(file_name: str, modes="r"): """Load a file. diff --git a/observatory-platform/observatory/platform/workflows/workflow.py b/observatory-platform/observatory/platform/workflows/workflow.py index 60cf905ef..d5d9bbbe6 100644 --- a/observatory-platform/observatory/platform/workflows/workflow.py +++ b/observatory-platform/observatory/platform/workflows/workflow.py @@ -217,8 +217,8 @@ def __init__( *, dag_id: str, run_id: str, - changefile_start_date: pendulum.DateTime = None, - changefile_end_date: pendulum.DateTime = None, + start_date: pendulum.DateTime = None, + end_date: pendulum.DateTime = None, sequence_start: int = None, sequence_end: int = None, ): @@ -226,27 +226,27 @@ def __init__( :param dag_id: the DAG ID. :param run_id: the DAG's run ID. - :param changefile_start_date: the date of the first changefile processed in this release. - :param changefile_end_date: the date of the last changefile processed in this release. + :param start_date: the date of the first changefile processed in this release. + :param end_date: the date of the last changefile processed in this release. :param sequence_start: the starting sequence number of files that make up this release. :param sequence_end: the end sequence number of files that make up this release. """ super().__init__(dag_id=dag_id, run_id=run_id) - self.changefile_start_date = changefile_start_date - self.changefile_end_date = changefile_end_date + self.start_date = start_date + self.end_date = end_date self.sequence_start = sequence_start self.sequence_end = sequence_end - changefile = f"changefile_{changefile_start_date.format(DATE_TIME_FORMAT)}_to_{changefile_end_date.format(DATE_TIME_FORMAT)}" + changefile = f"changefile_{start_date.format(DATE_TIME_FORMAT)}_to_{end_date.format(DATE_TIME_FORMAT)}" self.download_folder = make_workflow_folder(self.dag_id, run_id, changefile, "download") self.extract_folder = make_workflow_folder(self.dag_id, run_id, changefile, "extract") self.transform_folder = make_workflow_folder(self.dag_id, run_id, changefile, "transform") def __str__(self): return ( - f"Release(dag_id={self.dag_id}, run_id={self.run_id}, changefile_start_date={self.changefile_start_date}, " - f"changefile_end_date={self.changefile_end_date}, sequence_start={self.sequence_start}, sequence_end={self.sequence_end})" + f"Release(dag_id={self.dag_id}, run_id={self.run_id}, start_date={self.start_date}, " + f"end_date={self.end_date}, sequence_start={self.sequence_start}, sequence_end={self.sequence_end})" )