Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor track_deletes #1892

Merged
merged 13 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 11 additions & 12 deletions packages/jobs/lib/crons/autoIdleDemo.integration.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { describe, it, beforeAll, expect, vi } from 'vitest';
import type { Environment } from '@nangohq/shared';
import {
seeders,
multipleMigrations,
configService,
environmentService,
connectionService,
createSync,
db,
Expand All @@ -18,12 +18,12 @@ import {
import { exec } from './autoIdleDemo.js';
import { nanoid } from 'nanoid';

const envName = 'dev';

describe('Auto Idle Demo', () => {
describe('Auto Idle Demo', async () => {
let env: Environment;
beforeAll(async () => {
await multipleMigrations();
await seeders.createConfigSeeds(envName);
env = await seeders.createEnvironmentSeed('dev');
await seeders.createConfigSeeds(env);
});

it('should delete syncs', async () => {
Expand All @@ -33,32 +33,31 @@ describe('Auto Idle Demo', () => {
});

const connName = nanoid();
const env = await environmentService.createEnvironment(0, envName);
await configService.createProviderConfig({
unique_key: DEMO_GITHUB_CONFIG_KEY,
provider: 'github',
environment_id: env!.id,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to track_deletes: making environment id not optional, same from sync and syncJob below

environment_id: env.id,
oauth_client_id: '',
oauth_client_secret: ''
});
const conn = await connectionService.upsertConnection(connName, DEMO_GITHUB_CONFIG_KEY, 'github', {} as any, {}, env!.id, 0);
const conn = await connectionService.upsertConnection(connName, DEMO_GITHUB_CONFIG_KEY, 'github', {} as any, {}, env.id, 0);
const connId = conn[0]!.id;
const sync = (await createSync(connId, DEMO_SYNC_NAME))!;
await createSchedule(sync.id!, '86400', 0, ScheduleStatus.RUNNING, nanoid());
const schedBefore = await getSchedule(sync.id!);
await createSchedule(sync.id, '86400', 0, ScheduleStatus.RUNNING, nanoid());
const schedBefore = await getSchedule(sync.id);
expect(schedBefore?.status).toBe(ScheduleStatus.RUNNING);

// First execution nothings happen
await exec();

const schedMid = await getSchedule(sync.id!);
const schedMid = await getSchedule(sync.id);
expect(schedMid?.status).toBe(ScheduleStatus.RUNNING);

// Second execution it should pick the old sync
await db.knex.from('_nango_syncs').update({ updated_at: new Date(Date.now() - 86400 * 2 * 1000) });
await exec();

const schedAfter = await getSchedule(sync.id!);
const schedAfter = await getSchedule(sync.id);
expect(schedAfter?.status).toBe(ScheduleStatus.PAUSED);
});
});
3 changes: 1 addition & 2 deletions packages/jobs/lib/crons/deleteSyncsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ export async function exec(): Promise<void> {
// ----
// hard delete records
const res = await syncDataService.deleteRecordsBySyncId({ syncId: sync.id, limit: limitRecords });
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalRecords);
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_DELETES, res.totalDeletes);
telemetry.increment(MetricTypes.JOBS_DELETE_SYNCS_DATA_RECORDS, res.totalDeletedRecords);
}
});

