Skip to content

Commit

Permalink
Refactor track_deletes (#1892)
Browse files Browse the repository at this point in the history
## Describe your changes

Refactoring track_deletes logic so we don't need an extra snapshot
tables.
The logic is now simple: 
- `upsert` adds/updates records (or set `deletedAt`), including updating
the sync_job_id that serves as a generationId
- when the sync is finish, if `track_deletes == true` then every records
without a `deletedAt` that doesn't have the same `sync_job_id` as the
current one is marked as deleted

## Issue ticket number and link
https://linear.app/nango/issue/NAN-599/track-deletes-refactoring

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [x] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
TBonnin committed Mar 25, 2024
1 parent 3c93972 commit 4ddac7f
Show file tree
Hide file tree
Showing 29 changed files with 698 additions and 1,114 deletions.
23 changes: 11 additions & 12 deletions packages/jobs/lib/crons/autoIdleDemo.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { describe, it, beforeAll, expect, vi } from 'vitest';
import type { Environment } from '@nangohq/shared';
import {
seeders,
multipleMigrations,
configService,
environmentService,
connectionService,
createSync,
db,
Expand All @@ -18,12 +18,12 @@ import {
import { exec } from './autoIdleDemo.js';
import { nanoid } from 'nanoid';

const envName = 'dev';

describe('Auto Idle Demo', () => {
describe('Auto Idle Demo', async () => {
let env: Environment;
beforeAll(async () => {
await multipleMigrations();
await seeders.createConfigSeeds(envName);
env = await seeders.createEnvironmentSeed('dev');
await seeders.createConfigSeeds(env);
});

it('should delete syncs', async () => {
Expand All @@ -33,32 +33,31 @@ describe('Auto Idle Demo', () => {
});

const connName = nanoid();
const env = await environmentService.createEnvironment(0, envName);
await configService.createProviderConfig({
unique_key: DEMO_GITHUB_CONFIG_KEY,
provider: 'github',
environment_id: env!.id,
environment_id: env.id,
oauth_client_id: '',
oauth_client_secret: ''
});
const conn = await connectionService.upsertConnection(connName, DEMO_GITHUB_CONFIG_KEY, 'github', {} as any, {}, env!.id, 0);
const conn = await connectionService.upsertConnection(connName, DEMO_GITHUB_CONFIG_KEY, 'github', {} as any, {}, env.id, 0);
const connId = conn[0]!.id;
const sync = (await createSync(connId, DEMO_SYNC_NAME))!;
await createSchedule(sync.id!, '86400', 0, ScheduleStatus.RUNNING, nanoid());
const schedBefore = await getSchedule(sync.id!);
await createSchedule(sync.id, '86400', 0, ScheduleStatus.RUNNING, nanoid());
const schedBefore = await getSchedule(sync.id);
expect(schedBefore?.status).toBe(ScheduleStatus.RUNNING);

// First execution nothings happen
await exec();

const schedMid = await getSchedule(sync.id!);
const schedMid = await getSchedule(sync.id);
expect(schedMid?.status).toBe(ScheduleStatus.RUNNING);

// Second execution it should pick the old sync
await db.knex.from('_nango_syncs').update({ updated_at: new Date(Date.now() - 86400 * 2 * 1000) });
await exec();

const schedAfter = await getSchedule(sync.id!);
const schedAfter = await getSchedule(sync.id);
expect(schedAfter?.status).toBe(ScheduleStatus.PAUSED);
});
});
3 changes: 1 addition & 2 deletions packages/jobs/lib/crons/deleteSyncsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ export async function exec(): Promise<void> {
// ----
// hard delete records
const res = await syncDataService.deleteRecordsBySyncId({ syncId: sync.id, limit: limitRecords });
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalRecords);
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_DELETES, res.totalDeletes);
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalDeletedRecords);
}
});

