From 5838e3d6909e7e42e89ef0363ddb14e6703db233 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 16:54:37 -0500 Subject: [PATCH 01/35] create template for sentry ci and end to end pytest --- .github/workflows/ci.yml | 42 +++- python/__init__.py | 0 python/__pycache__/__init__.cpython-311.pyc | Bin 0 -> 160 bytes ...r_rebalancing.cpython-311-pytest-8.3.3.pyc | Bin 0 -> 9509 bytes .../test_consumer_rebalancing.py | 179 ++++++++++++++++++ python/pyproject.toml | 9 + python/requirements-dev.txt | 1 + 7 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 python/__init__.py create mode 100644 python/__pycache__/__init__.cpython-311.pyc create mode 100644 python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc create mode 100644 python/integration_tests/test_consumer_rebalancing.py create mode 100644 python/pyproject.toml create mode 100644 python/requirements-dev.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0eee45d7..2c2b942d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: run: cargo clippy --workspace --all-features --tests -- -D clippy::all test: - name: Test (ubuntu) + name: Tests (ubuntu) runs-on: ubuntu-latest steps: @@ -56,3 +56,43 @@ jobs: with: command: test args: --all + + sentry: + name: Sentry End-To-End Tests + runs-on: ubuntu-latest + timeout-minutes: 60 + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Registry login + run: | + echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin + + - name: Checkout sentry + uses: actions/checkout@v2 + with: + repository: getsentry/sentry + path: sentry + + - name: Setup steps + id: setup + run: | + pip install --upgrade pip wheel + # We cannot execute actions that are not placed under .github of the main repo + mkdir -p .github/actions + cp -r sentry/.github/actions/* .github/actions + + - name: Setup Sentry + id: setup-sentry + uses: ./.github/actions/setup-sentry + with: + workdir: sentry + kafka: true + + - name: Run taskbroker end-to-end tests + working-directory: sentry + run: | + echo "TODO: Send tasks to sentry-kafka" + echo "TODO: Run pytest" diff --git a/python/__init__.py b/python/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/__pycache__/__init__.cpython-311.pyc b/python/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f81692639a199d0ba2e86e67e5a0e670bc662b40 GIT binary patch literal 160 zcmZ3^%ge<81kJ{7=^*+sh=2h`DC095kTIPhg&~+hlhJP_LlF~@{~09t%UD0OIJKx) zKQ%8uIin;oFI_)5KP6SaB(XR ePO4oIE6@y(Eyesm;sY}yBjX1K7*WIw6axUES0wuY literal 0 HcmV?d00001 diff --git a/python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc b/python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a331555ddb9de5f761c8450b52168bb1057a1c74 GIT binary patch literal 9509 zcmb_BTWlLwb~EJgJwuTaCF&Jxq*%5ni;`u_juSuPr(y?={7f7R>oPQFBvIyT?+pE5 z$yV6rLkR>#StPJ`H*VFUO&ug3{@jm#Y_nJ+pF@L%i9rRpDEi?dK*Q#^f@Zxe}u};L}-;c z9hVp2={&zc#JHgP6WNTEPm3g$%n0Ijl{3pZAXI%&oJ-Ef=8`E<^(JMJ#N{M#sGf8( z6O&S+m;*F^y^eCJtWF!#xSd3l^CXjS)lG#D5B%-${~-hLCMDBYsb58>K!0hYsITF# zU%RN1OGfJYCD@A8Z@?5q=WnRn%$LCXwaHIYvcsUGsG{B4dX!qFF9{{Lz!Yu3Yb$x= zCao43gR@8rj-qGYUUYrqwCYmw%07dNq86P7F1HyN)VdA4$P^t`eycv9ZF9d|TEE;y zd(mBhDJOUkLQlQ79j$yt?;6_hg-Z@KsuoR6Te&?H@Usod1}6BvVXf39pTL#aqOZta z(MF`i6*=jq+}Ge)^c%R~FLHCV(1z4=wlBe+Uz;`+0`h=C2R1Sm4)rBeer+~frKFHW zcF<@l`mA~pD_#3WbQv1n+^lVbvZbd&@C#c3MzrJ?+J%myKS^t)HhHTdo1!d|Oj{S* zth!lCzku!FI$Z)QuF@-qUZv#GhP=%b>85csx*By2%xW8&x(M@kq=Nu?tFsOj)&^I(EkO|qe>cS`T`6TFHOiIpY;wk8K{i%0WWv<1Ovg$pUyMgwJ z#fch>CE}@6jEFgs$fk2~BC0Gj$1dlSDKN35I#XGokksy&w2;lG1dTK{pUPg2r(&5n zXyS=U@i`GCsQyD*$6(hzMo5;Z9kIDANyp`wcs-YjXSDrT^&GpN5OcUStIU;TDisM3 z+zbisQUo_Tf;$tzjf0@g32qRo+t{vEI;jSlwt9X0cGZnUaJH*H?1FCO9nFPFy`U>K z)m+#u;Zo;YuIKMfybB{GO^BInVnK!xnZUuEkmJ&o%Orb6BokoDg=}WxC36ogY)mb% z8rUnINx(!J&)rbl(qN5w(QGGs;O!jzrL!=tmZ+!gl-IwUzICqb8LW5)Yn0zHu=f7C zn^XMnuN?pY_jj-8&$9bs#eGq6UtAA%trp6`p-OP5MzNlO#|V}WRR@m!)~5^{!<+42 z@8T<6TkddoWo7?6%GMd>LQLuUpxpIArR#&`x7OLf%G;k`{p_#Sl6QN`;k}jc-ZHzd z!tPVped}zn8koD2Q37*t|IYi}nSb>wXU>)np4DprKDOI9{}T#8O#@Df3q4>5?z02` z5G(O;uzZlD3Gf$&ZCm+Yth z&-|9sG0-DSQ}^!JjNk^f}l~iA%-&kD5>-s%&kukk4%r+zkTEI z-ea#Zf7oS%!XIASG~MI6*J%g*UN?<+kA{yj)8nptlQh5tu$mXnQFI1=_!n5I;7QUY z@BmA+U@zGOreqf!B}QA&vYdz#Ldo3~p ztuj4mi{PIuH2}wRlVb$OvmFo53}YO+mbbRIzBNgKgV^K>7HKQRQR>pcl210B8gM8& z8unW>T{^3^tujh1I43@E3q!ZPckqp*=1Y1JQkee7p23rqe|TeAaI3yb$gecszGzys*j5Z!X|?F+B~DPr_i`6#Wg2HjDflC!^e&9jV3P*sh{>B9 zwczGb4eNs24(`4%s9AgHwtdd^&8t>jOYOz>wWjfINV3`s9Kzt5>0CCbAfS0+#;v?p z41#^TN*%?HwWg76aJ9-_31`|~nB@?s{(>^*xUl)cNbW24NuLW* zVO-wdpjtFTG^=a~%X;KPO?2kkkl$$3THipd976`(#vBl;ja%Mg^0aEd0ac$>17YF| zyB5Or%WoNPhaI=MALeQ<_FH+arFsb0Tk5my|J$up^#E&frqDY+4z4Vj7v%{-yquqJ z@@pTV51w4wLsJAD<41UsKElObh#XlE6IU>TO3v}<2n%953t?0)nc(9oOw`=q#p_83 zzalQxDP$AiC98~hT}-Gf#3nMi5xWwfyAprSYLBSKDLrqGMlWZxvLusuE($*PRgpaR zYq7^dR+8U%ef#S>3eIRW1%l5_uLyBtKxuXz@Mj=#6B@H>Z(;6y~v_-QjJrp+KVq!-MHYo28DIW?@;#`AJE zmYtiEL>WRcfU0Mnp|eSqMjinIco&Ap8Ce3=aDK=6gWQU<;Ev03Jh8xUC1-%U;MmH;JR+w6Czr6o zw%7X=q&je>#uMUKHq|!1U3KbqlE6oTNSR$L~)Z|E@k1cX#6#3een`(g8c$v>Q}+k4oD@zb?(^7V@J;Oqx|&QQ}6Ja(VMK@GoH-M zrIPatatxOQYzfJ$dhVLU;Ef~>pxNp~fQtXWxqpq~aLsRIZjoN=y5p$~?szcx#{I!J ze*60GE|&*SR0dBdtf1d51|}jZ^zU9HeWG{QEMm2}4USo=M<5|TqA$z{ALSuaZMyi-lH8(L1y0)Y+rQ&RUxGYHbC!zT0fa&4!!>!&xRH0~Oj%umzC%Wb~J4 zSoKt69}s9g%MFvP4JMiG5r)NDLh>spk;_n0?Xb`N87IwMkSXE$hm*$UaBc`7{ZnuK zUlVH(>oaAnDFHeak-UG2DN{grJ$VXw)Df)N5s@O^l4^$xWyDS}Y`|3M4g^z$op?~{ zXAqo7X6Bd2O$f#|olA+bDDYA~fyvXkd@6NgJTgqshy=Y_Fi<|DrG>$FfOM)%k~w(5 z(JYfF5}}i<+CenIAdjGlRY&@YkR+-Dj<2*t&`DRB8}W2X^_X zUV>?Zs!D1IKbzG{&@(-;4z=(F?SRn6l1=NVzkB5`Zk@V2^&O*}yP$CIm$~;V-1|$@Rjy;@(gTja&+%n$sKO0B;70Cq zBX>e&ZoI;cFHP5+l)LlRwBj0qTgxk-F1ycE+-DT`nVQYt4`;9e6nFR2O_Zl&#jk|7 z-jNmeRM|aMaZdpP_kUqd`c&C{y5c^qxKGz?0Z*`IV|mB`8vwG-PkSjYyfUNoZM!?A zuzSkvo(j7M5V)VMw{=5~6u3-KWdqAcAvx*^LSpl&TeHKFGIO-T995X3)o_o(gx0zC z)%|5|u);xh+~WX!0W8z&q3~_y3(v27U--(Q&6UvRWzWO#mRo05{~CYQ@QJ&VO85la zcW>xVfYrW{>gL1Mt&`QBO>4nl_0%}m1jqvbT~~TW*CqkzciFv7ac={|w|6OB(QJ6vIh6?PcS0lWryjMx*i*l1|M zw9K%<)M$HFbA}!Mhy2j83%?qa%E94Ea2Uqi^Qs1x53P4@s&o#ohqpWobyh=z)zI** zxDp!H?`nvzhDNk||6Qowuiv#ccUODO;p_50rT{!ausmJsrnrtzPJVpyqnVYN<(Vqm z@yX1`(77@@RAGk{?S2^4=0GQE*a@F3SniYQkEcI6zH)pSe%SZUXRshxByhmgGCl2-8V1%@_KD?I+sq=SRd!E`6+Kltqn2C087VU=KI=LCY0XMI|nMg;7e6{ zkCnaC74P)Yk*b&bWc1_FkH%KUpuo5uFrD|A&ecqr8LlwH3NwrZ+dkg5Dwe(d6>tC2 zk($joT(u+hf0+NK9#P*@H3w8b#^bhe`aU&J-&xdKl*3m(B|Llz4#rJ=9@jDfs}x+Mwwwo;Dl&xFzjZrTk0UuSx|KE=z(f?U(F8BhvD9u1D?2GdulGk+*D_G&4ZYi o3WaMv#8|{Q#QYR+wjmavoZeaxG0F<2PTJ^0bdB2hp&9G{07#J#KL7v# literal 0 HcmV?d00001 diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py new file mode 100644 index 00000000..ac01a390 --- /dev/null +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -0,0 +1,179 @@ +import random +import sqlite3 +import subprocess +import threading +import time +import signal +from pathlib import Path + +import yaml +from threading import Thread + + +def manage_consumer( + consumer_index: int, + consumer_path: str, + config_file: str, + iterations: int, + min_sleep: int, + max_sleep: int, + log_file_path: str, +) -> None: + with open(log_file_path, "a") as log_file: + print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}") + for i in range(iterations): + config_file_path = f"integration_tests/tmp/{config_file}" + process = subprocess.Popen([consumer_path, "-c", config_file_path], stderr=log_file) + time.sleep(random.randint(min_sleep, max_sleep)) + print( + f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" + ) + process.send_signal(signal.SIGINT) + try: + return_code = process.wait(timeout=10) + assert return_code == 0 + except Exception: + process.kill() + + +def test_rebalancing_only_processed_once(): + # Test configuration + consumer_path = "../target/debug/taskbroker" + num_consumers = 8 + num_messages = 80_000 + num_restarts = 1 + min_restart_duration = 5 + max_restart_duration = 20 + + # First check if taskdemo topic exists + print("Checking if taskdemo topic already exists") + check_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--list", + ] + result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) + topics = result.stdout.strip().split("\n") + + # Create taskdemo Kafka topic with 32 partitions + if "task-worker" not in topics: + print("task-worker topic does not exist, creating it with 32 partitions") + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + "task-worker", + "--partitions", + "32", + "--replication-factor", + "1", + ] + subprocess.run(create_topic_cmd, check=True) + else: + print("Taskdemo topic already exists, making sure it has 32 partitions") + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--alter", + "--topic", + "task-worker", + "--partitions", + "32", + ] + subprocess.run(create_topic_cmd, check=True) + except Exception: + pass + + # Create config files for consumers + print("Creating config files for consumers in taskbroker/tests") + consumer_configs = {} + for i in range(num_consumers): + curr_time = int(time.time()) + consumer_configs[f"config_{i}.yml"] = { + "db_name": f"db_{i}_{curr_time}", + "db_path": f"integration_tests/tmp/db_{i}_{curr_time}.sqlite", + "kafka_topic": "task-worker", + "kafka_consumer_group": "task-worker-integration-test", + "kafka_auto_offset_reset": "earliest", + "grpc_port": 50051 + i, + } + + config_dir = Path("integration_tests/tmp") + config_dir.mkdir(parents=True, exist_ok=True) + + for filename, config in consumer_configs.items(): + with open(config_dir / filename, "w") as f: + yaml.safe_dump(config, f) + + try: + # TODO: Use sentry run CLI to produce messages to topic + threads: list[Thread] = [] + for i in range(num_consumers): + thread = threading.Thread( + target=manage_consumer, + args=( + i, + consumer_path, + f"config_{i}.yml", + num_restarts, + min_restart_duration, + max_restart_duration, + f"integration_tests/tmp/consumer_{i}.log", + ), + ) + thread.start() + threads.append(thread) + + for t in threads: + t.join() + + except Exception: + raise + + attach_db_stmt = "".join( + [ + f"attach '{config['db_path']}' as {config['db_path'].replace('.sqlite', '')};\n" + for config in consumer_configs.values() + ] + ) + from_stmt = "\nUNION ALL\n".join( + [ + f" SELECT * FROM {config['db_path'].replace('.sqlite', '')}.inflight_taskactivations" + for config in consumer_configs.values() + ] + ) + query = f""" + SELECT + partition, + (max(offset) - min(offset)) + 1 AS offset_diff, + count(*) AS occ, + (max(offset) - min(offset)) + 1 - count(offset) AS delta + FROM ( + {from_stmt} + ) + GROUP BY partition + ORDER BY partition; + """ + + con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) + cur = con.cursor() + cur.executescript(attach_db_stmt) + res = cur.execute(query) + assert all( + [res[3] == 0 for res in res.fetchall()] + ) # Assert that each value in the delta (fourth) column is 0 + print("Taskbroker integration test completed successfully.") \ No newline at end of file diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 00000000..36d99426 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,9 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["integration_tests"] +python_files = ["test_*.py"] +python_functions = ["test_*"] \ No newline at end of file diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt new file mode 100644 index 00000000..4ceaa0e4 --- /dev/null +++ b/python/requirements-dev.txt @@ -0,0 +1 @@ +pytest>=7.0.0 \ No newline at end of file From d967e5aef0c5d30c6bef5b443e4b8db45f5f17a5 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 17:12:20 -0500 Subject: [PATCH 02/35] fix path to sqlite3 --- ...r_rebalancing.cpython-311-pytest-8.3.3.pyc | Bin 9509 -> 9561 bytes .../test_consumer_rebalancing.py | 16 +++++++++------- python/requirements-dev.txt | 3 ++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc b/python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc index a331555ddb9de5f761c8450b52168bb1057a1c74..4db1be0b6260025d33da5b09fb79b2fbda66e926 100644 GIT binary patch delta 1632 zcmZ`(%}*Ow5PxrXZCK;Aca81!+9qDd4(6*QAx$BI5}5=ykPv8~O^ZSU^chl00$MMv zVyzXVa%d{5ioQdsf>j?{nzTq&P$A{io;qu@0jufTH@`P; zX5P%a+1dS~|F?eOOF?iT7&EEV^ph1q!RsrDsq=W&J?bWiu(SMaaa6>JaA}Fy(jH<5 zssg?mhzD}OzJ}l_kvNyQ(OM!bp;7O*KW*?#Lv>Me>|y*09dhEY{CPWZ5}5>uV$@_IArf4(8x9gG*@W|K7s!@h)fF?@@*s8V7@@C$ z$|ENjLP4NWp*DmFPxy^v>kR0D$^#?AQE-`h`h&~ea}ka8-T-CJm2>M6%LpMuFkIOh zNFY*q{4C0edi2n4Mxsl?!u1nJB()Z=H&`6_t1-~wH4N6^3>#5P*NW!Zsj7|Rm2q+& z!&C4bq$x3mL`AosF1sRFJ`g_7#tJ2#MP2(ht1~&KAG&dNJ*k`CAN!!O|}+EzF3SOs?ppv>pRjk_7KS~Gd>*W=&*cthq^}iqcsO5u}`>o5ZzjDepr)+V`9>-HT z)};FUsdn4rb}rS)KkmkWKXW~(&xU@nIr}8G(MKm@efP|tpcXP=!Gt;!jsxsg%}?mJ zX5X6BSW`4-LdFEnggXGcY)02Kv#&JsHl;#olO|`@{GaNs+jC@LHr=g#23xavGhV=b s7Gu1F&|>+a4RIGe73b&+@hgrU#4nbf9r|9}s2^Z(67Qq`UrN#b1+e*E_5c6? delta 1500 zcmZ`(OKcle6n$^TpNu{Jf5sC#_?x6|N}Tdh+tfix1gRYm{eVanB?@ldq)A928c&L_ z#)3+bkWd$io)7`z;x3w|Y8F8jp9Mm!sxDZhm8F6KiEh{+Axnr&B_!_HojRuQ=G`;* z-gn=<=iT}IIDT#1cHL&PAQ&Hxj-9{1ZWHm9_1?K7>=JjuG3mgF7$he#N-kmoF@xR# z!hu-8wh}z&B>bvj(oJluXwvif_d6W(jk!{={DSc?JT0cd^ukwIG$w+t- zAR_5htp%t{f}1AQLPCw0t`eID-{ld-r*Ucy^28QK=q5;W6m6=lY^C=ZpU4qzC%nbL zDoAr+q*}^6y~=p0-D)3w9!|9I4*I2S zWg@!IpMiOltDpq3W%scV+Iz_kYE_-yECSdI(R94bkYxa2kRw0&4Awj4tT~#4yZopda}e8{Yt7ucQK&r z(EEd;ZXnVp(o1~6D5yeN)a#jw0X0zWY?zAEhl366iqm601a-x7NDl;J9(l?JqqJx-Oe3yfU}T@?QA?FU1T}@AEGqlgV|whvb@!VXa<)a#x-&4|?$S4@!SV4*yE>Q zZeQG8sHTSRr80L@ncL#ttFPTjO;l48%~e2}7W!$Qp9{a-VjU}Df>ccY%{z(+L)gc7 z8%iwjjstHa{Y>y#HO3FA(l3OtFr(QyfV}oaMk4YN%|Ynz!YIyaq@R{>*}u(Iqfgz7 z4&IFpetqn_b9bWIYBamYljf|mEN7`B9b~Dp6gZ0U8G21T8M=XzjC>A~)>v7}KUQUk zp6bkCiGJK!#NAW~9-~vi4g3cED_F;~^taFnyh!`ImMxz^hF*|-p|ahzjKe(}r`O)q zt{;T#?%_Ql3v>6K<{hx|V)$p44#f{fkbf*@ycIKy^>DY6Si{sP$_${}1AHdUCcX6i zcozNzaA@ssBfm|5iibv4G}WMi3{Ye9iYy=1OpS)Tpn;@;)~p8|=?ia>l_lvp`6`$g bByXidiDmpN{VJjMyu{!UTu1+ZYoz)Iids(~ diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index ac01a390..88fde901 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -1,13 +1,15 @@ +import os import random +import signal import sqlite3 import subprocess import threading import time -import signal + from pathlib import Path +from threading import Thread import yaml -from threading import Thread def manage_consumer( @@ -101,11 +103,11 @@ def test_rebalancing_only_processed_once(): # Create config files for consumers print("Creating config files for consumers in taskbroker/tests") consumer_configs = {} + curr_time = int(time.time()) for i in range(num_consumers): - curr_time = int(time.time()) consumer_configs[f"config_{i}.yml"] = { "db_name": f"db_{i}_{curr_time}", - "db_path": f"integration_tests/tmp/db_{i}_{curr_time}.sqlite", + "db_path": f"{os.getcwd()}/integration_tests/tmp/db_{i}_{curr_time}.sqlite", "kafka_topic": "task-worker", "kafka_consumer_group": "task-worker-integration-test", "kafka_auto_offset_reset": "earliest", @@ -146,13 +148,13 @@ def test_rebalancing_only_processed_once(): attach_db_stmt = "".join( [ - f"attach '{config['db_path']}' as {config['db_path'].replace('.sqlite', '')};\n" + f"ATTACH DATABASE '{config['db_path']}' AS {config['db_name']};\n" for config in consumer_configs.values() ] ) from_stmt = "\nUNION ALL\n".join( [ - f" SELECT * FROM {config['db_path'].replace('.sqlite', '')}.inflight_taskactivations" + f" SELECT * FROM {config['db_name']}.inflight_taskactivations" for config in consumer_configs.values() ] ) @@ -176,4 +178,4 @@ def test_rebalancing_only_processed_once(): assert all( [res[3] == 0 for res in res.fetchall()] ) # Assert that each value in the delta (fourth) column is 0 - print("Taskbroker integration test completed successfully.") \ No newline at end of file + print("Taskbroker integration test completed successfully.") diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index 4ceaa0e4..22fac044 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -1 +1,2 @@ -pytest>=7.0.0 \ No newline at end of file +pytest>=7.0.0 +pylint>=3.3.0 From 4b9c1e53b15b2cef282f75cf0d19319ed687e425 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 17:14:27 -0500 Subject: [PATCH 03/35] clean up --- .gitignore | 7 ++++++- python/__pycache__/__init__.cpython-311.pyc | Bin 160 -> 0 bytes ...umer_rebalancing.cpython-311-pytest-8.3.3.pyc | Bin 9561 -> 0 bytes python/pyproject.toml | 2 +- python/requirements-dev.txt | 1 - 5 files changed, 7 insertions(+), 3 deletions(-) delete mode 100644 python/__pycache__/__init__.cpython-311.pyc delete mode 100644 python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc diff --git a/.gitignore b/.gitignore index b0338720..6c289643 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,9 @@ *.sqlite-shm *.sqlite-wal -.VERSION \ No newline at end of file +# Python +**/__pycache__/ +**/.pytest_cache/ +**/integration_tests/tmp/ + +.VERSION diff --git a/python/__pycache__/__init__.cpython-311.pyc b/python/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index f81692639a199d0ba2e86e67e5a0e670bc662b40..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 160 zcmZ3^%ge<81kJ{7=^*+sh=2h`DC095kTIPhg&~+hlhJP_LlF~@{~09t%UD0OIJKx) zKQ%8uIin;oFI_)5KP6SaB(XR ePO4oIE6@y(Eyesm;sY}yBjX1K7*WIw6axUES0wuY diff --git a/python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc b/python/integration_tests/__pycache__/test_consumer_rebalancing.cpython-311-pytest-8.3.3.pyc deleted file mode 100644 index 4db1be0b6260025d33da5b09fb79b2fbda66e926..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 9561 zcmb_CTWs4_mZU_zNm;UG%df`F!?ER9cAPkmHjl^9xhdG--wJE|g5wi3POl5S$H zv@mA-;RXhp4mwCWlVFU+Y&V1UBcJ`8?O?LqKPu2707DaxPv4vmp0+fb;pVt{U;mt6YipZp zgQq?1$pq#C(8h>C!6CK_PWW>H?uI`P{CT156X;|>U_WN&It1=xYAz(u=ctJP5ehvU zp;hK|TwZ{u^ZWu4L^J;?&RLCG{$>Q~Vz&|lgp z>MQuuuRT=BB_nnH5^P24H=v55^EcFO=1b`NRii&m$qs{#qKbBF>v8Hj{ee(&3rx`l zeQhO=+$7Z{#^_n31xL{{Z!fyOc3NdAd1aqL1xv$e;BuRRL9N@si%ijB?QfL_q-`FT zOBrZ{KDsv^Cl2z}K z{1WOX7AI;jmWZd*F(T$kBA3a>iKw#B9J{!fN`s0e)tSx#g`{@Jq=npKTF^*i^Xc5h zcsiDigCw4q6iH$du^_{YjN@dE%W>)QMUuNLl5tSwLM}VL!P-L& z8%qnc2KtI;6R?oR@=I!42DC9RHrmO4csmDw(pgwm%ha=W%Ijar+&owIY_51V*C@YZ zVD0^NH>ddDUpou{?(bgFpJn%jiu;1%zOWwbx?U&;w^V{#Y82}kc#L3WsyZRZ41=<~l{OWo}$hYwW3 z2g>Zh3VTpt53aMpY9M(hs|1p8|H1p+nSb#sXU>+7oYiXpKDOI9|5FM;O#@Df3q4>5 z?z02`$C%YubW1^N5e;$nK9SB2^wGmx;8Fcqv#Ag8HP;? z9wl7@Pq0J__L5CtN_N3fVgzT&(d;S-E~}xTsd-9p|07*;HoFy<8jcjWRv!F)?X{Zb zlB?*t)Nn-B8Ya6T8z5FwwASQgnN&iHi3UfDExJFV$%sX2ZUd?PAhqWuQlrFetwt_; zz``@o%F~0o2>!)f17Mz;93wE#b~K(@#xS~;x3;&wHAR7g*yIWpX)DEX>VtPmKDpu4 zfJ4z?>hDs+rL$_=%A>@BbK(QHFjQpG0hS>)F|H<;52{SJ3aC5HK0?=Rr@#tfpMLa(&`MZ zp;GWC%;#W}1daKWhs|1Wh$&OM;NU}GAn=;DH{Z4=U0-jp%35kKwy!nKtI5f#FK``$ z8@rChAFR#QcZ)&LZ&#_K*s&HiIht52@6~X&?Fah-BHLe3#%>U{Zp=y8_AxrO-6n4n zv%ckc%gNHR%0j~6OWv^kuk?maVW`*%`(yaFz0@V|Hs!q3Yv8amBIZ^QMohd#+Hf(v z))Z0`lv0})|5$3mh?G>}##EM;3%2JQ812odJw#lS1u^-oYsaJm8 zq%HQgaPBMiNxu?Cg{VAbQeA57IxBAo;CkfgCc3Q#E*~~9&KiFYf_C zUDWcHMxR#cUqaPql|UG~W!Hkee)(^Vx5L@oJdW)~eX-x#*IKFvd%dN;W=rwY=Eceh zk^`V6;}XVOTeVIMsgM&TKAV$uQAha%2(E>% zDVe{Lk{9^hyLgDaWQb`YT@~TTAs9Qsbx3f*lOqUlQIn$p3hroB;~>6pMfDMpH=zCn z8;zP=RNL-dg-|pKVOctr&;qY$GMZ zm`cYAy&FYc5b;BE!8IoRJPkR(LVweIM4O_tDDqO>Vw|K?fTjN7j}Xl}RTp&DqFO}s zw4%0jV%pMCLwcbR*3FaLVqOjFwedwc7t1A+k|;yG2T=9QlYAnU&kf}zL@??K#Mm~uE= z*smocnj+*_E}LG8)gyXQfMP-%JE$ij4g+HJ{2Vw*GXR#U8fEu*P4F<#RkKsvDSFMh zwp8IK6m}NwZ=ByORra6I3jk|&OQ$Emi7NQK@64V!HOo((Jn6Nl5052HeR#&!)#u+> zeYfqPtTs@V9WqI9otr*6ee66x!q1#N^>$rDJ!7eCGM$=VkYl(+VQ;2B(i7I?19$=X z8GvS$kvLTR|8@0iWcMn7m+5MjUhBH!schb*uy5;ki+adK6yQI+Mvg=uRHKO1jq_*J zQaub=_hEfwMEEEV>3E|Q;dk&8xN7R#Mo1--*1n0{Vpbj=iD*QLgthU{F^CZKXrzI?arV?Zr}?87%+WQPPMtkIeb&-+Kx-?o_1(S`H@mhw zEK7+2O>VH1VAoORt`lfIm0e#-wnrHIYFS5ZfkbR11{JCuj+(#XI=KrxCA2-bPHYb6 zO97;L>aG9Vz)WI&v5Yn)IwvEN_wO(@3J9+!Ng-c4j5WI=Qp8(Q?U0F#*a-#{m=fKE zV6w0qjg)@=gEeSmczHZDV0tt8v?z-LFD)i8Nt#?trfXsx1dePjFKbR|J9@R~?zlLW-ykuqGLaV6dPvOYuxv z^dI^>giYlog{8Uyi!N~MvI@H2fvmey9X>O5j7-NZypu#6K zX)z50(cLV_(xQPEnO(NV|G7e%t9+JPqkct;ZWkV$0Llp)^ToCn?` z;UX}OxDB700aLgA6r)9M0yCT_>33i~mZ^uKoy%`mL!AIRdX`UCJ3;{Z`W2?T+V={; zXHLrDRlIx3%-#yKS7G+nZ0;~@K?5wGg2b@Dd-Z2GPu-pTj#19Nr*Q9=x%VsF`^z&` zu4DCs2ONK&}d8X#1+?_XP6xSBGwOsP)vinTMeMWJg zsoDJfU_T9@xVxWiqC6d|ekD9~M^@aEW%p#oJqZZh|Al$zQ)Tz*iu<(UK3%f~Ji(fc z!rBx>a5bY`JbwqI4wadS z3NxWF6A$}GAN23K-@mKeKUwLY#A9;12FmX475DY+>s;{q$jz(e_Mtl+WiDFbqNted zcY~OZpS`(vZKBK$R@gy>9YhsDUjsZw>}jKDHnd<`hS=z2wmq*o!w&yLe#?przaEsz z!NE#!5Y~Zbs|Hr4);l*+(OQ06ayoGE?iOxQ7=^4Z@l-*_i8r7yL;F> z@}PI(e(yxNcXy?Ccg+C}kI`X#ir}%`(dB$f0a%``u~d+M5ZHDnfJ=kkdf z>*L%tKjrPHwIN0sV0jvLgRgycTG-?-?om(1AV{Kv@@%PMQ;QmL}k|{&q|6(L>a?hrGwU z_CI=PfcHAPX7)Mm?PF#Rc{Oj2-+i^s@wQtsu^arC#HX69n(D^ z2<*p*h@Y+DdugKzzS`<_x(UaBJRq3o*8CQ-h!|QUvIPJJhVnu>by0IlNDk{I1e(8* zMG8CkHk#+5nH@cfK%ZTYxZDRbIbks^9wxtqI*bdY5J1>AG+jmG^pYR9to^E#e_8ug zsi0!qRVttuca`!jYriVh=+doFmb*%MmbG6Kalb-YZa9(XfogYO%?{6p;oh19p17%d zYA(dwRA+b1gP4~Jg=;>lod?AZKJ2?8ujvr=DhwNt8Wrt diff --git a/python/pyproject.toml b/python/pyproject.toml index 36d99426..d7c27fbb 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -6,4 +6,4 @@ build-backend = "setuptools.build_meta" asyncio_mode = "auto" testpaths = ["integration_tests"] python_files = ["test_*.py"] -python_functions = ["test_*"] \ No newline at end of file +python_functions = ["test_*"] diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index 22fac044..f77c910f 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -1,2 +1 @@ pytest>=7.0.0 -pylint>=3.3.0 From 80ab3763e1b8ca629962e71b4cf31f9719574888 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 26 Nov 2024 17:19:56 -0500 Subject: [PATCH 04/35] add make command --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index d15e4bd8..6e298667 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,10 @@ test: cargo test .PHONY: test +integration-test: + cd python && pytest +.PHONY: integration-test + # Help help: ## this help From 9986d38b70dba2243e017398668c56287b9e021f Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Wed, 27 Nov 2024 17:50:46 -0500 Subject: [PATCH 05/35] update num restarts and clean up comments --- .github/workflows/ci.yml | 40 ------------------- Makefile | 4 ++ .../test_consumer_rebalancing.py | 21 +++++----- 3 files changed, 16 insertions(+), 49 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c2b942d..eef52acb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,43 +56,3 @@ jobs: with: command: test args: --all - - sentry: - name: Sentry End-To-End Tests - runs-on: ubuntu-latest - timeout-minutes: 60 - - steps: - - name: Checkout code - uses: actions/checkout@v2 - - - name: Registry login - run: | - echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin - - - name: Checkout sentry - uses: actions/checkout@v2 - with: - repository: getsentry/sentry - path: sentry - - - name: Setup steps - id: setup - run: | - pip install --upgrade pip wheel - # We cannot execute actions that are not placed under .github of the main repo - mkdir -p .github/actions - cp -r sentry/.github/actions/* .github/actions - - - name: Setup Sentry - id: setup-sentry - uses: ./.github/actions/setup-sentry - with: - workdir: sentry - kafka: true - - - name: Run taskbroker end-to-end tests - working-directory: sentry - run: | - echo "TODO: Send tasks to sentry-kafka" - echo "TODO: Run pytest" diff --git a/Makefile b/Makefile index 6e298667..02a0c8e3 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,10 @@ test: cargo test .PHONY: test +install-py-dev: + pip install -r python/requirements-dev.txt +.PHONY: install-py-dev + integration-test: cd python && pytest .PHONY: integration-test diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 88fde901..e21f8316 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -25,7 +25,9 @@ def manage_consumer( print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}") for i in range(iterations): config_file_path = f"integration_tests/tmp/{config_file}" - process = subprocess.Popen([consumer_path, "-c", config_file_path], stderr=log_file) + process = subprocess.Popen( + [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file + ) time.sleep(random.randint(min_sleep, max_sleep)) print( f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" @@ -37,18 +39,17 @@ def manage_consumer( except Exception: process.kill() - -def test_rebalancing_only_processed_once(): +def test_tasks_written_once_during_rebalancing(): # Test configuration consumer_path = "../target/debug/taskbroker" num_consumers = 8 num_messages = 80_000 - num_restarts = 1 + num_restarts = 4 min_restart_duration = 5 max_restart_duration = 20 - # First check if taskdemo topic exists - print("Checking if taskdemo topic already exists") + # First check if task-worker topic exists + print("Checking if task-worker topic already exists") check_topic_cmd = [ "docker", "exec", @@ -61,9 +62,9 @@ def test_rebalancing_only_processed_once(): result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) topics = result.stdout.strip().split("\n") - # Create taskdemo Kafka topic with 32 partitions + # Create/Update task-worker Kafka topic with 32 partitions if "task-worker" not in topics: - print("task-worker topic does not exist, creating it with 32 partitions") + print("Task-worker topic does not exist, creating it with 32 partitions") create_topic_cmd = [ "docker", "exec", @@ -81,7 +82,7 @@ def test_rebalancing_only_processed_once(): ] subprocess.run(create_topic_cmd, check=True) else: - print("Taskdemo topic already exists, making sure it has 32 partitions") + print("Task-worker topic already exists, making sure it has 32 partitions") try: create_topic_cmd = [ "docker", @@ -123,6 +124,7 @@ def test_rebalancing_only_processed_once(): try: # TODO: Use sentry run CLI to produce messages to topic + threads: list[Thread] = [] for i in range(num_consumers): thread = threading.Thread( @@ -146,6 +148,7 @@ def test_rebalancing_only_processed_once(): except Exception: raise + # Validate that all tasks were written once during rebalancing attach_db_stmt = "".join( [ f"ATTACH DATABASE '{config['db_path']}' AS {config['db_name']};\n" From 31ebf8e66c5ac381f9594c6ffc5cffd1d191e47c Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 Nov 2024 14:52:54 -0500 Subject: [PATCH 06/35] clean up and fix db path --- .github/workflows/ci.yml | 6 ++ Makefile | 2 +- python/integration_tests/helpers.py | 67 +++++++++++++ .../test_consumer_rebalancing.py | 98 ++++++------------- 4 files changed, 104 insertions(+), 69 deletions(-) create mode 100644 python/integration_tests/helpers.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index df7dcd28..7792eff8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,3 +78,9 @@ jobs: with: command: test args: --all + + - name: Run Integration Tests + env: + SENTRY_PATH: ${{ github.workspace }}/sentry + run: | + python -m pytest tests/integration diff --git a/Makefile b/Makefile index 11710868..f10e016a 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ install-py-dev: .PHONY: install-py-dev integration-test: - cd python && pytest + python -m pytest python/integration_tests .PHONY: integration-test # Help diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py new file mode 100644 index 00000000..52dbefd0 --- /dev/null +++ b/python/integration_tests/helpers.py @@ -0,0 +1,67 @@ +import subprocess +from pathlib import Path + +TASKBROKER_ROOT = Path(__file__).parent.parent.parent +TASKBROKER_BIN = TASKBROKER_ROOT / "target/debug/taskbroker" +TESTS_OUTPUT_PATH = Path(__file__).parent / ".tests_output" + + +def check_topic_exists(topic_name: str): + # Checks if a topic exists in sentry_kafka + check_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--list", + ] + result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) + topics = result.stdout.strip().split("\n") + + return topic_name in topics + + +def create_topic(topic_name: str, num_partitions: int): + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + topic_name, + "--partitions", + str(num_partitions), + "--replication-factor", + "1", + ] + subprocess.run(create_topic_cmd, check=True) + + +def update_topic_partitions(topic_name: str, num_partitions: int): + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--alter", + "--topic", + topic_name, + "--partitions", + str(num_partitions), + ] + subprocess.run(create_topic_cmd, check=True) + except Exception: + pass + + +def send_messages_to_kafka(num_messages: int): + # if sentry_path exists in environment, use sentry-cli to send messages to kafka + pass \ No newline at end of file diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index e21f8316..92cd2e1d 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -1,21 +1,23 @@ import os import random +import shutil import signal import sqlite3 import subprocess import threading import time -from pathlib import Path from threading import Thread import yaml +from python.integration_tests.helpers import TASKBROKER_BIN, TESTS_OUTPUT_PATH, check_topic_exists, create_topic, update_topic_partitions + def manage_consumer( consumer_index: int, consumer_path: str, - config_file: str, + config_file_path: str, iterations: int, min_sleep: int, max_sleep: int, @@ -24,7 +26,6 @@ def manage_consumer( with open(log_file_path, "a") as log_file: print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}") for i in range(iterations): - config_file_path = f"integration_tests/tmp/{config_file}" process = subprocess.Popen( [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file ) @@ -39,92 +40,46 @@ def manage_consumer( except Exception: process.kill() + def test_tasks_written_once_during_rebalancing(): # Test configuration - consumer_path = "../target/debug/taskbroker" + consumer_path = str(TASKBROKER_BIN) num_consumers = 8 num_messages = 80_000 - num_restarts = 4 + num_restarts = 1 min_restart_duration = 5 - max_restart_duration = 20 - - # First check if task-worker topic exists - print("Checking if task-worker topic already exists") - check_topic_cmd = [ - "docker", - "exec", - "sentry_kafka", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--list", - ] - result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) - topics = result.stdout.strip().split("\n") - - # Create/Update task-worker Kafka topic with 32 partitions - if "task-worker" not in topics: + max_restart_duration = 25 + topic_name = "task-worker" + curr_time = int(time.time()) + + # Check if topic exists, and create/update it with 32 partitions if it doesn't + if not check_topic_exists("task-worker"): print("Task-worker topic does not exist, creating it with 32 partitions") - create_topic_cmd = [ - "docker", - "exec", - "sentry_kafka", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--create", - "--topic", - "task-worker", - "--partitions", - "32", - "--replication-factor", - "1", - ] - subprocess.run(create_topic_cmd, check=True) + create_topic(topic_name, 32) else: print("Task-worker topic already exists, making sure it has 32 partitions") - try: - create_topic_cmd = [ - "docker", - "exec", - "sentry_kafka", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--alter", - "--topic", - "task-worker", - "--partitions", - "32", - ] - subprocess.run(create_topic_cmd, check=True) - except Exception: - pass + update_topic_partitions(topic_name, 32) # Create config files for consumers - print("Creating config files for consumers in taskbroker/tests") + print("Creating config files for consumers") consumer_configs = {} - curr_time = int(time.time()) for i in range(num_consumers): + db_name = f"db_{i}_{curr_time}" consumer_configs[f"config_{i}.yml"] = { - "db_name": f"db_{i}_{curr_time}", - "db_path": f"{os.getcwd()}/integration_tests/tmp/db_{i}_{curr_time}.sqlite", + "db_name": db_name, + "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), "kafka_topic": "task-worker", - "kafka_consumer_group": "task-worker-integration-test", + "kafka_consumer_group": "task-worker", "kafka_auto_offset_reset": "earliest", "grpc_port": 50051 + i, } - config_dir = Path("integration_tests/tmp") - config_dir.mkdir(parents=True, exist_ok=True) - for filename, config in consumer_configs.items(): - with open(config_dir / filename, "w") as f: + with open(str(TESTS_OUTPUT_PATH / filename), "w") as f: yaml.safe_dump(config, f) try: # TODO: Use sentry run CLI to produce messages to topic - threads: list[Thread] = [] for i in range(num_consumers): thread = threading.Thread( @@ -132,11 +87,11 @@ def test_tasks_written_once_during_rebalancing(): args=( i, consumer_path, - f"config_{i}.yml", + str(TESTS_OUTPUT_PATH / f"config_{i}.yml"), num_restarts, min_restart_duration, max_restart_duration, - f"integration_tests/tmp/consumer_{i}.log", + str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), ), ) thread.start() @@ -155,6 +110,7 @@ def test_tasks_written_once_during_rebalancing(): for config in consumer_configs.values() ] ) + print(attach_db_stmt) from_stmt = "\nUNION ALL\n".join( [ f" SELECT * FROM {config['db_name']}.inflight_taskactivations" @@ -173,12 +129,18 @@ def test_tasks_written_once_during_rebalancing(): GROUP BY partition ORDER BY partition; """ + query = f"SELECT * FROM {consumer_configs['config_0.yml']['db_name']}.inflight_taskactivations;" con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) cur = con.cursor() cur.executescript(attach_db_stmt) res = cur.execute(query) + assert all( [res[3] == 0 for res in res.fetchall()] ) # Assert that each value in the delta (fourth) column is 0 print("Taskbroker integration test completed successfully.") + + # Clean up test output files + print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") + shutil.rmtree(TESTS_OUTPUT_PATH) From 313d6e4a9eceb9eb4b90294e272ce347051fc08d Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 Nov 2024 16:24:07 -0500 Subject: [PATCH 07/35] send messages to kafka from sentry --- python/integration_tests/helpers.py | 98 ++++++++++++------- .../test_consumer_rebalancing.py | 35 ++++--- python/requirements-dev.txt | 3 +- 3 files changed, 86 insertions(+), 50 deletions(-) diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index 52dbefd0..3cbc4bfd 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -1,4 +1,5 @@ import subprocess +from os import environ from pathlib import Path TASKBROKER_ROOT = Path(__file__).parent.parent.parent @@ -6,43 +7,48 @@ TESTS_OUTPUT_PATH = Path(__file__).parent / ".tests_output" -def check_topic_exists(topic_name: str): - # Checks if a topic exists in sentry_kafka - check_topic_cmd = [ - "docker", - "exec", - "sentry_kafka", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--list", - ] - result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) - topics = result.stdout.strip().split("\n") +def check_topic_exists(topic_name: str) -> bool: + try: + check_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--list", + ] + result = subprocess.run(check_topic_cmd, check=True, capture_output=True, text=True) + topics = result.stdout.strip().split("\n") - return topic_name in topics + return topic_name in topics + except Exception as e: + raise Exception(f"Failed to check if topic exists: {e}") -def create_topic(topic_name: str, num_partitions: int): - create_topic_cmd = [ - "docker", - "exec", - "sentry_kafka", - "kafka-topics", - "--bootstrap-server", - "localhost:9092", - "--create", - "--topic", - topic_name, - "--partitions", - str(num_partitions), - "--replication-factor", - "1", - ] - subprocess.run(create_topic_cmd, check=True) +def create_topic(topic_name: str, num_partitions: int) -> None: + try: + create_topic_cmd = [ + "docker", + "exec", + "sentry_kafka", + "kafka-topics", + "--bootstrap-server", + "localhost:9092", + "--create", + "--topic", + topic_name, + "--partitions", + str(num_partitions), + "--replication-factor", + "1", + ] + subprocess.run(create_topic_cmd, check=True) + except Exception as e: + raise Exception(f"Failed to create topic: {e}") -def update_topic_partitions(topic_name: str, num_partitions: int): +def update_topic_partitions(topic_name: str, num_partitions: int) -> None: try: create_topic_cmd = [ "docker", @@ -59,9 +65,31 @@ def update_topic_partitions(topic_name: str, num_partitions: int): ] subprocess.run(create_topic_cmd, check=True) except Exception: + # Command fails topic already has the correct number of partitions. Try to continue. pass -def send_messages_to_kafka(num_messages: int): - # if sentry_path exists in environment, use sentry-cli to send messages to kafka - pass \ No newline at end of file +def send_messages_to_kafka(num_messages: int) -> None: + path_to_sentry = environ.get("SENTRY_PATH") + if not path_to_sentry: + raise Exception("SENTRY_PATH not set in environment. This is required to send messages to kafka") + + try: + send_tasks_cmd = [ + "cd", + path_to_sentry, + "&&", + "sentry", + "run", + "taskbroker-send-tasks", + "--task-function-path", + "sentry.taskworker.tasks.examples.say_hello", + "--args", + "'[\"foobar\"]'", + "--repeat", + str(num_messages), + ] + subprocess.run(send_tasks_cmd, check=True) + print(f"Sent {num_messages} messages to kafka") + except Exception as e: + raise Exception(f"Failed to send messages to kafka: {e}") diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 92cd2e1d..baa9c1b1 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -1,4 +1,3 @@ -import os import random import shutil import signal @@ -11,7 +10,14 @@ import yaml -from python.integration_tests.helpers import TASKBROKER_BIN, TESTS_OUTPUT_PATH, check_topic_exists, create_topic, update_topic_partitions +from python.integration_tests.helpers import ( + TASKBROKER_BIN, + TESTS_OUTPUT_PATH, + check_topic_exists, + create_topic, + update_topic_partitions, + send_messages_to_kafka +) def manage_consumer( @@ -41,27 +47,29 @@ def manage_consumer( process.kill() -def test_tasks_written_once_during_rebalancing(): +def test_tasks_written_once_during_rebalancing() -> None: # Test configuration consumer_path = str(TASKBROKER_BIN) num_consumers = 8 num_messages = 80_000 - num_restarts = 1 + num_restarts = 4 + num_partitions = 32 min_restart_duration = 5 - max_restart_duration = 25 + max_restart_duration = 20 topic_name = "task-worker" curr_time = int(time.time()) - # Check if topic exists, and create/update it with 32 partitions if it doesn't + # Ensure topic has correct number of partitions if not check_topic_exists("task-worker"): - print("Task-worker topic does not exist, creating it with 32 partitions") - create_topic(topic_name, 32) + print(f"Task-worker topic does not exist, creating it with {num_partitions} partitions") + create_topic(topic_name, num_partitions) else: - print("Task-worker topic already exists, making sure it has 32 partitions") - update_topic_partitions(topic_name, 32) + print(f"Task-worker topic already exists, making sure it has {num_partitions} partitions") + update_topic_partitions(topic_name, num_partitions) # Create config files for consumers print("Creating config files for consumers") + TESTS_OUTPUT_PATH.mkdir(exist_ok=True) consumer_configs = {} for i in range(num_consumers): db_name = f"db_{i}_{curr_time}" @@ -79,7 +87,7 @@ def test_tasks_written_once_during_rebalancing(): yaml.safe_dump(config, f) try: - # TODO: Use sentry run CLI to produce messages to topic + send_messages_to_kafka(num_messages) threads: list[Thread] = [] for i in range(num_consumers): thread = threading.Thread( @@ -100,8 +108,8 @@ def test_tasks_written_once_during_rebalancing(): for t in threads: t.join() - except Exception: - raise + except Exception as e: + raise Exception(f"Error running taskbroker: {e}") # Validate that all tasks were written once during rebalancing attach_db_stmt = "".join( @@ -110,7 +118,6 @@ def test_tasks_written_once_during_rebalancing(): for config in consumer_configs.values() ] ) - print(attach_db_stmt) from_stmt = "\nUNION ALL\n".join( [ f" SELECT * FROM {config['db_name']}.inflight_taskactivations" diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index f77c910f..ba183e2f 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -1 +1,2 @@ -pytest>=7.0.0 +pytest>=8.3.3 +pyyaml>=6.0.2 From aaaa86798a14f703a4a5abfba110f3a9b01bb9db Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 Nov 2024 16:25:50 -0500 Subject: [PATCH 08/35] update gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 6c289643..529812c3 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,6 @@ # Python **/__pycache__/ **/.pytest_cache/ -**/integration_tests/tmp/ +**/integration_tests/.tests_output/ .VERSION From ae7904ffa2d2fda7bce78d9e133a19a32bc9689d Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 Nov 2024 16:39:11 -0500 Subject: [PATCH 09/35] use rnd display andom seed value --- .../integration_tests/test_consumer_rebalancing.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index baa9c1b1..16435d20 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -27,6 +27,7 @@ def manage_consumer( iterations: int, min_sleep: int, max_sleep: int, + random_seed: int, log_file_path: str, ) -> None: with open(log_file_path, "a") as log_file: @@ -35,6 +36,7 @@ def manage_consumer( process = subprocess.Popen( [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file ) + random.seed(random_seed) time.sleep(random.randint(min_sleep, max_sleep)) print( f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" @@ -59,6 +61,17 @@ def test_tasks_written_once_during_rebalancing() -> None: topic_name = "task-worker" curr_time = int(time.time()) + print(f"""Running test with the following configuration: + num of consumers: {num_consumers}, + num of messages: {num_messages}, + num of restarts: {num_restarts}, + num of partitions: {num_partitions}, + min restart duration: {min_restart_duration} seconds, + max restart duration: {max_restart_duration} seconds, + topic name: {topic_name} + random seed value: {curr_time} + """) + # Ensure topic has correct number of partitions if not check_topic_exists("task-worker"): print(f"Task-worker topic does not exist, creating it with {num_partitions} partitions") @@ -99,6 +112,7 @@ def test_tasks_written_once_during_rebalancing() -> None: num_restarts, min_restart_duration, max_restart_duration, + curr_time, str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), ), ) From 79eb4a26c4099637ddbef74bbc73d7fabe0fb2e3 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 Nov 2024 16:48:23 -0500 Subject: [PATCH 10/35] fix path to test in ci --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7792eff8..1943cef1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,6 +81,6 @@ jobs: - name: Run Integration Tests env: - SENTRY_PATH: ${{ github.workspace }}/sentry + SENTRY_PATH: sentry/ run: | - python -m pytest tests/integration + python -m pytest python/integration_tests From 94635967702c07b795f23953b3b452e661150a7f Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Fri, 29 Nov 2024 15:23:39 -0500 Subject: [PATCH 11/35] produce to kafka with sentry --- .github/workflows/ci.yml | 10 ++-- .gitignore | 1 + Makefile | 2 + python/integration_tests/helpers.py | 56 ++++++++++++------- .../test_consumer_rebalancing.py | 15 +++-- python/requirements-dev.txt | 4 ++ 6 files changed, 56 insertions(+), 32 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1943cef1..9ad5de86 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,8 +79,10 @@ jobs: command: test args: --all - - name: Run Integration Tests - env: - SENTRY_PATH: sentry/ + - name: Install Python Dependencies run: | - python -m pytest python/integration_tests + make install-py-dev + + - name: Run Python Integration Tests + run: | + make integration-test diff --git a/.gitignore b/.gitignore index 529812c3..0e745d47 100644 --- a/.gitignore +++ b/.gitignore @@ -12,5 +12,6 @@ **/__pycache__/ **/.pytest_cache/ **/integration_tests/.tests_output/ +**/.venv .VERSION diff --git a/Makefile b/Makefile index f10e016a..d3fe3e7c 100644 --- a/Makefile +++ b/Makefile @@ -43,10 +43,12 @@ test: .PHONY: test install-py-dev: + source python/.venv/bin/activate pip install -r python/requirements-dev.txt .PHONY: install-py-dev integration-test: + source python/.venv/bin/activate python -m pytest python/integration_tests .PHONY: integration-test diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index 3cbc4bfd..6c160e8a 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -1,6 +1,12 @@ +import orjson import subprocess -from os import environ +import time + +from kafka import KafkaProducer from pathlib import Path +from uuid import uuid4 +from sentry_protos.sentry.v1.taskworker_pb2 import RetryState, TaskActivation +from google.protobuf.timestamp_pb2 import Timestamp TASKBROKER_ROOT = Path(__file__).parent.parent.parent TASKBROKER_BIN = TASKBROKER_ROOT / "target/debug/taskbroker" @@ -69,27 +75,37 @@ def update_topic_partitions(topic_name: str, num_partitions: int) -> None: pass -def send_messages_to_kafka(num_messages: int) -> None: - path_to_sentry = environ.get("SENTRY_PATH") - if not path_to_sentry: - raise Exception("SENTRY_PATH not set in environment. This is required to send messages to kafka") +def serialize_task_activation(args: list, kwargs: dict) -> bytes: + retry_state = RetryState( + attempts=0, + kind="sentry.taskworker.retry.Retry", + discard_after_attempt=None, + deadletter_after_attempt=None, + ) + pending_task_payload = TaskActivation( + id=uuid4().hex, + namespace="integration_tests", + taskname="integration_tests.say_hello", + parameters=orjson.dumps({"args": args, "kwargs": kwargs}), + retry_state=retry_state, + received_at=Timestamp(seconds=int(time.time())), + ).SerializeToString() + + return pending_task_payload + +def send_messages_to_kafka(topic_name: str, num_messages: int) -> None: try: - send_tasks_cmd = [ - "cd", - path_to_sentry, - "&&", - "sentry", - "run", - "taskbroker-send-tasks", - "--task-function-path", - "sentry.taskworker.tasks.examples.say_hello", - "--args", - "'[\"foobar\"]'", - "--repeat", - str(num_messages), - ] - subprocess.run(send_tasks_cmd, check=True) + producer = KafkaProducer( + bootstrap_servers=['localhost:9092'], + ) + + for _ in range(num_messages): + task_message = serialize_task_activation(["foobar"], {}) + future = producer.send(topic_name, task_message) + future.get(timeout=10) # make sure messages were successfully sent + producer.flush() + producer.close() print(f"Sent {num_messages} messages to kafka") except Exception as e: raise Exception(f"Failed to send messages to kafka: {e}") diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 16435d20..5a6e1d50 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -36,7 +36,7 @@ def manage_consumer( process = subprocess.Popen( [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file ) - random.seed(random_seed) + # random.seed(random_seed) time.sleep(random.randint(min_sleep, max_sleep)) print( f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" @@ -73,11 +73,11 @@ def test_tasks_written_once_during_rebalancing() -> None: """) # Ensure topic has correct number of partitions - if not check_topic_exists("task-worker"): - print(f"Task-worker topic does not exist, creating it with {num_partitions} partitions") + if not check_topic_exists(topic_name): + print(f"{topic_name} topic does not exist, creating it with {num_partitions} partitions") create_topic(topic_name, num_partitions) else: - print(f"Task-worker topic already exists, making sure it has {num_partitions} partitions") + print(f"{topic_name} topic already exists, making sure it has {num_partitions} partitions") update_topic_partitions(topic_name, num_partitions) # Create config files for consumers @@ -89,8 +89,8 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_configs[f"config_{i}.yml"] = { "db_name": db_name, "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), - "kafka_topic": "task-worker", - "kafka_consumer_group": "task-worker", + "kafka_topic": topic_name, + "kafka_consumer_group": topic_name, "kafka_auto_offset_reset": "earliest", "grpc_port": 50051 + i, } @@ -100,7 +100,7 @@ def test_tasks_written_once_during_rebalancing() -> None: yaml.safe_dump(config, f) try: - send_messages_to_kafka(num_messages) + send_messages_to_kafka(topic_name, num_messages) threads: list[Thread] = [] for i in range(num_consumers): thread = threading.Thread( @@ -150,7 +150,6 @@ def test_tasks_written_once_during_rebalancing() -> None: GROUP BY partition ORDER BY partition; """ - query = f"SELECT * FROM {consumer_configs['config_0.yml']['db_name']}.inflight_taskactivations;" con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) cur = con.cursor() diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index ba183e2f..d0a38afa 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -1,2 +1,6 @@ +kafka-python>=2.0.2 +orjson>=3.10.12 +protobuf>=5.29.0 pytest>=8.3.3 pyyaml>=6.0.2 +sentry-protos>=0.1.37 From 6690164de769d86b0f6f1ba21f3a143d3c25529d Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Fri, 29 Nov 2024 15:30:27 -0500 Subject: [PATCH 12/35] clean up --- .github/workflows/ci.yml | 6 ++++-- Makefile | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ad5de86..d7a46ea9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,8 +81,10 @@ jobs: - name: Install Python Dependencies run: | - make install-py-dev + python -m venv python/.venv + . python/.venv/bin/activate + pip install -r python/requirements-dev.txt - name: Run Python Integration Tests run: | - make integration-test + python -m pytest python/integration_tests diff --git a/Makefile b/Makefile index d3fe3e7c..2dd09cea 100644 --- a/Makefile +++ b/Makefile @@ -43,12 +43,12 @@ test: .PHONY: test install-py-dev: - source python/.venv/bin/activate + . python/.venv/bin/activate pip install -r python/requirements-dev.txt .PHONY: install-py-dev integration-test: - source python/.venv/bin/activate + . python/.venv/bin/activate python -m pytest python/integration_tests .PHONY: integration-test From 189b57812f41a635b44ce7421760a033e608e68e Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Fri, 29 Nov 2024 15:37:38 -0500 Subject: [PATCH 13/35] accept test_seed environment variable --- python/integration_tests/test_consumer_rebalancing.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 5a6e1d50..d6716344 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -1,3 +1,4 @@ +import os import random import shutil import signal @@ -36,7 +37,6 @@ def manage_consumer( process = subprocess.Popen( [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file ) - # random.seed(random_seed) time.sleep(random.randint(min_sleep, max_sleep)) print( f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" @@ -61,6 +61,10 @@ def test_tasks_written_once_during_rebalancing() -> None: topic_name = "task-worker" curr_time = int(time.time()) + random_seed = int(os.environ.get("TEST_SEED")) + if not random_seed: + random_seed = curr_time + print(f"""Running test with the following configuration: num of consumers: {num_consumers}, num of messages: {num_messages}, @@ -69,8 +73,9 @@ def test_tasks_written_once_during_rebalancing() -> None: min restart duration: {min_restart_duration} seconds, max restart duration: {max_restart_duration} seconds, topic name: {topic_name} - random seed value: {curr_time} + random seed value: {random_seed} """) + random.seed(curr_time) # Ensure topic has correct number of partitions if not check_topic_exists(topic_name): From 9d152c98db422d26adfdf8d4d55fe8e0f4a11b3f Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Fri, 29 Nov 2024 15:42:37 -0500 Subject: [PATCH 14/35] small fix --- python/integration_tests/test_consumer_rebalancing.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index d6716344..dc1a5ca5 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -61,9 +61,7 @@ def test_tasks_written_once_during_rebalancing() -> None: topic_name = "task-worker" curr_time = int(time.time()) - random_seed = int(os.environ.get("TEST_SEED")) - if not random_seed: - random_seed = curr_time + random_seed = int(os.environ.get("TEST_SEED")) if os.environ.get("TEST_SEED") else curr_time print(f"""Running test with the following configuration: num of consumers: {num_consumers}, From 537eaa88fbcf080d4a3b73e13504bf2cddc1144e Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Sat, 30 Nov 2024 12:55:27 -0500 Subject: [PATCH 15/35] use confluent-kafka --- python/integration_tests/helpers.py | 17 +++++++++-------- python/pyproject.toml | 1 - python/requirements-dev.txt | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/python/integration_tests/helpers.py b/python/integration_tests/helpers.py index 6c160e8a..bcc7f491 100644 --- a/python/integration_tests/helpers.py +++ b/python/integration_tests/helpers.py @@ -2,7 +2,7 @@ import subprocess import time -from kafka import KafkaProducer +from confluent_kafka import Producer from pathlib import Path from uuid import uuid4 from sentry_protos.sentry.v1.taskworker_pb2 import RetryState, TaskActivation @@ -96,16 +96,17 @@ def serialize_task_activation(args: list, kwargs: dict) -> bytes: def send_messages_to_kafka(topic_name: str, num_messages: int) -> None: try: - producer = KafkaProducer( - bootstrap_servers=['localhost:9092'], - ) + producer = Producer({ + 'bootstrap.servers': 'localhost:9092', + 'broker.address.family': 'v4' + }) for _ in range(num_messages): task_message = serialize_task_activation(["foobar"], {}) - future = producer.send(topic_name, task_message) - future.get(timeout=10) # make sure messages were successfully sent + producer.produce(topic_name, task_message) + + producer.poll(5) # trigger delivery reports producer.flush() - producer.close() - print(f"Sent {num_messages} messages to kafka") + print(f"Sent {num_messages} messages to kafka topic {topic_name}") except Exception as e: raise Exception(f"Failed to send messages to kafka: {e}") diff --git a/python/pyproject.toml b/python/pyproject.toml index d7c27fbb..4a65d254 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -3,7 +3,6 @@ requires = ["setuptools>=42", "wheel"] build-backend = "setuptools.build_meta" [tool.pytest.ini_options] -asyncio_mode = "auto" testpaths = ["integration_tests"] python_files = ["test_*.py"] python_functions = ["test_*"] diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt index d0a38afa..1853c2d8 100644 --- a/python/requirements-dev.txt +++ b/python/requirements-dev.txt @@ -1,6 +1,6 @@ -kafka-python>=2.0.2 -orjson>=3.10.12 -protobuf>=5.29.0 +confluent_kafka>=2.3.0 +orjson>=3.10.10 +protobuf>=5.28.3 pytest>=8.3.3 pyyaml>=6.0.2 sentry-protos>=0.1.37 From 2f3868a4406fa095dc3dbc78a5e840c6d7baf780 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Mon, 2 Dec 2024 14:15:26 -0500 Subject: [PATCH 16/35] print query result --- python/integration_tests/test_consumer_rebalancing.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index dc1a5ca5..b90fbacf 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -146,7 +146,7 @@ def test_tasks_written_once_during_rebalancing() -> None: partition, (max(offset) - min(offset)) + 1 AS offset_diff, count(*) AS occ, - (max(offset) - min(offset)) + 1 - count(offset) AS delta + (max(offset) - min(offset)) + 1 - count(*) AS delta FROM ( {from_stmt} ) @@ -157,10 +157,11 @@ def test_tasks_written_once_during_rebalancing() -> None: con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) cur = con.cursor() cur.executescript(attach_db_stmt) - res = cur.execute(query) + res = cur.execute(query).fetchall() + print(res) assert all( - [res[3] == 0 for res in res.fetchall()] + [row[3] == 0 for row in res] ) # Assert that each value in the delta (fourth) column is 0 print("Taskbroker integration test completed successfully.") From 1c83e311bccf7d6a1cc77acae8a1e547e2d8bb23 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Mon, 2 Dec 2024 15:28:09 -0500 Subject: [PATCH 17/35] upload artifact --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d7a46ea9..c5929df1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -87,4 +87,5 @@ jobs: - name: Run Python Integration Tests run: | + export PYTEST_ADDOPTS="" python -m pytest python/integration_tests From 242c0476e1cf6efe24983c48b521cf660cd6a80e Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 14:59:08 -0800 Subject: [PATCH 18/35] pretty print output --- .../test_consumer_rebalancing.py | 54 ++++++++++++------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index b90fbacf..83521a1a 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -17,25 +17,29 @@ check_topic_exists, create_topic, update_topic_partitions, - send_messages_to_kafka + send_messages_to_kafka, ) def manage_consumer( - consumer_index: int, - consumer_path: str, - config_file_path: str, - iterations: int, - min_sleep: int, - max_sleep: int, - random_seed: int, - log_file_path: str, + consumer_index: int, + consumer_path: str, + config_file_path: str, + iterations: int, + min_sleep: int, + max_sleep: int, + random_seed: int, + log_file_path: str, ) -> None: with open(log_file_path, "a") as log_file: - print(f"Starting consumer {consumer_index}, writing log file to {log_file_path}") + print( + f"Starting consumer {consumer_index}, writing log file to {log_file_path}" + ) for i in range(iterations): process = subprocess.Popen( - [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file + [consumer_path, "-c", config_file_path], + stderr=subprocess.STDOUT, + stdout=log_file, ) time.sleep(random.randint(min_sleep, max_sleep)) print( @@ -54,16 +58,19 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_path = str(TASKBROKER_BIN) num_consumers = 8 num_messages = 80_000 - num_restarts = 4 + num_restarts = 1 num_partitions = 32 min_restart_duration = 5 max_restart_duration = 20 topic_name = "task-worker" curr_time = int(time.time()) - random_seed = int(os.environ.get("TEST_SEED")) if os.environ.get("TEST_SEED") else curr_time + random_seed = ( + int(os.environ.get("TEST_SEED")) if os.environ.get("TEST_SEED") else curr_time + ) - print(f"""Running test with the following configuration: + print( + f"""Running test with the following configuration: num of consumers: {num_consumers}, num of messages: {num_messages}, num of restarts: {num_restarts}, @@ -72,15 +79,20 @@ def test_tasks_written_once_during_rebalancing() -> None: max restart duration: {max_restart_duration} seconds, topic name: {topic_name} random seed value: {random_seed} - """) + """ + ) random.seed(curr_time) # Ensure topic has correct number of partitions if not check_topic_exists(topic_name): - print(f"{topic_name} topic does not exist, creating it with {num_partitions} partitions") + print( + f"{topic_name} topic does not exist, creating it with {num_partitions} partitions" + ) create_topic(topic_name, num_partitions) else: - print(f"{topic_name} topic already exists, making sure it has {num_partitions} partitions") + print( + f"{topic_name} topic already exists, making sure it has {num_partitions} partitions" + ) update_topic_partitions(topic_name, num_partitions) # Create config files for consumers @@ -158,7 +170,13 @@ def test_tasks_written_once_during_rebalancing() -> None: cur = con.cursor() cur.executescript(attach_db_stmt) res = cur.execute(query).fetchall() - print(res) + print( + f"\n{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" + ) + for partition, expected_row_count, actual_row_count, diff in res: + print( + f"{str(partition).rjust(16)}{str(expected_row_count).rjust(16)}{str(actual_row_count).rjust(16)}{str(diff).rjust(16)}" + ) assert all( [row[3] == 0 for row in res] From 8dfbc71dcc54c1e6559d02cd537106038da7dcef Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 15:14:18 -0800 Subject: [PATCH 19/35] show duplicate rows --- .../test_consumer_rebalancing.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 83521a1a..7185fecb 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -65,9 +65,7 @@ def test_tasks_written_once_during_rebalancing() -> None: topic_name = "task-worker" curr_time = int(time.time()) - random_seed = ( - int(os.environ.get("TEST_SEED")) if os.environ.get("TEST_SEED") else curr_time - ) + random_seed = 1733180863 print( f"""Running test with the following configuration: @@ -181,7 +179,21 @@ def test_tasks_written_once_during_rebalancing() -> None: assert all( [row[3] == 0 for row in res] ) # Assert that each value in the delta (fourth) column is 0 - print("Taskbroker integration test completed successfully.") + + query = f""" + SELECT partition, offset, count(*) as count + FROM ( + {from_stmt} + ) + GROUP BY partition, offset + HAVING count > 1 + """ + res = cur.execute(query).fetchall() + print(f"\n{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") + for partition, offset, count in res: + print( + f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" + ) # Clean up test output files print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") From a89166610755556c9b8ab74ed76ba492b97362e8 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 15:19:41 -0800 Subject: [PATCH 20/35] move assertion to end of test --- .../integration_tests/test_consumer_rebalancing.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 7185fecb..93d94c3c 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -167,19 +167,15 @@ def test_tasks_written_once_during_rebalancing() -> None: con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) cur = con.cursor() cur.executescript(attach_db_stmt) - res = cur.execute(query).fetchall() + row_count = cur.execute(query).fetchall() print( f"\n{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" ) - for partition, expected_row_count, actual_row_count, diff in res: + for partition, expected_row_count, actual_row_count, diff in row_count: print( f"{str(partition).rjust(16)}{str(expected_row_count).rjust(16)}{str(actual_row_count).rjust(16)}{str(diff).rjust(16)}" ) - assert all( - [row[3] == 0 for row in res] - ) # Assert that each value in the delta (fourth) column is 0 - query = f""" SELECT partition, offset, count(*) as count FROM ( @@ -195,6 +191,10 @@ def test_tasks_written_once_during_rebalancing() -> None: f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" ) + assert all( + [row[3] == 0 for row in row_count] + ) # Assert that each value in the delta (fourth) column is 0 + # Clean up test output files print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") shutil.rmtree(TESTS_OUTPUT_PATH) From 537a7c3ea037b7a9e43c6e6523ecdb1e98d56ed5 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 15:30:08 -0800 Subject: [PATCH 21/35] print inserted offset and partition --- .github/workflows/ci.yml | 2 +- python/integration_tests/test_consumer_rebalancing.py | 5 +++++ src/consumer/inflight_activation_writer.rs | 7 +++++-- src/consumer/kafka.rs | 1 - 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c5929df1..215e4e5c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,4 +88,4 @@ jobs: - name: Run Python Integration Tests run: | export PYTEST_ADDOPTS="" - python -m pytest python/integration_tests + python -m pytest python/integration_tests -s -vv diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 93d94c3c..fd3dab05 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -191,6 +191,11 @@ def test_tasks_written_once_during_rebalancing() -> None: f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" ) + for i in range(num_consumers): + print(f"=== consumer {i} log ===") + with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f: + print(f.read()) + assert all( [row[3] == 0 for row in row_count] ) # Assert that each value in the delta (fourth) column is 0 diff --git a/src/consumer/inflight_activation_writer.rs b/src/consumer/inflight_activation_writer.rs index cbf4411b..67aacbe2 100644 --- a/src/consumer/inflight_activation_writer.rs +++ b/src/consumer/inflight_activation_writer.rs @@ -63,14 +63,17 @@ impl Reducer for InflightActivationWriter { if self.buffer.is_empty() { return Ok(()); } - let res = self + info!( + "Inserting ({}, {})", + self.buffer[0].partition, self.buffer[0].offset + ); + let _ = self .store .store(replace( &mut self.buffer, Vec::with_capacity(self.config.max_buf_len), )) .await?; - info!("Inserted {:?} entries", res.rows_affected); Ok(()) } diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index bf77b0bb..d8b58b9d 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -791,7 +791,6 @@ pub async fn commit( _rendezvous_guard: oneshot::Sender<()>, ) -> Result<(), Error> { while let Some(msgs) = receiver.recv().await { - debug!("Storing offsets"); let mut highwater_mark = HighwaterMark::new(); msgs.0.iter().for_each(|msg| highwater_mark.track(msg)); consumer.store_offsets(&highwater_mark.into()).unwrap(); From c5dabdaa1a0f32060bcf55d877a676e7031e0505 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 15:54:26 -0800 Subject: [PATCH 22/35] print highwater mark stored --- src/consumer/kafka.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index d8b58b9d..35d73009 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -748,7 +748,7 @@ impl CommitClient for StreamConsumer { } } -#[derive(Default)] +#[derive(Default, Debug)] struct HighwaterMark { data: HashMap<(String, i32), i64>, } @@ -793,6 +793,7 @@ pub async fn commit( while let Some(msgs) = receiver.recv().await { let mut highwater_mark = HighwaterMark::new(); msgs.0.iter().for_each(|msg| highwater_mark.track(msg)); + debug!("Storing {:?}", highwater_mark); consumer.store_offsets(&highwater_mark.into()).unwrap(); } debug!("Shutdown complete"); From 96a4e9ced6ac364f375e2f77f868086ea385eac6 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 16:36:16 -0800 Subject: [PATCH 23/35] revert 2b9064c4f9cae9e8babfcdf9024c494192645c53 --- src/consumer/kafka.rs | 45 +++++++++++++++---------------------------- 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index 35d73009..1f5fcc11 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -32,7 +32,7 @@ use tokio::{ mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }, - task::{self, JoinError, JoinSet}, + task::{self, JoinSet}, time::{self, sleep, MissedTickBehavior}, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -60,46 +60,32 @@ pub async fn start_consumer( .subscribe(topics) .expect("Can't subscribe to specified topics"); - select! { - res = handle_os_signals(event_sender.clone()) => { - info!("Received shutdown signal, shutting down consumer"); - match res { - Ok(res) => Ok(res), - Err(e) => Err(anyhow!("Error in OS signals handler: {}", e)), - } - } - res = handle_consumer_client(consumer.clone(), client_shutdown_receiver) => { - info!("Consumer client shutdown"); - match res { - Ok(res) => Ok(res), - Err(e) => Err(anyhow!("Error in consumer client: {}", e)), - } - } - res = handle_events(consumer, event_receiver, client_shutdown_sender, spawn_actors) => { - info!("Events handler shutdown"); - res - } - } + handle_os_signals(event_sender.clone()); + handle_consumer_client(consumer.clone(), client_shutdown_receiver); + handle_events( + consumer, + event_receiver, + client_shutdown_sender, + spawn_actors, + ) + .await } -pub async fn handle_os_signals( - event_sender: UnboundedSender<(Event, SyncSender<()>)>, -) -> Result<(), JoinError> { +pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>) { let guard = elegant_departure::get_shutdown_guard(); tokio::spawn(async move { let _ = guard.wait().await; info!("Cancellation token received, shutting down consumer"); let (rendezvous_sender, _) = sync_channel(0); let _ = event_sender.send((Event::Shutdown, rendezvous_sender)); - }) - .await + }); } #[instrument(skip(consumer, shutdown))] -pub async fn handle_consumer_client( +pub fn handle_consumer_client( consumer: Arc>, shutdown: oneshot::Receiver<()>, -) -> Result<(), JoinError> { +) { task::spawn_blocking(|| { Handle::current().block_on(async move { select! { @@ -114,8 +100,7 @@ pub async fn handle_consumer_client( } debug!("Shutdown complete"); }); - }) - .await + }); } #[derive(Debug)] From 22ec37aa8fd0d959e6406d5c0433163328237991 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 16:43:35 -0800 Subject: [PATCH 24/35] remove extra logging --- .../test_consumer_rebalancing.py | 21 ++++++++++++------- src/consumer/inflight_activation_writer.rs | 7 ++----- src/consumer/kafka.rs | 3 +-- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index fd3dab05..aeb38ed8 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -191,15 +191,20 @@ def test_tasks_written_once_during_rebalancing() -> None: f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" ) - for i in range(num_consumers): - print(f"=== consumer {i} log ===") - with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f: - print(f.read()) - - assert all( - [row[3] == 0 for row in row_count] - ) # Assert that each value in the delta (fourth) column is 0 + if not all([row[3] == 0 for row in row_count]): + print( + "Test failed! Got duplicate/missing kafka messages in sqlite, dumping logs" + ) + for i in range(num_consumers): + print(f"=== consumer {i} log ===") + with open( + str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r" + ) as f: + print(f.read()) # Clean up test output files print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") shutil.rmtree(TESTS_OUTPUT_PATH) + + if not all([row[3] == 0 for row in row_count]): + assert False diff --git a/src/consumer/inflight_activation_writer.rs b/src/consumer/inflight_activation_writer.rs index 67aacbe2..cbf4411b 100644 --- a/src/consumer/inflight_activation_writer.rs +++ b/src/consumer/inflight_activation_writer.rs @@ -63,17 +63,14 @@ impl Reducer for InflightActivationWriter { if self.buffer.is_empty() { return Ok(()); } - info!( - "Inserting ({}, {})", - self.buffer[0].partition, self.buffer[0].offset - ); - let _ = self + let res = self .store .store(replace( &mut self.buffer, Vec::with_capacity(self.config.max_buf_len), )) .await?; + info!("Inserted {:?} entries", res.rows_affected); Ok(()) } diff --git a/src/consumer/kafka.rs b/src/consumer/kafka.rs index 1f5fcc11..7ec232db 100644 --- a/src/consumer/kafka.rs +++ b/src/consumer/kafka.rs @@ -733,7 +733,7 @@ impl CommitClient for StreamConsumer { } } -#[derive(Default, Debug)] +#[derive(Default)] struct HighwaterMark { data: HashMap<(String, i32), i64>, } @@ -778,7 +778,6 @@ pub async fn commit( while let Some(msgs) = receiver.recv().await { let mut highwater_mark = HighwaterMark::new(); msgs.0.iter().for_each(|msg| highwater_mark.track(msg)); - debug!("Storing {:?}", highwater_mark); consumer.store_offsets(&highwater_mark.into()).unwrap(); } debug!("Shutdown complete"); From 40908029b0ab09ab7a47e6b4640684a11eaf5759 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 16:54:29 -0800 Subject: [PATCH 25/35] add more restarts --- python/integration_tests/test_consumer_rebalancing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index aeb38ed8..8d4cfe07 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -58,10 +58,10 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_path = str(TASKBROKER_BIN) num_consumers = 8 num_messages = 80_000 - num_restarts = 1 + num_restarts = 8 num_partitions = 32 - min_restart_duration = 5 - max_restart_duration = 20 + min_restart_duration = 1 + max_restart_duration = 12 topic_name = "task-worker" curr_time = int(time.time()) From 86eccafc1cf0408d3e83a93987fd16eacde7d933 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 17:00:17 -0800 Subject: [PATCH 26/35] increase max pending count --- python/integration_tests/test_consumer_rebalancing.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 8d4cfe07..dfede22e 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -102,6 +102,7 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_configs[f"config_{i}.yml"] = { "db_name": db_name, "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), + "max_pending_count": 8192, "kafka_topic": topic_name, "kafka_consumer_group": topic_name, "kafka_auto_offset_reset": "earliest", @@ -154,9 +155,9 @@ def test_tasks_written_once_during_rebalancing() -> None: query = f""" SELECT partition, - (max(offset) - min(offset)) + 1 AS offset_diff, - count(*) AS occ, - (max(offset) - min(offset)) + 1 - count(*) AS delta + (max(offset) - min(offset)) + 1 AS expected, + count(*) AS actual, + (max(offset) - min(offset)) + 1 - count(*) AS diff FROM ( {from_stmt} ) @@ -168,6 +169,7 @@ def test_tasks_written_once_during_rebalancing() -> None: cur = con.cursor() cur.executescript(attach_db_stmt) row_count = cur.execute(query).fetchall() + print(query) print( f"\n{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" ) @@ -185,6 +187,7 @@ def test_tasks_written_once_during_rebalancing() -> None: HAVING count > 1 """ res = cur.execute(query).fetchall() + print(query) print(f"\n{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") for partition, offset, count in res: print( From b99a6626088991e343b177cfb0a030831db97844 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 17:12:04 -0800 Subject: [PATCH 27/35] more restarts --- python/integration_tests/test_consumer_rebalancing.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index dfede22e..ee16cc22 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -58,10 +58,10 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_path = str(TASKBROKER_BIN) num_consumers = 8 num_messages = 80_000 - num_restarts = 8 + num_restarts = 16 num_partitions = 32 min_restart_duration = 1 - max_restart_duration = 12 + max_restart_duration = 24 topic_name = "task-worker" curr_time = int(time.time()) @@ -146,9 +146,9 @@ def test_tasks_written_once_during_rebalancing() -> None: for config in consumer_configs.values() ] ) - from_stmt = "\nUNION ALL\n".join( + from_stmt = "\n UNION ALL\n".join( [ - f" SELECT * FROM {config['db_name']}.inflight_taskactivations" + f" SELECT * FROM {config['db_name']}.inflight_taskactivations" for config in consumer_configs.values() ] ) @@ -159,7 +159,7 @@ def test_tasks_written_once_during_rebalancing() -> None: count(*) AS actual, (max(offset) - min(offset)) + 1 - count(*) AS diff FROM ( - {from_stmt} +{from_stmt} ) GROUP BY partition ORDER BY partition; From 1498b26fa074a0aa5665d9e0b72fde4f41154508 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 17:18:53 -0800 Subject: [PATCH 28/35] more restart durations --- python/integration_tests/test_consumer_rebalancing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index ee16cc22..71da334a 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -43,7 +43,7 @@ def manage_consumer( ) time.sleep(random.randint(min_sleep, max_sleep)) print( - f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining" + f"Sending SIGINT to consumer {consumer_index}, {iterations - i - 1} SIGINTs remaining for that consumer" ) process.send_signal(signal.SIGINT) try: @@ -61,7 +61,7 @@ def test_tasks_written_once_during_rebalancing() -> None: num_restarts = 16 num_partitions = 32 min_restart_duration = 1 - max_restart_duration = 24 + max_restart_duration = 30 topic_name = "task-worker" curr_time = int(time.time()) @@ -171,7 +171,7 @@ def test_tasks_written_once_during_rebalancing() -> None: row_count = cur.execute(query).fetchall() print(query) print( - f"\n{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" + f"{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" ) for partition, expected_row_count, actual_row_count, diff in row_count: print( @@ -181,7 +181,7 @@ def test_tasks_written_once_during_rebalancing() -> None: query = f""" SELECT partition, offset, count(*) as count FROM ( - {from_stmt} +{from_stmt} ) GROUP BY partition, offset HAVING count > 1 From 34c98cb329ecadb7e91a6859254092f18f60ee97 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 17:23:07 -0800 Subject: [PATCH 29/35] better logs --- python/integration_tests/test_consumer_rebalancing.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 71da334a..5fabe0f8 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -169,6 +169,7 @@ def test_tasks_written_once_during_rebalancing() -> None: cur = con.cursor() cur.executescript(attach_db_stmt) row_count = cur.execute(query).fetchall() + print("======== Verify number of rows based on max and min offset ========") print(query) print( f"{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" @@ -187,6 +188,7 @@ def test_tasks_written_once_during_rebalancing() -> None: HAVING count > 1 """ res = cur.execute(query).fetchall() + print("======== Verify all (partition, offset) are unique ========") print(query) print(f"\n{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") for partition, offset, count in res: From feccd50377ecc7f4907aeec3623dc96501a7555c Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 18:19:36 -0800 Subject: [PATCH 30/35] pretty print output --- Makefile | 6 ++++-- .../integration_tests/test_consumer_rebalancing.py | 13 ++++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 2dd09cea..12ba7251 100644 --- a/Makefile +++ b/Makefile @@ -38,18 +38,20 @@ format: ## Run autofix mode for formatting and lint # Tests -test: +unit-test: cargo test .PHONY: test install-py-dev: + python -m venv python/.venv . python/.venv/bin/activate pip install -r python/requirements-dev.txt .PHONY: install-py-dev integration-test: + python -m venv python/.venv . python/.venv/bin/activate - python -m pytest python/integration_tests + python -m pytest python/integration_tests -s -vv .PHONY: integration-test # Help diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 5fabe0f8..722a63d3 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -68,7 +68,8 @@ def test_tasks_written_once_during_rebalancing() -> None: random_seed = 1733180863 print( - f"""Running test with the following configuration: + f""" +Running test with the following configuration: num of consumers: {num_consumers}, num of messages: {num_messages}, num of restarts: {num_restarts}, @@ -152,8 +153,7 @@ def test_tasks_written_once_during_rebalancing() -> None: for config in consumer_configs.values() ] ) - query = f""" - SELECT + query = f""" SELECT partition, (max(offset) - min(offset)) + 1 AS expected, count(*) AS actual, @@ -170,7 +170,9 @@ def test_tasks_written_once_during_rebalancing() -> None: cur.executescript(attach_db_stmt) row_count = cur.execute(query).fetchall() print("======== Verify number of rows based on max and min offset ========") + print("Query:") print(query) + print("Result:") print( f"{'Partition'.rjust(16)}{'Expected'.rjust(16)}{'Actual'.rjust(16)}{'Diff'.rjust(16)}" ) @@ -179,8 +181,7 @@ def test_tasks_written_once_during_rebalancing() -> None: f"{str(partition).rjust(16)}{str(expected_row_count).rjust(16)}{str(actual_row_count).rjust(16)}{str(diff).rjust(16)}" ) - query = f""" - SELECT partition, offset, count(*) as count + query = f""" SELECT partition, offset, count(*) as count FROM ( {from_stmt} ) @@ -189,7 +190,9 @@ def test_tasks_written_once_during_rebalancing() -> None: """ res = cur.execute(query).fetchall() print("======== Verify all (partition, offset) are unique ========") + print("Query:") print(query) + print("Result:") print(f"\n{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") for partition, offset, count in res: print( From fa44ef85f1863384fbd57372d848106b257ae309 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 18:30:26 -0800 Subject: [PATCH 31/35] pretty print output --- python/integration_tests/test_consumer_rebalancing.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 722a63d3..902f8e9c 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -162,14 +162,13 @@ def test_tasks_written_once_during_rebalancing() -> None: {from_stmt} ) GROUP BY partition - ORDER BY partition; - """ + ORDER BY partition;""" con = sqlite3.connect(consumer_configs["config_0.yml"]["db_path"]) cur = con.cursor() cur.executescript(attach_db_stmt) row_count = cur.execute(query).fetchall() - print("======== Verify number of rows based on max and min offset ========") + print("\n======== Verify number of rows based on max and min offset ========") print("Query:") print(query) print("Result:") @@ -186,10 +185,9 @@ def test_tasks_written_once_during_rebalancing() -> None: {from_stmt} ) GROUP BY partition, offset - HAVING count > 1 - """ + HAVING count > 1""" res = cur.execute(query).fetchall() - print("======== Verify all (partition, offset) are unique ========") + print("\n======== Verify all (partition, offset) are unique ========") print("Query:") print(query) print("Result:") From 8b91ceeb4c634fca6ce98bc1da7bb45d9c41a031 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 2 Dec 2024 19:00:23 -0800 Subject: [PATCH 32/35] pretty print output --- python/integration_tests/test_consumer_rebalancing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 902f8e9c..51a4caf4 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -191,7 +191,7 @@ def test_tasks_written_once_during_rebalancing() -> None: print("Query:") print(query) print("Result:") - print(f"\n{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") + print(f"{'Partition'.rjust(16)}{'Offset'.rjust(16)}{'count'.rjust(16)}") for partition, offset, count in res: print( f"{str(partition).rjust(16)}{str(offset).rjust(16)}{str(count).rjust(16)}" From b3c53f2d428bcd967b17903257ed569aa61f38c1 Mon Sep 17 00:00:00 2001 From: John Yang Date: Tue, 3 Dec 2024 07:39:04 -0800 Subject: [PATCH 33/35] add cargo build to integration test recipie --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 12ba7251..38826ecf 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ install-py-dev: .PHONY: install-py-dev integration-test: + cargo build python -m venv python/.venv . python/.venv/bin/activate python -m pytest python/integration_tests -s -vv From 5f2553982cdb38aa710d9e84028a7e46dc31853b Mon Sep 17 00:00:00 2001 From: John Yang Date: Tue, 3 Dec 2024 07:49:40 -0800 Subject: [PATCH 34/35] add more messages --- python/integration_tests/test_consumer_rebalancing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 51a4caf4..20e5066f 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -57,11 +57,11 @@ def test_tasks_written_once_during_rebalancing() -> None: # Test configuration consumer_path = str(TASKBROKER_BIN) num_consumers = 8 - num_messages = 80_000 + num_messages = 100_000 num_restarts = 16 num_partitions = 32 min_restart_duration = 1 - max_restart_duration = 30 + max_restart_duration = 10 topic_name = "task-worker" curr_time = int(time.time()) @@ -103,7 +103,7 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_configs[f"config_{i}.yml"] = { "db_name": db_name, "db_path": str(TESTS_OUTPUT_PATH / f"{db_name}.sqlite"), - "max_pending_count": 8192, + "max_pending_count": 16384, "kafka_topic": topic_name, "kafka_consumer_group": topic_name, "kafka_auto_offset_reset": "earliest", From f3261b87bd60efa099e3fde9f74d461004d96bd9 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 3 Dec 2024 12:05:07 -0500 Subject: [PATCH 35/35] clean up and fix seed value --- python/integration_tests/test_consumer_rebalancing.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 20e5066f..15121762 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -1,4 +1,3 @@ -import os import random import shutil import signal @@ -28,7 +27,6 @@ def manage_consumer( iterations: int, min_sleep: int, max_sleep: int, - random_seed: int, log_file_path: str, ) -> None: with open(log_file_path, "a") as log_file: @@ -65,8 +63,6 @@ def test_tasks_written_once_during_rebalancing() -> None: topic_name = "task-worker" curr_time = int(time.time()) - random_seed = 1733180863 - print( f""" Running test with the following configuration: @@ -77,10 +73,10 @@ def test_tasks_written_once_during_rebalancing() -> None: min restart duration: {min_restart_duration} seconds, max restart duration: {max_restart_duration} seconds, topic name: {topic_name} - random seed value: {random_seed} + random seed value: 42 """ ) - random.seed(curr_time) + random.seed(42) # Ensure topic has correct number of partitions if not check_topic_exists(topic_name): @@ -127,7 +123,6 @@ def test_tasks_written_once_during_rebalancing() -> None: num_restarts, min_restart_duration, max_restart_duration, - curr_time, str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), ), )