Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions backend/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import boto3
import pika
import aio_pika
from aio_pika.abc import AbstractChannel

from app.config import settings
from app.search.connect import connect_elasticsearch
from minio import Minio
Expand Down Expand Up @@ -74,18 +77,27 @@ async def get_external_fs() -> AsyncGenerator[Minio, None]:
yield file_system


def get_rabbitmq() -> BlockingChannel:
async def get_rabbitmq() -> AbstractChannel:
"""Client to connect to RabbitMQ for listeners/extractors interactions."""
credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
settings.RABBITMQ_HOST, credentials=credentials
)
RABBITMQ_URL = f"amqp://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_HOST}/"

logger.debug("Connecting to rabbitmq at %s", settings.RABBITMQ_HOST)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
connection = await aio_pika.connect_robust(RABBITMQ_URL)
channel = await connection.channel()

print(f"DEBUG: get_rabbitmq() called. Returning channel of type: {type(channel)}")
return channel


# Keep the old function for compatibility if needed
def get_blocking_rabbitmq() -> BlockingChannel:
"""Legacy blocking RabbitMQ client (for extractors that need it)"""
credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS)
parameters = pika.ConnectionParameters(settings.RABBITMQ_HOST, credentials=credentials)
connection = pika.BlockingConnection(parameters)
return connection.channel()


async def get_elasticsearchclient():
es = await connect_elasticsearch()
return es
70 changes: 36 additions & 34 deletions backend/app/rabbitmq/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,39 @@
from app.routers.users import get_user_job_key
from fastapi import Depends
from pika.adapters.blocking_connection import BlockingChannel
import aio_pika
from aio_pika.abc import AbstractChannel


async def create_reply_queue():
channel: BlockingChannel = dependencies.get_rabbitmq()

if (
config_entry := await ConfigEntryDB.find_one({"key": "instance_id"})
) is not None:
async def create_reply_queue(channel: AbstractChannel):
if (config_entry := await ConfigEntryDB.find_one({"key": "instance_id"})) is not None:
instance_id = config_entry.value
else:
# If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier
instance_id = "".join(
random.choice(
string.ascii_uppercase + string.ascii_lowercase + string.digits
)
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits)
for _ in range(10)
)
config_entry = ConfigEntryDB(key="instance_id", value=instance_id)
await config_entry.insert()

queue_name = "clowder.%s" % instance_id
channel.exchange_declare(exchange="clowder", durable=True)
result = channel.queue_declare(
queue=queue_name, durable=True, exclusive=False, auto_delete=False
)
queue_name = result.method.queue
channel.queue_bind(exchange="clowder", queue=queue_name)
return queue_name
queue_name = f"clowder.{instance_id}"

# Use aio_pika methods instead of pika methods
exchange = await channel.declare_exchange("clowder", durable=True)
queue = await channel.declare_queue(queue_name, durable=True, exclusive=False, auto_delete=False)
await queue.bind(exchange)

return queue.name