Expand Down
63 changes: 7 additions & 56 deletions packages/persist/lib/controllers/persist.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type RecordRequest = Request<
providerConfigKey: string;
connectionId: string;
activityLogId: number;
trackDeletes: boolean;
lastSyncDate: Date;
},
any,
Record<string, any>
Expand Down Expand Up @@ -70,20 +68,10 @@ class PersistController {
public async saveRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(
dataRecords,
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
model,
activityLogId,
environmentId,
trackDeletes,
false
);
return await dataService.upsert(dataRecords, nangoConnectionId, model, activityLogId, environmentId, false);
};
const result = await PersistController.persistRecords({
persistType: 'save',
Expand All @@ -95,8 +83,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete: false,
persistFunction: persist
Expand All @@ -111,20 +97,10 @@ class PersistController {
public async deleteRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(
dataRecords,
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
model,
activityLogId,
environmentId,
trackDeletes,
true
);
return await dataService.upsert(dataRecords, nangoConnectionId, model, activityLogId, environmentId, true);
};
const result = await PersistController.persistRecords({
persistType: 'delete',
Expand All @@ -136,8 +112,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete: true,
persistFunction: persist
Expand All @@ -152,18 +126,10 @@ class PersistController {
public async updateRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.updateRecord(
dataRecords,
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
model,
activityLogId,
environmentId
);
return await dataService.update(dataRecords, nangoConnectionId, model, activityLogId, environmentId);
};
const result = await PersistController.persistRecords({
persistType: 'update',
Expand All @@ -175,8 +141,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete: false,
persistFunction: persist
Expand All @@ -198,8 +162,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete,
persistFunction
Expand All @@ -213,8 +175,6 @@ class PersistController {
syncJobId: number;
model: string;
records: Record<string, any>[];
trackDeletes: boolean;
lastSyncDate: Date;
activityLogId: number;
softDelete: boolean;
persistFunction: (records: DataRecord[]) => Promise<UpsertResponse>;
Expand Down Expand Up @@ -242,16 +202,7 @@ class PersistController {
success,
error,
response: formattedRecords
} = syncDataService.formatDataRecords(
records as unknown as DataResponse[],
nangoConnectionId,
model,
syncId,
syncJobId,
lastSyncDate,
trackDeletes,
softDelete
);
} = syncDataService.formatDataRecords(records as unknown as DataResponse[], nangoConnectionId, model, syncId, syncJobId, softDelete);

if (!success || formattedRecords === null) {
await createActivityLogMessage({
Expand Down
8 changes: 1 addition & 7 deletions packages/persist/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,7 @@ const validateRecordsRequest = validateRequest({
records: z.any().array().nonempty(),
providerConfigKey: z.string(),
connectionId: z.string(),
activityLogId: z.number(),
lastSyncDate: z
.string()
.datetime()
.transform((value) => new Date(value))
.pipe(z.date()) as unknown as z.ZodDate,
trackDeletes: z.boolean()
activityLogId: z.number()
})
});
const recordPath = '/environment/:environmentId/connection/:nangoConnectionId/sync/:syncId/job/:syncJobId/records';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const RECORDS_TABLE = '_nango_sync_data_records';
exports.up = async function (knex) {
await knex.transaction((trx) => {
return trx
.raw(
`
CREATE OR REPLACE FUNCTION ${RECORDS_TABLE}_reset_created_at()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.external_deleted_at IS NOT NULL AND NEW.external_deleted_at IS NULL THEN
NEW.created_at = NOW();
NEW.updated_at = NOW();
NEW.external_is_deleted = FALSE;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`
)
.then(function () {
return trx.raw(`
CREATE TRIGGER ${RECORDS_TABLE}_reset_created_at_trigger
BEFORE UPDATE ON ${RECORDS_TABLE}
FOR EACH ROW
EXECUTE FUNCTION ${RECORDS_TABLE}_reset_created_at();
`);
});
});
};

exports.down = async function (knex) {
await knex.transaction((trx) => {
return trx.raw(`DROP TRIGGER IF EXISTS ${RECORDS_TABLE}_reset_created_at_trigger ON ${RECORDS_TABLE};`).then(function () {
return trx.raw(`DROP FUNCTION IF EXISTS ${RECORDS_TABLE}_reset_created_at();`);
});
});
};
56 changes: 21 additions & 35 deletions packages/shared/lib/db/seeders/config.seeder.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
import { schema } from '../database.js';
import configService from '../../services/config.service.js';
import environmentService from '../../services/environment.service.js';
import type { Config as ProviderConfig } from '../../models/Provider.js';
import type { Environment } from '../../models/Environment.js';

export const createConfigSeeds = async (environmentName = ''): Promise<Environment[]> => {
let result: Environment[];
if (environmentName) {
result = [(await environmentService.createEnvironment(0, environmentName))!];
} else {
result = await schema().select('*').from('_nango_environments');
}

for (const row of result) {
const { id: environment_id } = row;
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'notion',
environment_id
} as ProviderConfig);
}

return result;
export const createConfigSeeds = async (env: Environment): Promise<void> => {
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id: env.id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id: env.id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id: env.id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'notion',
environment_id: env.id
} as ProviderConfig);
};
47 changes: 9 additions & 38 deletions packages/shared/lib/db/seeders/connection.seeder.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,17 @@
import db, { schema } from '../database.js';
import db from '../database.js';
import connectionService from '../../services/connection.service.js';
import environmentService from '../../services/environment.service.js';
import type { AuthCredentials } from '../../models/Auth.js';
import type { Environment } from '../../models/Environment.js';

export const createConnectionSeeds = async (environmentName = ''): Promise<number[]> => {
let result;
if (environmentName) {
result = [await environmentService.createEnvironment(0, environmentName)];
} else {
result = await schema().select('*').from('_nango_environments');
}

const connections = [];

for (const { id: environment_id } of result) {
const connectionParams = [
[Math.random().toString(36).substring(7)],
[Math.random().toString(36).substring(7)],
[Math.random().toString(36).substring(7)],
[Math.random().toString(36).substring(7)]
];
export const createConnectionSeeds = async (env: Environment): Promise<number[]> => {
const connectionIds = [];

const connectionIds = [];

for (const [name] of connectionParams) {
const [result] = (await connectionService.upsertConnection(
`conn-${name}`,
`provider-${name}`,
'google',
{} as AuthCredentials,
{},
environment_id,
0
)) as { id: number }[];
const { id: connection_id } = result as { id: number };
connectionIds.push(connection_id);
}

connections.push(...connectionIds);
for (let i = 0; i < 4; i++) {
const name = Math.random().toString(36).substring(7);
const result = await connectionService.upsertConnection(`conn-${name}`, `provider-${name}`, 'google', {} as AuthCredentials, {}, env.id, 0);
connectionIds.push(...result.map((res) => res.id));
}

return connections;
return connectionIds;
};

export const deleteAllConnectionSeeds = async (): Promise<void> => {
Expand Down
10 changes: 10 additions & 0 deletions packages/shared/lib/db/seeders/environment.seeder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import type { Environment } from '../../models/Environment.js';
import environmentService from '../../services/environment.service.js';

export async function createEnvironmentSeed(envName: string = 'test'): Promise<Environment> {
const env = await environmentService.createEnvironment(0, envName);
if (!env) {
throw new Error('Failed to create environment');
}
return env;
}

0 comments on commit 4ddac7f

Please sign in to comment.