Expand Down
63 changes: 7 additions & 56 deletions packages/persist/lib/controllers/persist.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type RecordRequest = Request<
providerConfigKey: string;
connectionId: string;
activityLogId: number;
trackDeletes: boolean;
lastSyncDate: Date;
},
any,
Record<string, any>
Expand Down Expand Up @@ -70,20 +68,10 @@ class PersistController {
public async saveRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(
dataRecords,
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
model,
activityLogId,
environmentId,
trackDeletes,
false
);
return await dataService.upsert(dataRecords, nangoConnectionId, model, activityLogId, environmentId, false);
};
const result = await PersistController.persistRecords({
persistType: 'save',
Expand All @@ -95,8 +83,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete: false,
persistFunction: persist
Expand All @@ -111,20 +97,10 @@ class PersistController {
public async deleteRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.upsert(
dataRecords,
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
model,
activityLogId,
environmentId,
trackDeletes,
true
);
return await dataService.upsert(dataRecords, nangoConnectionId, model, activityLogId, environmentId, true);
};
const result = await PersistController.persistRecords({
persistType: 'delete',
Expand All @@ -136,8 +112,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete: true,
persistFunction: persist
Expand All @@ -152,18 +126,10 @@ class PersistController {
public async updateRecords(req: RecordRequest, res: Response, next: NextFunction) {
const {
params: { environmentId, nangoConnectionId, syncId, syncJobId },
body: { model, records, providerConfigKey, connectionId, trackDeletes, lastSyncDate, activityLogId }
body: { model, records, providerConfigKey, connectionId, activityLogId }
} = req;
const persist = async (dataRecords: DataRecord[]) => {
return await dataService.updateRecord(
dataRecords,
'_nango_sync_data_records',
'external_id',
nangoConnectionId,
model,
activityLogId,
environmentId
);
return await dataService.update(dataRecords, nangoConnectionId, model, activityLogId, environmentId);
};
const result = await PersistController.persistRecords({
persistType: 'update',
Expand All @@ -175,8 +141,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete: false,
persistFunction: persist
Expand All @@ -198,8 +162,6 @@ class PersistController {
syncJobId,
model,
records,
trackDeletes,
lastSyncDate,
activityLogId,
softDelete,
persistFunction
Expand All @@ -213,8 +175,6 @@ class PersistController {
syncJobId: number;
model: string;
records: Record<string, any>[];
trackDeletes: boolean;
lastSyncDate: Date;
activityLogId: number;
softDelete: boolean;
persistFunction: (records: DataRecord[]) => Promise<UpsertResponse>;
Expand Down Expand Up @@ -242,16 +202,7 @@ class PersistController {
success,
error,
response: formattedRecords
} = syncDataService.formatDataRecords(
records as unknown as DataResponse[],
nangoConnectionId,
model,
syncId,
syncJobId,
lastSyncDate,
trackDeletes,
softDelete
);
} = syncDataService.formatDataRecords(records as unknown as DataResponse[], nangoConnectionId, model, syncId, syncJobId, softDelete);

if (!success || formattedRecords === null) {
await createActivityLogMessage({
Expand Down
8 changes: 1 addition & 7 deletions packages/persist/lib/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,7 @@ const validateRecordsRequest = validateRequest({
records: z.any().array().nonempty(),
providerConfigKey: z.string(),
connectionId: z.string(),
activityLogId: z.number(),
lastSyncDate: z
.string()
.datetime()
.transform((value) => new Date(value))
.pipe(z.date()) as unknown as z.ZodDate,
trackDeletes: z.boolean()
activityLogId: z.number()
})
});
const recordPath = '/environment/:environmentId/connection/:nangoConnectionId/sync/:syncId/job/:syncJobId/records';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const RECORDS_TABLE = '_nango_sync_data_records';
exports.up = async function (knex) {
await knex.transaction((trx) => {
return trx
.raw(
`
CREATE OR REPLACE FUNCTION ${RECORDS_TABLE}_reset_created_at()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.external_deleted_at IS NOT NULL AND NEW.external_deleted_at IS NULL THEN
NEW.created_at = NOW();
NEW.updated_at = NOW();
NEW.external_is_deleted = FALSE;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to have this logic in the code? I'm really not fond of sql logic, it's hard to debug and easy to forget

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also not a huge fan of the fact it is easy to forget. The alternative would be to fetch all existing records and compare before merging

`
)
.then(function () {
return trx.raw(`
CREATE TRIGGER ${RECORDS_TABLE}_reset_created_at_trigger
BEFORE UPDATE ON ${RECORDS_TABLE}
FOR EACH ROW
EXECUTE FUNCTION ${RECORDS_TABLE}_reset_created_at();
`);
});
});
};

exports.down = async function (knex) {
await knex.transaction((trx) => {
return trx.raw(`DROP TRIGGER IF EXISTS ${RECORDS_TABLE}_reset_created_at_trigger ON ${RECORDS_TABLE};`).then(function () {
return trx.raw(`DROP FUNCTION IF EXISTS ${RECORDS_TABLE}_reset_created_at();`);
});
});
};
56 changes: 21 additions & 35 deletions packages/shared/lib/db/seeders/config.seeder.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,26 @@
import { schema } from '../database.js';
import configService from '../../services/config.service.js';
import environmentService from '../../services/environment.service.js';
import type { Config as ProviderConfig } from '../../models/Provider.js';
import type { Environment } from '../../models/Environment.js';

export const createConfigSeeds = async (environmentName = ''): Promise<Environment[]> => {
let result: Environment[];
if (environmentName) {
result = [(await environmentService.createEnvironment(0, environmentName))!];
} else {
result = await schema().select('*').from('_nango_environments');
}

for (const row of result) {
const { id: environment_id } = row;
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'notion',
environment_id
} as ProviderConfig);
}

return result;
export const createConfigSeeds = async (env: Environment): Promise<void> => {
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id: env.id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id: env.id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'google',
environment_id: env.id
} as ProviderConfig);
await configService.createProviderConfig({
unique_key: Math.random().toString(36).substring(7),
provider: 'notion',
environment_id: env.id
} as ProviderConfig);
};
47 changes: 9 additions & 38 deletions packages/shared/lib/db/seeders/connection.seeder.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,17 @@
import db, { schema } from '../database.js';
import db from '../database.js';
import connectionService from '../../services/connection.service.js';
import environmentService from '../../services/environment.service.js';
import type { AuthCredentials } from '../../models/Auth.js';
import type { Environment } from '../../models/Environment.js';

export const createConnectionSeeds = async (environmentName = ''): Promise<number[]> => {
let result;
if (environmentName) {
result = [await environmentService.createEnvironment(0, environmentName)];
} else {
result = await schema().select('*').from('_nango_environments');
}

const connections = [];

for (const { id: environment_id } of result) {
const connectionParams = [
[Math.random().toString(36).substring(7)],
[Math.random().toString(36).substring(7)],
[Math.random().toString(36).substring(7)],
[Math.random().toString(36).substring(7)]
];
export const createConnectionSeeds = async (env: Environment): Promise<number[]> => {
const connectionIds = [];

const connectionIds = [];

for (const [name] of connectionParams) {
const [result] = (await connectionService.upsertConnection(
`conn-${name}`,
`provider-${name}`,
'google',
{} as AuthCredentials,
{},
environment_id,
0
)) as { id: number }[];
const { id: connection_id } = result as { id: number };
connectionIds.push(connection_id);
}

connections.push(...connectionIds);
for (let i = 0; i < 4; i++) {
const name = Math.random().toString(36).substring(7);
const result = await connectionService.upsertConnection(`conn-${name}`, `provider-${name}`, 'google', {} as AuthCredentials, {}, env.id, 0);
connectionIds.push(...result.map((res) => res.id));
}

return connections;
return connectionIds;
};

export const deleteAllConnectionSeeds = async (): Promise<void> => {
Expand Down
10 changes: 10 additions & 0 deletions packages/shared/lib/db/seeders/environment.seeder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import type { Environment } from '../../models/Environment.js';
import environmentService from '../../services/environment.service.js';

export async function createEnvironmentSeed(envName: string = 'test'): Promise<Environment> {
const env = await environmentService.createEnvironment(0, envName);
if (!env) {
throw new Error('Failed to create environment');
}
return env;
}