Skip to content

Commit

Permalink
refresh table after schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
Hayden-Yu committed May 21, 2019
1 parent cee3949 commit 0edba27
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
6 changes: 3 additions & 3 deletions src/core/service/force-schema.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ export class ForceSchemaService {
});
}

public static describeObject(objectName: string): Promise<DescribeSObjectResult> {
public static describeObject(objectName: string, timestamp?: string): Promise<DescribeSObjectResult> {
return new Promise((resolve, reject) => {
sf.describe(objectName, (err, meta) => {
logger.debug(`describing ${objectName} sf object`);
logger.debug(`describing ${objectName} sf object${timestamp ? ` if modified after ${timestamp}` : ''}`);
if (err) {
logger.error(err.message);
return reject(err);
}
return resolve(meta);
});
}, timestamp);
});
}

Expand Down
63 changes: 41 additions & 22 deletions src/core/synchronize-tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,57 @@ async function massDeleteRecords(name: string, ids: string[]) {
connection.release();
}

async function loadFromScratch(schema: DescribeSObjectResult) {
logger.info(`initialize postgres table ${schema.name}`);
PostgresDBService.createSobjectTable(schema);
const soql = ForceSchemaService.generateSelectStar(schema);
return synchronizeTableWithPagination(await ForceDataService.query(soql), schema);
}

async function updateTable(schema: DescribeSObjectResult, lastSync: string, currentTime: string) {
const processes = [];
let recentUpdates = await ForceDataService.getRecentUpdates(schema.name, lastSync, currentTime);
logger.info(`found ${recentUpdates.length} recent updates on ${schema.name}`);
const soql = ForceSchemaService.generateSelectStar(schema);
while(recentUpdates.length) {
const chunk = recentUpdates.slice(0,SOQL_WHERE_IN_SIZE);
processes.push(
synchronizeTableWithPagination(
await ForceDataService.query(`${soql} WHERE Id IN (${chunk.map(id=>`'${id}'`).join(',')})`),
schema));
logger.info(`synchronized ${chunk.length} updated ${schema.name} records`);
recentUpdates = recentUpdates.slice(SOQL_WHERE_IN_SIZE);
}
let recentDeletes = await ForceDataService.getRecentDeletes(schema.name, lastSync, currentTime);
logger.info(`found ${recentUpdates.length} recent deletes on ${schema.name}`);
if (recentDeletes.length) {
processes.push(massDeleteRecords(schema.name, recentDeletes));
}
return processes;
}

async function synchronizeTable(name: string) {
const schema = await ForceSchemaService.describeObject(name);
const currentTime = (new Date()).toISOString();
const syncHistory = await database.query(`SELECT id, ts FROM ${SCHEMA}.internal_syncHistory WHERE objectName='${name}' ORDER BY id DESC;`);
const soql = ForceSchemaService.generateSelectStar(schema);
const schema = await ForceSchemaService.describeObject(name);

const processes: Promise<any>[] = [
database.query(`INSERT INTO ${SCHEMA}.internal_syncHistory (objectName, ts) VALUES ('${name}','${currentTime}');`),
];
if (!syncHistory.rows.length) {
logger.info(`initialize postgres table ${name}`);
PostgresDBService.createSobjectTable(schema);
processes.push(synchronizeTableWithPagination(await ForceDataService.query(soql), schema));
processes.push(loadFromScratch(schema));
} else {
const lastSync = moment.tz(syncHistory.rows[0]['ts'], 'UTC').toISOString();
let recentUpdates = await ForceDataService.getRecentUpdates(name, lastSync, currentTime);
logger.info(`found ${recentUpdates.length} recent updates on ${name}`);
while(recentUpdates.length) {
const chunk = recentUpdates.slice(0,SOQL_WHERE_IN_SIZE);
processes.push(
synchronizeTableWithPagination(
await ForceDataService.query(`${soql} WHERE Id IN (${chunk.map(id=>`'${id}'`).join(',')})`),
schema));
logger.info(`synchronized ${chunk.length} updated ${name} records`);
recentUpdates = recentUpdates.slice(SOQL_WHERE_IN_SIZE);
}
let recentDeletes = await ForceDataService.getRecentDeletes(name, lastSync, currentTime);
logger.info(`found ${recentUpdates.length} recent deletes on ${name}`);
if (recentDeletes.length) {
processes.push(massDeleteRecords(name, recentDeletes));
const lastSync = moment.tz(syncHistory.rows[0]['ts'], 'UTC');
const schemaChanged = !(await ForceSchemaService.describeObject(name, lastSync.format('ddd, D MMM YYYY HH:mm:ss') + ' GMT'));
if (schemaChanged) {
logger.warning(`${name} schema changed, performing full table refresh`);
processes.push(database.query(`DROP TABLE ${SCHEMA}.${name};`).then(() => loadFromScratch(schema)));
} else {
(await updateTable(schema, lastSync.toISOString(), currentTime))
.forEach(el => processes.push(el));
}
await Promise.all(processes);
}
await Promise.all(processes);
}

export function synchronizeTables(): Promise<any> {
Expand Down

0 comments on commit 0edba27

Please sign in to comment.