-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix/ingest to embed message exchange #113
Conversation
@@ -59,11 +59,12 @@ def ingest_file(self, file: File): | |||
logging.info(f"written {len(items)} chunks to elasticsearch") | |||
|
|||
for chunk in chunks: | |||
queue_item = EmbedQueueItem(model=env.embedding_model, sentence=chunk.text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the key change
|
||
poll_thread = threading.Thread(target=poll_queue_every, args=(queue_uri, queue_name, queue_poll_interval)) | ||
poll_thread.start() | ||
thread = threading.Thread(target=subscribe_to_queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we always want to run this thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
|
||
connection = None | ||
|
||
for i in range(max_connection_attempts): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pika
already handles retries
@@ -171,62 +161,24 @@ def embed_sentences(model: str, sentences: list[str]): | |||
return output | |||
|
|||
|
|||
def poll_queue_every(queue_uri: str, queue_name: str, interval: int = 5): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ive tested this IRL, there is no need to poll as pika handles this via the callback mechanism. see guidance-for-review
ch.basic_ack(delivery_tag=method.delivery_tag) | ||
ch.stop_consuming() | ||
finally: | ||
ch.basic_ack(delivery_tag=method.delivery_tag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is basic_nack
then the unprocessed items remain in the queue.... tbh i wasnt sure what we want to do here?
|
||
poll_thread = threading.Thread(target=poll_queue_every, args=(queue_uri, queue_name, queue_poll_interval)) | ||
poll_thread.start() | ||
thread = threading.Thread(target=subscribe_to_queue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
Context
In #108 I erroneously introduced an error where I put a
Chunk
rather than aEmbedQueueItem
message into the embed queueChanges proposed in this pull request
ingest
app i now put anEmbedQueueItem
onto the embed queueGuidance to review
upload a file via: http://0.0.0.0:5002/docs#/file/create_upload_file_file_post
watch http://localhost:15672/#/queues and see that:
docker compose logs -f emebd
Relevant links
n/a
Things to check