diff --git a/src/lrmaCU/terra/workspace_utils.py b/src/lrmaCU/terra/workspace_utils.py index 49d0392..dc499e2 100644 --- a/src/lrmaCU/terra/workspace_utils.py +++ b/src/lrmaCU/terra/workspace_utils.py @@ -22,6 +22,18 @@ def get_workspace_bucket(ns: str, ws: str, max_attempts: int = 2) -> str: return _query_workspace(ns, ws, max_attempts)['bucketName'] +def get_workspace_attributes(ns: str, ws: str, max_attempts: int = 2) -> dict[str, str]: + """ + Get names and values of all attributes in a workspace. + + :param ns: + :param ws: + :param max_attempts: + :return: + """ + return _query_workspace(ns, ws, max_attempts)['attributes'] + + def get_workspace_attribute(ns: str, ws: str, attribute_name: str, max_attempts: int = 2): """ Get value of an existing attribute. @@ -244,3 +256,161 @@ def _filter_sub_by_timestamp(submissions: List[dict], """ return [sub for sub in submissions if earlier_cutoff <= parser.parse(sub['submissionDate']) <= later_cutoff] + +def backup_workspace(ns: str, ws: str, backup_bucket: str, max_attempts: int = 2) -> None: + """ + Backup a workspace by copying all tables, notebooks, workflow information, and job history to a given google bucket. + + :param ns: Namespace of the workspace to back up. + :param ws: Name of the workspace to back up. + :param backup_bucket: Destination bucket into which to save the backup. + :param max_attempts: Maximum number of attempts to make for each API call. + :return: + """ + + # Track start and end time so we can report how long the backup took: + start_time = time.time() + + # Remove leading `gs://` if present: + backup_bucket = backup_bucket.replace("gs://", "") + + # Get our entity types so we know what to dump: + entity_types = fapi.list_entity_types(ns, ws).json() + + timestamp = datetime.datetime.now().strftime("%Y%m%dT%H%M%S") + workspace_bucket = fapi.get_workspace(ns, ws).json()["workspace"]["bucketName"] + backup_folder_path = f"{timestamp}" + + # Create our timestamped backup bucket: + storage_client = storage.Client() + bucket = storage_client.bucket(backup_bucket) + + # Backup the entities: + # Iterate over entity types and dump each one to a separate TSV: + print(f"Writing workspace entities to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}/tables") + for et in entity_types: + print(f"\t{et}") + tbl, _ = load_table(ns, ws, et) + tbl = fix_nans(tbl) + table_name = f"{timestamp}_{ns}_{ws}_{et}.tsv.gz" + + # Write our table to our bucket: + blob = bucket.blob(f"{backup_folder_path}/tables/{table_name}") + + with io.StringIO() as buf: + tbl.to_csv(buf, sep="\t", index=False) + with blob.open('wb') as f: + f.write(gzip.compress(bytes(buf.getvalue(), 'utf-8'))) + print('Done.') + + # Now backup the notebooks: + print("Writing notebooks to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}/notebooks") + for notebook_blob in storage_client.list_blobs(workspace_bucket, prefix='notebooks'): + original_name = notebook_blob.name[notebook_blob.name.find("/") + 1:] + print(f"\t{original_name}") + notebook_name = f"{timestamp}_{ns}_{ws}_{original_name}" + bucket.copy_blob(notebook_blob, bucket, new_name=f"{backup_folder_path}/notebooks/{notebook_name}") + print("Done.") + + # Now store workspace attributes: + ws_dict = lrma_workspace_utils._query_workspace(ns, ws, 2) + ws_attributes_dict = ws_dict['attributes'] + + # Write our dict to our bucket: + print(f"Writing workspace attributes to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}") + _write_json_file_to_bucket(f"{backup_folder_path}", bucket, timestamp, ns, ws, ws_attributes_dict, + f"workspace_attributes") + + print(f"Done.") + print() + + # and store the workspace metadata: + del ws_dict['attributes'] + + print(f"Writing workspace metadata to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}") + _write_json_file_to_bucket(f"{backup_folder_path}", bucket, timestamp, ns, ws, ws_dict, + f"workspace_metadata") + print(f"Done.") + + # Now store workspace method (i.e. workflow) information: + response = lrma_workspace_utils.retry_fiss_api_call('list_workspace_configs', 2, ns, ws, True) + workflow_dict = response.json() + + # Sort our workflows in alphabetical order: + workflow_dict = sorted(workflow_dict, key=lambda k: k['name']) + + print(f"Writing workflow high-level information to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}") + + _write_json_file_to_bucket(f"{backup_folder_path}", bucket, timestamp, ns, ws, workflow_dict, + f"workflows") + print(f'Done.') + + # Get the metadata, default inputs, and default outputs for each workflow: + print(f"Writing workflow metadata, inputs, and outputs to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}/workflows") + + for workflow_name in [w["name"] for w in workflow_dict]: + print(f"\t{workflow_name}") + + response = lrma_workspace_utils.retry_fiss_api_call('get_workspace_config', 2, ns, ws, ns, + workflow_name) + metadata = response.json() + inputs = metadata['inputs'] + outputs = metadata['outputs'] + del metadata['inputs'] + del metadata['outputs'] + + _write_json_file_to_bucket(f"{backup_folder_path}/workflows", bucket, timestamp, ns, ws, inputs, + f"{workflow_name}_workflow_inputs") + + _write_json_file_to_bucket(f"{backup_folder_path}/workflows", bucket, timestamp, ns, ws, outputs, + f"{workflow_name}_workflow_outputs") + + _write_json_file_to_bucket(f"{backup_folder_path}/workflows", bucket, timestamp, ns, ws, outputs, + f"{workflow_name}_workflow_metadata") + print(f"Done") + + # Finally store submissions: + response = lrma_workspace_utils.retry_fiss_api_call('list_submissions', 2, ns, ws) + submissions_dict = response.json() + + # Sort our submissions from most recent to least recent: + submissions_dict = sorted(submissions_dict, key=lambda k: k['submissionDate'], reverse=True) + + print(f"Writing workspace job history to backup dir:") + print(f"gs://{backup_bucket}/{backup_folder_path}") + _write_json_file_to_bucket(backup_folder_path, bucket, timestamp, ns, ws, submissions_dict, "workspace_job_history") + print(f"Done.") + + # Track end time so we can calculate elapsed time for backup: + end_time = time.time() + + now_utc = datetime.datetime.utcnow() + timezone = pytz.timezone('America/New_York') + now_et = now_utc.astimezone(timezone) + time_string = now_et.strftime("%A %B %d at %H:%M:%S ET") + print(f"Backup completed on {time_string}") + print(f"Backup location: gs://{backup_bucket}/{backup_folder_path}") + print(f"Elapsed time: {end_time - start_time:2.2f}s") + + +def _write_json_file_to_bucket(backup_folder_path, bucket, timestamp, namespace, workspace, json_object, file_base_name): + """ + Write a JSON object to a GCS bucket. + :param backup_folder_path: The folder path in the bucket to write to. + :param bucket: The bucket to write to. + :param timestamp: The timestamp of the backup. + :param namespace: The namespace of the workspace. + :param workspace: The name of the workspace. + :param json_object: The JSON object to write. + :param file_base_name: The base name of the file to write. + """ + # Write our dict to our bucket: + blob = bucket.blob(f"{backup_folder_path}/{timestamp}_{namespace}_{workspace}_{file_base_name}.json.gz") + with blob.open('wb') as f: + f.write(gzip.compress(bytes(json.dumps(json_object, indent=4), 'utf-8'))) \ No newline at end of file