Skip to content

Commit

Permalink
Otel cleanup (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
UTXOnly committed Jul 4, 2024
1 parent a7bb700 commit 23fec39
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 104 deletions.
25 changes: 3 additions & 22 deletions docker_stuff/config-opentelemetry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
grpc:
endpoint: 0.0.0.0:4317

docker_stats:
endpoint: unix:///var/run/docker.sock
Expand Down Expand Up @@ -49,24 +51,6 @@ receivers:
network:
processes:

prometheus:
config:
scrape_configs:
- job_name: 'otelcol'
scrape_interval: 10s
static_configs:
- targets: ['0.0.0.0:8888']

filelog:
include_file_path: true
poll_interval: 500ms
include:
- /var/log/*/app.log
operators:
- type: json_parser
- type: time_parser
parse_from: attributes.time
layout: '%Y-%m-%dT%H:%M:%S%z'

processors:
batch:
Expand Down Expand Up @@ -103,6 +87,7 @@ processors:
value: nostpy-otel
action: upsert


connectors:
datadog/connector:

Expand All @@ -124,10 +109,6 @@ service:
receivers: [datadog/connector, otlp, hostmetrics, docker_stats]
processors: [memory_limiter, batch, resourcedetection, attributes]
exporters: [datadog/exporter]
logs:
receivers: [otlp, filelog]
processors: [memory_limiter, batch, resourcedetection, attributes]
exporters: [datadog/exporter]
telemetry:
logs:
level: "debug"
13 changes: 6 additions & 7 deletions docker_stuff/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
context: .
dockerfile: Dockerfile.websocket_handler
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://172.28.0.7:4317
- OTEL_EXPORTER_OTLP_ENDPOINT=http://opentelemetry-collector:4317
- EVENT_HANDLER_SVC=${EVENT_HANDLER_SVC}
- EVENT_HANDLER_PORT=${EVENT_HANDLER_PORT}
- WS_PORT=${WS_PORT}
Expand All @@ -20,15 +20,15 @@ services:
context: .
dockerfile: Dockerfile.event_handler
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://172.28.0.7:4317
- OTEL_EXPORTER_OTLP_ENDPOINT=http://opentelemetry-collector:4317
- EVENT_HANDLER_PORT=${EVENT_HANDLER_PORT}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- PGDATABASE_WRITE=${PGDATABASE_WRITE}
- PGUSER_WRITE=${PGUSER_WRITE}
- PGPASSWORD_WRITE=${PGPASSWORD_WRITE}
- PGPORT_WRITE=${PGPORT_WRITE}
- PGHOST_WRITE=${PGHOST_WRITE=}
- PGHOST_WRITE=${PGHOST_WRITE}
- PGDATABASE_READ=${PGDATABASE_READ}
- PGUSER_READ=${PGUSER_READ}
- PGPASSWORD_READ=${PGPASSWORD_READ}
Expand Down Expand Up @@ -57,7 +57,7 @@ services:
- ./postgresql.conf:/postgresql.conf
- ./postgresql/data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${PGUSER}"] # database username here - nostr, should be changed if other user
test: ["CMD-SHELL", "pg_isready -U ${PGUSER_WRITE}"] # database username here - nostr, should be changed if other user
interval: 10s
timeout: 5s
retries: 5
Expand All @@ -78,7 +78,7 @@ services:
environment:
- DOMAIN=${DOMAIN}
- DOCKER_SVC=172.17.0.1
- SVC_PORT=${EVENT_HANDLER_PORT}
- SVC_PORT=${WS_PORT}
- VERSION=${VERSION}
- CONTACT=${CONTACT}
- HEX_PUBKEY=${HEX_PUBKEY}
Expand All @@ -92,7 +92,7 @@ services:
- websocket_handler

opentelemetry-collector:
image: otel/opentelemetry-collector-contrib:latest
image: otel/opentelemetry-collector-contrib:0.103.1
environment:
- DD_API_KEY=${DD_API_KEY}
volumes:
Expand All @@ -107,7 +107,6 @@ services:
ports:
- "55680:55680"
- "4317:4317"
- "4318:4318"
command: ["--config=/etc/otel/config.yaml"]

volumes:
Expand Down
116 changes: 48 additions & 68 deletions docker_stuff/python_stuff/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
)
tracer = trace.get_tracer(__name__)

otlp_exporter = OTLPSpanExporter()
otlp_exporter = OTLPSpanExporter(endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"))
span_processor = BatchSpanProcessor(otlp_exporter)
otlp_tracer = trace.get_tracer_provider().add_span_processor(span_processor)

Expand All @@ -61,7 +61,6 @@
redis_client = redis.Redis(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT"))



def get_conn_str(db_suffix: str) -> str:
return (
f"dbname={os.getenv(f'PGDATABASE_{db_suffix}')} "
Expand All @@ -71,24 +70,23 @@ def get_conn_str(db_suffix: str) -> str:
f"port={os.getenv(f'PGPORT_{db_suffix}')} "
)


@asynccontextmanager
async def lifespan(app: FastAPI):
conn_str_write = get_conn_str('WRITE')
conn_str_read = get_conn_str('READ')
conn_str_write = get_conn_str("WRITE")
conn_str_read = get_conn_str("READ")
logger.info(f"Write conn string is: {conn_str_write}")
logger.info(f"Read conn string is: {conn_str_read}")

app.write_pool = AsyncConnectionPool(conninfo=conn_str_write)
app.read_pool = AsyncConnectionPool(conninfo=conn_str_read)

yield

await app.write_pool.close()
await app.read_pool.close()




app = FastAPI(lifespan=lifespan)
FastAPIInstrumentor.instrument_app(app)

Expand All @@ -101,7 +99,7 @@ def initialize_db() -> None:
"""
try:
logger.info(f"conn string is {get_conn_str('WRITE')}")
conn = psycopg.connect(get_conn_str('WRITE'))
conn = psycopg.connect(get_conn_str("WRITE"))
with conn.cursor() as cur:
# Create events table if it doesn't already exist
cur.execute(
Expand Down Expand Up @@ -133,6 +131,27 @@ def initialize_db() -> None:
logger.info(f"Error occurred during database initialization: {caught_error}")


async def set_span_attributes(
span, db_system: str, db_statement: str, service_name: str, operation_name: str
):
span.set_attribute(SpanAttributes.DB_SYSTEM, db_system)
span.set_attribute(SpanAttributes.DB_STATEMENT, db_statement)
span.set_attribute("service.name", service_name)
span.set_attribute("operation.name", operation_name)


async def execute_sql_with_tracing(app, sql_query: str, span_name: str):
with tracer.start_as_current_span(span_name) as span:
current_span = trace.get_current_span()
await set_span_attributes(
current_span, "postgresql", sql_query, "postgres", "postgres.query"
)
async with app.read_pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(query=sql_query)
return await cur.fetchall()


@app.post("/new_event")
async def handle_new_event(request: Request) -> JSONResponse:
event_dict = await request.json()
Expand Down Expand Up @@ -173,12 +192,10 @@ async def handle_new_event(request: Request) -> JSONResponse:
return event_obj.evt_response(
results_status="flase", http_status_code=200
)

else:
try:
logger.debug(f"Adding event id: {event_obj.event_id}")
await event_obj.add_event(conn, cur)

except psycopg.IntegrityError:
conn.rollback()
logger.info(
Expand All @@ -196,11 +213,9 @@ async def handle_new_event(request: Request) -> JSONResponse:
http_status_code=400,
message="error: failed to add event",
)

return event_obj.evt_response(
results_status="true", http_status_code=200
)

except Exception:
logger.debug("Entering gen exc")
conn.rollback()
Expand Down Expand Up @@ -240,66 +255,31 @@ async def handle_subscription(request: Request) -> JSONResponse:
)

if cached_results is None:
with tracer.start_as_current_span("SELECT * FROM EVENTS") as parent:
current_span = trace.get_current_span()
current_span.set_attribute(SpanAttributes.DB_SYSTEM, "postgresql")
current_span.set_attribute(SpanAttributes.DB_STATEMENT, sql_query)
current_span.set_attribute("service.name", "postgres")
current_span.set_attribute("operation.name", "postgres.query")
async with app.read_pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute(query=sql_query)
query_results = await cur.fetchall()
if query_results:
parsed_results = await subscription_obj.query_result_parser(
query_results
)
serialized_events = json.dumps(parsed_results)
redis_client.setex(
str(raw_filters_copy), 240, serialized_events
)
logger.debug(
f"Caching results , keys: {str(raw_filters_copy)} value is : {serialized_events}"
)
return_response = subscription_obj.sub_response_builder(
"EVENT",
subscription_obj.subscription_id,
serialized_events,
200,
)
return return_response

else:
redis_client.setex(str(raw_filters_copy), 240, "")
return subscription_obj.sub_response_builder(
"EOSE", subscription_obj.subscription_id, "", 200
)

elif cached_results:
event_type = "EVENT"
try:
parse_var = json.loads(cached_results.decode("utf-8"))
results_json = json.dumps(parse_var)
except:
logger.debug("Empty cache results, sending EOSE")
if not parse_var:
event_type = "EOSE"
results_json = ""
return subscription_obj.sub_response_builder(
event_type, subscription_obj.subscription_id, results_json, 200
query_results = await execute_sql_with_tracing(
app, sql_query, "SELECT * FROM EVENTS"
)

else:
return subscription_obj.sub_response_builder(
"EOSE", subscription_obj.subscription_id, "", 200
)

if query_results:
parsed_results = await subscription_obj.query_result_parser(
query_results
)
serialized_events = json.dumps(parsed_results)
redis_client.setex(str(raw_filters_copy), 240, serialized_events)
logger.debug(
f"Caching results, keys: {str(raw_filters_copy)} value is: {serialized_events}"
)
return subscription_obj.sub_response_builder(
"EVENT", subscription_obj.subscription_id, serialized_events, 200
)
else:
redis_client.setex(str(raw_filters_copy), 240, "")
return subscription_obj.sub_response_builder(
"EOSE", subscription_obj.subscription_id, "", 200
)
except psycopg.Error as exc:
logger.error(f"Error occurred: {str(exc)}", exc_info=True)
return subscription_obj.sub_response_builder(
"EOSE", subscription_obj.subscription_id, "", 500
)

except Exception as exc:
logger.error(f"General exception occurred: {exc}", exc_info=True)
return subscription_obj.sub_response_builder(
Expand Down
14 changes: 7 additions & 7 deletions docker_stuff/python_stuff/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,17 @@
)
tracer = trace.get_tracer(__name__)

otlp_exporter = OTLPSpanExporter()
span_processor = BatchSpanProcessor(
otlp_exporter
)
otlp_exporter = OTLPSpanExporter(endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"))
span_processor = BatchSpanProcessor(otlp_exporter)
otlp_tracer = trace.get_tracer_provider().add_span_processor(span_processor)


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")

EVENT_HANDLER_SVC= os.getenv("EVENT_HANDLER_SVC")
EVENT_HANDLER_PORT= os.getenv("EVENT_HANDLER_PORT")
EVENT_HANDLER_SVC = os.getenv("EVENT_HANDLER_SVC")
EVENT_HANDLER_PORT = os.getenv("EVENT_HANDLER_PORT")


async def handle_websocket_connection(
Expand Down Expand Up @@ -215,7 +213,9 @@ async def send_subscription_to_handler(
rate_limiter = TokenBucketRateLimiter(tokens_per_second=1, max_tokens=50000)

try:
start_server = websockets.serve(handle_websocket_connection, "0.0.0.0", os.getenv("WS_PORT"))
start_server = websockets.serve(
handle_websocket_connection, "0.0.0.0", os.getenv("WS_PORT")
)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Expand Down

0 comments on commit 23fec39

Please sign in to comment.