Skip to content

Commit

Permalink
Write relations directly as part of batch add / update of entities.
Browse files Browse the repository at this point in the history
This implies a slight change of the CommonDatabase contract.

The huge LDAP dataset is now down to 10s to write, both on postgres and sqlite.
  • Loading branch information
freben committed Dec 14, 2020
1 parent 5153e3e commit 6b37c95
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 176 deletions.
26 changes: 26 additions & 0 deletions .changeset/purple-cycles-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
'@backstage/plugin-catalog-backend': minor
---

Write relations directly as part of batch add / update of entities.

Slight change of the `CommonDatabase` contract:

## `addEntity` removed

This method was unused by the core, and rendered unnecessary when `addEntities`
exists.

If you were a user of `addEntity`, please call `addEntities` instead, with an
array of one element.

## `DbEntityRequest` has a new field `relations`

This is the structure that is passed to `addEntities` and `updateEntity`. It
used to be the case that you needed to call `setRelations` separately, but now
this instead happens directly when you call `addEntities` or `updateEntity`.

If you were using `addEntities` or `updateEntity` directly, please adapt your
code to add the `relations` array to each request. If you were calling
`setRelations` separately next to these methods, you no longer need to do so,
after adding the relations to the `DbEntityRequest`s.
22 changes: 10 additions & 12 deletions plugins/catalog-backend/src/catalog/DatabaseEntitiesCatalog.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ describe('DatabaseEntitiesCatalog', () => {
'kind=b,metadata.namespace=d,metadata.name=c',
),
);
expect(db.setRelations).toHaveBeenCalledTimes(1);
expect(db.setRelations).toHaveBeenCalledWith(expect.anything(), 'u', []);
expect(db.addEntities).toHaveBeenCalledTimes(1);
expect(db.addEntities).toHaveBeenCalledWith(expect.anything(), [
{ entity: expect.anything(), relations: [] },
]);
expect(result).toEqual([{ entityId: 'u' }]);
});

Expand Down Expand Up @@ -113,9 +114,10 @@ describe('DatabaseEntitiesCatalog', () => {
'kind=b,metadata.namespace=d,metadata.name=c',
),
);
expect(db.setRelations).toHaveBeenCalledTimes(1);
expect(db.setRelations).toHaveBeenCalledWith(expect.anything(), 'u', []);
expect(db.addEntities).toHaveBeenCalledTimes(1);
expect(db.addEntities).toHaveBeenCalledWith(expect.anything(), [
{ entity: expect.anything(), relations: [] },
]);
expect(transaction.rollback).toBeCalledTimes(1);
expect(result).toEqual([{ entityId: 'u' }]);
});
Expand Down Expand Up @@ -145,11 +147,7 @@ describe('DatabaseEntitiesCatalog', () => {
},
},
};
db.entities.mockResolvedValue([
{
entity: dbEntity,
},
]);
db.entities.mockResolvedValue([{ entity: dbEntity }]);
db.addEntities.mockResolvedValue([
{ entity: { ...entity, metadata: { ...entity.metadata, uid: 'u' } } },
]);
Expand All @@ -161,7 +159,7 @@ describe('DatabaseEntitiesCatalog', () => {
);

expect(db.entities).toHaveBeenCalledTimes(2);
expect(db.setRelations).toHaveBeenCalledTimes(1);
expect(db.addEntities).toHaveBeenCalledTimes(1);
expect(result).toEqual([
{
entityId: 'u',
Expand Down Expand Up @@ -237,12 +235,11 @@ describe('DatabaseEntitiesCatalog', () => {
x: 'b',
},
},
relations: [],
},
'e',
1,
);
expect(db.setRelations).toHaveBeenCalledTimes(1);
expect(db.setRelations).toHaveBeenCalledWith(expect.anything(), 'u', []);
expect(result).toEqual([{ entityId: 'u' }]);
});

