Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions server/php-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "server/php-engine.h"
#include "server/php-script-run-once-invoker.h"

#include <cassert>
#include <cerrno>
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <fcntl.h>
#include <fstream>
#include <limits>
#include <netdb.h>
#include <netinet/in.h>
#include <optional>
#include <poll.h>
#include "re2/re2.h"
#include <string>
Expand Down Expand Up @@ -838,7 +841,6 @@ int rpcx_func_wakeup(connection *c) {

int rpcx_func_close(connection *c, int who __attribute__((unused))) {
auto *D = TCP_RPC_DATA(c);

auto *worker = reinterpret_cast<PhpWorker *>(D->extra);
if (worker != nullptr) {
worker->terminate(1, script_error_t::rpc_connection_close, "rpc connection close");
Expand Down Expand Up @@ -1439,7 +1441,7 @@ void reopen_json_log() {
}
}

void generic_event_loop(WorkerType worker_type, bool invoke_dummy_self_rpc_request) noexcept {
void generic_event_loop(WorkerType worker_type, bool run_once_mode_enabled) noexcept {
if (master_flag && logname_pattern != nullptr) {
reopen_logs();
reopen_json_log();
Expand All @@ -1450,6 +1452,9 @@ void generic_event_loop(WorkerType worker_type, bool invoke_dummy_self_rpc_reque
double last_cron_time = 0;
double next_create_outbound = 0;

// Runner for --once=N mode, initialized only when needed. In special run once prefork mode it's initialized ONLY at general workers
auto &run_once_invoker = vk::singleton<PhpScriptRunOnceInvoker>::get();

switch (worker_type) {
case WorkerType::general_worker: {
const auto &http_server_ctx = vk::singleton<HttpServerContext>::get();
Expand All @@ -1465,23 +1470,8 @@ void generic_event_loop(WorkerType worker_type, bool invoke_dummy_self_rpc_reque
rpc_sfd = rpc_server_ctx.worker_socket_fd();
}

if (invoke_dummy_self_rpc_request) {
int pipe_fd[2];
pipe(pipe_fd);

int read_fd = pipe_fd[0];
int write_fd = pipe_fd[1];

rpc_client_methods.rpc_ready = nullptr;
epoll_insert_pipe(pipe_for_read, read_fd, &ct_php_rpc_client, &rpc_client_methods);

int q[6];
int qsize = 6 * sizeof(int);
q[2] = TL_RPC_INVOKE_REQ;
for (int i = 0; i < run_once_count; i++) {
prepare_rpc_query_raw(i, q, qsize, crc32c_partial);
assert(write(write_fd, q, (size_t)qsize) == qsize);
}
if (run_once_mode_enabled) {
run_once_invoker.init(run_once_count);
}

if (http_sfd >= 0) {
Expand Down Expand Up @@ -1547,6 +1537,10 @@ void generic_event_loop(WorkerType worker_type, bool invoke_dummy_self_rpc_reque
vkprintf (1, "epoll_work(): %d out of %d connections, network buffers: %d used, %d out of %d allocated\n",
active_connections, maxconn, NB_used, NB_alloc, NB_max);
}
// Continue sending run_once messages in batches to avoid pipe buffer overflow. Do it ONLY if we don't have some php scripts running
if (run_once_invoker.enabled() && run_once_invoker.has_pending() && !php_worker_run_flag) {
run_once_invoker.invoke_run_once();
}

epoll_work(57);

Expand Down Expand Up @@ -2430,8 +2424,13 @@ bool check_server_options() {
const auto& rpc_server_ctx = vk::singleton<RpcServerContext>::get();
size_t general_workers_cnt = vk::singleton<WorkersControl>::get().get_count(WorkerType::general_worker);

if (run_once && (http_server_ctx.server_enabled() || rpc_server_ctx.server_enabled())) {
kprintf("You can't start RPC or HTTP server in run-once mode\n");
return false;
}

if (!master_flag && (http_server_ctx.server_enabled() || rpc_server_ctx.server_enabled())) {
kprintf("Server mode is not supported without workers, yo must specify -f/--workers-num <n>\n");
kprintf("Server mode is not supported without workers, you must specify -f/--workers-num <n>\n");
return false;
}

Expand Down
1 change: 1 addition & 0 deletions server/php-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern command_t command_net_write_rpc_base;
extern conn_target_t rpc_ct;

void send_rpc_query(connection *c, int op, long long id, int *q, int qsize) ubsan_supp("alignment");
void prepare_rpc_query_raw(int packet_id, int *q, int qsize, unsigned (*crc32_partial_custom)(const void *q, long len, unsigned crc32_complement));
void on_net_event(int event_status);
void create_delayed_send_query(conn_target_t *t, command_t *command, double finish_time);

Expand Down
85 changes: 85 additions & 0 deletions server/php-script-run-once-invoker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2026 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "server/php-script-run-once-invoker.h"
#include "server/php-engine.h"

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <unistd.h>

#include "common/crc32c.h"
#include "common/kprintf.h"
#include "common/pipe-utils.h"
#include "common/tl/constants/common.h"
#include "net/net-connections.h"
#include "net/net-socket.h"
#include "net/net-tcp-rpc-client.h"

// These are defined in php-engine.cpp
extern conn_type_t ct_php_rpc_client;
extern tcp_rpc_client_functions rpc_client_methods;

void PhpScriptRunOnceInvoker::init(int total_run_once_count) {
remaining_count_ = total_run_once_count;

int pipe_fd[2];
if (pipe(pipe_fd) != 0) {
kprintf("Failed to create pipe for run_once mode: %s\n", strerror(errno));
exit(1);
}

int read_fd = pipe_fd[0];
write_fd_ = pipe_fd[1];

bool ok = set_fd_nonblocking(write_fd_);
if (!ok) {
kprintf("Failed to set pipe non-blocking: %s\n", strerror(errno));
exit(1);
}

rpc_client_methods.rpc_ready = nullptr;
auto* connection = epoll_insert_pipe(pipe_for_read, read_fd, &ct_php_rpc_client, &rpc_client_methods);
if (connection == nullptr) {
kprintf("Failed to insert pipe to epoll reactor\n");
exit(1);
}
}

bool PhpScriptRunOnceInvoker::invoke_run_once(int runs_count) {
if (remaining_count_ <= 0 || write_fd_ == -1) {
return false;
}

int q[6];
int qsize = 6 * sizeof(int);
q[2] = TL_RPC_INVOKE_REQ;

int batch_size = std::min(remaining_count_, runs_count);
for (int i = 0; i < batch_size; i++) {
prepare_rpc_query_raw(next_packet_id_, q, qsize, crc32c_partial);
ssize_t written = write(write_fd_, q, static_cast<size_t>(qsize));
if (written != qsize) {
if (written == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
break;
}
// Other error or partial write - this shouldn't happen
kprintf("Failed to write to run_once pipe: %s\n", strerror(errno));
exit(1);
}
--remaining_count_;
++next_packet_id_;
}

return true;
}

bool PhpScriptRunOnceInvoker::has_pending() const {
return remaining_count_ > 0;
}

bool PhpScriptRunOnceInvoker::enabled() const {
return remaining_count_ != -1;
}
36 changes: 36 additions & 0 deletions server/php-script-run-once-invoker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2026 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include "common/mixin/not_copyable.h"
#include "common/smart_ptrs/singleton.h"

// Helper class to run PHP scripts N times via self-RPC requests through a pipe
// Avoids blocking on pipe write by writing messages in batches from within the event loop
class PhpScriptRunOnceInvoker : vk::not_copyable {
public:
static constexpr int DEFAULT_RUNS_BATCH_SIZE = 128;

// Initialize the pipe and register read end with epoll
void init(int total_run_once_count);

// Try to send a batch of run-once trigger messages to the pipe
// Returns true if there are more messages to send, false when done
bool invoke_run_once(int runs_count = DEFAULT_RUNS_BATCH_SIZE);

// Check if we still have messages pending
bool has_pending() const;

bool enabled() const;

private:
int remaining_count_{-1};
int write_fd_{-1};
int next_packet_id_{0};

PhpScriptRunOnceInvoker() = default;

friend class vk::singleton<PhpScriptRunOnceInvoker>;
};
2 changes: 2 additions & 0 deletions server/server.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ prepend(KPHP_SERVER_SOURCES ${BASE_DIR}/server/
numa-configuration.cpp
php-engine-vars.cpp
php-engine.cpp
php-script-run-once-invoker.cpp
php-lease.cpp
php-master.cpp
php-master-restart.cpp
Expand All @@ -34,6 +35,7 @@ if(COMPILER_CLANG)
allow_deprecated_declarations(${BASE_DIR}/server/json-logger.cpp)
allow_deprecated_declarations(${BASE_DIR}/server/lease-config-parser.cpp)
allow_deprecated_declarations(${BASE_DIR}/server/php-engine.cpp)
allow_deprecated_declarations(${BASE_DIR}/server/php-script-run-once-invoker.cpp)
allow_deprecated_declarations(${BASE_DIR}/server/php-master.cpp)
allow_deprecated_declarations(${BASE_DIR}/server/server-config.cpp)
endif()
Expand Down
Empty file.
15 changes: 15 additions & 0 deletions tests/python/tests/run_once_prefork/php/index.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

function suspend_($time) {
sched_yield_sleep($time);
return null;
}

function suspend($time) {
$f = fork(suspend_($time));
wait($f);
}

echo "PID: " . posix_getpid() . " - before suspend\n";
suspend(0.001);
echo "Done - after suspend\n";
76 changes: 76 additions & 0 deletions tests/python/tests/run_once_prefork/test_run_once_prefork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import pytest

from python.lib.testcase import WebServerAutoTestCase

@pytest.mark.k2_skip_suite
class TestRunOncePrefork(WebServerAutoTestCase):
@classmethod
def extra_class_setup(cls):
cls.web_server.update_options(options={"--http-port": None})

def _start_run_once_server(self, runs_count, workers_num=1, auto_start=True):
"""Create a KPHP server configured for run-once prefork mode"""
self.web_server.update_options(options={
"--once={}".format(runs_count): True,
"--workers-num": workers_num,
})
self.web_server.restart()

def test_run_once_single(self):
"""Test basic run-once prefork mode: worker must execute script N times, then restart the process and repeat"""
runs_count = 1
self._start_run_once_server(runs_count=runs_count, workers_num=1)
self.web_server.assert_stats(
{
"kphp_server.workers_general_requests_total_incoming_queries": self.cmpGe(2),
"kphp_server.server_workers_started": self.cmpGe(2)
},
timeout=5
)
self.web_server.stop()

def test_run_once_with_multiple_workers(self):
"""Test run-once with multiple workers"""
runs_count = 10
workers_num = 4
self._start_run_once_server(runs_count=runs_count, workers_num=workers_num)

self.web_server.assert_stats(
{
"kphp_server.workers_general_requests_total_incoming_queries": self.cmpGe(runs_count * workers_num),
"kphp_server.server_workers_started": self.cmpGe(workers_num)
},
timeout=5
)

self.web_server.stop()

def test_run_once_large_batch(self):
"""Test run-once with large number of runs (tests batching)"""
runs_count = 1000
self._start_run_once_server(runs_count=runs_count, workers_num=1)

self.web_server.assert_stats(
{
"kphp_server.workers_general_requests_total_incoming_queries": self.cmpGe(runs_count),
"kphp_server.server_workers_started": self.cmpGe(1)
},
timeout=5
)

self.web_server.stop()

def test_run_once_infinite(self):
"""Test run-once with large number of runs (tests batching)"""
runs_count = 2**31 - 1 # max int32
self._start_run_once_server(runs_count=runs_count, workers_num=1)

self.web_server.assert_stats(
{
"kphp_server.workers_general_requests_total_incoming_queries": self.cmpGe(200),
"kphp_server.server_workers_started": 1
},
timeout=5
)

self.web_server.stop()
Loading