Skip to content

Commit

Permalink
fix(database): initial db schema registration (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
kon14 committed Sep 23, 2022
1 parent 8fd20e0 commit 235955e
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 92 deletions.
21 changes: 8 additions & 13 deletions modules/database/src/Database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,22 @@ export default class DatabaseModule extends ManagedModule<void> {
}

async preServerStart() {
this._activeAdapter.setGrpcSdk(this.grpcSdk);
this._activeAdapter.connect();
await this._activeAdapter.ensureConnected();
await this._activeAdapter.init(this.grpcSdk);
await this.registerMetrics();
}

async onServerStart() {
const declaredSchemaExists = await this._activeAdapter.checkDeclaredSchemaExistence();
await this._activeAdapter.createSchemaFromAdapter(
models.DeclaredSchema,
false,
!declaredSchemaExists,
false,
false,
);
const modelPromises = Object.values(models).flatMap((model: ConduitSchema) => {
if (model.name === '_DeclaredSchema') return [];
return this._activeAdapter.createSchemaFromAdapter(model, false);
});
await Promise.all(modelPromises);
await this._activeAdapter.retrieveForeignSchemas();
await this._activeAdapter.recoverSchemasFromDatabase();
await runMigrations(this._activeAdapter);
Expand All @@ -118,13 +119,7 @@ export default class DatabaseModule extends ManagedModule<void> {
this.grpcSdk.bus?.subscribe('database:create:schema', async schemaStr => {
const syncSchema: ConduitDatabaseSchema = JSON.parse(schemaStr); // @dirty-type-cast
delete (syncSchema as any).fieldHash;
await this._activeAdapter.createSchemaFromAdapter(
syncSchema,
false,
false,
false,
true,
);
await this._activeAdapter.createSchemaFromAdapter(syncSchema, false, false, true);
});
this.grpcSdk.bus?.subscribe('database:delete:schema', async schemaName => {
await this._activeAdapter.deleteSchema(schemaName, false, '', true);
Expand Down Expand Up @@ -199,7 +194,7 @@ export default class DatabaseModule extends ManagedModule<void> {
}
schema.ownerModule = call.metadata!.get('module-name')![0] as string;
await this._activeAdapter
.createSchemaFromAdapter(schema, false, true, true)
.createSchemaFromAdapter(schema, false, true)
.then((schemaAdapter: Schema) => {
callback(
null,
Expand Down
33 changes: 17 additions & 16 deletions modules/database/src/adapters/DatabaseAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import ObjectHash from 'object-hash';
export abstract class DatabaseAdapter<T extends Schema> {
protected readonly maxConnTimeoutMs: number;
protected grpcSdk: ConduitGrpcSdk;
private legacyDeployment = false; // unprefixed declared schema collection
registeredSchemas: Map<string, ConduitSchema>;
models: { [name: string]: T } = {};
foreignSchemaCollections: Set<string> = new Set([]); // not in DeclaredSchemas
Expand All @@ -25,15 +26,26 @@ export abstract class DatabaseAdapter<T extends Schema> {
: this.maxConnTimeoutMs;
}

async init(grpcSdk: ConduitGrpcSdk) {
this.grpcSdk = grpcSdk;
this.connect();
await this.ensureConnected();
this.legacyDeployment = await this.hasLegacyCollections();
}

protected abstract connect(): void;

protected abstract ensureConnected(): Promise<void>;

/**
* Introspects all schemas of current db connection, registers them to Conduit
*/
abstract introspectDatabase(): Promise<ConduitSchema[]>;

/**
* Check Declared Schema Existence
* Checks whether DeclaredSchema collection name is unprefixed
*/
abstract checkDeclaredSchemaExistence(): Promise<boolean>;
protected abstract hasLegacyCollections(): Promise<boolean>;

/**
* Retrieves all schemas not related to Conduit and stores them in adapter
Expand All @@ -44,14 +56,12 @@ export abstract class DatabaseAdapter<T extends Schema> {
* Registers a schema, creates its collection in the database and updates routing types
* @param {ConduitSchema} schema
* @param {boolean} imported Whether schema is an introspected schema
* @param {boolean} cndPrefix Whether to prefix the collection name with a Conduit system schema identifier (cnd_)
* @param {boolean} gRPC Merge existing extensions before stitching schema from gRPC
* @param {boolean} instanceSync Do not republish schema changes for multi-instance sync calls
*/
async createSchemaFromAdapter(
schema: ConduitSchema,
imported = false,
cndPrefix = true,
gRPC = false,
instanceSync = false,
): Promise<Schema> {
Expand All @@ -62,11 +72,11 @@ export abstract class DatabaseAdapter<T extends Schema> {
this.foreignSchemaCollections.delete(schema.collectionName);
} else {
let collectionName = this.getCollectionName(schema);
if (cndPrefix && !this.models['_DeclaredSchema']) {
if (!this.legacyDeployment && !this.models['_DeclaredSchema']) {
collectionName = collectionName.startsWith('_')
? `cnd${collectionName}`
: `cnd_${collectionName}`;
} else if (cndPrefix) {
} else if (schema.name !== '_DeclaredSchema') {
const declaredSchema = await this.models['_DeclaredSchema'].findOne({
name: schema.name,
});
Expand All @@ -75,7 +85,7 @@ export abstract class DatabaseAdapter<T extends Schema> {
? `cnd${collectionName}`
: `cnd_${collectionName}`;
} else {
//recover collection name from DeclaredSchema
// recover collection name from DeclaredSchema
collectionName = declaredSchema.collectionName;
}
}
Expand Down Expand Up @@ -223,21 +233,12 @@ export abstract class DatabaseAdapter<T extends Schema> {
!!model.modelOptions.conduit?.imported,
true,
false,
false,
);
});

await Promise.all(models);
}

abstract connect(): void;

abstract ensureConnected(): Promise<void>;

setGrpcSdk(grpcSdk: ConduitGrpcSdk) {
this.grpcSdk = grpcSdk;
}

setSchemaExtension(
schemaName: string,
extOwner: string,
Expand Down
50 changes: 25 additions & 25 deletions modules/database/src/adapters/mongoose-adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,27 @@ export class MongooseAdapter extends DatabaseAdapter<MongooseSchema> {
this.mongoose = new Mongoose();
}

async ensureConnected(): Promise<void> {
protected connect() {
this.mongoose = new Mongoose();
ConduitGrpcSdk.Logger.log('Connecting to database...');
this.mongoose
.connect(this.connectionString, this.options)
.then(() => {
deepPopulate = deepPopulate(this.mongoose);
})
.catch(err => {
ConduitGrpcSdk.Logger.error('Unable to connect to the database: ', err);
throw new Error();
})
.then(() => {
ConduitGrpcSdk.Logger.log('Mongoose connection established successfully');
});
this.mongoose.set('debug', () => {
ConduitGrpcSdk.Metrics?.increment('database_queries_total');
});
}

protected async ensureConnected(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const db = this.mongoose.connection;
db.on('connected', () => {
Expand Down Expand Up @@ -62,24 +82,10 @@ export class MongooseAdapter extends DatabaseAdapter<MongooseSchema> {
});
}

connect() {
this.mongoose = new Mongoose();
ConduitGrpcSdk.Logger.log('Connecting to database...');
this.mongoose
.connect(this.connectionString, this.options)
.then(() => {
deepPopulate = deepPopulate(this.mongoose);
})
.catch(err => {
ConduitGrpcSdk.Logger.error('Unable to connect to the database: ', err);
throw new Error();
})
.then(() => {
ConduitGrpcSdk.Logger.log('Mongoose connection established successfully');
});
this.mongoose.set('debug', () => {
ConduitGrpcSdk.Metrics?.increment('database_queries_total');
});
protected async hasLegacyCollections() {
return !!(await this.mongoose.connection.db.listCollections().toArray()).find(
c => c.name === '_declaredschemas',
);
}

async retrieveForeignSchemas(): Promise<void> {
Expand Down Expand Up @@ -230,12 +236,6 @@ export class MongooseAdapter extends DatabaseAdapter<MongooseSchema> {
throw new GrpcError(status.NOT_FOUND, `Schema ${schemaName} not defined yet`);
}

async checkDeclaredSchemaExistence() {
return !!(await this.mongoose.connection.db.listCollections().toArray()).find(
c => c.name === '_declaredschemas',
);
}

async deleteSchema(
schemaName: string,
deleteData: boolean,
Expand Down
76 changes: 38 additions & 38 deletions modules/database/src/adapters/sequelize-adapter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,47 @@ export class SequelizeAdapter extends DatabaseAdapter<SequelizeSchema> {
this.connectionUri = connectionUri;
}

connect() {
protected connect() {
this.sequelize = new Sequelize(this.connectionUri, { logging: false });
}

protected async ensureConnected() {
let error;
ConduitGrpcSdk.Logger.log('Connecting to database...');
for (let i = 0; i < this.maxConnTimeoutMs / 200; i++) {
try {
await this.sequelize.authenticate();
ConduitGrpcSdk.Logger.log('Sequelize connection established successfully');
return;
} catch (err: any) {
error = err;
if (error.original.code !== 'ECONNREFUSED') break;
await sleep(200);
}
}
if (error) {
ConduitGrpcSdk.Logger.error('Unable to connect to the database: ', error);
throw new Error();
}
}

protected async hasLegacyCollections() {
return (
(
await this.sequelize.query(
`SELECT EXISTS (
SELECT FROM
information_schema.tables
WHERE
table_schema LIKE '${sqlSchemaName}' AND
table_type LIKE 'BASE TABLE' AND
table_name = '_DeclaredSchema'
);`,
)
)[0][0] as { exists: boolean }
).exists;
}

async retrieveForeignSchemas(): Promise<void> {
const declaredSchemas = await this.getSchemaModel('_DeclaredSchema').model.findMany(
{},
Expand Down Expand Up @@ -192,23 +229,6 @@ export class SequelizeAdapter extends DatabaseAdapter<SequelizeSchema> {
return this.models[schema.name];
}

async checkDeclaredSchemaExistence() {
return (
(
await this.sequelize.query(
`SELECT EXISTS (
SELECT FROM
information_schema.tables
WHERE
table_schema LIKE '${sqlSchemaName}' AND
table_type LIKE 'BASE TABLE' AND
table_name = '_DeclaredSchema'
);`,
)
)[0][0] as { exists: boolean }
).exists;
}

async deleteSchema(
schemaName: string,
deleteData: boolean,
Expand Down Expand Up @@ -262,24 +282,4 @@ export class SequelizeAdapter extends DatabaseAdapter<SequelizeSchema> {
}
throw new GrpcError(status.NOT_FOUND, `Schema ${schemaName} not defined yet`);
}

async ensureConnected() {
let error;
ConduitGrpcSdk.Logger.log('Connecting to database...');
for (let i = 0; i < this.maxConnTimeoutMs / 200; i++) {
try {
await this.sequelize.authenticate();
ConduitGrpcSdk.Logger.log('Sequelize connection established successfully');
return;
} catch (err: any) {
error = err;
if (error.original.code !== 'ECONNREFUSED') break;
await sleep(200);
}
}
if (error) {
ConduitGrpcSdk.Logger.error('Unable to connect to the database: ', error);
throw new Error();
}
}
}

0 comments on commit 235955e

Please sign in to comment.