async def submit_file_job(
file_out: FileOut,
routing_key: str,
parameters: dict,
user: UserOut,
rabbitmq_client: BlockingChannel,
rabbitmq_client: AbstractChannel,
):
print(f"DEBUG submit_file_job: Got client of type: {type(rabbitmq_client)}")
# Create an entry in job history with unique ID
job = EventListenerJobDB(
listener_id=routing_key,
Expand All @@ -65,6 +61,7 @@ async def submit_file_job(
)
await job.insert()


current_secretKey = await get_user_job_key(user.email)
msg_body = EventListenerJobMessage(
filename=file_out.name,
Expand All @@ -75,15 +72,19 @@ async def submit_file_job(
job_id=str(job.id),
parameters=parameters,
)
reply_to = await create_reply_queue()

# Use aio_pika publishing
# Get the existing clowder exchange
reply_to = await create_reply_queue(rabbitmq_client)
print("RABBITMQ_CLIENT: " + str(rabbitmq_client))
rabbitmq_client.basic_publish(
exchange="",
routing_key=routing_key,
body=json.dumps(msg_body.dict(), ensure_ascii=False),
properties=pika.BasicProperties(
content_type="application/json", delivery_mode=1, reply_to=reply_to
await rabbitmq_client.default_exchange.publish(
aio_pika.Message(
body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
reply_to=reply_to,
),
routing_key=routing_key,
)
return str(job.id)

Expand All @@ -93,7 +94,7 @@ async def submit_dataset_job(
routing_key: str,
parameters: dict,
user: UserOut,
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
rabbitmq_client: AbstractChannel,
):
# Create an entry in job history with unique ID
job = EventListenerJobDB(
Expand All @@ -113,13 +114,14 @@ async def submit_dataset_job(
job_id=str(job.id),
parameters=parameters,
)
reply_to = await create_reply_queue()
rabbitmq_client.basic_publish(
exchange="",
routing_key=routing_key,
body=json.dumps(msg_body.dict(), ensure_ascii=False),
properties=pika.BasicProperties(
content_type="application/json", delivery_mode=1, reply_to=reply_to
reply_to = await create_reply_queue(rabbitmq_client)
await rabbitmq_client.default_exchange.publish(
aio_pika.Message(
body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
reply_to=reply_to,
),
routing_key=routing_key,
)
return str(job.id)
12 changes: 7 additions & 5 deletions backend/app/routers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
from fastapi.security import HTTPBearer
from minio import Minio
from pika.adapters.blocking_connection import BlockingChannel
import aio_pika
from aio_pika.abc import AbstractChannel
from pymongo import DESCENDING
from rocrate.model.person import Person
from rocrate.rocrate import ROCrate
Expand Down Expand Up @@ -944,7 +946,7 @@ async def save_file(
fs: Minio = Depends(dependencies.get_fs),
file: UploadFile = File(...),
es=Depends(dependencies.get_elasticsearchclient),
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
allow: bool = Depends(Authorization("uploader")),
):
if (dataset := await DatasetDB.get(PydanticObjectId(dataset_id))) is not None:
Expand Down Expand Up @@ -996,7 +998,7 @@ async def save_files(
user=Depends(get_current_user),
fs: Minio = Depends(dependencies.get_fs),
es=Depends(dependencies.get_elasticsearchclient),
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
allow: bool = Depends(Authorization("uploader")),
):
if (dataset := await DatasetDB.get(PydanticObjectId(dataset_id))) is not None:
Expand Down Expand Up @@ -1056,7 +1058,7 @@ async def save_local_file(
folder_id: Optional[str] = None,
user=Depends(get_current_user),
es=Depends(dependencies.get_elasticsearchclient),
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
allow: bool = Depends(Authorization("uploader")),
):
if (dataset := await DatasetDB.get(PydanticObjectId(dataset_id))) is not None:
Expand Down Expand Up @@ -1110,7 +1112,7 @@ async def create_dataset_from_zip(
fs: Minio = Depends(dependencies.get_fs),
file: UploadFile = File(...),
es: Elasticsearch = Depends(dependencies.get_elasticsearchclient),
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
token: str = Depends(get_token),
):
if file.filename.endswith(".zip") is False:
Expand Down Expand Up @@ -1427,7 +1429,7 @@ async def get_dataset_extract(
# parameters don't have a fixed model shape
parameters: dict = None,
user=Depends(get_current_user),
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq),
allow: bool = Depends(Authorization("uploader")),
):
if extractorName is None:
Expand Down
5 changes: 3 additions & 2 deletions backend/app/routers/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from beanie.operators import Or, RegEx
from fastapi import APIRouter, Depends, HTTPException
from pika.adapters.blocking_connection import BlockingChannel

import aio_pika
from aio_pika.abc import AbstractChannel
from app.deps.authorization_deps import FeedAuthorization, ListenerAuthorization
from app.keycloak_auth import get_current_user, get_current_username
from app.models.feeds import FeedDB, FeedIn, FeedOut
Expand Down Expand Up @@ -41,7 +42,7 @@ async def check_feed_listeners(
es_client,
file_out: FileOut,
user: UserOut,
rabbitmq_client: BlockingChannel,
rabbitmq_client: AbstractChannel,
):
"""Automatically submit new file to listeners on feeds that fit the search criteria."""
listener_ids_found = []
Expand Down
Loading
Loading