Skip to content

Commit 89fc762

Browse files
authored
feat: Sockets Preview (#231)
* Sockets WIP * Sockets WIP * Sockets WIP: fix dateParser * Sockets WIP: fix react loading Fixes #221
1 parent aa736b1 commit 89fc762

30 files changed

+22422
-18368
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
class LocalSubscriptionStore {
2+
constructor(options) {
3+
options = options || {};
4+
this.connections = {};
5+
this.hearBeatInterval = options.heartBeatInterval || 60;
6+
}
7+
8+
async getSubscription(connectionId, subscriptionId) {
9+
const connection = this.getConnection(connectionId);
10+
return connection.subscriptions[subscriptionId];
11+
}
12+
13+
async subscribe(connectionId, subscriptionId, subscription) {
14+
const connection = this.getConnection(connectionId);
15+
connection.subscriptions[subscriptionId] = {
16+
...subscription,
17+
timestamp: new Date()
18+
};
19+
}
20+
21+
async unsubscribe(connectionId, subscriptionId) {
22+
const connection = this.getConnection(connectionId);
23+
delete connection.subscriptions[subscriptionId];
24+
}
25+
26+
async getAllSubscriptions() {
27+
return Object.keys(this.connections).map(connectionId => {
28+
Object.keys(this.connections[connectionId].subscriptions).filter(
29+
subscriptionId => new Date().getTime() -
30+
this.connections[connectionId].subscriptions[subscriptionId].timestamp.getTime() >
31+
this.hearBeatInterval * 4 * 1000
32+
).forEach(subscriptionId => { delete this.connections[connectionId].subscriptions[subscriptionId]; });
33+
34+
return Object.keys(this.connections[connectionId].subscriptions)
35+
.map(subscriptionId => ({
36+
connectionId,
37+
...this.connections[connectionId].subscriptions[subscriptionId]
38+
}));
39+
}).reduce((a, b) => a.concat(b), []);
40+
}
41+
42+
async cleanupSubscriptions(connectionId) {
43+
delete this.connections[connectionId];
44+
}
45+
46+
async getAuthContext(connectionId) {
47+
return this.getConnection(connectionId).authContext;
48+
}
49+
50+
async setAuthContext(connectionId, authContext) {
51+
this.getConnection(connectionId).authContext = authContext;
52+
}
53+
54+
getConnection(connectionId) {
55+
if (!this.connections[connectionId]) {
56+
this.connections[connectionId] = { subscriptions: {} };
57+
}
58+
return this.connections[connectionId];
59+
}
60+
}
61+
62+
module.exports = LocalSubscriptionStore;
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
const UserError = require('./UserError');
2+
3+
const methodParams = {
4+
load: ['query'],
5+
sql: ['query'],
6+
meta: [],
7+
subscribe: ['query'],
8+
unsubscribe: []
9+
};
10+
11+
class SubscriptionServer {
12+
constructor(apiGateway, sendMessage, subscriptionStore) {
13+
this.apiGateway = apiGateway;
14+
this.sendMessage = sendMessage;
15+
this.subscriptionStore = subscriptionStore;
16+
}
17+
18+
resultFn(connectionId, messageId) {
19+
return (message, { status } = {}) => this.sendMessage(connectionId, { messageId, message, status: status || 200 });
20+
}
21+
22+
async processMessage(connectionId, message, isSubscription) {
23+
let context = {};
24+
try {
25+
if (typeof message === 'string') {
26+
message = JSON.parse(message);
27+
}
28+
if (message.authorization) {
29+
const newContext = {};
30+
await this.apiGateway.checkAuthFn(newContext, message.authorization);
31+
await this.subscriptionStore.setAuthContext(connectionId, newContext);
32+
this.sendMessage(connectionId, { handshake: true });
33+
return;
34+
}
35+
36+
if (message.unsubscribe) {
37+
await this.subscriptionStore.unsubscribe(connectionId, message.unsubscribe);
38+
return;
39+
}
40+
41+
if (!message.messageId) {
42+
throw new UserError(`messageId is required`);
43+
}
44+
45+
context = await this.subscriptionStore.getAuthContext(connectionId);
46+
47+
if (!context) {
48+
await this.sendMessage(
49+
connectionId,
50+
{
51+
messageId: message.messageId,
52+
message: { error: 'Not authorized' },
53+
status: 403
54+
}
55+
);
56+
return;
57+
}
58+
59+
if (!methodParams[message.method]) {
60+
throw new UserError(`Unsupported method: ${message.method}`);
61+
}
62+
63+
const allowedParams = methodParams[message.method];
64+
const params = allowedParams.map(k => ({ [k]: (message.params || {})[k] }))
65+
.reduce((a, b) => ({ ...a, ...b }), {});
66+
await this.apiGateway[message.method]({
67+
...params,
68+
context,
69+
isSubscription,
70+
res: this.resultFn(connectionId, message.messageId),
71+
subscriptionState: async () => {
72+
const subscription = await this.subscriptionStore.getSubscription(connectionId, message.messageId);
73+
return subscription && subscription.state;
74+
},
75+
subscribe: async (state) => this.subscriptionStore.subscribe(connectionId, message.messageId, {
76+
message,
77+
state
78+
}),
79+
unsubscribe: async () => this.subscriptionStore.unsubscribe(connectionId, message.messageId)
80+
});
81+
await this.sendMessage(connectionId, { messageProcessedId: message.messageId });
82+
} catch (e) {
83+
this.apiGateway.handleError({
84+
e,
85+
query: message.query,
86+
res: this.resultFn(connectionId, message.messageId),
87+
context
88+
});
89+
}
90+
}
91+
92+
async processSubscriptions() {
93+
const allSubscriptions = await this.subscriptionStore.getAllSubscriptions();
94+
await Promise.all(allSubscriptions.map(async subscription => {
95+
await this.processMessage(subscription.connectionId, subscription.message, true);
96+
}));
97+
}
98+
99+
async disconnect(connectionId) {
100+
await this.subscriptionStore.cleanupSubscriptions(connectionId);
101+
}
102+
}
103+
104+
module.exports = SubscriptionServer;

packages/cubejs-api-gateway/dateParser.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ const UserError = require('./UserError');
55
module.exports = (dateString) => {
66
let momentRange;
77
dateString = dateString.toLowerCase();
8-
if (dateString.match(/(this|last)\s+(day|week|month|year|quarter)/)) {
9-
const match = dateString.match(/(this|last)\s+(day|week|month|year|quarter)/);
8+
if (dateString.match(/(this|last)\s+(day|week|month|year|quarter|hour|minute|second)/)) {
9+
const match = dateString.match(/(this|last)\s+(day|week|month|year|quarter|hour|minute|second)/);
1010
let start = moment();
1111
let end = moment();
1212
if (match[1] === 'last') {
@@ -15,18 +15,18 @@ module.exports = (dateString) => {
1515
}
1616
const span = match[2] === 'week' ? 'isoWeek' : match[2];
1717
momentRange = [start.startOf(span), end.endOf(span)];
18-
} else if (dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter)/)) {
19-
const match = dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter)/);
18+
} else if (dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter|hour|minute|second)/)) {
19+
const match = dateString.match(/last\s+(\d+)\s+(day|week|month|year|quarter|hour|minute|second)/);
2020
const span = match[2] === 'week' ? 'isoWeek' : match[2];
2121
momentRange = [
2222
moment().add(-parseInt(match[1], 10) - 1, match[2]).startOf(span),
2323
moment().add(-1, match[2]).endOf(span)
2424
];
2525
} else if (dateString.match(/today/)) {
26-
momentRange = [moment(), moment()];
26+
momentRange = [moment().startOf('day'), moment().endOf('day')];
2727
} else if (dateString.match(/yesterday/)) {
2828
const yesterday = moment().add(-1, 'day');
29-
momentRange = [yesterday, yesterday];
29+
momentRange = [yesterday.startOf('day'), yesterday.endOf('day')];
3030
} else {
3131
const results = chrono.parse(dateString);
3232
if (!results) {
@@ -40,5 +40,5 @@ module.exports = (dateString) => {
4040
results[0].start.moment()
4141
];
4242
}
43-
return momentRange.map(d => d.format(moment.HTML5_FMT.DATE));
43+
return momentRange.map(d => d.format(moment.HTML5_FMT.DATETIME_LOCAL_MS));
4444
};

0 commit comments

Comments
 (0)