From 3954412fd16b2a0001e6f7012fee70b08a8709df Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 18 Nov 2024 15:08:58 -0800 Subject: [PATCH 1/7] test(kafka-test): adds a simple test for task worker integrations --- bin/send_tasks.py | 23 +++++++++++++++++++++++ src/sentry/taskdemo/__init__.py | 14 ++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 bin/send_tasks.py create mode 100644 src/sentry/taskdemo/__init__.py diff --git a/bin/send_tasks.py b/bin/send_tasks.py new file mode 100644 index 00000000000000..1b29aee48d4f3b --- /dev/null +++ b/bin/send_tasks.py @@ -0,0 +1,23 @@ +import click + +from sentry.taskdemo import say_hello + + +def produce_activations(num_activations: int): + for i in range(num_activations): + say_hello.delay(f"{i}") + + +@click.option( + "--num-activations", + type=int, + default=1, + show_default=True, + help="Number of task activations to send to kafka", +) +def main(num_activations: int): + produce_activations(num_activations) + + +if __name__ == "__main__": + main() diff --git a/src/sentry/taskdemo/__init__.py b/src/sentry/taskdemo/__init__.py new file mode 100644 index 00000000000000..d330b1e2d1d5f6 --- /dev/null +++ b/src/sentry/taskdemo/__init__.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import logging + +from sentry.taskworker.registry import taskregistry + +logger = logging.getLogger(__name__) +demotasks = taskregistry.create_namespace(name="demos") + + +@demotasks.register(name="demos.say_hello") +def say_hello(name): + # logger.info("hello %s", name) need to fix logging now that we are running this in another process + print(f"{name}") # noqa From 6650cdc3ca4d983f04605bf6b846caa4597c43a1 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Wed, 20 Nov 2024 14:56:57 -0500 Subject: [PATCH 2/7] wip integration test --- src/sentry/runner/commands/run.py | 134 ++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 1c17cbf8d85712..a5c745f5c877cc 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -253,6 +253,140 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: raise SystemExit(exitcode) +@run.command() +@click.option("--rust-binary", help="Path to taskbroker brinary") +@log_options() +@configuration +def taskbroker_integration_test(rust_binary: str) -> None: + import datetime + import random + import subprocess + import threading + import time + import uuid + from pathlib import Path + + import yaml + + from sentry.utils import json + + def manage_consumer( + rust_binary: str, config_file: str, iterations: int, min_sleep: int, max_sleep: int + ) -> None: + for _ in range(iterations): + process = subprocess.Popen([rust_binary, "-c", config_file]) + time.sleep(random.randint(min_sleep, max_sleep)) + process.send_signal(signal.SIGINT) + try: + return_code = process.wait(timeout=10) + assert return_code == 0 + except Exception: + process.kill() + + # First check if taskdemo topic exists + print("Checking if taskdemo topic already exists") + check_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--list", + ] + result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) + topics = result.stdout.strip().split("\n") + + # Create taskdemo Kafka topic with 32 partitions + if "task-worker" not in topics: + print("task-worker topic does not exist, creating it with 32 partitions") + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + "task-worker", + "--partitions", + "32", + "--replication-factor", + "1", + ] + subprocess.run(create_topic_cmd, check=True) + else: + print("Taskdemo topic already exists, making sure it has 32 partitions") + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--alter", + "--topic", + "task-worker", + "--partitions", + "32", + ] + subprocess.run(create_topic_cmd, check=True) + except Exception: + pass + + # Create config files for consumers + print("Creating config files for consumers in taskbroker/tests") + consumer_configs = { + "config_0.yml": { + "db_path": "db_0.sqlite", + "kafka_topic": "task-worker", + "kafka_consumer_group": "task-worker-integration-test", + "grpc_port": 50051, + }, + "config_1.yml": { + "db_path": "db_1.sqlite", + "kafka_topic": "task-worker", + "kafka_consumer_group": "task-worker-integration-test", + "grpc_port": 50052, + }, + "config_2.yml": { + "db_path": "db_2.sqlite", + "kafka_topic": "task-worker", + "kafka_consumer_group": "task-worker-integration-test", + "grpc_port": 50053, + }, + "config_3.yml": { + "db_path": "db_3.sqlite", + "kafka_topic": "task-worker", + "kafka_consumer_group": "task-worker-integration-test", + "grpc_port": 50054, + }, + } + + test_dir = Path("../taskbroker/tests") + test_dir.mkdir(parents=True, exist_ok=True) + + for filename, config in consumer_configs.items(): + with open(test_dir / filename, "w") as f: + yaml.safe_dump(config, f) + + try: + manage_consumer(rust_binary, "config_0.yml", 1, 3, 10) + + # Produce a test message to the taskdemo topic + from sentry.taskdemo import say_hello + + for i in range(10): + print(f"Sending messages {i}") + say_hello.delay("hello world") + + manage_consumer(rust_binary, "config_0.yml", 1, 8, 10) + except Exception: + raise + + @run.command() @click.option( "--pidfile", From fb5056fb702c4b47cd4e3c7c7a15e8cca73bd899 Mon Sep 17 00:00:00 2001 From: John Yang Date: Wed, 20 Nov 2024 13:02:11 -0800 Subject: [PATCH 3/7] use auto offset reset setting --- src/sentry/runner/commands/run.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index a5c745f5c877cc..f18dd87f74ef5f 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -343,24 +343,28 @@ def manage_consumer( "db_path": "db_0.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", + "kafka_auto_offset_reset": "earliest", "grpc_port": 50051, }, "config_1.yml": { "db_path": "db_1.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", + "kafka_auto_offset_reset": "earliest", "grpc_port": 50052, }, "config_2.yml": { "db_path": "db_2.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", + "kafka_auto_offset_reset": "earliest", "grpc_port": 50053, }, "config_3.yml": { "db_path": "db_3.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", + "kafka_auto_offset_reset": "earliest", "grpc_port": 50054, }, } @@ -373,8 +377,6 @@ def manage_consumer( yaml.safe_dump(config, f) try: - manage_consumer(rust_binary, "config_0.yml", 1, 3, 10) - # Produce a test message to the taskdemo topic from sentry.taskdemo import say_hello From 67c5fd623a5a8aee6d081429143e2136791d5a82 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Wed, 20 Nov 2024 16:26:02 -0500 Subject: [PATCH 4/7] fix config file bug --- db_0.sqlite | Bin 0 -> 4096 bytes db_0.sqlite-shm | Bin 0 -> 32768 bytes db_0.sqlite-wal | Bin 0 -> 399672 bytes src/sentry/runner/commands/run.py | 3 ++- taskbroker-inflight.sqlite | Bin 0 -> 20480 bytes taskbroker-inflight.sqlite-wal | 0 6 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 db_0.sqlite create mode 100644 db_0.sqlite-shm create mode 100644 db_0.sqlite-wal create mode 100644 taskbroker-inflight.sqlite create mode 100644 taskbroker-inflight.sqlite-wal diff --git a/db_0.sqlite b/db_0.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..7ee7c113a09428e4daafacb6e70a35d18573e608 GIT binary patch literal 4096 zcmWFz^vNtqRY=P(%1ta$FlG>7U}9o$P*7lCU|@t|AVoG{WYDWB;00+HAlr;ljiVtj n8UmvsFd71*Aut*OqaiRF0;3@?8UmvsFd71*Aut*O6ovo*4{!$i literal 0 HcmV?d00001 diff --git a/db_0.sqlite-shm b/db_0.sqlite-shm new file mode 100644 index 0000000000000000000000000000000000000000..ee7f8e5e79dfebe442c83cf8320912f2fd02a4f7 GIT binary patch literal 32768 zcmeI)%T7~K6o>JTO9k;(MX=sb5Jm0^Dk>FFQ7T>l0j2c)gjrxz9f`|M`7rRa#Nsy^2--`R^8H(;TWLROPufJg3U<;&ak_lqa5fAe`IE znUdV2xlzSg;vCU`6~EhWk8=J?h0k0HD4>7>3Mim}0tzUgfC36Apnw7jD4>7>3Mim} z0tzUgfC36Apnw7jD4>7>3Mim}0tzUgfC36Apnw7jD4>7>3Mim}0tzUgfC36Apg_U` zP2paLt2|A3s^b)B1cByov&z9n2x0~W5*KJG-J6s6SO+T500M1vaG0Z{Il)P~$#8~q z^l*_r`nkd&&l(`55frF}Ks$#x!ZD81Nf)O$%~{TKfnF|gnE{3vWt{6wa+4|UFwH$? zxz9r$u~3UZ_EaD-fnlDo$a9ui;U%lAvCb=ACpO22W$Mcf4nlEsFIE>tqG0DzH@5ZZ=au0RX`E3h+wKsR&UH8mo zyOPoU_OI@xU;3+Gop8>3myP9I!CN=EmgWS7J-e+f@h(ANZQIySyZ@Zh=0Cq9c)5C8!X009sH0T2KI5C8!X0D;4bK>nm; zYS#Su$&YvDs5S16W-lDSWOVJ=s!5s~TQ`xdeB5^W@Zh46L1ARk;$?$E%ST3elj75}8*JOnW`)Ho zRxBG_w46V$c+_6)+H0~K)(VT4tyo-f%)P|5H~61wG&i=^4L*MG;W8(k*48!aocYPb z*o5O=!a%V#{3zX!TUU5IS_xxRs@&tR>(iy)<*)O(z2JxkE`8hgethFIeg4j(DZ&2> z8wh{^2!H?xfB*=900@8p2!H?xfWY)5;M)k=10BJ?eB`RnAHA;anvDNlVHcR5XB^)O z1V8`;KmY_l00ck)1V8`;KmY_n0%dUoOF!}bcfNVf{L^CM2s*>37vc)`IgX&CcWWVz zpm%Hk!pLjlPY?hB5C8!X009sH0T2KI5GW(?u_N14vrauVd6iiZn2$|(<72B<=SFih zyN24iv5SgA^LqO`$^_^2T9t(7V~((L<$e4XcI-CRi+UgJ1*9Hv@lA>B)uM&6gDS3KWwsWa~8a_(fbQSr6cPpthqE zMhnfxCfreHLs5n=)c!a%p2FX`q=nDX+=gt`(=#Wn=;)edpIT4KUJvuARPRV9;t0Y3 zh65Hy@Vi^zdE77lcJD9zID+KB^$GS58wh{^2!H?xfB*=900@8p2!H?xfWR9_;DThT z`z)nj=z-TU#E-Yq`%h5O9^{M_{)e&^5b+xX1wJOA;|=l_v?twIjLAdcY0 z%kLZ8X`QS2aRkYMKP1>cY#;yvAOHd&00JNY0w4eaAOHd&00PsHKzn<4H~U(J7=XYo z@U-6l+snEx{VwtjPQw$9uLJ@h00JNY0w4eaAOHd&00JNY0@VckTmhYdUEmjcK7Z5b zEqks+-oa{6*aHF}00JNY0w4eaAOHd&00JNY0@ILyU#F1U1(Lt|$jUuY1^Qo3uz%P<00ck)1V8`;KmY_l00ck)1VCV#5y+>KJwtgpnI0NSCT4r8sS-;U zO~v$xEPGVI4<>GOrbiDIy zcfw(nE7*xGizE2u6UXjJ|K&H6#k_+*OAP#Mnw@%l9S{Hk5C8!X009sH0T2KI5C8!X zI7|qf*gn)9XVG9N%Jc#k+`a9@Z~x8fm-%{uw!RmbUV#0=1_B@e0w4eaAOHd&00JNY z0w6Fg2;@_35ihx6>x$`0rtLT$(NxJ4siQh3F&xL%byLp>aV=W*lJCC7_mXe=&X&iM zO?k;v5RhOUyG$?e-Q8Q){@}A~Z}#;9Z3800@8p2!H?xfB*=900@AY&?1=@Rm zUeF7$|FD4o2!H?xfB*=900@8p2!OyLOdy|XFL=qj3$f%1QFPTKhGtWRP}?$CNVcO= zL(*hTS5!A6#I=|rFZm~b{i$bK@sg{eD5?#->prhxYtgcoyqIx)lM&^HXdK+!i0y+GI} zpckmE7r1xFrf)v;;BY}N&@r&Ks25m^90P~k4FDe)1V8`;KmY_l00ck)1V8`;K;Ymb z;CuPw3~(I>Rxj|<@7{IGqg_jG^7R6pnTNPufc=RL1V8`;KmY_l00ck)1P%=X`E+N& zNuDf(k{g;Od#Yn7uBWil-m)dxx+U6{D5;t!Im}5O*P>-7c`@Po&ig;xY$$nK&5Y~J zORmIt$%EseM-+O2z)KFjKtKZZ2m%t&3)I#NTrRxpu77#UkACIn7w8-~)7J}}$ubN; zF>q+y0r4q800ck)1V8`;KmY_l00ck)1P%~^Lco8FA+qxj&mDrV1-b@KObna|#lS&)^T$U60T2KI5C8!X009sH0T2KI5ZM0&3IYEy z#?-Fqq8I4?>kt3t-+!4QzFr`e-chC(U=Oi@00@8p2!H?xfWW~qU}lF;&U6%6#k43vkyk z^a30O>JgkB_=ceu2uMIL5RiagptfG%g>Ss&$z9L?cG%A^km`Ruq8I3Y9aRYqwrLPP z4+ww&2!H?xfB*=900@8p2!KFq1bi=loXI$KSm^~$e#dIE{)C&ezFr`m`dWovfbGBr z0w4eaAOHd&a8MA)r_&K9xo3&8Y7!PUE>l8H#q@N`)MQK3B&-liSs&NdVbr#etpm0ColiqvA$lQyX)pEy#U*V4Fo^{1V8`;4jh4ex;x?|mlR!d zWNLe^V-qSXrsQhWwroqIhAx>b-FjS$mYw9qNb*;ny|LLy@{W3uCx{u8%e(JPV!2}7xV&wZ6EmsxSIp@ z2qGxDB4dRTQ&1}-#I=~BvEu2?cQ!j#)OV7XvPzT-Ziu*0))o*|6qTy16VMBA-*88v zS9A{zB@?qvSu{LHXX^n`AygC%R;a^dJ;hXH$#E4aBgD0sqF%A_;qA@#it>{#tHDVZ zW0VuYaXxB7FTnc*^a9WeG}a6JeC>VT8XY@J@$~|k{%b4r0{z!QG4KX21@Lwt00JNY z0w4eaAOHd&00JOzco8TB{Kq&sGSCaO`v?qMS4>wjZO8G5rb?_TucJC9F&tK;S2y*2 z(hJa!0k#_(2!H?xyb%QQ>7Iz=OVo%Ysk)^bhDQ|3uw2CvZdMR(9SwO4oN0OD{c94?)c)zxsL;Sfe;1q3k3ci zc;8IH z*9-LYUy$g(pgh9>6a#O>QV8z>0w4eaAOHd&00JNY0w4ea(~*Gh;*N9W^gu7r;UiFp zqN^S;G@G)xAKNlaQ*=~nFn^e)E2_Ky^a2-Oojw1$WA7Gxy+CjK#yWZdb^sd)fWUqu zkWcqUoaCzPIHqJcy5Z_X6sf6;mPy&LV5l|=4rEU9xE3ut$%~QX4`1_8vvGj??j$dz zV-J0R&(Sn4+;_%l&sZJ65!p3ueNgVJ#+TSgVR8 zhhBj93g`tm3g`s_6wnKB63`0-UUKLK0us;*)Yc1pKXcEs@40bX%GV3@_78HsK>y&t znFBip*7m;+#lU`B4B~}B00ck)1V8`;KmY_l00cnbP$l5IxZ@mFz0eDE`Uv6@d>&xE zz|BYe=JJ1hO<(Hk1^U`H)YS{HL)buIY6;}ieG$i(s%cas4%5dtBB8El*osNn9$8e0 zYIqiNlE<`Y+3{73B!70#ZOulKchzxxNvhH~k~}ybx&Hdh5+|V@DHbtG7o<%&(v#3KQMW>oeDcdDxgt!(hJIRZY zq0kEi)@jnDzUMtfJ^t}efz!Vq)@Dd;Z z0w4eaAOHd&00JNY0w8d(5GcC18@ng_p%+N`2ozaXR8Q4(*%gWIP|G%T&m@{;+ODKi zI&Jg<=|?ACxa(iKj`Z~c1BubbdV$e>?#lV}K*UMz$hNI2%ug_%h zuG-YmGeTU8mYw9qNb-$OZ)-M^JXJ4}+>jebk{hK+^Uxg%y+B~XMSg*>S0KLt?-kGs za1_uBa1_uBa1_uBa1_uBa1_uBa1_uB1Sp^v;3S|I2)yLb3j`#f7pSclxcH`dCx30= zzkcZJ1qS+d*VGI2?M79CeYS|;i68(1AOHd&00JNY0w4eaAn=9|@Lk+-F4h6)1=0mh zs;Dt3fMQdPWj{BFsVk1HyOt?Zw%1jsyIpe0^7zMVdcuDON8Ys zMug=nmn{>{9$q@MX!u;=tif|nDYOXRQCPfe#bVyFcyv9T%#HaUM_9UiWbll^;fmwy zJTL3!gvCqGsC}F|j_ZuFrPvbokw=ye4OTqgaj7%z=5p?2v{C83C)e3-mT_1$T4**l z;f^{RCJS`xe4O>wPvP%ee9>%<<~C%jo}M{rMMu{x`_yD&Y{GFb$zC|lmXV`$LvCH+ z@n}8Fqf)&io%xfJsaf;qCqLd1QQoA)ZiDTg5Ml9(70U(}E$7cG9<^7y_L}U5wY9pDdx>jr@ITdPZfva^ zeEi_UWllP+t!vgf^XtK{gfS{r?(x_5>C)&@Sknh~fv^1dqbt+re(AwZb}%{cn%^Wf z@LHX`gNP%Lr1`QqB1;Qpb)l>+AgU-Kj)0>Gitm+&To5rpr6TVjM^P1Bf;fUe@P|5u zVRQ-N2zakR905mxI0B9WaReL%;s`hj#1RB25J$jCAdVnV&m)c?Ab~gnmo53b2G}p( z1i!(>qi=g8j-YRQzYt^|L?nUF3>yf500@8p2!H?xfB*=9001QJRfuX!nrum!1w6qtQQ|T>tf^`%uQapz*i()yl!%hJGo%e^>+;WzmH9x z*_lG#!2rb!)F}*5r1l*J^a6o&LRO23J3}Wq@(u<*jx_WF90l|O90l|O90l|O90l|O z90l|O0Sf2^I0@(l0xvoA0s#r=1&Sn?Nx;t)kP7SqM||YMe>(AoHGlB!0?Gcp3HA>g z2!H?xfB*=900@8p2!H?xfB*XP5FUPyOkl{|K0%i;*${_K_`zx9i4m-}%9NA!KVVH`oi|UjYO_00ck) z1V8`;KmY_l;Ef^RySNR76(Q7F0z6Gri7R@R;TWPWYPv;;OGI%d^a4Ev1cpo%Q*%5; zqQul?Q!@-n^$ba|JkvEb=mp&NLVvQEj>MHr+i^UisgfyDM|DhMIF7CB=Dz6#&Y%3+ zDHs3r@kw7V(BAu0K`+4m!v+E%00JNY0w4eaAOHd&00M_9fqZ&q#7Q2vGS3KcEn0Sx z7bD4ce*BSUBgyMK$xBY~=AGow3k0bskzXK4_RTKrGQGg(zHs-3$;B`Jxvv*EvTrfh z3-m4SA1qWVgks=O9SiZfK>!3m00ck)1V8`;KmY{x8v);S%YwH(MWwFm$SRR^iK@2f z+SId&=4q;BA6fA4LN6d{M3Pk9(hb8Sie*@?Y#5Gb>#8JqtnecA0#ZkzKM_S&Jz{7! zRR~L)V3?-psMKJHG+j~mqh8?n1)q5Ld(Lg!o^d97#?}k>-Ir6nX(M zL;<}3^a6492o`=K_4~|wKKc<~FK|@foJM+qIeokPU+=#bIR*}?n*%;92!H?xfB*=9 z00@8p2!Oz!M! z5~!(*mPwhX+hE!W*_C_!M$ii+O<6QNM`uVxCIt{hL!_Q5vw{kWEIF=np!EWurl;L? z%B5e}?&}3QGY@dR0Q(ag2!H?xfB*=900@8p2ps$b^68@@PV%_bTt6Q!kR-R2oN;o20TPIrIYD9SXewM*+P6M*+P6 zM*+P+2?e{cmFWd${Pr6g{&IkR$kz)T)B9*aFVOqwz=;Fb)u>3oGYmj6aIj7g@VP($ z1V8`;KmY_l00cnbFeTu7Wo4BTYABjZT~^zjP|I{I7W^)04$&OlQjhT)#Tm(<7ZCe> zPO7F+jW{e3z9SL_)UXwkvOTh>64mga7ogKeFOWX{8}FTCSdaO7fmHgAGQ9wMhz$fl z00ck)1V8`;4lV-u)G-Awd3V7}9=8U_2yrc@$V-0DQ=j>6YhH3iRvUWB`Ej0P9C`un z1B70Hqkvw3qkvw3qkvw3qkvwZgo54J%k%Io>L*qf@(axB{d!Y+f!?q8eHw~^ zgKLU}PXhuV00JNY0w4eaAOHf>mw@lRb}Y-3bWJo|Mu_D zp7jAgzreA*S2xrP^j_Up$}j-Mz(FuU#76)D5C8!X009sH0T2LzL!W@}l{HOGvvs0T znHt2Bb?SLU)pbj;Y(sTra%{m54!wY?5Y?76*`l6HEYUM%M|Wf*S+-_6s;xmUK)d~} z!UPYpY7*wsmsz?AQ!zc=G8r~alUUc1^mL&=siMZD0E$gDmPElIrmi@)?pmfu+1|rV zFYwqCcc}BeutW9r0^MCVRp|xTE^HtG0w4eaAaI}vXL z%{s~3%bnn|8aTm?#tGLIqblJ#^a6pu2l)j!3DhGfDJP&8fL?&DjVr5EKravs6=~=N zI11r&Bv%(TPc@J88^u|5p%*ZZ@Hs&*kYq^^ zG)Jbk=Q=EVf~=U5t5MssEsYwwWI``s>TM^s4|TIkOC^>r`eF$p%O2G|$rELl8kQ<~ zipEBel>b9tOq|#G((~J{z3p20kN>^&MPDzF>Ab#%UV!by1_B@e0wC~)63C~Ii#W-h zm>Cs|B#&t^MNaakcl}+nPV$aAPI4m24fCxNspLctU0=`(a3=@!0s#u>1vm-l1)vwG za*`vzKyZDep%>sNpcmjMpcg2iU=!Oiy}lc0)1n zhAt2AW*`6pAOHd&00JNY0w8d(5%9g&wx$rxbu5WWA~9T-x|-uUvMM@~Cu_QXJoExb z`Unh#I*!f)-8E4quIO2YV~9*Gp<9HwL=nD7@!0g`RYv=`fkMCR6_u9aj z13L!R`V|SF7})=dBfJ9$fB*=900@8p2!H?x94G?5S5}vaLaFMws>YH75YZx*p)>O~ z^<>Y}37zdXinGu|FJMD2Aou!RMbwBSsk)^bhDQ|3uw2MQAaPp4qyWT z5ZF%y^6A+TCwbgmI3vWhm?9_nZ8zQ0tdqQ}+zHOpt+Sdh4I{}lt+FOL^a7z}A9?|9 z*@s?$qkvu@Kmok~Cjq?x^a9WeKrb*~7Dr@hA@l;0)C3CX1xhy%wy-JF3;gsqi_YJk zc*jqDy}+A#_U@Bjpl5IYPxjM-2`>u*AOHd&00JNY0w4eaATU(~d~c(tTB=P9QMDY2 z$Sf||Qyhabf37DRrf92g@*BnZA)psfj`BINWbdA$QrAzbKqOtFsx7)U^=zVfnrcBW z;6g9pNS%IHF=f&49G#T}5EVj2(GaO;%B-M*B1?{|NF9a#L=;{1h@sh(HL@+kG(|_H z29s21y0Sm@0>_*w7=U76Di_RnEf4?!5C8!X009sH0TB383HaVdSyydC)`><;m&nwlo~X($b6qnh zq|FopbNohe-WKQuyrX?i7NJBph-bLELu^$sbkDPGjp&}`Sc(C?fM@mjoK)FyOv!L` z28tC|pr$TbCS{&(gJ~yZ=mn%hLoYD?{$b~5|9kyceZ4?`a&-f}06U1SWdiy1oQRV= zZsg7gaV@6EN&fItw>9e|PuFph>vH3o&~Bdu54i5*CjvvM6PRh?+zlMR8bc zvS|^=5DhxlZxrWKf?hy9#^>bv*%e&NuwBU}9?OAm%7#h|*`cy3vy1`I3yA#%DS)Q2 z#1jq+Y$hlpO_ zqDQX0_0=!h|KaNe1`_8t)(f1!kHc9$JvZVcj~ml6LR^a}a*}U(;VaEL$-B!po)^wIKvq~9;`sv%D#*Jr09!@^EzL8e%tQ7AN=W(d%nG+Xct(TNG$Dtx$lME zpZEMI^Y7i?Nq;l7t!qo?-*$Ye{mQmna;yovKz>fLU;{`GvGkntJk?Z*rHjmx=@D7> zsP0J~3p1gHrAnTn`Pt+ew`kc0@Z{E4@9dhfbMu$Cw=bSIb57Qs$W30lAV;%nE?zge z#+_U+>H0f{{oltX&+MF0(*!_RCdft=*Vu6-NDvjifWRt-FdKp*ojtkEcC*>BiBSf!L zjK(&Ss=k;{_eW3|tYn>SiaIepi+Gx6QHM&3PBoWOwoA+iaV=U#QS2&D@BYEACc8?# z>U@$Kb6KiEUkT=Q>AnaGHO4TT5#n03jH1{pp51d>v%O;9QIt>xBUC=!8$}TluEtQr zv}hScu~$5N%|p%hiq3EV@Jj}9BcdcX8bR1#E?3oT?1`X=EBc!e;##zfqSz}oJ+h_Q zUa{{esxFXB1VwCVVy?G^3iXPu7~qE|t|JJhmG-7|?MnXIEQ7hgt*Ytb@_(!65xoz3=&w(_$c_6ntR z*2|BFy&^asqUemEh;wB6Q>@0W&X!RWd&SQCKih1tsE;D%0#Pc5%~VGLMK@cRBxc8@ zgU<+YEm}rV>=hdy-rj7lC_m|8rx0UE1jl)&NVP{uC^0$j+4T|AqGb}rPVvcKf2!F| zQI|x_NFi3T468>T}G25_p#dO)Y;5Z)9REZ6i4lB?>42O-Ky2yW3w$^?a>e%VeRFluE}$fm=OmPZW#E~f-I)iw z?@8a0`fArro!56<-F`(yO@jQK5UMgS4%xJX8d9%i0f2?1F&|h+_(%;4Nr(a#5^gMl zg@mJkg@mJkg@mJkg@mJkg@mJkg@mJkg@mJkg(N@$3kfFy3rSIC#q0t}-|HXf1tW`vzV~bo6db^j|Zuqjzin!a8lS8w5ZA1V8`;KmY_l00ck)1QH)RvOP8H)KimJ znK^2WyJHjH_}Hq|xlv#HOYPj)MKm|IZi0z&YwzzkeRy!u$e=K?Xz{W^p zV~((L<@)aY(@|DY$31<&49a=Peu5i}ixu+CbgzqRUUbbQ}Z&^ILo=)b*{Es6n zT|P2+#^7+p@pYb;b#ubvrDxPWP94W}M%hwq3H!(+ONRz4p6|HS8FzCzcQV?j^xj;Y zJX&ZrHsOvs8zu{M>U^B_)lcE?Tzt`Nj^;LGtDc@YX+=lZEc?`CVr;^3FUejw&X$p* zbVF`k;qhoa%%f7hBc1t^lBrqq=O;hjSvbw3>@_bLT|2g__yw|+kK4*lYvs|>>Ac9D z%=#yz$h-PvI&RjU99v&FVIza@XXm_h-iu#;Y=V9L@o{!W*lYM_Uy8`jq9M(!o9y}|!f zqq(uQZt(Gg50^RVw6?BU=ghAMzY@l%RJo^}4J*OgzObeb>;f--YSVq++9*8WCnIbd zc-bF)(gUvzyqtNc``+}9)YrOh?)+TGHSHVQHY6`djQWT7X@hz`l6xNN`M_KU^?c|i zhi{no3iyUO3iyUO3iyUO3iyUO3iyUO3iyUO3iyTt6z~mm67UU|QgA>$&-+9e^q!a< z7t%icg5KF$K6MQH)z#zw@>lkOUEtX#Csyr#$1i@<$AScU62&-z&MR4lfwDM)ViW=5 z2xM&mOE4J*XEfHyBaWaUpZ=HIdx!FJGFb@jiOZvaObf`gP@MvIx@20&&j}|j%DlLM zzG)fq*Kn-Jr(r7!<}zv6iZ}|`iZ}|`iZ}|`iZ}|`iZ}|`iUJg{6_rRZ@qET#f)sO3 z9G25AP9l+t+69ttH8K}|cF6Ga3-okcod~iC^zZF^uJ@^)$1)Fe?@Hg{=N3q=X5|X@ zZ9_eRh*|*k2!i1W^$0?>0O}F&UV*%v90im`90jsYa1>A$1t?IDfRjKy0@NcwJp$At zKs^G~BiOfk1b#e0I?xNW-|13&=r6~8yFfa1TeME$-RXa3JFtNO2!H?xfB*;_Z~`!~ z!^AG?a;fr9XjgtyB<;MXTB=R_qU?@DWR_9eQyhcU8J9fSFh!dcBZzC!a$fCwZ}p2D zZGQ09W(z^rD{`cnjVt?TW>w)5nAihjQ5q$GISQ2gnNS zMS`kz@^YklDdGsK`IJ?6MH~U2%pi_{qd*)1M}as3jskH690lSCI10oOlu)otaz};g z`NMKD#z`QKfY&1^vkN@F{i7e6xAfXaD(wR8?~K+dM1BEQhrCAd$HsG6wYM*+J)uo{M4ARtk$7r5e*|8w-w zFWm4%wOye9smz1j_oVOeGY-Hm0K33KSuI3wF};LXnyQ9N!I5%4!BjSE5UhnFbN!y( zunV}J!V+1^mSpReXj`JBYM$gsE$cXU-{R{yci#WmXOfHO&76~UCvuaQF7ONbT)b{_ zjXSwu()D)?`@fG(p4pjd3s04QE=%WH6-7lmSrqGOXClWt!xsq7b=kTXc7dvOZy6uh z1*-T&#tP&ejL7F<7vRet*abKW*abKW*ai3su)RB63@fs%sGh3nvMUnZp_Xmxo=G&x zv|UN3l%=zcY0>h$}=jYuN?*?uK2UKh zE&#g#p`vJr)HB&wZ7QMPnc^Ik8j>b!x}v(MM?fS=)h*pH zJfc{J<;sTPc($%elE)xr1d&BkH)_%H1h;5-+w{nmW({u? z^q39OEZHc7xqdzky8vGd!!E!V!)-5xi(y9|hk4$*0>7s$^IO<~HsxDGu%yL@=f z)bdNgE)bc3)Bsdz7r5Z2GAJhWr9kq!;ME2X+D21z;D* z`UxhGU!eNF9<{NePGPthhF*X#hG7>77sF44i($_aWz{6i4J=baO~v$d%hY5`(C^G4UJyPytuAAJv(>x z?J+7hgk7NVvKM**u`!@ZyTBbs9C`Gs6FWxg*#&w&j5vbw(jTx3Op#q6eFy9UunWL0 z0K0%6r&8m#1-k(93xx4$HS_{Z5c_Z8Vpvji&5^0?xsFXJQy)pL#>6(ZrBOqdOeT|# zYtiyzxTqp|<=GpX^$d4})BQ3PiD<@nhJ)jQwTPjBT_9KtA7Xle{M^vIrOb=#q|>t- zhu1;i%v2&-d0`i*aeIVafZZNNE|ID3L1lmtN6-*ZrCs3iXTJT-3-8(T!TNTA%m;l~jk#{ikTR<-` zMS6j*n_w4!T>y3g*ahP40?ZwHW4IWO%N{VT^9wK(unTa{@PXG0%b@ zzu@FP=DqD{U!7=Q-E~vv^&MCD@9lf8_o<%8G7og`O5f3bMO!wxI&nUG=DuwC^xU3s zR!3x$usmZTE1anikwv^YisG=GW2VLO!iWab`^B|r*~M}1t$te6EiZiKyKPOSMa^`F zP|Ngwx+*u!1jCO9-kyAVP6UOltF|HQL?fn4WNK1RRArZF9?Lss8zM`J6W3yjP(1w9 zZOx)c*Fz!4pa_nKDBcu7;i;Bt6ZV}gMl^w^F3`aLyoroeebjxUizwR4E(Pw`EQeC6?&&8`D@kJ`FHlP}m>%GO60;rIwpB&7O-W?hV0KNoitMo( z4h)cLQ-=l0#68-9zja#&gqLfW~_Ya!wD_!M%fhW+C)Vddlw!jz9t3gE*9!cI8L7{3I z8yOsyg3%EPbv?sYOv?7iqDoZ5v)E-5)1qY*#a{93p4*!375k2&qE{pm$ta4$as&x_ zMe3LWiteGIWMa0gvRHaUVb_D}u{$cYOvhqltE4$Zb9BqUnZ~qexmOhD756;#nP%@G zb*>L37%R(<^Rbj4=R-y6=mLp!frRVdMqSHb`2}p^F+GVX8!9nmhsvsKdR|6|YcWM6 z_B`^y_9kao<=>Q#6mtDrXMY5#6&K zOEHv;5Z9t*62(FD{vSTt?4Vh1t|=3#-mIdCA~+tRI5L8QC7JLPmAbAYt3=Wzb~hDW zn|d~zZfdG!vt+h0Em}rV95i=6|BYq`&H5PF_KH0>Kiq7us6W??nP64UHQBPh%r3C?KR>u&=eEE3av$^F z7VQG5>tPqDEaL{d0Iz75sdrWC!MeZv4$tQ@{CId(!Y*)F&g#Ps0=qyY-8$?7e5|Ol z3$WYP-q5lfx9XoJc7god&=|sOh%u&)Y4W8@cA>x+(s-7KANb;}?gcB^- z1%m4%4Z8qG0lNT40lPp6MWtO}sq?+I?W**?zRv5pU7#xqy+C<+ci07B7jP%5tVQ*# zb9>jl_Y{ZzOZj#C31a(aRM+)xgo%$s&!&kv3WSGAD@HBmW4*dQ>G zk!ZRt*afNqMHT9>3j{M<=mi22unWL0z@-%-3fKks1p>POM*+J)2}Pw{;J^Rv_5c3c zg@66qzK*LSc7e`e#1Twhx*%K(*Aq1HIDv5fQZ@tR$HM^%aRmITgkAu8frU&mszm1c zY=C+oTnxuW2^_?61o^pPKm<#-byx;Ogrj23WiRXkF@Rtfs0I`nEAp@l1ou+d1p*SV z3xsn`=mmJMfL(y2fL(y2fL)-3qS7vK!Q;0+(*A*SzFx2kw4K573#6{^%61O7oq_xU zunVlI;UsSuN06TzUTIAFdRRv6R8M8ge1OO=Pz@+D*YDp8yFkrU3M~h(9e_j$)n~fw zQh;3`1`zB5)qo;n1?&QR2?)CYpX(#P07n7407n7407n74KnX=fs-DAgwZc)9=NI_? z^KU!vn!B#LtJ*Hm{}l2L`q2c(CeQ3d96>0ZfL%Z)wPf-9KX=JWZsTau2|J}OvPs~|g%P!D&H|i0< zE-+b;kY0j7_XU-a#>@b_wDL0w*B!0s#rc5pZKs z8hU}y9U4v56H{&AkgG?KpBu^{SgM}s_VpU1>Iuyr4VS&J3)H%mHn0o4^Z(s`&QHGR zwAZ!^^nMX_3ZWNZqKQaI1?&Q_3+zX`K+j{a3&1V_y8!G0{%jC-ftoiP)G6dna_9v( zipn~L<*9n&st_K;bqe!yLj#90?-2eX(tNofY~aomnKWB)v3l97Ml;mHE>I09vh0Oj zpw_K40;p0i@cVnb=PpPN{dqmRK+lJ}cct%WzoIRhT%9QCV75tm=Zym7x#e#yR&91q4-DIs?%jNg{(A#S&4R3kO0ao{xSGKByT_7Y} ZbJ+{KKnx(*1*!qTE None: for _ in range(iterations): - process = subprocess.Popen([rust_binary, "-c", config_file]) + config_file_path = f"../taskbroker/tests/{config_file}" + process = subprocess.Popen([rust_binary, "-c", config_file_path]) time.sleep(random.randint(min_sleep, max_sleep)) process.send_signal(signal.SIGINT) try: diff --git a/taskbroker-inflight.sqlite b/taskbroker-inflight.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..9bdfdfc9fcb3ef259ab413f0ccb91b0269e59b3d GIT binary patch literal 20480 zcmeI&&ui0Q7zgmA?KWGsb>g9Tdr#uP>e`9W!JO@u&9bbWZ3^xnAxqwNFH3)zyd6UX zMW+{UBA&$41dkpB^*RUs1M10(7ykhdDtPcse>t<|>S26e5}G&f^Cs{6c|ssb%hy!K zDXBJ{y1~h`G$DmT(iK7^NeYQ|NUZ(h)g%3i_>~WA2W^I=^S4@K@t;y;bXOXG6n`_i zJ3bQ>jQtRR00bZa0SG_<0uX=z1n!>-k0fSiWcPTU&<1nKvAgFKEh=Qu4Jot5uhBlFKh0-ELSGwRG_)wjhSAp{$VDyU`X-lRN|NG9c-p-!50am;PK&T8EWxc++RpS1d0d+FSwj_W0( z>wP$x7IjUBwK@sYl(SAba>b&mWDA3S-BxphnwxIBe$-_|x2f6oMx}FBr~Tpg2N#>13k@gA$sl|` zhMw~835&bR3GdD_CRREASP}~g1Rwwb2tWV=5P$##AOHafKmY>&O~95%qRZ1!J8U}C z5TEAz?@Ng9bys>SeKDCzCDT(RotjByX403w&3}4+=Jc-@h4b{y)YE#?&AH W0SG_<0uX=z1Rwwb2tWV=Lf{WE5lSrp literal 0 HcmV?d00001 diff --git a/taskbroker-inflight.sqlite-wal b/taskbroker-inflight.sqlite-wal new file mode 100644 index 00000000000000..e69de29bb2d1d6 From cde40ce5b39b668efc1ba5a319ad3be4acbc2583 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Wed, 20 Nov 2024 17:09:33 -0500 Subject: [PATCH 5/7] create consumer in separate threads and close in variable time --- db_0.sqlite | Bin 4096 -> 0 bytes db_0.sqlite-shm | Bin 32768 -> 0 bytes db_0.sqlite-wal | Bin 399672 -> 0 bytes src/sentry/runner/commands/run.py | 24 +++++++++++++++++++++++- 4 files changed, 23 insertions(+), 1 deletion(-) delete mode 100644 db_0.sqlite delete mode 100644 db_0.sqlite-shm delete mode 100644 db_0.sqlite-wal diff --git a/db_0.sqlite b/db_0.sqlite deleted file mode 100644 index 7ee7c113a09428e4daafacb6e70a35d18573e608..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmWFz^vNtqRY=P(%1ta$FlG>7U}9o$P*7lCU|@t|AVoG{WYDWB;00+HAlr;ljiVtj n8UmvsFd71*Aut*OqaiRF0;3@?8UmvsFd71*Aut*O6ovo*4{!$i diff --git a/db_0.sqlite-shm b/db_0.sqlite-shm deleted file mode 100644 index ee7f8e5e79dfebe442c83cf8320912f2fd02a4f7..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 32768 zcmeI)%T7~K6o>JTO9k;(MX=sb5Jm0^Dk>FFQ7T>l0j2c)gjrxz9f`|M`7rRa#Nsy^2--`R^8H(;TWLROPufJg3U<;&ak_lqa5fAe`IE znUdV2xlzSg;vCU`6~EhWk8=J?h0k0HD4>7>3Mim}0tzUgfC36Apnw7jD4>7>3Mim} z0tzUgfC36Apnw7jD4>7>3Mim}0tzUgfC36Apnw7jD4>7>3Mim}0tzUgfC36Apg_U` zP2paLt2|A3s^b)B1cByov&z9n2x0~W5*KJG-J6s6SO+T500M1vaG0Z{Il)P~$#8~q z^l*_r`nkd&&l(`55frF}Ks$#x!ZD81Nf)O$%~{TKfnF|gnE{3vWt{6wa+4|UFwH$? zxz9r$u~3UZ_EaD-fnlDo$a9ui;U%lAvCb=ACpO22W$Mcf4nlEsFIE>tqG0DzH@5ZZ=au0RX`E3h+wKsR&UH8mo zyOPoU_OI@xU;3+Gop8>3myP9I!CN=EmgWS7J-e+f@h(ANZQIySyZ@Zh=0Cq9c)5C8!X009sH0T2KI5C8!X0D;4bK>nm; zYS#Su$&YvDs5S16W-lDSWOVJ=s!5s~TQ`xdeB5^W@Zh46L1ARk;$?$E%ST3elj75}8*JOnW`)Ho zRxBG_w46V$c+_6)+H0~K)(VT4tyo-f%)P|5H~61wG&i=^4L*MG;W8(k*48!aocYPb z*o5O=!a%V#{3zX!TUU5IS_xxRs@&tR>(iy)<*)O(z2JxkE`8hgethFIeg4j(DZ&2> z8wh{^2!H?xfB*=900@8p2!H?xfWY)5;M)k=10BJ?eB`RnAHA;anvDNlVHcR5XB^)O z1V8`;KmY_l00ck)1V8`;KmY_n0%dUoOF!}bcfNVf{L^CM2s*>37vc)`IgX&CcWWVz zpm%Hk!pLjlPY?hB5C8!X009sH0T2KI5GW(?u_N14vrauVd6iiZn2$|(<72B<=SFih zyN24iv5SgA^LqO`$^_^2T9t(7V~((L<$e4XcI-CRi+UgJ1*9Hv@lA>B)uM&6gDS3KWwsWa~8a_(fbQSr6cPpthqE zMhnfxCfreHLs5n=)c!a%p2FX`q=nDX+=gt`(=#Wn=;)edpIT4KUJvuARPRV9;t0Y3 zh65Hy@Vi^zdE77lcJD9zID+KB^$GS58wh{^2!H?xfB*=900@8p2!H?xfWR9_;DThT z`z)nj=z-TU#E-Yq`%h5O9^{M_{)e&^5b+xX1wJOA;|=l_v?twIjLAdcY0 z%kLZ8X`QS2aRkYMKP1>cY#;yvAOHd&00JNY0w4eaAOHd&00PsHKzn<4H~U(J7=XYo z@U-6l+snEx{VwtjPQw$9uLJ@h00JNY0w4eaAOHd&00JNY0@VckTmhYdUEmjcK7Z5b zEqks+-oa{6*aHF}00JNY0w4eaAOHd&00JNY0@ILyU#F1U1(Lt|$jUuY1^Qo3uz%P<00ck)1V8`;KmY_l00ck)1VCV#5y+>KJwtgpnI0NSCT4r8sS-;U zO~v$xEPGVI4<>GOrbiDIy zcfw(nE7*xGizE2u6UXjJ|K&H6#k_+*OAP#Mnw@%l9S{Hk5C8!X009sH0T2KI5C8!X zI7|qf*gn)9XVG9N%Jc#k+`a9@Z~x8fm-%{uw!RmbUV#0=1_B@e0w4eaAOHd&00JNY z0w6Fg2;@_35ihx6>x$`0rtLT$(NxJ4siQh3F&xL%byLp>aV=W*lJCC7_mXe=&X&iM zO?k;v5RhOUyG$?e-Q8Q){@}A~Z}#;9Z3800@8p2!H?xfB*=900@AY&?1=@Rm zUeF7$|FD4o2!H?xfB*=900@8p2!OyLOdy|XFL=qj3$f%1QFPTKhGtWRP}?$CNVcO= zL(*hTS5!A6#I=|rFZm~b{i$bK@sg{eD5?#->prhxYtgcoyqIx)lM&^HXdK+!i0y+GI} zpckmE7r1xFrf)v;;BY}N&@r&Ks25m^90P~k4FDe)1V8`;KmY_l00ck)1V8`;K;Ymb z;CuPw3~(I>Rxj|<@7{IGqg_jG^7R6pnTNPufc=RL1V8`;KmY_l00ck)1P%=X`E+N& zNuDf(k{g;Od#Yn7uBWil-m)dxx+U6{D5;t!Im}5O*P>-7c`@Po&ig;xY$$nK&5Y~J zORmIt$%EseM-+O2z)KFjKtKZZ2m%t&3)I#NTrRxpu77#UkACIn7w8-~)7J}}$ubN; zF>q+y0r4q800ck)1V8`;KmY_l00ck)1P%~^Lco8FA+qxj&mDrV1-b@KObna|#lS&)^T$U60T2KI5C8!X009sH0T2KI5ZM0&3IYEy z#?-Fqq8I4?>kt3t-+!4QzFr`e-chC(U=Oi@00@8p2!H?xfWW~qU}lF;&U6%6#k43vkyk z^a30O>JgkB_=ceu2uMIL5RiagptfG%g>Ss&$z9L?cG%A^km`Ruq8I3Y9aRYqwrLPP z4+ww&2!H?xfB*=900@8p2!KFq1bi=loXI$KSm^~$e#dIE{)C&ezFr`m`dWovfbGBr z0w4eaAOHd&a8MA)r_&K9xo3&8Y7!PUE>l8H#q@N`)MQK3B&-liSs&NdVbr#etpm0ColiqvA$lQyX)pEy#U*V4Fo^{1V8`;4jh4ex;x?|mlR!d zWNLe^V-qSXrsQhWwroqIhAx>b-FjS$mYw9qNb*;ny|LLy@{W3uCx{u8%e(JPV!2}7xV&wZ6EmsxSIp@ z2qGxDB4dRTQ&1}-#I=~BvEu2?cQ!j#)OV7XvPzT-Ziu*0))o*|6qTy16VMBA-*88v zS9A{zB@?qvSu{LHXX^n`AygC%R;a^dJ;hXH$#E4aBgD0sqF%A_;qA@#it>{#tHDVZ zW0VuYaXxB7FTnc*^a9WeG}a6JeC>VT8XY@J@$~|k{%b4r0{z!QG4KX21@Lwt00JNY z0w4eaAOHd&00JOzco8TB{Kq&sGSCaO`v?qMS4>wjZO8G5rb?_TucJC9F&tK;S2y*2 z(hJa!0k#_(2!H?xyb%QQ>7Iz=OVo%Ysk)^bhDQ|3uw2CvZdMR(9SwO4oN0OD{c94?)c)zxsL;Sfe;1q3k3ci zc;8IH z*9-LYUy$g(pgh9>6a#O>QV8z>0w4eaAOHd&00JNY0w4ea(~*Gh;*N9W^gu7r;UiFp zqN^S;G@G)xAKNlaQ*=~nFn^e)E2_Ky^a2-Oojw1$WA7Gxy+CjK#yWZdb^sd)fWUqu zkWcqUoaCzPIHqJcy5Z_X6sf6;mPy&LV5l|=4rEU9xE3ut$%~QX4`1_8vvGj??j$dz zV-J0R&(Sn4+;_%l&sZJ65!p3ueNgVJ#+TSgVR8 zhhBj93g`tm3g`s_6wnKB63`0-UUKLK0us;*)Yc1pKXcEs@40bX%GV3@_78HsK>y&t znFBip*7m;+#lU`B4B~}B00ck)1V8`;KmY_l00cnbP$l5IxZ@mFz0eDE`Uv6@d>&xE zz|BYe=JJ1hO<(Hk1^U`H)YS{HL)buIY6;}ieG$i(s%cas4%5dtBB8El*osNn9$8e0 zYIqiNlE<`Y+3{73B!70#ZOulKchzxxNvhH~k~}ybx&Hdh5+|V@DHbtG7o<%&(v#3KQMW>oeDcdDxgt!(hJIRZY zq0kEi)@jnDzUMtfJ^t}efz!Vq)@Dd;Z z0w4eaAOHd&00JNY0w8d(5GcC18@ng_p%+N`2ozaXR8Q4(*%gWIP|G%T&m@{;+ODKi zI&Jg<=|?ACxa(iKj`Z~c1BubbdV$e>?#lV}K*UMz$hNI2%ug_%h zuG-YmGeTU8mYw9qNb-$OZ)-M^JXJ4}+>jebk{hK+^Uxg%y+B~XMSg*>S0KLt?-kGs za1_uBa1_uBa1_uBa1_uBa1_uBa1_uB1Sp^v;3S|I2)yLb3j`#f7pSclxcH`dCx30= zzkcZJ1qS+d*VGI2?M79CeYS|;i68(1AOHd&00JNY0w4eaAn=9|@Lk+-F4h6)1=0mh zs;Dt3fMQdPWj{BFsVk1HyOt?Zw%1jsyIpe0^7zMVdcuDON8Ys zMug=nmn{>{9$q@MX!u;=tif|nDYOXRQCPfe#bVyFcyv9T%#HaUM_9UiWbll^;fmwy zJTL3!gvCqGsC}F|j_ZuFrPvbokw=ye4OTqgaj7%z=5p?2v{C83C)e3-mT_1$T4**l z;f^{RCJS`xe4O>wPvP%ee9>%<<~C%jo}M{rMMu{x`_yD&Y{GFb$zC|lmXV`$LvCH+ z@n}8Fqf)&io%xfJsaf;qCqLd1QQoA)ZiDTg5Ml9(70U(}E$7cG9<^7y_L}U5wY9pDdx>jr@ITdPZfva^ zeEi_UWllP+t!vgf^XtK{gfS{r?(x_5>C)&@Sknh~fv^1dqbt+re(AwZb}%{cn%^Wf z@LHX`gNP%Lr1`QqB1;Qpb)l>+AgU-Kj)0>Gitm+&To5rpr6TVjM^P1Bf;fUe@P|5u zVRQ-N2zakR905mxI0B9WaReL%;s`hj#1RB25J$jCAdVnV&m)c?Ab~gnmo53b2G}p( z1i!(>qi=g8j-YRQzYt^|L?nUF3>yf500@8p2!H?xfB*=9001QJRfuX!nrum!1w6qtQQ|T>tf^`%uQapz*i()yl!%hJGo%e^>+;WzmH9x z*_lG#!2rb!)F}*5r1l*J^a6o&LRO23J3}Wq@(u<*jx_WF90l|O90l|O90l|O90l|O z90l|O0Sf2^I0@(l0xvoA0s#r=1&Sn?Nx;t)kP7SqM||YMe>(AoHGlB!0?Gcp3HA>g z2!H?xfB*=900@8p2!H?xfB*XP5FUPyOkl{|K0%i;*${_K_`zx9i4m-}%9NA!KVVH`oi|UjYO_00ck) z1V8`;KmY_l;Ef^RySNR76(Q7F0z6Gri7R@R;TWPWYPv;;OGI%d^a4Ev1cpo%Q*%5; zqQul?Q!@-n^$ba|JkvEb=mp&NLVvQEj>MHr+i^UisgfyDM|DhMIF7CB=Dz6#&Y%3+ zDHs3r@kw7V(BAu0K`+4m!v+E%00JNY0w4eaAOHd&00M_9fqZ&q#7Q2vGS3KcEn0Sx z7bD4ce*BSUBgyMK$xBY~=AGow3k0bskzXK4_RTKrGQGg(zHs-3$;B`Jxvv*EvTrfh z3-m4SA1qWVgks=O9SiZfK>!3m00ck)1V8`;KmY{x8v);S%YwH(MWwFm$SRR^iK@2f z+SId&=4q;BA6fA4LN6d{M3Pk9(hb8Sie*@?Y#5Gb>#8JqtnecA0#ZkzKM_S&Jz{7! zRR~L)V3?-psMKJHG+j~mqh8?n1)q5Ld(Lg!o^d97#?}k>-Ir6nX(M zL;<}3^a6492o`=K_4~|wKKc<~FK|@foJM+qIeokPU+=#bIR*}?n*%;92!H?xfB*=9 z00@8p2!Oz!M! z5~!(*mPwhX+hE!W*_C_!M$ii+O<6QNM`uVxCIt{hL!_Q5vw{kWEIF=np!EWurl;L? z%B5e}?&}3QGY@dR0Q(ag2!H?xfB*=900@8p2ps$b^68@@PV%_bTt6Q!kR-R2oN;o20TPIrIYD9SXewM*+P6M*+P6 zM*+P+2?e{cmFWd${Pr6g{&IkR$kz)T)B9*aFVOqwz=;Fb)u>3oGYmj6aIj7g@VP($ z1V8`;KmY_l00cnbFeTu7Wo4BTYABjZT~^zjP|I{I7W^)04$&OlQjhT)#Tm(<7ZCe> zPO7F+jW{e3z9SL_)UXwkvOTh>64mga7ogKeFOWX{8}FTCSdaO7fmHgAGQ9wMhz$fl z00ck)1V8`;4lV-u)G-Awd3V7}9=8U_2yrc@$V-0DQ=j>6YhH3iRvUWB`Ej0P9C`un z1B70Hqkvw3qkvw3qkvw3qkvwZgo54J%k%Io>L*qf@(axB{d!Y+f!?q8eHw~^ zgKLU}PXhuV00JNY0w4eaAOHf>mw@lRb}Y-3bWJo|Mu_D zp7jAgzreA*S2xrP^j_Up$}j-Mz(FuU#76)D5C8!X009sH0T2LzL!W@}l{HOGvvs0T znHt2Bb?SLU)pbj;Y(sTra%{m54!wY?5Y?76*`l6HEYUM%M|Wf*S+-_6s;xmUK)d~} z!UPYpY7*wsmsz?AQ!zc=G8r~alUUc1^mL&=siMZD0E$gDmPElIrmi@)?pmfu+1|rV zFYwqCcc}BeutW9r0^MCVRp|xTE^HtG0w4eaAaI}vXL z%{s~3%bnn|8aTm?#tGLIqblJ#^a6pu2l)j!3DhGfDJP&8fL?&DjVr5EKravs6=~=N zI11r&Bv%(TPc@J88^u|5p%*ZZ@Hs&*kYq^^ zG)Jbk=Q=EVf~=U5t5MssEsYwwWI``s>TM^s4|TIkOC^>r`eF$p%O2G|$rELl8kQ<~ zipEBel>b9tOq|#G((~J{z3p20kN>^&MPDzF>Ab#%UV!by1_B@e0wC~)63C~Ii#W-h zm>Cs|B#&t^MNaakcl}+nPV$aAPI4m24fCxNspLctU0=`(a3=@!0s#u>1vm-l1)vwG za*`vzKyZDep%>sNpcmjMpcg2iU=!Oiy}lc0)1n zhAt2AW*`6pAOHd&00JNY0w8d(5%9g&wx$rxbu5WWA~9T-x|-uUvMM@~Cu_QXJoExb z`Unh#I*!f)-8E4quIO2YV~9*Gp<9HwL=nD7@!0g`RYv=`fkMCR6_u9aj z13L!R`V|SF7})=dBfJ9$fB*=900@8p2!H?x94G?5S5}vaLaFMws>YH75YZx*p)>O~ z^<>Y}37zdXinGu|FJMD2Aou!RMbwBSsk)^bhDQ|3uw2MQAaPp4qyWT z5ZF%y^6A+TCwbgmI3vWhm?9_nZ8zQ0tdqQ}+zHOpt+Sdh4I{}lt+FOL^a7z}A9?|9 z*@s?$qkvu@Kmok~Cjq?x^a9WeKrb*~7Dr@hA@l;0)C3CX1xhy%wy-JF3;gsqi_YJk zc*jqDy}+A#_U@Bjpl5IYPxjM-2`>u*AOHd&00JNY0w4eaATU(~d~c(tTB=P9QMDY2 z$Sf||Qyhabf37DRrf92g@*BnZA)psfj`BINWbdA$QrAzbKqOtFsx7)U^=zVfnrcBW z;6g9pNS%IHF=f&49G#T}5EVj2(GaO;%B-M*B1?{|NF9a#L=;{1h@sh(HL@+kG(|_H z29s21y0Sm@0>_*w7=U76Di_RnEf4?!5C8!X009sH0TB383HaVdSyydC)`><;m&nwlo~X($b6qnh zq|FopbNohe-WKQuyrX?i7NJBph-bLELu^$sbkDPGjp&}`Sc(C?fM@mjoK)FyOv!L` z28tC|pr$TbCS{&(gJ~yZ=mn%hLoYD?{$b~5|9kyceZ4?`a&-f}06U1SWdiy1oQRV= zZsg7gaV@6EN&fItw>9e|PuFph>vH3o&~Bdu54i5*CjvvM6PRh?+zlMR8bc zvS|^=5DhxlZxrWKf?hy9#^>bv*%e&NuwBU}9?OAm%7#h|*`cy3vy1`I3yA#%DS)Q2 z#1jq+Y$hlpO_ zqDQX0_0=!h|KaNe1`_8t)(f1!kHc9$JvZVcj~ml6LR^a}a*}U(;VaEL$-B!po)^wIKvq~9;`sv%D#*Jr09!@^EzL8e%tQ7AN=W(d%nG+Xct(TNG$Dtx$lME zpZEMI^Y7i?Nq;l7t!qo?-*$Ye{mQmna;yovKz>fLU;{`GvGkntJk?Z*rHjmx=@D7> zsP0J~3p1gHrAnTn`Pt+ew`kc0@Z{E4@9dhfbMu$Cw=bSIb57Qs$W30lAV;%nE?zge z#+_U+>H0f{{oltX&+MF0(*!_RCdft=*Vu6-NDvjifWRt-FdKp*ojtkEcC*>BiBSf!L zjK(&Ss=k;{_eW3|tYn>SiaIepi+Gx6QHM&3PBoWOwoA+iaV=U#QS2&D@BYEACc8?# z>U@$Kb6KiEUkT=Q>AnaGHO4TT5#n03jH1{pp51d>v%O;9QIt>xBUC=!8$}TluEtQr zv}hScu~$5N%|p%hiq3EV@Jj}9BcdcX8bR1#E?3oT?1`X=EBc!e;##zfqSz}oJ+h_Q zUa{{esxFXB1VwCVVy?G^3iXPu7~qE|t|JJhmG-7|?MnXIEQ7hgt*Ytb@_(!65xoz3=&w(_$c_6ntR z*2|BFy&^asqUemEh;wB6Q>@0W&X!RWd&SQCKih1tsE;D%0#Pc5%~VGLMK@cRBxc8@ zgU<+YEm}rV>=hdy-rj7lC_m|8rx0UE1jl)&NVP{uC^0$j+4T|AqGb}rPVvcKf2!F| zQI|x_NFi3T468>T}G25_p#dO)Y;5Z)9REZ6i4lB?>42O-Ky2yW3w$^?a>e%VeRFluE}$fm=OmPZW#E~f-I)iw z?@8a0`fArro!56<-F`(yO@jQK5UMgS4%xJX8d9%i0f2?1F&|h+_(%;4Nr(a#5^gMl zg@mJkg@mJkg@mJkg@mJkg@mJkg@mJkg@mJkg(N@$3kfFy3rSIC#q0t}-|HXf1tW`vzV~bo6db^j|Zuqjzin!a8lS8w5ZA1V8`;KmY_l00ck)1QH)RvOP8H)KimJ znK^2WyJHjH_}Hq|xlv#HOYPj)MKm|IZi0z&YwzzkeRy!u$e=K?Xz{W^p zV~((L<@)aY(@|DY$31<&49a=Peu5i}ixu+CbgzqRUUbbQ}Z&^ILo=)b*{Es6n zT|P2+#^7+p@pYb;b#ubvrDxPWP94W}M%hwq3H!(+ONRz4p6|HS8FzCzcQV?j^xj;Y zJX&ZrHsOvs8zu{M>U^B_)lcE?Tzt`Nj^;LGtDc@YX+=lZEc?`CVr;^3FUejw&X$p* zbVF`k;qhoa%%f7hBc1t^lBrqq=O;hjSvbw3>@_bLT|2g__yw|+kK4*lYvs|>>Ac9D z%=#yz$h-PvI&RjU99v&FVIza@XXm_h-iu#;Y=V9L@o{!W*lYM_Uy8`jq9M(!o9y}|!f zqq(uQZt(Gg50^RVw6?BU=ghAMzY@l%RJo^}4J*OgzObeb>;f--YSVq++9*8WCnIbd zc-bF)(gUvzyqtNc``+}9)YrOh?)+TGHSHVQHY6`djQWT7X@hz`l6xNN`M_KU^?c|i zhi{no3iyUO3iyUO3iyUO3iyUO3iyUO3iyUO3iyTt6z~mm67UU|QgA>$&-+9e^q!a< z7t%icg5KF$K6MQH)z#zw@>lkOUEtX#Csyr#$1i@<$AScU62&-z&MR4lfwDM)ViW=5 z2xM&mOE4J*XEfHyBaWaUpZ=HIdx!FJGFb@jiOZvaObf`gP@MvIx@20&&j}|j%DlLM zzG)fq*Kn-Jr(r7!<}zv6iZ}|`iZ}|`iZ}|`iZ}|`iZ}|`iUJg{6_rRZ@qET#f)sO3 z9G25AP9l+t+69ttH8K}|cF6Ga3-okcod~iC^zZF^uJ@^)$1)Fe?@Hg{=N3q=X5|X@ zZ9_eRh*|*k2!i1W^$0?>0O}F&UV*%v90im`90jsYa1>A$1t?IDfRjKy0@NcwJp$At zKs^G~BiOfk1b#e0I?xNW-|13&=r6~8yFfa1TeME$-RXa3JFtNO2!H?xfB*;_Z~`!~ z!^AG?a;fr9XjgtyB<;MXTB=R_qU?@DWR_9eQyhcU8J9fSFh!dcBZzC!a$fCwZ}p2D zZGQ09W(z^rD{`cnjVt?TW>w)5nAihjQ5q$GISQ2gnNS zMS`kz@^YklDdGsK`IJ?6MH~U2%pi_{qd*)1M}as3jskH690lSCI10oOlu)otaz};g z`NMKD#z`QKfY&1^vkN@F{i7e6xAfXaD(wR8?~K+dM1BEQhrCAd$HsG6wYM*+J)uo{M4ARtk$7r5e*|8w-w zFWm4%wOye9smz1j_oVOeGY-Hm0K33KSuI3wF};LXnyQ9N!I5%4!BjSE5UhnFbN!y( zunV}J!V+1^mSpReXj`JBYM$gsE$cXU-{R{yci#WmXOfHO&76~UCvuaQF7ONbT)b{_ zjXSwu()D)?`@fG(p4pjd3s04QE=%WH6-7lmSrqGOXClWt!xsq7b=kTXc7dvOZy6uh z1*-T&#tP&ejL7F<7vRet*abKW*abKW*ai3su)RB63@fs%sGh3nvMUnZp_Xmxo=G&x zv|UN3l%=zcY0>h$}=jYuN?*?uK2UKh zE&#g#p`vJr)HB&wZ7QMPnc^Ik8j>b!x}v(MM?fS=)h*pH zJfc{J<;sTPc($%elE)xr1d&BkH)_%H1h;5-+w{nmW({u? z^q39OEZHc7xqdzky8vGd!!E!V!)-5xi(y9|hk4$*0>7s$^IO<~HsxDGu%yL@=f z)bdNgE)bc3)Bsdz7r5Z2GAJhWr9kq!;ME2X+D21z;D* z`UxhGU!eNF9<{NePGPthhF*X#hG7>77sF44i($_aWz{6i4J=baO~v$d%hY5`(C^G4UJyPytuAAJv(>x z?J+7hgk7NVvKM**u`!@ZyTBbs9C`Gs6FWxg*#&w&j5vbw(jTx3Op#q6eFy9UunWL0 z0K0%6r&8m#1-k(93xx4$HS_{Z5c_Z8Vpvji&5^0?xsFXJQy)pL#>6(ZrBOqdOeT|# zYtiyzxTqp|<=GpX^$d4})BQ3PiD<@nhJ)jQwTPjBT_9KtA7Xle{M^vIrOb=#q|>t- zhu1;i%v2&-d0`i*aeIVafZZNNE|ID3L1lmtN6-*ZrCs3iXTJT-3-8(T!TNTA%m;l~jk#{ikTR<-` zMS6j*n_w4!T>y3g*ahP40?ZwHW4IWO%N{VT^9wK(unTa{@PXG0%b@ zzu@FP=DqD{U!7=Q-E~vv^&MCD@9lf8_o<%8G7og`O5f3bMO!wxI&nUG=DuwC^xU3s zR!3x$usmZTE1anikwv^YisG=GW2VLO!iWab`^B|r*~M}1t$te6EiZiKyKPOSMa^`F zP|Ngwx+*u!1jCO9-kyAVP6UOltF|HQL?fn4WNK1RRArZF9?Lss8zM`J6W3yjP(1w9 zZOx)c*Fz!4pa_nKDBcu7;i;Bt6ZV}gMl^w^F3`aLyoroeebjxUizwR4E(Pw`EQeC6?&&8`D@kJ`FHlP}m>%GO60;rIwpB&7O-W?hV0KNoitMo( z4h)cLQ-=l0#68-9zja#&gqLfW~_Ya!wD_!M%fhW+C)Vddlw!jz9t3gE*9!cI8L7{3I z8yOsyg3%EPbv?sYOv?7iqDoZ5v)E-5)1qY*#a{93p4*!375k2&qE{pm$ta4$as&x_ zMe3LWiteGIWMa0gvRHaUVb_D}u{$cYOvhqltE4$Zb9BqUnZ~qexmOhD756;#nP%@G zb*>L37%R(<^Rbj4=R-y6=mLp!frRVdMqSHb`2}p^F+GVX8!9nmhsvsKdR|6|YcWM6 z_B`^y_9kao<=>Q#6mtDrXMY5#6&K zOEHv;5Z9t*62(FD{vSTt?4Vh1t|=3#-mIdCA~+tRI5L8QC7JLPmAbAYt3=Wzb~hDW zn|d~zZfdG!vt+h0Em}rV95i=6|BYq`&H5PF_KH0>Kiq7us6W??nP64UHQBPh%r3C?KR>u&=eEE3av$^F z7VQG5>tPqDEaL{d0Iz75sdrWC!MeZv4$tQ@{CId(!Y*)F&g#Ps0=qyY-8$?7e5|Ol z3$WYP-q5lfx9XoJc7god&=|sOh%u&)Y4W8@cA>x+(s-7KANb;}?gcB^- z1%m4%4Z8qG0lNT40lPp6MWtO}sq?+I?W**?zRv5pU7#xqy+C<+ci07B7jP%5tVQ*# zb9>jl_Y{ZzOZj#C31a(aRM+)xgo%$s&!&kv3WSGAD@HBmW4*dQ>G zk!ZRt*afNqMHT9>3j{M<=mi22unWL0z@-%-3fKks1p>POM*+J)2}Pw{;J^Rv_5c3c zg@66qzK*LSc7e`e#1Twhx*%K(*Aq1HIDv5fQZ@tR$HM^%aRmITgkAu8frU&mszm1c zY=C+oTnxuW2^_?61o^pPKm<#-byx;Ogrj23WiRXkF@Rtfs0I`nEAp@l1ou+d1p*SV z3xsn`=mmJMfL(y2fL(y2fL)-3qS7vK!Q;0+(*A*SzFx2kw4K573#6{^%61O7oq_xU zunVlI;UsSuN06TzUTIAFdRRv6R8M8ge1OO=Pz@+D*YDp8yFkrU3M~h(9e_j$)n~fw zQh;3`1`zB5)qo;n1?&QR2?)CYpX(#P07n7407n7407n74KnX=fs-DAgwZc)9=NI_? z^KU!vn!B#LtJ*Hm{}l2L`q2c(CeQ3d96>0ZfL%Z)wPf-9KX=JWZsTau2|J}OvPs~|g%P!D&H|i0< zE-+b;kY0j7_XU-a#>@b_wDL0w*B!0s#rc5pZKs z8hU}y9U4v56H{&AkgG?KpBu^{SgM}s_VpU1>Iuyr4VS&J3)H%mHn0o4^Z(s`&QHGR zwAZ!^^nMX_3ZWNZqKQaI1?&Q_3+zX`K+j{a3&1V_y8!G0{%jC-ftoiP)G6dna_9v( zipn~L<*9n&st_K;bqe!yLj#90?-2eX(tNofY~aomnKWB)v3l97Ml;mHE>I09vh0Oj zpw_K40;p0i@cVnb=PpPN{dqmRK+lJ}cct%WzoIRhT%9QCV75tm=Zym7x#e#yR&91q4-DIs?%jNg{(A#S&4R3kO0ao{xSGKByT_7Y} ZbJ+{KKnx(*1*!qTE Date: Wed, 20 Nov 2024 18:09:25 -0800 Subject: [PATCH 6/7] parameterize input and redirect output to file --- src/sentry/runner/commands/run.py | 163 +++++++++++++++++++----------- taskbroker-inflight.sqlite | Bin 20480 -> 0 bytes taskbroker-inflight.sqlite-wal | 0 3 files changed, 105 insertions(+), 58 deletions(-) delete mode 100644 taskbroker-inflight.sqlite delete mode 100644 taskbroker-inflight.sqlite-wal diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index c9db7258065b73..b770d6a249fd59 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -4,7 +4,7 @@ import os import signal from multiprocessing import cpu_count -from typing import Any +from threading import Thread import click @@ -254,35 +254,84 @@ def taskworker(rpc_host: str, max_task_count: int, **options: Any) -> None: @run.command() -@click.option("--rust-binary", help="Path to taskbroker brinary") +@click.option("--consumer-path", type=str, help="Path to taskbroker brinary") +@click.option( + "--num-consumers", + type=int, + help="Number of consumers to run in the consumer group", + default=8, + show_default=True, +) +@click.option( + "--num-messages", + type=int, + help="Number of messages to send to the kafka topic", + default=80_000, + show_default=True, +) +@click.option( + "--num-restarts", + type=int, + help="Number of restarts for each consumers", + default=24, + show_default=True, +) +@click.option( + "--min-restart-duration", + type=int, + help="Minimum number of seconds between each restarts per consumer", + default=1, + show_default=True, +) +@click.option( + "--max-restart-duration", + type=int, + help="Maximum number of seconds between each restarts per consumer", + default=30, + show_default=True, +) @log_options() @configuration -def taskbroker_integration_test(rust_binary: str) -> None: - import datetime +def taskbroker_integration_test( + consumer_path: str, + num_consumers: int, + num_messages: int, + num_restarts: int, + min_restart_duration: int, + max_restart_duration: int, +) -> None: import random import subprocess import threading import time - import uuid from pathlib import Path import yaml - from sentry.utils import json - def manage_consumer( - rust_binary: str, config_file: str, iterations: int, min_sleep: int, max_sleep: int + consumer_index: int, + consumer_path: str, + config_file: str, + iterations: int, + min_sleep: int, + max_sleep: int, + log_file_path: str, ) -> None: - for _ in range(iterations): - config_file_path = f"../taskbroker/tests/{config_file}" - process = subprocess.Popen([rust_binary, "-c", config_file_path]) - time.sleep(random.randint(min_sleep, max_sleep)) - process.send_signal(signal.SIGINT) - try: - return_code = process.wait(timeout=10) - assert return_code == 0 - except Exception: - process.kill() + with open(log_file_path, "a") as log_file: + print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}") + for i in range(iterations): + config_file_path = f"../taskbroker/tests/{config_file}" + process = subprocess.Popen([consumer_path, "-c", config_file_path], stderr=log_file) + time.sleep(random.randint(min_sleep, max_sleep)) + print( + f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" + ) + process.send_signal(signal.SIGINT) + try: + return_code = process.wait(timeout=10) + assert return_code == 0 + except Exception: + process.kill() # First check if taskdemo topic exists print("Checking if taskdemo topic already exists") @@ -340,34 +389,14 @@ def manage_consumer( # Create config files for consumers print("Creating config files for consumers in taskbroker/tests") consumer_configs = { - "config_0.yml": { - "db_path": "db_0.sqlite", - "kafka_topic": "task-worker", - "kafka_consumer_group": "task-worker-integration-test", - "kafka_auto_offset_reset": "earliest", - "grpc_port": 50051, - }, - "config_1.yml": { - "db_path": "db_1.sqlite", - "kafka_topic": "task-worker", - "kafka_consumer_group": "task-worker-integration-test", - "kafka_auto_offset_reset": "earliest", - "grpc_port": 50052, - }, - "config_2.yml": { - "db_path": "db_2.sqlite", + f"config_{i}.yml": { + "db_path": f"db_{i}.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", "kafka_auto_offset_reset": "earliest", - "grpc_port": 50053, - }, - "config_3.yml": { - "db_path": "db_3.sqlite", - "kafka_topic": "task-worker", - "kafka_consumer_group": "task-worker-integration-test", - "kafka_auto_offset_reset": "earliest", - "grpc_port": 50054, - }, + "grpc_port": 50051 + i, + } + for i in range(num_consumers) } test_dir = Path("../taskbroker/tests") @@ -381,26 +410,23 @@ def manage_consumer( # Produce a test message to the taskdemo topic from sentry.taskdemo import say_hello - for i in range(10): - print(f"Sending messages {i}") + for i in range(num_messages): + print(f"Sending message: {i}", end="\r") say_hello.delay("hello world") + print(f"\nDone: sent {num_messages} messages") - consumer_params = [ - {"config": "config_0.yml", "iterations": 1, "min_sleep": 5, "max_sleep": 6}, - {"config": "config_1.yml", "iterations": 1, "min_sleep": 14, "max_sleep": 15}, - {"config": "config_2.yml", "iterations": 1, "min_sleep": 14, "max_sleep": 15}, - {"config": "config_3.yml", "iterations": 1, "min_sleep": 14, "max_sleep": 15}, - ] - threads = [] - for consumer_param in consumer_params: + threads: list[Thread] = [] + for i in range(num_consumers): thread = threading.Thread( target=manage_consumer, args=( - rust_binary, - consumer_param["config"], - consumer_param["iterations"], - consumer_param["min_sleep"], - consumer_param["max_sleep"], + i, + consumer_path, + f"config_{i}.yml", + num_restarts, + min_restart_duration, + max_restart_duration, + f"consumer_{i}.log", ), ) thread.start() @@ -408,9 +434,30 @@ def manage_consumer( for t in threads: t.join() + except Exception: raise + query_prelude = "".join([f"attach 'db_{i}.sqlite' as db{i};\n" for i in range(num_consumers)]) + + from_stmt = "\nUNION ALL\n".join( + [f" SELECT * FROM db{i}.inflight_taskactivations" for i in range(num_consumers)] + ) + query = f""" +{query_prelude} +SELECT + partition, + (max(offset) - min(offset)) + 1 AS offset_diff, + count(*) AS occ, + (max(offset) - min(offset)) + 1 - count(offset) AS delta +FROM ( +{from_stmt} +) +GROUP BY partition +ORDER BY partition; + """ + print(f"DONE!!!\nUse the following query to validate sqlite integrity:\n{query}") + @run.command() @click.option( diff --git a/taskbroker-inflight.sqlite b/taskbroker-inflight.sqlite deleted file mode 100644 index 9bdfdfc9fcb3ef259ab413f0ccb91b0269e59b3d..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 20480 zcmeI&&ui0Q7zgmA?KWGsb>g9Tdr#uP>e`9W!JO@u&9bbWZ3^xnAxqwNFH3)zyd6UX zMW+{UBA&$41dkpB^*RUs1M10(7ykhdDtPcse>t<|>S26e5}G&f^Cs{6c|ssb%hy!K zDXBJ{y1~h`G$DmT(iK7^NeYQ|NUZ(h)g%3i_>~WA2W^I=^S4@K@t;y;bXOXG6n`_i zJ3bQ>jQtRR00bZa0SG_<0uX=z1n!>-k0fSiWcPTU&<1nKvAgFKEh=Qu4Jot5uhBlFKh0-ELSGwRG_)wjhSAp{$VDyU`X-lRN|NG9c-p-!50am;PK&T8EWxc++RpS1d0d+FSwj_W0( z>wP$x7IjUBwK@sYl(SAba>b&mWDA3S-BxphnwxIBe$-_|x2f6oMx}FBr~Tpg2N#>13k@gA$sl|` zhMw~835&bR3GdD_CRREASP}~g1Rwwb2tWV=5P$##AOHafKmY>&O~95%qRZ1!J8U}C z5TEAz?@Ng9bys>SeKDCzCDT(RotjByX403w&3}4+=Jc-@h4b{y)YE#?&AH W0SG_<0uX=z1Rwwb2tWV=Lf{WE5lSrp diff --git a/taskbroker-inflight.sqlite-wal b/taskbroker-inflight.sqlite-wal deleted file mode 100644 index e69de29bb2d1d6..00000000000000 From 1f71bd03b65d081df622b80db20791c4a81969eb Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Fri, 22 Nov 2024 17:17:16 -0500 Subject: [PATCH 7/7] use exist sqlite to validate --- src/sentry/runner/commands/run.py | 52 +++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index b770d6a249fd59..4d9e7891d3323e 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -301,6 +301,7 @@ def taskbroker_integration_test( max_restart_duration: int, ) -> None: import random + import sqlite3 import subprocess import threading import time @@ -333,6 +334,10 @@ def manage_consumer( except Exception: process.kill() + if num_consumers < 1: + print("Number of consumers must be greater than 0") + return + # First check if taskdemo topic exists print("Checking if taskdemo topic already exists") check_topic_cmd = [ @@ -346,7 +351,6 @@ def manage_consumer( ] result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) topics = result.stdout.strip().split("\n") - # Create taskdemo Kafka topic with 32 partitions if "task-worker" not in topics: print("task-worker topic does not exist, creating it with 32 partitions") @@ -390,7 +394,7 @@ def manage_consumer( print("Creating config files for consumers in taskbroker/tests") consumer_configs = { f"config_{i}.yml": { - "db_path": f"db_{i}.sqlite", + "db_path": f"db_{i}_{int(time.time())}.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", "kafka_auto_offset_reset": "earliest", @@ -438,25 +442,39 @@ def manage_consumer( except Exception: raise - query_prelude = "".join([f"attach 'db_{i}.sqlite' as db{i};\n" for i in range(num_consumers)]) - + attach_db_stmt = "".join( + [ + f"attach '{config['db_path']}' as {config['db_path'].replace('.sqlite', '')};\n" + for config in consumer_configs.values() + ] + ) from_stmt = "\nUNION ALL\n".join( - [f" SELECT * FROM db{i}.inflight_taskactivations" for i in range(num_consumers)] + [ + f" SELECT * FROM {config['db_path'].replace('.sqlite', '')}.inflight_taskactivations" + for config in consumer_configs.values() + ] ) query = f""" -{query_prelude} -SELECT - partition, - (max(offset) - min(offset)) + 1 AS offset_diff, - count(*) AS occ, - (max(offset) - min(offset)) + 1 - count(offset) AS delta -FROM ( -{from_stmt} -) -GROUP BY partition -ORDER BY partition; + SELECT + partition, + (max(offset) - min(offset)) + 1 AS offset_diff, + count(*) AS occ, + (max(offset) - min(offset)) + 1 - count(offset) AS delta + FROM ( + {from_stmt} + ) + GROUP BY partition + ORDER BY partition; """ - print(f"DONE!!!\nUse the following query to validate sqlite integrity:\n{query}") + + con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) + cur = con.cursor() + cur.executescript(attach_db_stmt) + res = cur.execute(query) + assert all( + [res[3] == 0 for res in res.fetchall()] + ) # Assert that each value in the delta (fourth) column is 0 + print("Taskbroker integration test completed successfully.") @run.command()