Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions bin/send_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import click

from sentry.taskdemo import say_hello


def produce_activations(num_activations: int):
for i in range(num_activations):
say_hello.delay(f"{i}")


@click.option(
"--num-activations",
type=int,
default=1,
show_default=True,
help="Number of task activations to send to kafka",
)
def main(num_activations: int):
produce_activations(num_activations)


if __name__ == "__main__":
main()
226 changes: 225 additions & 1 deletion src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import signal
from multiprocessing import cpu_count
from typing import Any
from threading import Thread

import click

Expand Down Expand Up @@ -253,6 +253,230 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None:
raise SystemExit(exitcode)


@run.command()
@click.option("--consumer-path", type=str, help="Path to taskbroker brinary")
@click.option(
"--num-consumers",
type=int,
help="Number of consumers to run in the consumer group",
default=8,
show_default=True,
)
@click.option(
"--num-messages",
type=int,
help="Number of messages to send to the kafka topic",
default=80_000,
show_default=True,
)
@click.option(
"--num-restarts",
type=int,
help="Number of restarts for each consumers",
default=24,
show_default=True,
)
@click.option(
"--min-restart-duration",
type=int,
help="Minimum number of seconds between each restarts per consumer",
default=1,
show_default=True,
)
@click.option(
"--max-restart-duration",
type=int,
help="Maximum number of seconds between each restarts per consumer",
default=30,
show_default=True,
)
@log_options()
@configuration
def taskbroker_integration_test(
consumer_path: str,
num_consumers: int,
num_messages: int,
num_restarts: int,
min_restart_duration: int,
max_restart_duration: int,
) -> None:
import random
import sqlite3
import subprocess
import threading
import time
from pathlib import Path

import yaml

def manage_consumer(
consumer_index: int,
consumer_path: str,
config_file: str,
iterations: int,
min_sleep: int,
max_sleep: int,
log_file_path: str,
) -> None:
with open(log_file_path, "a") as log_file:
print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}")
for i in range(iterations):
config_file_path = f"../taskbroker/tests/{config_file}"
process = subprocess.Popen([consumer_path, "-c", config_file_path], stderr=log_file)
time.sleep(random.randint(min_sleep, max_sleep))
print(
f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining"
)
process.send_signal(signal.SIGINT)
try:
return_code = process.wait(timeout=10)
assert return_code == 0
except Exception:
process.kill()

if num_consumers < 1:
print("Number of consumers must be greater than 0")
return

# First check if taskdemo topic exists
print("Checking if taskdemo topic already exists")
check_topic_cmd = [
"docker",
"exec",
"sentry_kafka",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--list",
]
result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True)
topics = result.stdout.strip().split("\n")
# Create taskdemo Kafka topic with 32 partitions
if "task-worker" not in topics:
print("task-worker topic does not exist, creating it with 32 partitions")
create_topic_cmd = [
"docker",
"exec",
"sentry_kafka",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--create",
"--topic",
"task-worker",
"--partitions",
"32",
"--replication-factor",
"1",
]
subprocess.run(create_topic_cmd, check=True)
else:
print("Taskdemo topic already exists, making sure it has 32 partitions")
try:
create_topic_cmd = [
"docker",
"exec",
"sentry_kafka",
"kafka-topics",
"--bootstrap-server",
"localhost:9092",
"--alter",
"--topic",
"task-worker",
"--partitions",
"32",
]
subprocess.run(create_topic_cmd, check=True)
except Exception:
pass

# Create config files for consumers
print("Creating config files for consumers in taskbroker/tests")
consumer_configs = {
f"config_{i}.yml": {
"db_path": f"db_{i}_{int(time.time())}.sqlite",
"kafka_topic": "task-worker",
"kafka_consumer_group": "task-worker-integration-test",
"kafka_auto_offset_reset": "earliest",
"grpc_port": 50051 + i,
}
for i in range(num_consumers)
}

test_dir = Path("../taskbroker/tests")
test_dir.mkdir(parents=True, exist_ok=True)

for filename, config in consumer_configs.items():
with open(test_dir / filename, "w") as f:
yaml.safe_dump(config, f)

try:
# Produce a test message to the taskdemo topic
from sentry.taskdemo import say_hello

for i in range(num_messages):
print(f"Sending message: {i}", end="\r")
say_hello.delay("hello world")
print(f"\nDone: sent {num_messages} messages")

threads: list[Thread] = []
for i in range(num_consumers):
thread = threading.Thread(
target=manage_consumer,
args=(
i,
consumer_path,
f"config_{i}.yml",
num_restarts,
min_restart_duration,
max_restart_duration,
f"consumer_{i}.log",
),
)
thread.start()
threads.append(thread)

for t in threads:
t.join()

except Exception:
raise

attach_db_stmt = "".join(
[
f"attach '{config['db_path']}' as {config['db_path'].replace('.sqlite', '')};\n"
for config in consumer_configs.values()
]
)
from_stmt = "\nUNION ALL\n".join(
[
f" SELECT * FROM {config['db_path'].replace('.sqlite', '')}.inflight_taskactivations"
for config in consumer_configs.values()
]
)
query = f"""
SELECT
partition,
(max(offset) - min(offset)) + 1 AS offset_diff,
count(*) AS occ,
(max(offset) - min(offset)) + 1 - count(offset) AS delta
FROM (
{from_stmt}
)
GROUP BY partition
ORDER BY partition;
"""

con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"])
cur = con.cursor()
cur.executescript(attach_db_stmt)
res = cur.execute(query)
assert all(
[res[3] == 0 for res in res.fetchall()]
) # Assert that each value in the delta (fourth) column is 0
print("Taskbroker integration test completed successfully.")


@run.command()
@click.option(
"--pidfile",
Expand Down
14 changes: 14 additions & 0 deletions src/sentry/taskdemo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

import logging

from sentry.taskworker.registry import taskregistry

logger = logging.getLogger(__name__)
demotasks = taskregistry.create_namespace(name="demos")


@demotasks.register(name="demos.say_hello")
def say_hello(name):
# logger.info("hello %s", name) need to fix logging now that we are running this in another process
print(f"{name}") # noqa
Loading