Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 166 additions & 155 deletions package-lock.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cap-js-community/event-queue",
"version": "1.11.0-beta.5",
"version": "1.11.0",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
"main": "src/index.js",
"types": "src/index.d.ts",
Expand Down Expand Up @@ -47,24 +47,24 @@
"node": ">=18"
},
"dependencies": {
"@sap/xssec": "^4.6.0",
"cron-parser": "^5.3.1",
"@sap/xssec": "^4.10.0",
"cron-parser": "^5.4.0",
"redis": "^4.7.0",
"verror": "^1.10.1",
"yaml": "^2.7.1"
},
"devDependencies": {
"@cap-js/cds-test": "^0.4.0",
"@cap-js/hana": "^2.2.0",
"@cap-js/sqlite": "^2.0.1",
"@sap/cds": "^9.3.1",
"@sap/cds-dk": "^9.3.1",
"@cap-js/hana": "^2.3.3",
"@cap-js/sqlite": "^2.0.3",
"@sap/cds": "^9.4.1",
"@sap/cds-dk": "^9.4.1",
"eslint": "^8.57.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-jest": "^28.6.0",
"eslint-plugin-node": "^11.1.0",
"express": "^4.21.2",
"hdb": "^2.25.1",
"hdb": "^2.26.1",
"jest": "^29.7.0",
"prettier": "^2.8.8",
"sqlite3": "^5.1.7",
Expand Down
2 changes: 1 addition & 1 deletion src/redis/redisPub.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const broadcastEvent = async (tenantId, events, forceBroadcast = false) => {
for (let i = 0; i < TRIES_FOR_PUBLISH_PERIODIC_EVENT; i++) {
const result = eventConfig.multiInstanceProcessing
? false
: await distributedLock.checkLockExistsAndReturnValue(context, [type, subType].join("##"));
: await distributedLock.checkLockExists(context, [type, subType].join("##"));
if (result) {
logger.debug("skip publish redis event as no lock is available", {
type,
Expand Down
6 changes: 3 additions & 3 deletions src/runner/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ const _acquireRunId = async (context) => {
overrideValue: true,
});
} else {
runId = await distributedLock.checkLockExistsAndReturnValue(context, EVENT_QUEUE_RUN_ID, {
runId = await distributedLock.getValue(context, EVENT_QUEUE_RUN_ID, {
tenantScoped: false,
});
}
Expand All @@ -422,7 +422,7 @@ const _calculateOffsetForFirstRun = async () => {
try {
await trace(dummyContext, "calculateOffsetForFirstRun", async () => {
if (eventQueueConfig.redisEnabled) {
let lastRunTs = await distributedLock.checkLockExistsAndReturnValue(dummyContext, EVENT_QUEUE_RUN_TS, {
let lastRunTs = await distributedLock.getValue(dummyContext, EVENT_QUEUE_RUN_TS, {
tenantScoped: false,
});
if (!lastRunTs) {
Expand All @@ -434,7 +434,7 @@ const _calculateOffsetForFirstRun = async () => {
if (couldSetValue) {
lastRunTs = ts;
} else {
lastRunTs = await distributedLock.checkLockExistsAndReturnValue(dummyContext, EVENT_QUEUE_RUN_TS, {
lastRunTs = await distributedLock.getValue(dummyContext, EVENT_QUEUE_RUN_TS, {
tenantScoped: false,
});
}
Expand Down
31 changes: 0 additions & 31 deletions src/shared/cdsHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,38 +165,7 @@ const getAllTenantIds = async () => {
}, []);
};

const TENANT_COLUMNS = ["subscribedSubdomain", "createdAt", "modifiedAt"];

const getAllTenantWithMetadata = async () => {
const response = await _getAllTenantBase();
if (!response) {
return null;
}

return response.reduce(async (result, row) => {
const tenantId = row.subscribedTenantId ?? row.tenant;
result = await result;
if (await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId)) {
const data = Object.entries(row).reduce(
(result, [key, value]) => {
if (TENANT_COLUMNS.includes(key)) {
result[key] = value;
} else {
result.metadata[key] = value;
}
return result;
},
{ metadata: {} }
);
data.metadata = JSON.stringify(data.metadata);
result.push(data);
}
return result;
}, []);
};

module.exports = {
executeInNewTransaction,
getAllTenantIds,
getAllTenantWithMetadata,
};
2 changes: 1 addition & 1 deletion src/shared/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const hashStringTo32Bit = (value) => crypto.createHash("sha256").update(String(v
const _getNewAuthContext = async (tenantId) => {
try {
if (!_getNewAuthContext._xsuaaService) {
_getNewAuthContext._xsuaaService = new xssec.XsuaaService(cds.requires.auth.credentials);
_getNewAuthContext._xsuaaService = new xssec.XsuaaService(cds.requires["xsuaa-eventQueue"]?.credentials);
}
const authService = _getNewAuthContext._xsuaaService;
const token = await authService.fetchClientCredentialsToken({ zid: tenantId });
Expand Down
24 changes: 17 additions & 7 deletions src/shared/distributedLock.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,21 @@ const releaseLock = async (context, key, { tenantScoped = true } = {}) => {
}
};

const checkLockExistsAndReturnValue = async (context, key, { tenantScoped = true } = {}) => {
const checkLockExists = async (context, key, { tenantScoped = true } = {}) => {
const fullKey = _generateKey(context, tenantScoped, key);
if (config.redisEnabled) {
return await _checkLockExistsRedis(context, fullKey);
return !!(await _getLockValueRedis(context, fullKey));
} else {
return await _checkLockExistsDb(context, fullKey);
return !!(await _getLockValueDb(context, fullKey));
}
};

const getValue = async (context, key, { tenantScoped = true } = {}) => {
const fullKey = _generateKey(context, tenantScoped, key);
if (config.redisEnabled) {
return await _getLockValueRedis(context, fullKey);
} else {
return await _getLockValueDb(context, fullKey);
}
};

Expand Down Expand Up @@ -106,12 +115,12 @@ const _renewLockRedis = async (context, fullKey, expiryTime, { value = "true" }
return result === REDIS_COMMAND_OK;
};

const _checkLockExistsRedis = async (context, fullKey) => {
const _getLockValueRedis = async (context, fullKey) => {
const client = await redis.createMainClientAndConnect(config.redisOptions);
return await client.exists(fullKey);
return await client.get(fullKey);
};

const _checkLockExistsDb = async (context, fullKey) => {
const _getLockValueDb = async (context, fullKey) => {
let result;
await cdsHelper.executeInNewTransaction(context, "distributedLock-checkExists", async (tx) => {
result = await tx.run(SELECT.one.from(config.tableNameEventLock).where("code =", fullKey));
Expand Down Expand Up @@ -259,7 +268,8 @@ const shutdownHandler = async () => {
module.exports = {
acquireLock,
releaseLock,
checkLockExistsAndReturnValue,
checkLockExists,
getValue,
setValueWithExpire,
shutdownHandler,
renewLock,
Expand Down
8 changes: 0 additions & 8 deletions srv/service/admin-service.cds
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,4 @@ service EventQueueAdminService {
@mandatory
subType: String) returns Boolean;
}

@readonly
@cds.persistence.skip
entity Tenant {
Key ID: String;
subdomain: String;
metadata: String;
}
}
11 changes: 1 addition & 10 deletions srv/service/admin-service.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"use strict";

const cds = require("@sap/cds");
const cdsHelper = require("../../src/shared/cdsHelper");
const { EventProcessingStatus } = require("../../src");
const config = require("../../src/config");
const distributedLock = require("../../src/shared/distributedLock");
const redisPub = require("../../src/redis/redisPub");

module.exports = class AdminService extends cds.ApplicationService {
async init() {
const { Event: EventService, Tenant, Lock: LockService } = this.entities();
const { Event: EventService, Lock: LockService } = this.entities();
const { Event: EventDb } = cds.db.entities("sap.eventqueue");
const { landscape, space } = this.getLandscapeAndSpace();

Expand All @@ -18,9 +17,6 @@ module.exports = class AdminService extends cds.ApplicationService {
req.reject(403, "Admin service is disabled by configuration");
}

if (req.target.name === Tenant.name) {
return;
}
const headers = Object.assign({}, req.headers, req.req?.headers);
const tenant = headers["z-id"] ?? req.data.tenant;

Expand Down Expand Up @@ -61,11 +57,6 @@ module.exports = class AdminService extends cds.ApplicationService {
}));
});

this.on("READ", Tenant, async () => {
const tenants = await cdsHelper.getAllTenantWithMetadata();
return tenants ?? [];
});

this.on("setStatusAndAttempts", async (req) => {
const tenant = req.headers["z-id"];
cds.log("eventQueue").info("Restarting processing for event queue");
Expand Down
8 changes: 4 additions & 4 deletions test-integration/runner.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,12 @@ describe("runner", () => {
});

it("acquireRunId should set ts", async () => {
let runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
let runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
tenantScoped: false,
});
expect(runTs).toBeFalsy();
await runner.__._acquireRunId();
runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
tenantScoped: false,
});
expect(runTs).toBeDefined();
Expand All @@ -512,7 +512,7 @@ describe("runner", () => {
jest.useFakeTimers();
const systemTime = Date.now();
jest.setSystemTime(systemTime);
const runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
const runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
tenantScoped: false,
});
const expectedTs = new Date(runTs).getTime() + configInstance.runInterval - systemTime;
Expand All @@ -528,7 +528,7 @@ describe("runner", () => {
jest.useFakeTimers();
const systemTime = Date.now();
jest.setSystemTime(systemTime);
const runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
const runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
tenantScoped: false,
});
const expectedTs = new Date(runTs).getTime() + configInstance.runInterval - systemTime;
Expand Down
33 changes: 0 additions & 33 deletions test/__snapshots__/admin-service.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ exports[`admin-service-test metadata snapshot 1`] = `
<EntityContainer Name="EntityContainer">
<EntitySet Name="Event" EntityType="EventQueueAdminService.Event"/>
<EntitySet Name="Lock" EntityType="EventQueueAdminService.Lock"/>
<EntitySet Name="Tenant" EntityType="EventQueueAdminService.Tenant"/>
</EntityContainer>
<EntityType Name="Event">
<Key>
Expand Down Expand Up @@ -65,14 +64,6 @@ exports[`admin-service-test metadata snapshot 1`] = `
<Property Name="ttl" Type="Edm.Int32"/>
<Property Name="createdAt" Type="Edm.Int32"/>
</EntityType>
<EntityType Name="Tenant">
<Key>
<PropertyRef Name="ID"/>
</Key>
<Property Name="ID" Type="Edm.String" Nullable="false"/>
<Property Name="subdomain" Type="Edm.String"/>
<Property Name="metadata" Type="Edm.String"/>
</EntityType>
<Action Name="setStatusAndAttempts" IsBound="true" EntitySetPath="in">
<Parameter Name="in" Type="EventQueueAdminService.Event"/>
<Parameter Name="tenant" Type="Edm.String"/>
Expand Down Expand Up @@ -212,23 +203,6 @@ exports[`admin-service-test metadata snapshot 1`] = `
<Annotations Target="EventQueueAdminService.releaseLock(EventQueueAdminService.Lock)/subType">
<Annotation Term="Common.FieldControl" EnumMember="Common.FieldControlType/Mandatory"/>
</Annotations>
<Annotations Target="EventQueueAdminService.EntityContainer/Tenant">
<Annotation Term="Capabilities.DeleteRestrictions">
<Record Type="Capabilities.DeleteRestrictionsType">
<PropertyValue Property="Deletable" Bool="false"/>
</Record>
</Annotation>
<Annotation Term="Capabilities.InsertRestrictions">
<Record Type="Capabilities.InsertRestrictionsType">
<PropertyValue Property="Insertable" Bool="false"/>
</Record>
</Annotation>
<Annotation Term="Capabilities.UpdateRestrictions">
<Record Type="Capabilities.UpdateRestrictionsType">
<PropertyValue Property="Updatable" Bool="false"/>
</Record>
</Annotation>
</Annotations>
</Schema>
</edmx:DataServices>
</edmx:Edmx>"
Expand All @@ -249,10 +223,3 @@ exports[`admin-service-test read entities: Lock 1`] = `
"value": [],
}
`;

exports[`admin-service-test read entities: Tenant 1`] = `
{
"@odata.context": "$metadata#Tenant",
"value": [],
}
`;
2 changes: 1 addition & 1 deletion test/redisPubSub.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const setTimeoutSpy = jest.spyOn(global, "setTimeout").mockImplementation((first
});

const distributedLock = require("../src/shared/distributedLock");
const checkLockExistsSpy = jest.spyOn(distributedLock, "checkLockExistsAndReturnValue");
const checkLockExistsSpy = jest.spyOn(distributedLock, "checkLockExists");
const config = require("../src/config");
const redisPub = require("../src/redis/redisPub");
const redisSub = require("../src/redis/redisSub");
Expand Down