-
Notifications
You must be signed in to change notification settings - Fork 5
/
subscriber.py
43 lines (29 loc) · 843 Bytes
/
subscriber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import time
import aiohttp
import redis
from fastapi import FastAPI
from settings import settings
db = redis.Redis(
host=settings.subscriber_redis_url,
port=settings.subscriber_redis_port,
decode_responses=True
)
consumer = db.pubsub()
app = FastAPI()
URL = f"http://{settings.subscriber_url}:{settings.subscriber_port}/"
async def request(session):
async with session.get(URL) as response:
return await response.json()
def process_messages():
consumer.subscribe(settings.redis_channel)
for messages in consumer.listen():
print(messages)
time.sleep(0.5)
async def task():
async with aiohttp.ClientSession() as session:
await request(session)
# publish to queue
process_messages()
@app.get("/beers/subscribe")
async def get_beer_info():
await task()