Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean device feature state in small chunks to prevent Gladys from being stuck #1610

Merged
merged 3 commits into from Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion front/src/config/i18n/en.json
Expand Up @@ -1540,7 +1540,8 @@
"monthly-device-state-aggregate": "Monthly sensors aggregation",
"daily-device-state-aggregate": "Daily sensors aggregation",
"hourly-device-state-aggregate": "Hourly sensors aggregation",
"gladys-gateway-backup": "Gladys Plus backup"
"gladys-gateway-backup": "Gladys Plus backup",
"device-state-purge-single-feature": "Single device feature states clean"
},
"jobErrors": {
"purged-when-restarted": "Gladys Assistant restarted while this job was still running, so it was purged. It doesn't mean the job has failed, it's a normal behavior."
Expand Down
3 changes: 2 additions & 1 deletion front/src/config/i18n/fr.json
Expand Up @@ -1540,7 +1540,8 @@
"monthly-device-state-aggregate": "Aggrégation donnée capteur mensuelle",
"daily-device-state-aggregate": "Aggrégation donnée capteur journalière",
"hourly-device-state-aggregate": "Aggrégation donnée capteur horaire",
"gladys-gateway-backup": "Sauvegarde Gladys Plus"
"gladys-gateway-backup": "Sauvegarde Gladys Plus",
"device-state-purge-single-feature": "Nettoyage des états d'un appareil"
},
"jobErrors": {
"purged-when-restarted": "Gladys Assistant a redémarré alors que cette tâche était en cours. Cela ne veut pas dire que cette tâche a échouée, c'est un comportement normal."
Expand Down
15 changes: 5 additions & 10 deletions server/lib/device/device.create.js
Expand Up @@ -2,7 +2,6 @@ const Promise = require('bluebird');
const { BadParameters } = require('../../utils/coreErrors');
const db = require('../../models');
const { EVENTS } = require('../../utils/constants');
const logger = require('../../utils/logger');

const getByExternalId = async (externalId) => {
return db.Device.findOne({
Expand Down Expand Up @@ -154,17 +153,13 @@ async function create(device) {
});

// We purge states of all device features that were marked as "keep_history = false"
await Promise.each(deviceFeaturesIdsToPurge, async (deviceFeaturesIdToPurge) => {
await this.purgeStatesByFeatureId(deviceFeaturesIdToPurge);
// We do this asynchronously with an event, so it doesn't block the current request
// Also, the function called will delete as slowly as possible the event
// To make sure that Gladys is not locked during this time
deviceFeaturesIdsToPurge.forEach((deviceFeatureIdToPurge) => {
this.eventManager.emit(EVENTS.DEVICE.PURGE_STATES_SINGLE_FEATURE, deviceFeatureIdToPurge);
});

if (deviceFeaturesIdsToPurge.length > 0) {
// If we don't run a VACUUM, the database file size will stay the same
// Read: https://www.sqlite.org/lang_vacuum.html
logger.info('Running VACUUM command to free up space.');
await db.sequelize.query('VACUUM;');
}

// we get the whole device from the DB to avoid
// having a partial final object
const newDevice = (await getByExternalId(device.external_id)).get({ plain: true });
Expand Down
95 changes: 89 additions & 6 deletions server/lib/device/device.purgeStatesByFeatureId.js
@@ -1,22 +1,105 @@
const { QueryTypes } = require('sequelize');
const Promise = require('bluebird');
const db = require('../../models');
const logger = require('../../utils/logger');

/**
* @description Purge device states of a specific feature
* @param {string} deviceFeatureId - Id of a device feature.
* @param {string} jobId - Id of the job.
* @returns {Promise} Resolve when finished.
* @example
* device.purgeStatesByFeatureId('d47b481b-a7be-4224-9850-313cdb8a4065');
*/
async function purgeStatesByFeatureId(deviceFeatureId) {
async function purgeStatesByFeatureId(deviceFeatureId, jobId) {
logger.info(`Purging states of device feature ${deviceFeatureId}`);
const queryInterface = db.sequelize.getQueryInterface();
await queryInterface.bulkDelete('t_device_feature_state', {
device_feature_id: deviceFeatureId,

const numberOfDeviceFeatureStateToDelete = await db.DeviceFeatureState.count({
where: {
device_feature_id: deviceFeatureId,
},
});

const numberOfDeviceFeatureStateAggregateToDelete = await db.DeviceFeatureStateAggregate.count({
where: {
device_feature_id: deviceFeatureId,
},
});
await queryInterface.bulkDelete('t_device_feature_state_aggregate', {
device_feature_id: deviceFeatureId,

logger.info(
`Purging "${deviceFeatureId}": ${numberOfDeviceFeatureStateToDelete} states & ${numberOfDeviceFeatureStateAggregateToDelete} aggregates to delete.`,
);

const numberOfIterationsStates = Math.ceil(
numberOfDeviceFeatureStateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
);
const iterator = [...Array(numberOfIterationsStates)];

const numberOfIterationsStatesAggregates = Math.ceil(
numberOfDeviceFeatureStateAggregateToDelete / this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
);
const iteratorAggregates = [...Array(numberOfIterationsStatesAggregates)];

const total = numberOfIterationsStates + numberOfIterationsStatesAggregates;
let currentBatch = 0;
let currentProgressPercent = 0;

// We only save progress to DB if it changed
// Because saving progress is expensive (DB write + Websocket call)
const updateProgressIfNeeded = async () => {
currentBatch += 1;
const newProgressPercent = Math.round((currentBatch * 100) / total);
if (currentProgressPercent !== newProgressPercent) {
currentProgressPercent = newProgressPercent;
await this.job.updateProgress(jobId, currentProgressPercent);
}
};

await Promise.each(iterator, async () => {
await db.sequelize.query(
`
DELETE FROM t_device_feature_state WHERE id IN (
SELECT id FROM t_device_feature_state
WHERE device_feature_id = :id
LIMIT :limit
);
`,
{
replacements: {
id: deviceFeatureId,
limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
},
type: QueryTypes.SELECT,
},
);
await updateProgressIfNeeded();
await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH);
});

await Promise.each(iteratorAggregates, async () => {
await db.sequelize.query(
`
DELETE FROM t_device_feature_state_aggregate WHERE id IN (
SELECT id FROM t_device_feature_state_aggregate
WHERE device_feature_id = :id
LIMIT :limit
);
`,
{
replacements: {
id: deviceFeatureId,
limit: this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH,
},
type: QueryTypes.SELECT,
},
);
await updateProgressIfNeeded();
await Promise.delay(this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH);
});
return {
numberOfDeviceFeatureStateToDelete,
numberOfDeviceFeatureStateAggregateToDelete,
};
}

module.exports = {
Expand Down
14 changes: 13 additions & 1 deletion server/lib/device/index.js
@@ -1,4 +1,4 @@
const { EVENTS } = require('../../utils/constants');
const { EVENTS, JOB_TYPES } = require('../../utils/constants');
const { eventFunctionWrapper } = require('../../utils/functionsWrapper');

// Categories of DeviceFeatures
Expand Down Expand Up @@ -49,12 +49,20 @@ const DeviceManager = function DeviceManager(
this.variable = variable;
this.job = job;

this.STATES_TO_PURGE_PER_DEVICE_FEATURE_CLEAN_BATCH = 1000;
this.WAIT_TIME_BETWEEN_DEVICE_FEATURE_CLEAN_BATCH = 100;

// initialize all types of device feature categories
this.camera = new CameraManager(this.stateManager, messageManager, eventManager, this);
this.lightManager = new LightManager(eventManager, messageManager, this);
this.temperatureSensorManager = new TemperatureSensorManager(eventManager, messageManager, this);
this.humiditySensorManager = new HumiditySensorManager(eventManager, messageManager, this);

this.purgeStatesByFeatureId = this.job.wrapper(
JOB_TYPES.DEVICE_STATES_PURGE_SINGLE_FEATURE,
this.purgeStatesByFeatureId.bind(this),
);

this.devicesByPollFrequency = {};
// listen to events
this.eventManager.on(EVENTS.DEVICE.NEW_STATE, this.newStateEvent.bind(this));
Expand All @@ -66,6 +74,10 @@ const DeviceManager = function DeviceManager(
EVENTS.DEVICE.CALCULATE_HOURLY_AGGREGATE,
eventFunctionWrapper(this.onHourlyDeviceAggregateEvent.bind(this)),
);
this.eventManager.on(
EVENTS.DEVICE.PURGE_STATES_SINGLE_FEATURE,
eventFunctionWrapper(this.purgeStatesByFeatureId.bind(this)),
);
};

DeviceManager.prototype.add = add;
Expand Down
3 changes: 2 additions & 1 deletion server/lib/job/job.wrapper.js
Expand Up @@ -14,8 +14,9 @@ function wrapper(type, func) {
let job;
try {
job = await this.start(type);
await func(...args, job.id);
const res = await func(...args, job.id);
await this.finish(job.id, JOB_STATUS.SUCCESS);
return res;
} catch (error) {
if (job) {
const data = {
Expand Down
4 changes: 3 additions & 1 deletion server/test/controllers/device/device.controller.test.js
Expand Up @@ -4,6 +4,7 @@ const EventEmitter = require('events');
const { fake } = require('sinon');
const db = require('../../../models');
const Device = require('../../../lib/device');
const Job = require('../../../lib/job');

const { authenticatedRequest } = require('../request.test');

Expand Down Expand Up @@ -67,7 +68,8 @@ describe('GET /api/v1/device_feature/aggregated_states', () => {
getValue: fake.resolves(null),
};
const event = new EventEmitter();
const device = new Device(event, {}, {}, {}, {}, variable);
const job = new Job(event);
const device = new Device(event, {}, {}, {}, {}, variable, job);
await device.calculateAggregate('hourly');
await device.calculateAggregate('daily');
await device.calculateAggregate('monthly');
Expand Down
9 changes: 6 additions & 3 deletions server/test/lib/device/camera/camera.command.test.js
Expand Up @@ -2,9 +2,12 @@ const EventEmitter = require('events');
const { assert, fake } = require('sinon');
const Device = require('../../../../lib/device');
const StateManager = require('../../../../lib/state');
const Job = require('../../../../lib/job');

const event = new EventEmitter();

const job = new Job(event);

const messageManager = {
replyByIntent: fake.resolves(true),
};
Expand All @@ -19,7 +22,7 @@ const RANDOM_IMAGE =
describe('Camera.command', () => {
it('should respond with image from camera', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, messageManager, stateManager, {});
const deviceManager = new Device(event, messageManager, stateManager, {}, {}, {}, job);
const context = {};
await deviceManager.camera.command(
message,
Expand All @@ -46,7 +49,7 @@ describe('Camera.command', () => {
});
it('should respond camera not found', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, messageManager, stateManager, {});
const deviceManager = new Device(event, messageManager, stateManager, {}, {}, {}, job);
const context = {};
await deviceManager.camera.command(
message,
Expand All @@ -73,7 +76,7 @@ describe('Camera.command', () => {
});
it('should respond camera not found', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, messageManager, stateManager, {});
const deviceManager = new Device(event, messageManager, stateManager, {}, {}, {}, job);
const context = {};
await deviceManager.camera.command(
message,
Expand Down
4 changes: 3 additions & 1 deletion server/test/lib/device/camera/camera.get.test.js
Expand Up @@ -2,13 +2,15 @@ const EventEmitter = require('events');
const { expect } = require('chai');
const Device = require('../../../../lib/device');
const StateManager = require('../../../../lib/state');
const Job = require('../../../../lib/job');

const event = new EventEmitter();
const job = new Job(event);

describe('Camera.get', () => {
it('should return list of cameras', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
const cameras = await deviceManager.camera.get();
expect(cameras).to.be.instanceOf(Array);
cameras.forEach((camera) => {
Expand Down
8 changes: 5 additions & 3 deletions server/test/lib/device/camera/camera.getImage.test.js
Expand Up @@ -2,16 +2,18 @@ const EventEmitter = require('events');
const { expect, assert } = require('chai');
const Device = require('../../../../lib/device');
const StateManager = require('../../../../lib/state');
const Job = require('../../../../lib/job');

const RANDOM_IMAGE =
'image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z/C/HgAGgwJ/lK3Q6wAAAABJRU5ErkJggg==';

const event = new EventEmitter();
const job = new Job(event);

describe('Camera.getImage', () => {
it('should return camera image', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand All @@ -28,7 +30,7 @@ describe('Camera.getImage', () => {
});
it('should return camera not found', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand All @@ -45,7 +47,7 @@ describe('Camera.getImage', () => {
});
it('should return camera not found', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera-2', {
features: [
{
Expand Down
4 changes: 3 additions & 1 deletion server/test/lib/device/camera/camera.getImageInRoom.test.js
Expand Up @@ -2,16 +2,18 @@ const EventEmitter = require('events');
const { expect } = require('chai');
const Device = require('../../../../lib/device');
const StateManager = require('../../../../lib/state');
const Job = require('../../../../lib/job');

const event = new EventEmitter();
const job = new Job(event);

const RANDOM_IMAGE =
'image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z/C/HgAGgwJ/lK3Q6wAAAABJRU5ErkJggg==';

describe('Camera.getImageInRoom', () => {
it('should return image', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand Down
10 changes: 6 additions & 4 deletions server/test/lib/device/camera/camera.setImage.test.js
Expand Up @@ -2,16 +2,18 @@ const EventEmitter = require('events');
const { expect, assert } = require('chai');
const Device = require('../../../../lib/device');
const StateManager = require('../../../../lib/state');
const Job = require('../../../../lib/job');

const RANDOM_IMAGE =
'image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z/C/HgAGgwJ/lK3Q6wAAAABJRU5ErkJggg==';

const event = new EventEmitter();
const job = new Job(event);

describe('Camera.setImage', () => {
it('should set image', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand All @@ -29,7 +31,7 @@ describe('Camera.setImage', () => {
});
it('should return camera not found', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand All @@ -46,7 +48,7 @@ describe('Camera.setImage', () => {
});
it('should return camera feature not found', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand All @@ -61,7 +63,7 @@ describe('Camera.setImage', () => {
});
it('should return image too big', async () => {
const stateManager = new StateManager(event);
const deviceManager = new Device(event, {}, stateManager, {});
const deviceManager = new Device(event, {}, stateManager, {}, {}, {}, job);
stateManager.setState('device', 'test-camera', {
features: [
{
Expand Down