Skip to content

Commit

Permalink
catalog-backend: split into smaller transactions for refreshes
Browse files Browse the repository at this point in the history
  • Loading branch information
freben committed Dec 10, 2020
1 parent 0451ac9 commit fb386b7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 72 deletions.
5 changes: 5 additions & 0 deletions .changeset/curvy-dodos-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend': patch
---

Break the refresh loop into several smaller transactions
144 changes: 72 additions & 72 deletions plugins/catalog-backend/src/catalog/DatabaseEntitiesCatalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,46 +152,48 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
): Promise<EntityUpsertResponse[]> {
const locationId = options?.locationId;

return await this.database.transaction(async tx => {
// Group the entities by unique kind+namespace combinations
const entitiesByKindAndNamespace = groupBy(requests, ({ entity }) => {
const name = getEntityName(entity);
return `${name.kind}:${name.namespace}`.toLowerCase();
});

const limiter = limiterFactory(BATCH_CONCURRENCY);
const tasks: Promise<EntityUpsertResponse[]>[] = [];

for (const groupRequests of Object.values(entitiesByKindAndNamespace)) {
const { kind, namespace } = getEntityName(groupRequests[0].entity);

// Go through the new entities in reasonable chunk sizes (sometimes,
// sources produce tens of thousands of entities, and those are too large
// batch sizes to reasonably send to the database)
for (const batch of chunk(groupRequests, BATCH_SIZE)) {
tasks.push(
limiter(async () => {
const first = serializeEntityRef(batch[0].entity);
const last = serializeEntityRef(batch[batch.length - 1].entity);
let modifiedEntityIds: EntityUpsertResponse[] = [];

this.logger.debug(
`Considering batch ${first}-${last} (${batch.length} entries)`,
);

// Retry the batch write a few times to deal with contention
const context = {
kind,
namespace,
locationId,
};
for (let attempt = 1; attempt <= BATCH_ATTEMPTS; ++attempt) {
try {
// Group the entities by unique kind+namespace combinations
const entitiesByKindAndNamespace = groupBy(requests, ({ entity }) => {
const name = getEntityName(entity);
return `${name.kind}:${name.namespace}`.toLowerCase();
});

const limiter = limiterFactory(BATCH_CONCURRENCY);
const tasks: Promise<EntityUpsertResponse[]>[] = [];

for (const groupRequests of Object.values(entitiesByKindAndNamespace)) {
const { kind, namespace } = getEntityName(groupRequests[0].entity);

// Go through the new entities in reasonable chunk sizes (sometimes,
// sources produce tens of thousands of entities, and those are too large
// batch sizes to reasonably send to the database)
for (const batch of chunk(groupRequests, BATCH_SIZE)) {
tasks.push(
limiter(async () => {
const first = serializeEntityRef(batch[0].entity);
const last = serializeEntityRef(batch[batch.length - 1].entity);

this.logger.debug(
`Considering batch ${first}-${last} (${batch.length} entries)`,
);

// Retry the batch write a few times to deal with contention
const context = {
kind,
namespace,
locationId,
};

for (let attempt = 1; ; ++attempt) {
try {
return await this.database.transaction(async tx => {
let modifiedEntityIds = new Array<EntityUpsertResponse>();
const { toAdd, toUpdate, toIgnore } = await this.analyzeBatch(
batch,
context,
tx,
);

if (toAdd.length) {
modifiedEntityIds.push(
...(await this.batchAdd(toAdd, context, tx)),
Expand All @@ -202,6 +204,7 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
...(await this.batchUpdate(toUpdate, context, tx)),
);
}

// TODO(Rugvip): We currently always update relations, but we
// likely want to figure out a way to avoid that
for (const { entity, relations } of toIgnore) {
Expand All @@ -212,49 +215,46 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
}
}

break;
} catch (e) {
if (e instanceof ConflictError && attempt < BATCH_ATTEMPTS) {
this.logger.warn(
`Failed to write batch at attempt ${attempt}/${BATCH_ATTEMPTS}, ${e}`,
if (options?.outputEntities) {
const writtenEntities = await this.database.entities(
tx,
EntityFilters.ofMatchers({
'metadata.uid': modifiedEntityIds.map(e => e.entityId),
}),
);
} else {
throw e;
modifiedEntityIds = writtenEntities.map(e => ({
entityId: e.entity.metadata.uid!,
entity: e.entity,
}));
}
}
}

if (options?.outputEntities) {
const writtenEntities = await this.database.entities(
tx,
EntityFilters.ofMatchers({
'metadata.uid': modifiedEntityIds.map(e => e.entityId),
}),
);

modifiedEntityIds = writtenEntities.map(e => ({
entityId: e.entity.metadata.uid!,
entity: e.entity,
}));
}

return modifiedEntityIds;
}),
);
}
}

const entityUpserts = (await Promise.all(tasks)).flat();

if (options?.dryRun) {
// If this is only a dry run, cancel the database transaction even if it was successful.
await tx.rollback();
if (options?.dryRun) {
// If this is only a dry run, cancel the database transaction even if it was successful.
await tx.rollback();
this.logger.debug(
`Performed successful dry run of adding entities`,
);
}

this.logger.debug(`Performed successful dry run of adding entities`);
return modifiedEntityIds;
});
} catch (e) {
if (e instanceof ConflictError && attempt < BATCH_ATTEMPTS) {
this.logger.warn(
`Failed to write batch at attempt ${attempt}/${BATCH_ATTEMPTS}, ${e}`,
);
} else {
throw e;
}
}
}
}),
);
}
}

return entityUpserts;
});
const entityUpserts = (await Promise.all(tasks)).flat();
return entityUpserts;
}

// Set the relations originating from an entity using the DB layer
Expand Down

0 comments on commit fb386b7

Please sign in to comment.