Skip to content

Commit

Permalink
feat: overhaul backup and restore process
Browse files Browse the repository at this point in the history
  • Loading branch information
CoolFool committed Sep 12, 2023
1 parent 1f49893 commit 3d1b5be
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 34 deletions.
52 changes: 22 additions & 30 deletions app/tasks/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@ def backup_collection(collections):
snapshot_url = ""
hosts = get_hosts_for_backup()
hosts = Hosts(**hosts)
client = QdrantClient(
host=settings.QDRANT_DB_HOST,
port=settings.QDRANT_DB_PORT,
grpc_port=settings.QDRANT_DB_GRPC_PORT,
prefer_grpc=True,
)
for collection in collections.collections:
try:
snapshot = client.create_snapshot(collection_name=collection)
except RpcError as rpc_error:
if rpc_error.code() == StatusCode.NOT_FOUND:
logger.warn(
f"{collection} replica not found on host {client._client._host}"
)
return
logger.exception(rpc_error)
"""
Qdrant doesn't give the qdrant-node (OR) hosts that actually has the collection snapshot.
So when you send a request for creating a collection snapshot to the qdrant service endpoint it will succesfully create the snapshot but
when you request the snapshot from the service it will hit any one of the qdrant-node and if it doesn't contain the snapshot,
it will give a 404 and backup operation will fail.
To fix this, we check every hosts to see if it contains the snapshot and once the right host is found we process it further.
"""
"""The collection on each host will be different as qdrant replicates the collection across. So, if at the time
of creating the snapshot the host that is behind a load balancer doesn't have the latest collection it will take snapshot of a older one from some other node and store it there.
So, we go one by one to all the hosts and create a collection snapshot for each one of them. That way when we recover the collection on a particular
host, the collection state on the host will be reset to exactly the way it was at the time of backup.
This ofcourse won't be required if the replication factor and write_consistency_factor for the collection is set in a way that collection or the records
in a collection is only accepted if it is replicated across all the nodes."""
for host in hosts.hosts:
client = QdrantClient(
host=host.host,
port=host.port,
grpc_port=host.grpc_port,
prefer_grpc=True,
)
try:
snapshot = client.create_snapshot(collection_name=collection)
except RpcError as rpc_error:
if rpc_error.code() == StatusCode.NOT_FOUND:
logger.warn(
f"{collection} replica not found on host {client._client._host}"
)
return
logger.exception(rpc_error)
snapshot_url = f"http://{host.host}:{host.port}/collections/{collection}/snapshots/{snapshot.name}"
if (
verify_collection_snapshot(snapshot_url=snapshot_url)
Expand All @@ -57,20 +56,13 @@ def backup_collection(collections):
logger.info(
f"{collection} snapshot {snapshot.name} found on host {host.host}"
)
client_with_snapshot = QdrantClient(
host=host.host,
port=host.port,
grpc_port=host.grpc_port,
prefer_grpc=True,
)
threads.append(
Thread(
target=backup_collection_handler,
args=(client_with_snapshot, collection, snapshot),
args=(client, collection, snapshot),
)
)
threads[-1].start()
break
else:
logger.warn(
f"{collection} snapshot {snapshot.name} not found on host {host.host}"
Expand Down
3 changes: 3 additions & 0 deletions app/tasks/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

@app.task()
def ingest(batch):
"""
We use the official Qdrant Library with .upsert with "batch" because we don't have to create a new PointStruct object for every point being insserted.
"""
client = QdrantClient(
host=settings.QDRANT_DB_HOST,
port=settings.QDRANT_DB_PORT,
Expand Down
5 changes: 4 additions & 1 deletion app/tasks/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from app.tasks.utils import restore_collection_handler

"""
In distributed mode, full storage recovery can't be performed using the API. Only collection restoration is supported
In distributed mode, full storage recovery can't be performed using the API. Only collection restoration is supported.
Restoring the collection means restoing the "collection" on a "host"
(OR)
reseting the state of the collection on a host to a particular point in time (i.e time of backup)
"""


Expand Down
12 changes: 9 additions & 3 deletions app/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def get_latest_snapshot(s3, prefix):

def get_snapshot_signed_url_from_s3(s3, prefix):
latest_snapshot = get_latest_snapshot(s3=s3, prefix=prefix)
if latest_snapshot is None:
logger.error(f"-> Snapshot not found with prefix {prefix}")
return None, None
url = s3.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": settings.S3_BUCKET_NAME, "Key": latest_snapshot["Key"]},
Expand Down Expand Up @@ -65,11 +68,14 @@ def restore_collection_handler(host, collection_name, snapshot_url):
endpoint_url=settings.AWS_ENDPOINT_URL,
config=Config(signature_version="s3v4"),
)
prefix = f"collection/{collection_name}"
prefix = f"collection/{host.host}/{collection_name}"
logger.info("-> Generating S3 Pre-signed URL")
snapshot_path,latest_snapshot = get_snapshot_signed_url_from_s3(s3_client, prefix)
if snapshot_path is None or latest_snapshot is None:
logger.error(f"-> Snapshot not found for host {host.host}")
return
logger.info(
f"-> Starting recovery process for collection {collection_name} on host {host.host} with snapshot {latest_snapshot}"
f"-> Starting recovery process for collection {collection_name} on host {host.host} with snapshot {latest_snapshot['Key']}"
)
return recover_collection_from_snapshot(host, snapshot_path, collection_name)

Expand Down Expand Up @@ -221,7 +227,7 @@ def get_hosts_for_backup():

def backup_collection_handler(client, collection_name, snapshot):
snapshot_url = f"http://{client._client._host}:{client._client._port}/collections/{collection_name}/snapshots/{snapshot.name}"
prefix = f"collection/{collection_name}"
prefix = f"collection/{client._client._host}/{collection_name}"
logger.info(
f"Starting backup handling process with snapshot: {snapshot.name} on host {client._client._host}"
)
Expand Down
2 changes: 2 additions & 0 deletions poetry.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[virtualenvs]
create = true

0 comments on commit 3d1b5be

Please sign in to comment.