Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions ghost/core/core/server/adapters/lib/redis/AdapterCacheRedis.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class AdapterCacheRedis extends BaseCacheAdapter {
this.refreshAheadFactor = config.refreshAheadFactor || 0;
this.getTimeoutMilliseconds = config.getTimeoutMilliseconds || null;
this.currentlyExecutingBackgroundRefreshes = new Set();
this.currentlyExecutingReads = new Map();
this._keyPrefix = config.keyPrefix || '';
this._prefixHashInitInFlight = null;
this.redisClient.on('error', this.handleRedisError);
Expand Down Expand Up @@ -240,9 +241,30 @@ class AdapterCacheRedis extends BaseCacheAdapter {
}
return result;
} else {
const data = await fetchData();
await this.set(key, data); // We don't use `internalKey` here because `set` handles it
return data;
if (!internalKey) {
return fetchData();
}
if (this.currentlyExecutingReads.has(internalKey)) {
return this.currentlyExecutingReads.get(internalKey);
}
const fetchPromise = fetchData();
const resultPromise = fetchPromise.catch((err) => {
Comment thread
allouis marked this conversation as resolved.
logging.error(err);
});
fetchPromise.then(async (data) => {
try {
debug('set', internalKey);
await this.cache.set(internalKey, data);
} catch (err) {
logging.error(err);
}
}).catch(() => {
Comment thread
allouis marked this conversation as resolved.
// fetchData rejection — already logged by resultPromise
}).finally(() => {
this.currentlyExecutingReads.delete(internalKey);
});
this.currentlyExecutingReads.set(internalKey, resultPromise);
return resultPromise;
}
} catch (err) {
logging.error(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,28 @@ SCENARIOS.forEach(({label, wrap}) => {
});
});

describe('concurrent cache miss coalescing', function () {
it('calls fetchData only once when multiple callers miss simultaneously', async function () {
const cache = createCache();
const fetcher = sinon.stub().resolves('shared');

const [v1, v2, v3] = await Promise.all([
cache.get('same-key', fetcher),
cache.get('same-key', fetcher),
cache.get('same-key', fetcher)
]);

assert.equal(fetcher.callCount, 1);
assert.equal(v1, 'shared');
assert.equal(v2, 'shared');
assert.equal(v3, 'shared');

const cached = await cache.get('same-key', fetcher);
assert.equal(fetcher.callCount, 1);
assert.equal(cached, 'shared');
});
});

describe('get with fetchData (error paths)', function () {
it('does not cache errors — a subsequent call retries fetchData', async function () {
const cache = createCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,145 @@ describe('Adapter Cache Redis', function () {
sinon.assert.calledOnce(logging.error);
});

describe('concurrent cache miss coalescing', function () {
it('calls fetchData only once when multiple callers miss simultaneously', async function () {
const KEY = 'concurrent-miss';
const cacheStub = createCacheStub();
cacheStub.get.resolves(null);
cacheStub.set.resolves();
const cache = new RedisCache({cache: cacheStub});

const fetchData = sinon.stub().resolves('shared value');

const [v1, v2, v3] = await Promise.all([
cache.get(KEY, fetchData),
cache.get(KEY, fetchData),
cache.get(KEY, fetchData)
]);

assert.equal(fetchData.callCount, 1);
assert.equal(v1, 'shared value');
assert.equal(v2, 'shared value');
assert.equal(v3, 'shared value');
});

it('propagates fetchData rejection to all concurrent callers', async function () {
const KEY = 'concurrent-miss-error';
const cacheStub = createCacheStub();
cacheStub.get.resolves(null);
const cache = new RedisCache({cache: cacheStub});

const fetchData = sinon.stub().rejects(new Error('upstream down'));

const [v1, v2] = await Promise.all([
cache.get(KEY, fetchData),
cache.get(KEY, fetchData)
]);

assert.equal(fetchData.callCount, 1);
assert.equal(v1, undefined);
assert.equal(v2, undefined);
sinon.assert.calledOnce(logging.error);
});

it('allows retry after a coalesced fetch rejection', async function () {
const KEY = 'concurrent-miss-retry';
let cachedValue = null;
const cacheStub = createCacheStub();
cacheStub.get.callsFake(async () => cachedValue);
cacheStub.set.callsFake(async (_key, value) => {
cachedValue = value;
});
const cache = new RedisCache({cache: cacheStub});

const fetchData = sinon.stub();
fetchData.onFirstCall().rejects(new Error('transient'));
fetchData.onSecondCall().resolves('recovered');

await Promise.all([
cache.get(KEY, fetchData),
cache.get(KEY, fetchData)
]);
assert.equal(fetchData.callCount, 1);

const value = await cache.get(KEY, fetchData);
assert.equal(fetchData.callCount, 2);
assert.equal(value, 'recovered');
});

it('does not coalesce fetches across a prefix_hash cycle (reset)', async function () {
let prefixHash = 'gen1';
const cacheStore = new Map();
const redisClient = {
on: sinon.stub(),
get: sinon.stub().callsFake(async (k) => {
if (k === 'prefix_hash') {
return prefixHash;
}
return null;
}),
set: sinon.stub().resolves('OK')
};
const cacheInstance = {
get: sinon.stub().callsFake(async k => cacheStore.get(k) ?? null),
set: sinon.stub().callsFake(async (k, v) => {
cacheStore.set(k, v);
}),
ttl: sinon.stub(),
store: {getClient: () => redisClient}
};
const cache = new RedisCache({cache: cacheInstance});

let resolveSlow;
const slowFetch = sinon.stub().returns(new Promise((resolve) => {
resolveSlow = resolve;
}));
const fastFetch = sinon.stub().resolves('post-reset');

const pA = cache.get('k', slowFetch);
await new Promise((resolve) => {
setTimeout(resolve, 0);
});
assert.equal(slowFetch.callCount, 1);

prefixHash = 'gen2';

const pB = cache.get('k', fastFetch);
await new Promise((resolve) => {
setTimeout(resolve, 0);
});
assert.equal(fastFetch.callCount, 1, 'post-reset caller must not join pre-reset in-flight fetch');

resolveSlow('pre-reset');
const [vA, vB] = await Promise.all([pA, pB]);

assert.equal(vA, 'pre-reset');
assert.equal(vB, 'post-reset');
assert.equal(cacheStore.get('gen1k'), 'pre-reset');
assert.equal(cacheStore.get('gen2k'), 'post-reset');
});

it('fetches independently for different keys', async function () {
const cacheStub = createCacheStub();
cacheStub.get.resolves(null);
cacheStub.set.resolves();
const cache = new RedisCache({cache: cacheStub});

const fetchA = sinon.stub().resolves('value-a');
const fetchB = sinon.stub().resolves('value-b');

const [v1, v2] = await Promise.all([
cache.get('key-a', fetchA),
cache.get('key-b', fetchB)
]);

assert.equal(fetchA.callCount, 1);
assert.equal(fetchB.callCount, 1);
assert.equal(v1, 'value-a');
assert.equal(v2, 'value-b');
});
});

it('returns the cached value when background refresh fails', async function () {
const KEY = 'bg-refresh-error';
let cachedValue = null;
Expand Down
Loading