Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added backup_workspace function #30

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions src/lrmaCU/terra/workspace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ def get_workspace_bucket(ns: str, ws: str, max_attempts: int = 2) -> str:
return _query_workspace(ns, ws, max_attempts)['bucketName']


def get_workspace_attributes(ns: str, ws: str, max_attempts: int = 2) -> dict[str, str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest rename this function a bit (sorry for naming the existing one sub-optimally).

get_workspace_all_attributes to avoid (easy to make) typos

"""
Get names and values of all attributes in a workspace.

:param ns:
:param ws:
:param max_attempts:
:return:
"""
return _query_workspace(ns, ws, max_attempts)['attributes']


def get_workspace_attribute(ns: str, ws: str, attribute_name: str, max_attempts: int = 2):
"""
Get value of an existing attribute.
Expand Down Expand Up @@ -244,3 +256,161 @@ def _filter_sub_by_timestamp(submissions: List[dict],
"""
return [sub for sub in submissions
if earlier_cutoff <= parser.parse(sub['submissionDate']) <= later_cutoff]

def backup_workspace(ns: str, ws: str, backup_bucket: str, max_attempts: int = 2) -> None:
"""
Backup a workspace by copying all tables, notebooks, workflow information, and job history to a given google bucket.

:param ns: Namespace of the workspace to back up.
:param ws: Name of the workspace to back up.
:param backup_bucket: Destination bucket into which to save the backup.
:param max_attempts: Maximum number of attempts to make for each API call.
:return:
"""

# Track start and end time so we can report how long the backup took:
start_time = time.time()

# Remove leading `gs://` if present:
backup_bucket = backup_bucket.replace("gs://", "")

# Get our entity types so we know what to dump:
entity_types = fapi.list_entity_types(ns, ws).json()

timestamp = datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
workspace_bucket = fapi.get_workspace(ns, ws).json()["workspace"]["bucketName"]
backup_folder_path = f"{timestamp}"

# Create our timestamped backup bucket:
storage_client = storage.Client()
bucket = storage_client.bucket(backup_bucket)

# Backup the entities:
# Iterate over entity types and dump each one to a separate TSV:
print(f"Writing workspace entities to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}/tables")
for et in entity_types:
print(f"\t{et}")
tbl, _ = load_table(ns, ws, et)
tbl = fix_nans(tbl)
table_name = f"{timestamp}_{ns}_{ws}_{et}.tsv.gz"

# Write our table to our bucket:
blob = bucket.blob(f"{backup_folder_path}/tables/{table_name}")

with io.StringIO() as buf:
tbl.to_csv(buf, sep="\t", index=False)
with blob.open('wb') as f:
f.write(gzip.compress(bytes(buf.getvalue(), 'utf-8')))
print('Done.')

# Now backup the notebooks:
print("Writing notebooks to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}/notebooks")
for notebook_blob in storage_client.list_blobs(workspace_bucket, prefix='notebooks'):
original_name = notebook_blob.name[notebook_blob.name.find("/") + 1:]
print(f"\t{original_name}")
notebook_name = f"{timestamp}_{ns}_{ws}_{original_name}"
bucket.copy_blob(notebook_blob, bucket, new_name=f"{backup_folder_path}/notebooks/{notebook_name}")
print("Done.")

# Now store workspace attributes:
ws_dict = lrma_workspace_utils._query_workspace(ns, ws, 2)
ws_attributes_dict = ws_dict['attributes']

# Write our dict to our bucket:
print(f"Writing workspace attributes to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}")
_write_json_file_to_bucket(f"{backup_folder_path}", bucket, timestamp, ns, ws, ws_attributes_dict,
f"workspace_attributes")

print(f"Done.")
print()

# and store the workspace metadata:
del ws_dict['attributes']

print(f"Writing workspace metadata to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}")
_write_json_file_to_bucket(f"{backup_folder_path}", bucket, timestamp, ns, ws, ws_dict,
f"workspace_metadata")
print(f"Done.")

# Now store workspace method (i.e. workflow) information:
response = lrma_workspace_utils.retry_fiss_api_call('list_workspace_configs', 2, ns, ws, True)
workflow_dict = response.json()

# Sort our workflows in alphabetical order:
workflow_dict = sorted(workflow_dict, key=lambda k: k['name'])

print(f"Writing workflow high-level information to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}")

_write_json_file_to_bucket(f"{backup_folder_path}", bucket, timestamp, ns, ws, workflow_dict,
f"workflows")
print(f'Done.')

# Get the metadata, default inputs, and default outputs for each workflow:
print(f"Writing workflow metadata, inputs, and outputs to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}/workflows")

for workflow_name in [w["name"] for w in workflow_dict]:
print(f"\t{workflow_name}")

response = lrma_workspace_utils.retry_fiss_api_call('get_workspace_config', 2, ns, ws, ns,
workflow_name)
metadata = response.json()
inputs = metadata['inputs']
outputs = metadata['outputs']
del metadata['inputs']
del metadata['outputs']

_write_json_file_to_bucket(f"{backup_folder_path}/workflows", bucket, timestamp, ns, ws, inputs,
f"{workflow_name}_workflow_inputs")

_write_json_file_to_bucket(f"{backup_folder_path}/workflows", bucket, timestamp, ns, ws, outputs,
f"{workflow_name}_workflow_outputs")

_write_json_file_to_bucket(f"{backup_folder_path}/workflows", bucket, timestamp, ns, ws, outputs,
f"{workflow_name}_workflow_metadata")
print(f"Done")

# Finally store submissions:
response = lrma_workspace_utils.retry_fiss_api_call('list_submissions', 2, ns, ws)
submissions_dict = response.json()

# Sort our submissions from most recent to least recent:
submissions_dict = sorted(submissions_dict, key=lambda k: k['submissionDate'], reverse=True)

print(f"Writing workspace job history to backup dir:")
print(f"gs://{backup_bucket}/{backup_folder_path}")
_write_json_file_to_bucket(backup_folder_path, bucket, timestamp, ns, ws, submissions_dict, "workspace_job_history")
print(f"Done.")

# Track end time so we can calculate elapsed time for backup:
end_time = time.time()

now_utc = datetime.datetime.utcnow()
timezone = pytz.timezone('America/New_York')
now_et = now_utc.astimezone(timezone)
time_string = now_et.strftime("%A %B %d at %H:%M:%S ET")
print(f"Backup completed on {time_string}")
print(f"Backup location: gs://{backup_bucket}/{backup_folder_path}")
print(f"Elapsed time: {end_time - start_time:2.2f}s")


def _write_json_file_to_bucket(backup_folder_path, bucket, timestamp, namespace, workspace, json_object, file_base_name):
"""
Write a JSON object to a GCS bucket.
:param backup_folder_path: The folder path in the bucket to write to.
:param bucket: The bucket to write to.
:param timestamp: The timestamp of the backup.
:param namespace: The namespace of the workspace.
:param workspace: The name of the workspace.
:param json_object: The JSON object to write.
:param file_base_name: The base name of the file to write.
"""
# Write our dict to our bucket:
blob = bucket.blob(f"{backup_folder_path}/{timestamp}_{namespace}_{workspace}_{file_base_name}.json.gz")
with blob.open('wb') as f:
f.write(gzip.compress(bytes(json.dumps(json_object, indent=4), 'utf-8')))