From d55d94667e8b5f52b72f574cc19414765d5b34d0 Mon Sep 17 00:00:00 2001 From: Joaquim d'Souza Date: Thu, 14 May 2026 09:38:17 +0200 Subject: [PATCH 1/2] feat: lock on email, not session ID --- .../src/Services/ActionNetworkService.php | 69 +++++++++++++++++-- .../join-block/src/Services/JoinService.php | 53 +++++++++----- .../join-block/src/Services/StripeService.php | 14 ++++ packages/join-block/tests/SessionLockTest.php | 20 +++--- .../tests/SessionLockTestProcess.php | 14 ++-- 5 files changed, 133 insertions(+), 37 deletions(-) diff --git a/packages/join-block/src/Services/ActionNetworkService.php b/packages/join-block/src/Services/ActionNetworkService.php index 06bc3f2..7095093 100644 --- a/packages/join-block/src/Services/ActionNetworkService.php +++ b/packages/join-block/src/Services/ActionNetworkService.php @@ -117,6 +117,11 @@ public static function signup($data) } public static function personExists($email) + { + return self::getPerson($email) !== null; + } + + private static function getPerson($email) { global $joinBlockLog; @@ -127,7 +132,7 @@ public static function personExists($email) ]; // TODO: remove after REI debugging complete - $joinBlockLog->info("Action Network personExists request query: " . json_encode($query)); + $joinBlockLog->info("Action Network getPerson request query: " . json_encode($query)); $response = $client->request( "GET", @@ -141,17 +146,66 @@ public static function personExists($email) ); $data = json_decode($response->getBody()->getContents(), true); - return !empty($data["_embedded"]["osdi:people"]); + $people = $data["_embedded"]["osdi:people"] ?? []; + return $people[0] ?? null; } + /** + * Returns the names of tags applied to the person, or null if the person does not exist. + */ + private static function getPersonTagNames($email) + { + $person = self::getPerson($email); + if ($person === null) { + return null; + } + + $taggingsHref = $person["_links"]["osdi:taggings"]["href"] ?? null; + if (!$taggingsHref) { + return []; + } + + $client = new Client(); + $headers = ["OSDI-API-Token" => Settings::get("ACTION_NETWORK_API_KEY")]; + + $tagNames = []; + $nextHref = $taggingsHref; + while ($nextHref) { + $response = $client->request("GET", $nextHref, ["headers" => $headers]); + $body = json_decode($response->getBody()->getContents(), true); + $taggings = $body["_embedded"]["osdi:taggings"] ?? []; + foreach ($taggings as $tagging) { + $tagHref = $tagging["_links"]["osdi:tag"]["href"] ?? null; + if (!$tagHref) { + continue; + } + $tagResponse = $client->request("GET", $tagHref, ["headers" => $headers]); + $tagBody = json_decode($tagResponse->getBody()->getContents(), true); + if (!empty($tagBody["name"])) { + $tagNames[] = $tagBody["name"]; + } + } + $nextHref = $body["_links"]["next"]["href"] ?? null; + } + + return $tagNames; + } + + // Callers must hold the per-email JoinService lock — see JoinService::acquireLock(). + // The get/check/post sequence is otherwise a TOCTOU race. public static function addTag($email, $tag) { global $joinBlockLog; - if (!self::personExists($email)) { + $tagNames = self::getPersonTagNames($email); + if ($tagNames === null) { $joinBlockLog->warning("Skipping Action Network addTag('$tag') for $email: person does not exist"); return; } + if (in_array($tag, $tagNames, true)) { + $joinBlockLog->info("Skipping Action Network addTag('$tag') for $email: tag already applied"); + return; + } $client = new Client(); @@ -182,14 +236,21 @@ public static function addTag($email, $tag) ); } + // Callers must hold the per-email JoinService lock — see JoinService::acquireLock(). + // The get/check/post sequence is otherwise a TOCTOU race. public static function removeTag($email, $tag) { global $joinBlockLog; - if (!self::personExists($email)) { + $tagNames = self::getPersonTagNames($email); + if ($tagNames === null) { $joinBlockLog->warning("Skipping Action Network removeTag('$tag') for $email: person does not exist"); return; } + if (!in_array($tag, $tagNames, true)) { + $joinBlockLog->info("Skipping Action Network removeTag('$tag') for $email: tag not applied"); + return; + } $client = new Client(); diff --git a/packages/join-block/src/Services/JoinService.php b/packages/join-block/src/Services/JoinService.php index 4d3b334..aa6b946 100644 --- a/packages/join-block/src/Services/JoinService.php +++ b/packages/join-block/src/Services/JoinService.php @@ -22,51 +22,68 @@ private static function formatDob($day, $month, $year) public static function handleJoin($data) { + global $joinBlockLog; + $lockFile = null; try { - $sessionToken = $data['sessionToken'] ?? null; - $lockFile = self::lockSession($sessionToken); + $lockKey = $data['email'] ?? null; + if (!$lockKey) { + $lockKey = $data['sessionToken'] ?? null; + if ($lockKey) { + $joinBlockLog->warning("handleJoin called without email; falling back to sessionToken for lock key"); + } + } + $lockFile = self::acquireLock($lockKey); $chargebeeCustomer = self::tryHandleJoin($data); do_action('ck_join_flow_success', $data, $chargebeeCustomer); } catch (\Exception $e) { do_action('ck_join_flow_error', $data, $e); throw $e; } finally { - self::unlockSession($lockFile); + self::releaseLock($lockFile); } return $chargebeeCustomer; } /** - * This is a BLOCKING lock, which means threads will sleep - * until they can get the lock. This forces sequential execution, - * which means no race conditions. + * Acquire a BLOCKING exclusive lock keyed by an opaque string (typically + * an email address). Threads sleep until they can get the lock, forcing + * sequential execution and avoiding race conditions on per-person CRM + * mutations across the /join endpoint and webhook handlers. * * We still need to handle duplicate join requests, by * e.g. making sure the code doesn't create a subscription * if one already exists. - * + * + * NOTE: flock() is per-host. If this app is ever deployed across multiple + * PHP-FPM hosts, this lock no longer protects — a distributed lock + * (Redis, DB row lock) would be needed. + * * @return resource The file handle of the lock file */ - public static function lockSession($sessionToken) + public static function acquireLock($key) { global $joinBlockLog; - if (!$sessionToken) { - throw new \Exception("Unable to lock session: no token provided"); + if (!$key) { + throw new \Exception("Unable to acquire lock: no key provided"); } - $joinBlockLog->info("Locking session $sessionToken"); + // Normalize so emails with differing case / whitespace collide on the + // same lock, and so the resulting filename is safe for any tmp dir. + $normalizedKey = sha1(strtolower(trim((string) $key))); + + $joinBlockLog->info("Locking key $normalizedKey"); // Use WordPress get_temp_dir() as lock directory, this must be writable // otherwise many WordPress features do not work (e.g. file uploads) - $lockFilepath = get_temp_dir() . '/' . $sessionToken; + $lockFilepath = get_temp_dir() . '/join-lock-' . $normalizedKey; // Ignore fopen() error, as it is necessary for flock() // phpcs:ignore WordPress.WP.AlternativeFunctions.file_system_operations_fopen $lockFile = fopen($lockFilepath, 'w'); if (!$lockFile) { - $joinBlockLog->error("Could not use lockfile for session $sessionToken"); + $joinBlockLog->error("Could not use lockfile for key $normalizedKey"); throw new \Exception("Unable to open lock file: " . esc_html($lockFilepath)); } @@ -78,11 +95,11 @@ public static function lockSession($sessionToken) if (!$lockSuccess) { // phpcs:ignore WordPress.WP.AlternativeFunctions.file_system_operations_fclose fclose($lockFile); - $joinBlockLog->error("Could not lock session $sessionToken"); - throw new \Exception("Unable to lock session: " . esc_html($sessionToken)); + $joinBlockLog->error("Could not lock key $normalizedKey"); + throw new \Exception("Unable to acquire lock: " . esc_html($normalizedKey)); } - $joinBlockLog->info("Locked session $sessionToken"); + $joinBlockLog->info("Locked key $normalizedKey"); // Lock acquired return $lockFile; @@ -91,7 +108,7 @@ public static function lockSession($sessionToken) /** * @param resource $lockFile The file handle of the lock file */ - public static function unlockSession($lockFile) + public static function releaseLock($lockFile) { global $joinBlockLog; @@ -111,7 +128,7 @@ public static function unlockSession($lockFile) // See: https://www.man7.org/linux/man-pages/man2/flock.2.html // phpcs:ignore WordPress.WP.AlternativeFunctions.unlink_unlink @unlink($fileInfo['uri']); - $joinBlockLog->info("Unlocked session {$fileInfo['uri']}"); + $joinBlockLog->info("Unlocked {$fileInfo['uri']}"); } /** diff --git a/packages/join-block/src/Services/StripeService.php b/packages/join-block/src/Services/StripeService.php index 62f95df..d20ea49 100644 --- a/packages/join-block/src/Services/StripeService.php +++ b/packages/join-block/src/Services/StripeService.php @@ -737,6 +737,11 @@ public static function handleWebhook($event) $customerId = null; $customerLapsed = false; $lapseTrigger = null; + // Per-email lock acquired lazily when we resolve the customer's + // email. Serialises CRM mutations against the /join endpoint and + // other concurrent webhook deliveries for the same person. + // See JoinService::acquireLock(). + $lockFile = null; try { switch ($event['type']) { @@ -792,6 +797,7 @@ public static function handleWebhook($event) if (!empty($invoice['customer'])) { $email = self::getEmailForCustomer($customerId); if ($email) { + $lockFile = JoinService::acquireLock($email); $context = ['provider' => 'stripe', 'trigger' => 'invoice_payment_failed_retry_scheduled', 'event' => $event]; if (JoinService::shouldMarkMemberLapsing($email, $context)) { JoinService::toggleMemberLapsing($email, true, $context); @@ -808,6 +814,7 @@ public static function handleWebhook($event) if (!empty($invoice['customer'])) { $email = self::getEmailForCustomer($customerId); if ($email) { + $lockFile = JoinService::acquireLock($email); $context = ['provider' => 'stripe', 'trigger' => 'invoice_paid', 'event' => $event]; if (JoinService::shouldUnlapseMember($email, $context)) { JoinService::toggleMemberLapsed($email, false, null, $context); @@ -830,6 +837,8 @@ public static function handleWebhook($event) $joinBlockLog->info("Syncing updated customer details for Stripe customer {$customer['id']} ($email)"); + $lockFile = JoinService::acquireLock($email); + $personData = self::extractPersonDataFromStripeCustomer($customer); $mergeFields = self::extractMailchimpMergeFieldsFromStripeCustomer($customer); @@ -868,6 +877,7 @@ public static function handleWebhook($event) $email = self::getEmailForCustomer($cid); if ($email) { + $lockFile = JoinService::acquireLock($email); $activeStatuses = ['active', 'trialing']; $lapsedStatuses = ['unpaid', 'incomplete_expired']; $lapsingStatuses = ['past_due']; @@ -906,6 +916,7 @@ public static function handleWebhook($event) $joinBlockLog->warning("Tier change detected but could not resolve email for customer {$subscription['customer']}"); break; } + $lockFile = $lockFile ?: JoinService::acquireLock($email); ['previousPriceId' => $previousPriceId, 'currentPriceId' => $currentPriceId] = $priceChange; $newPlan = Settings::getMembershipPlanByPriceId($currentPriceId); @@ -968,6 +979,7 @@ public static function handleWebhook($event) if ($customerLapsed) { $email = self::getEmailForCustomer($customerId); if ($email) { + $lockFile = $lockFile ?: JoinService::acquireLock($email); $context = ['provider' => 'stripe', 'trigger' => $lapseTrigger, 'event' => $event]; if (JoinService::shouldLapseMember($email, $context)) { JoinService::toggleMemberLapsed($email, true, null, $context); @@ -977,6 +989,8 @@ public static function handleWebhook($event) } catch (\Exception $e) { $c = $customerId ?: "(unknown)"; $joinBlockLog->error("Error handling Stripe webhook for customer $c: " . $e->getMessage()); + } finally { + JoinService::releaseLock($lockFile); } } diff --git a/packages/join-block/tests/SessionLockTest.php b/packages/join-block/tests/SessionLockTest.php index 34ee583..8cf21a8 100644 --- a/packages/join-block/tests/SessionLockTest.php +++ b/packages/join-block/tests/SessionLockTest.php @@ -10,39 +10,43 @@ class SessionLockTest extends TestCase public int $unsafeCounter = 0; /** - * Test the session lock works. This is tested by making sure that + * Test the lock works. This is tested by making sure that * processes that use the lock do not run in parallel. */ - public function testSessionLock(): void + public function testLockSerializesByKey(): void { $scriptPath = __DIR__ . "/SessionLockTestProcess.php"; - $sessionId = microtime(true); + // Use an email-shaped key to cover the typical webhook/join lock-key form. + $lockKey = "test+" . microtime(true) . "@example.com"; $logFile = __DIR__ . "/../logs/tests.log"; // Ensure clean log file output // phpcs:ignore WordPress.WP.AlternativeFunctions.unlink_unlink @unlink($logFile); + $shellKey = escapeshellarg($lockKey); + // Start two processes in parallel // Each script takes 1 second to complete. Therefore, if // they run in parallel, they should both complete in a little // over 1 second. However, if they run in series, they will not // both be complete until after 2 seconds. - exec("php $scriptPath $sessionId > /dev/null 2>&1 &"); - exec("php $scriptPath $sessionId > /dev/null 2>&1 &"); + exec("php $scriptPath $shellKey > /dev/null 2>&1 &"); + exec("php $scriptPath $shellKey > /dev/null 2>&1 &"); sleep(3); // phpcs:ignore WordPress.WP.AlternativeFunctions.file_get_contents_file_get_contents $logs = file_get_contents($logFile); - # The two processes use the same session lock, so they must not overlap. + # The two processes use the same lock key, so they must not overlap. # If they ran in parallel the log would read WORKING -> WORKING -> DONE -> DONE. # Sequential execution instead reads WORKING -> DONE -> WORKING -> DONE # (the second process can only start WORKING once the first has released the lock). - # The session id is included on each line to scope the match to this test run. + # The lock key is included on each line to scope the match to this test run. # We don't assert on the "Unlocked" line here: it is logged after flock() releases # the lock, so the next process can legitimately log "WORKING" before it appears. + $quotedKey = preg_quote($lockKey, '#'); $matched = preg_match( - "#WORKING $sessionId.*DONE $sessionId.*WORKING $sessionId.*DONE $sessionId#s", + "#WORKING $quotedKey.*DONE $quotedKey.*WORKING $quotedKey.*DONE $quotedKey#s", $logs ); $this->assertTrue((bool) $matched, "Should have expected sequence of logs:\n$logs"); diff --git a/packages/join-block/tests/SessionLockTestProcess.php b/packages/join-block/tests/SessionLockTestProcess.php index 11c23ab..04c8986 100644 --- a/packages/join-block/tests/SessionLockTestProcess.php +++ b/packages/join-block/tests/SessionLockTestProcess.php @@ -2,7 +2,7 @@ require_once(__DIR__ . "/../vendor/autoload.php"); -// Load WordPress functions required by JoinService::lockSession() +// Load WordPress functions required by JoinService::acquireLock() define( 'ABSPATH', __DIR__ . '/../wordpress/' ); define( 'WPINC', 'wp-includes' ); require_once(__DIR__ . "/../wordpress/wp-includes/formatting.php"); @@ -28,15 +28,15 @@ $joinBlockLogFile = fopen($joinBlockLogLocation, 'a'); $joinBlockLog->pushHandler(new StreamHandler($joinBlockLogFile, Level::Info)); -$sessionId = $argv[1]; +$lockKey = $argv[1]; -$joinBlockLog->info("Testing lock with sessionId $sessionId"); +$joinBlockLog->info("Testing lock with key $lockKey"); -$lockFile = JoinService::lockSession($sessionId); +$lockFile = JoinService::acquireLock($lockKey); -$joinBlockLog->info("WORKING $sessionId"); +$joinBlockLog->info("WORKING $lockKey"); // Simulate work sleep(1); -$joinBlockLog->info("DONE $sessionId"); +$joinBlockLog->info("DONE $lockKey"); -JoinService::unlockSession($lockFile); +JoinService::releaseLock($lockFile); From 24b86ce3de7cb149fc4e4b7e2087b0ee08c2b45c Mon Sep 17 00:00:00 2001 From: Joaquim d'Souza Date: Thu, 14 May 2026 11:03:46 +0200 Subject: [PATCH 2/2] feat: guard against infinite action network tag lookup --- .../src/Services/ActionNetworkService.php | 86 +++++++++++++++---- 1 file changed, 69 insertions(+), 17 deletions(-) diff --git a/packages/join-block/src/Services/ActionNetworkService.php b/packages/join-block/src/Services/ActionNetworkService.php index 7095093..7c44b56 100644 --- a/packages/join-block/src/Services/ActionNetworkService.php +++ b/packages/join-block/src/Services/ActionNetworkService.php @@ -10,6 +10,18 @@ class ActionNetworkService { + // Bounds for getPersonTagNames(). Action Network's default page size is 25, + // so 10 pages covers people with up to ~250 taggings. People with more than + // that fall back to the un-optimised path (just call the API and let + // Action Network handle the no-op). + private const MAX_TAGGING_PAGES = 10; + private const HTTP_TIMEOUT_SECONDS = 10; + + // Sentinel returned by getPersonTagNames() when enumeration was aborted + // (page cap hit, HTTP failure). Distinct from null (person not found) and + // from [] (person has zero tags). + private const TAG_ENUMERATION_ABORTED = false; + public static function signup($data) { global $joinBlockLog; @@ -151,10 +163,15 @@ private static function getPerson($email) } /** - * Returns the names of tags applied to the person, or null if the person does not exist. + * Returns the names of tags applied to the person, or null if the person + * does not exist, or self::TAG_ENUMERATION_ABORTED (false) if we hit the + * page cap or an HTTP error. Callers should treat the aborted sentinel as + * "unknown" and fall back to calling the underlying API anyway. */ private static function getPersonTagNames($email) { + global $joinBlockLog; + $person = self::getPerson($email); if ($person === null) { return null; @@ -166,26 +183,57 @@ private static function getPersonTagNames($email) } $client = new Client(); - $headers = ["OSDI-API-Token" => Settings::get("ACTION_NETWORK_API_KEY")]; + $requestOptions = [ + "headers" => ["OSDI-API-Token" => Settings::get("ACTION_NETWORK_API_KEY")], + "timeout" => self::HTTP_TIMEOUT_SECONDS, + "connect_timeout" => self::HTTP_TIMEOUT_SECONDS, + ]; $tagNames = []; $nextHref = $taggingsHref; - while ($nextHref) { - $response = $client->request("GET", $nextHref, ["headers" => $headers]); - $body = json_decode($response->getBody()->getContents(), true); - $taggings = $body["_embedded"]["osdi:taggings"] ?? []; - foreach ($taggings as $tagging) { - $tagHref = $tagging["_links"]["osdi:tag"]["href"] ?? null; - if (!$tagHref) { - continue; + $page = 0; + try { + while ($nextHref) { + if ($page >= self::MAX_TAGGING_PAGES) { + $joinBlockLog->warning( + "Action Network getPersonTagNames($email): aborting enumeration after " . + self::MAX_TAGGING_PAGES . " pages — falling back to un-optimised path" + ); + return self::TAG_ENUMERATION_ABORTED; + } + $page++; + + $response = $client->request("GET", $nextHref, $requestOptions); + $body = json_decode($response->getBody()->getContents(), true); + $taggings = $body["_embedded"]["osdi:taggings"] ?? []; + foreach ($taggings as $tagging) { + $tagHref = $tagging["_links"]["osdi:tag"]["href"] ?? null; + if (!$tagHref) { + continue; + } + $tagResponse = $client->request("GET", $tagHref, $requestOptions); + $tagBody = json_decode($tagResponse->getBody()->getContents(), true); + if (!empty($tagBody["name"])) { + $tagNames[] = $tagBody["name"]; + } } - $tagResponse = $client->request("GET", $tagHref, ["headers" => $headers]); - $tagBody = json_decode($tagResponse->getBody()->getContents(), true); - if (!empty($tagBody["name"])) { - $tagNames[] = $tagBody["name"]; + $candidateNext = $body["_links"]["next"]["href"] ?? null; + // Defensive: Action Network shouldn't return a cyclic next link, + // but guard against it explicitly rather than spinning. + if ($candidateNext === $nextHref) { + $joinBlockLog->warning( + "Action Network getPersonTagNames($email): next link did not advance — aborting enumeration" + ); + return self::TAG_ENUMERATION_ABORTED; } + $nextHref = $candidateNext; } - $nextHref = $body["_links"]["next"]["href"] ?? null; + } catch (\Exception $e) { + $joinBlockLog->warning( + "Action Network getPersonTagNames($email): enumeration failed (" . $e->getMessage() . + ") — falling back to un-optimised path" + ); + return self::TAG_ENUMERATION_ABORTED; } return $tagNames; @@ -202,10 +250,12 @@ public static function addTag($email, $tag) $joinBlockLog->warning("Skipping Action Network addTag('$tag') for $email: person does not exist"); return; } - if (in_array($tag, $tagNames, true)) { + if (is_array($tagNames) && in_array($tag, $tagNames, true)) { $joinBlockLog->info("Skipping Action Network addTag('$tag') for $email: tag already applied"); return; } + // $tagNames === TAG_ENUMERATION_ABORTED falls through and we apply the + // tag anyway — Action Network treats a repeat add as a no-op. $client = new Client(); @@ -247,10 +297,12 @@ public static function removeTag($email, $tag) $joinBlockLog->warning("Skipping Action Network removeTag('$tag') for $email: person does not exist"); return; } - if (!in_array($tag, $tagNames, true)) { + if (is_array($tagNames) && !in_array($tag, $tagNames, true)) { $joinBlockLog->info("Skipping Action Network removeTag('$tag') for $email: tag not applied"); return; } + // $tagNames === TAG_ENUMERATION_ABORTED falls through and we send the + // remove anyway — Action Network treats removing an absent tag as a no-op. $client = new Client();