Expand Down Expand Up @@ -315,6 +312,7 @@ describe('DatabaseEntitiesCatalog', () => {
x: 'b',
},
},
relations: [],
},
'e',
1,
Expand Down
87 changes: 37 additions & 50 deletions plugins/catalog-backend/src/catalog/DatabaseEntitiesCatalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import { ConflictError, NotFoundError } from '@backstage/backend-common';
import {
Entity,
entityHasChanges,
EntityRelationSpec,
generateUpdatedEntity,
getEntityName,
LOCATION_ANNOTATION,
Expand Down Expand Up @@ -111,47 +110,45 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
outputEntities?: boolean;
},
): Promise<EntityUpsertResponse[]> {
// Group the entities by unique kind+namespace combinations. The reason for
// Group the requests by unique kind+namespace combinations. The reason for
// this is that the change detection and merging logic requires finding
// pre-existing versions of the entities in the database. Those queries are
// easier and faster to make if every batch revolves around a single kind-
// namespace pair.
const entitiesByKindAndNamespace = groupBy(requests, ({ entity }) => {
const requestsByKindAndNamespace = groupBy(requests, ({ entity }) => {
const name = getEntityName(entity);
return `${name.kind}:${name.namespace}`.toLowerCase();
});

// Go through the requests in reasonable batch sizes. Sometimes, sources
// produce tens of thousands of entities, and those are too large batch
// sizes to reasonably send to the database.
const batches = Object.values(requestsByKindAndNamespace)
.map(requests => chunk(requests, BATCH_SIZE))
.flat();

// Bound the number of concurrent batches. We want a bit of concurrency for
// performance reasons, but not so much that we starve the connection pool
// or start thrashing.
const limiter = limiterFactory(BATCH_CONCURRENCY);
const tasks = new Array<Promise<EntityUpsertResponse[]>>();

for (const groupRequests of Object.values(entitiesByKindAndNamespace)) {
// Go through the new entities in reasonable chunk sizes (sometimes,
// sources produce tens of thousands of entities, and those are too large
// batch sizes to reasonably send to the database)
for (const batch of chunk(groupRequests, BATCH_SIZE)) {
tasks.push(
limiter(async () => {
// Retry the batch write a few times to deal with contention
for (let attempt = 1; ; ++attempt) {
try {
return this.batchAddOrUpdateEntitiesSingleBatch(batch, options);
} catch (e) {
if (e instanceof ConflictError && attempt < BATCH_ATTEMPTS) {
this.logger.warn(
`Failed to write batch at attempt ${attempt}/${BATCH_ATTEMPTS}, ${e}`,
);
} else {
throw e;
}
}
const tasks = batches.map(batch =>
limiter(async () => {
// Retry the batch write a few times to deal with contention
for (let attempt = 1; ; ++attempt) {
try {
return this.batchAddOrUpdateEntitiesSingleBatch(batch, options);
} catch (e) {
if (e instanceof ConflictError && attempt < BATCH_ATTEMPTS) {
this.logger.warn(
`Failed to write batch at attempt ${attempt}/${BATCH_ATTEMPTS}, ${e}`,
);
} else {
throw e;
}
}),
);
}
}
}
}
}),
);

const responses = await Promise.all(tasks);
return responses.flat();
Expand Down Expand Up @@ -203,7 +200,7 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
// likely want to figure out a way to avoid that
const entityId = entity.metadata.uid;
if (entityId) {
await this.setRelations(entityId, relations, tx);
await this.database.setRelations(tx, entityId, relations);
responses.push({ entityId });
}
}
Expand Down Expand Up @@ -232,15 +229,6 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
});
}

// Set the relations originating from an entity using the DB layer
private async setRelations(
originatingEntityId: string,
relations: EntityRelationSpec[],
tx: Transaction,
): Promise<void> {
await this.database.setRelations(tx, originatingEntityId, relations);
}

// Given a batch of entities that were just read from a location, take them
// into consideration by comparing against the existing catalog entities and
// produce the list of entities to be added, and the list of entities to be
Expand Down Expand Up @@ -321,17 +309,17 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {

const res = await this.database.addEntities(
tx,
requests.map(({ entity }) => ({ locationId, entity })),
requests.map(({ entity, relations }) => ({
locationId,
entity,
relations,
})),
);

const responses = res.map(({ entity }) => ({
entityId: entity.metadata.uid!,
}));

for (const [index, { entityId }] of responses.entries()) {
await this.setRelations(entityId, requests[index].relations, tx);
}

this.logger.debug(
`Added ${requests.length} entities in ${durationText(markTimestamp)}`,
);
Expand All @@ -350,11 +338,10 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
const responses: EntityUpsertResponse[] = [];

// TODO(freben): Still not batched
for (const entity of requests) {
const res = await this.addOrUpdateEntity(entity.entity, tx, locationId);
for (const request of requests) {
const res = await this.addOrUpdateEntity(tx, request, locationId);
const entityId = res.metadata.uid!;
responses.push({ entityId });
await this.setRelations(entityId, entity.relations, tx);
}

this.logger.debug(
Expand All @@ -366,8 +353,8 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {

// TODO(freben): Incorporate this into batchUpdate which is the only caller
private async addOrUpdateEntity(
entity: Entity,
tx: Transaction,
{ entity, relations }: EntityUpsertRequest,
locationId?: string,
): Promise<Entity> {
// Find a matching (by uid, or by compound name, depending on the given
Expand All @@ -383,13 +370,13 @@ export class DatabaseEntitiesCatalog implements EntitiesCatalog {
const updated = generateUpdatedEntity(existing.entity, entity);
response = await this.database.updateEntity(
tx,
{ locationId, entity: updated },
{ locationId, entity: updated, relations },
existing.entity.metadata.etag,
existing.entity.metadata.generation,
);
} else {
const added = await this.database.addEntities(tx, [
{ locationId, entity },
{ locationId, entity, relations },
]);
response = added[0];
}
Expand Down

0 comments on commit 6b37c95

Please sign in to comment.