-
Notifications
You must be signed in to change notification settings - Fork 15
/
app.py
99 lines (83 loc) · 3.34 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import os
from time import time
from fastapi import FastAPI, Request
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
from prometheus_fastapi_instrumentator import Instrumentator
from pythonjsonlogger import jsonlogger
from starlette.middleware.base import BaseHTTPMiddleware
from .models import Decision, Post, PostList
from .sentiment import get_sentiment
# Set up JSON logging
formatter = jsonlogger.JsonFormatter()
handler = logging.StreamHandler()
# Use OUR `formatter` to format all `logging` entries.
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
for _log in ["uvicorn", "uvicorn.error"]:
# Clear the log handlers for uvicorn loggers, and enable propagation
# so the messages are caught by our root logger and formatted correctly
# by structlog
logging.getLogger(_log).handlers.clear()
logging.getLogger(_log).propagate = True
# Since we re-create the access logs ourselves, to add all information
# in the structured log, we clear the handlers and prevent the logs to propagate to
# a logger higher up in the hierarchy (effectively rendering them silent).
logging.getLogger("uvicorn.access").handlers.clear()
logging.getLogger("uvicorn.access").propagate = False
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time()
response = await call_next(request)
process_time = time() - start_time
logging.info(
{
"message": "request handled",
"path": request.url.path,
"method": request.method,
"processing_time": process_time,
"status_code": response.status_code,
"query_params": request.query_params,
},
)
return response
# Set up OpenTelemetry
otel_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
if otel_endpoint:
resource = Resource(attributes={SERVICE_NAME: "bsky-sentiment"})
tp = TracerProvider(resource=resource, sampler=ParentBasedTraceIdRatio(0.2))
tp.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(
endpoint=otel_endpoint + "v1/traces",
)
)
)
trace.set_tracer_provider(tp)
app = FastAPI()
# Instrument FastAPI for OpenTelemetry
if otel_endpoint:
FastAPIInstrumentor.instrument_app(app)
# Instrument FastAPI for Prometheus
Instrumentator().instrument(
app,
latency_lowr_buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.5, 5, 10],
).expose(app, include_in_schema=False)
# Add logging middleware
app.add_middleware(LoggingMiddleware)
@app.post("/analyze_sentiment", response_model=PostList)
async def analyze_sentiment(posts: PostList):
for post in posts.posts:
sentiment, confidence_score = get_sentiment(post.text)
post.decision = Decision(
sentiment=sentiment, confidence_score=float(confidence_score)
)
return posts