Skip to content

Commit

Permalink
(WIP) Porting e2e tests for PG
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-dp committed Mar 14, 2024
1 parent 21d114b commit 2a25ca7
Show file tree
Hide file tree
Showing 20 changed files with 163 additions and 83 deletions.
17 changes: 11 additions & 6 deletions clients/typescript/src/client/model/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ squelPostgres.registerValueHandler('bigint', function (bigint) {
type AnyFindInput = FindInput<any, any, any, any, any>

export class Builder {
private _fullyQualifiedTableName: string

constructor(
private _tableName: string,
private _fields: string[],
Expand All @@ -44,23 +46,26 @@ export class Builder {
>,
public dialect: Dialect
) {
this._fullyQualifiedTableName = `"${this._tableName}"`
if (dialect === 'Postgres') {
squelPostgres.cls.DefaultQueryBuilderOptions.nameQuoteCharacter = '"'
squelPostgres.cls.DefaultQueryBuilderOptions.autoQuoteTableNames = true
//squelPostgres.cls.DefaultQueryBuilderOptions.autoQuoteTableNames = true
squelPostgres.cls.DefaultQueryBuilderOptions.autoQuoteFieldNames = true
squelPostgres.cls.DefaultQueryBuilderOptions.autoQuoteAliasNames = true
// need to register it, otherwise squel complains that the Date type is not registered
// as Squel does not support it out-of-the-box but our Postgres drivers do support it.
squelPostgres.registerValueHandler(Date, (date) => date)
//this._fullyQualifiedTableName = `public."${this._tableName}"`
} else {
// Don't use numbered parameters if dialect is SQLite
squelPostgres.cls.DefaultQueryBuilderOptions.numberedParameters = false
//this._fullyQualifiedTableName = `main."${this._tableName}"`
}
}

create(i: CreateInput<any, any, any>): QueryBuilder {
// Make a SQL query out of the data
const query = squelPostgres.insert().into(this._tableName).setFields(i.data)
const query = squelPostgres.insert().into(this._fullyQualifiedTableName).setFields(i.data)

// Adds a `RETURNING` statement that returns all known fields
const queryWithReturn = this.returnAllFields(query)
Expand All @@ -70,7 +75,7 @@ export class Builder {
createMany(i: CreateManyInput<any>): QueryBuilder {
const insert = squelPostgres
.insert()
.into(this._tableName)
.into(this._fullyQualifiedTableName)
.setFieldsRows(i.data)
return i.skipDuplicates
? insert.onConflict() // adds "ON CONFLICT DO NOTHING" to the query
Expand Down Expand Up @@ -115,7 +120,7 @@ export class Builder {
i: DeleteManyInput<any>,
idRequired = false
): QueryBuilder {
const deleteQuery = squel.delete().from(this._tableName)
const deleteQuery = squel.delete().from(this._fullyQualifiedTableName)
const whereObject = i.where // safe because the schema for `where` adds an empty object as default which is provided if the `where` field is absent
const fields = this.getFields(whereObject, idRequired)
return addFilters(fields, whereObject, deleteQuery)
Expand All @@ -138,7 +143,7 @@ export class Builder {

const query = squelPostgres
.update()
.table(this._tableName)
.table(this._fullyQualifiedTableName)
.setFields(i.data)

// Adds a `RETURNING` statement that returns all known fields
Expand Down Expand Up @@ -170,7 +175,7 @@ export class Builder {
if (!this.shapeManager.hasBeenSubscribed(this._tableName))
Log.debug('Reading from unsynced table ' + this._tableName)

const query = squelPostgres.select().from(this._tableName) // specify from which table to select
const query = squelPostgres.select().from(this._fullyQualifiedTableName) // specify from which table to select
// only select the fields provided in `i.select` and the ones in `i.where`
const addFieldSelectionP = this.addFieldSelection.bind(
this,
Expand Down
101 changes: 53 additions & 48 deletions clients/typescript/src/drivers/node-postgres/database.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pg from 'pg'
import type { Client } from 'pg'
import EmbeddedPostgres from 'embedded-postgres'
import { Row, Statement } from '../../util'

const originalGetTypeParser = pg.types.getTypeParser
Expand All @@ -13,67 +12,73 @@ export type QueryResult = {
export interface Database {
name: string
exec(statement: Statement): Promise<QueryResult>
stop(): Promise<void>
}

export class ElectricDatabase implements Database {
// Do not use this constructor directly.
// Create a Database instance using the static `init` method instead.
private constructor(
constructor(
public name: string,
private postgres: EmbeddedPostgres,
//private postgres: EmbeddedPostgres,
private db: Client
) {}

async exec(statement: Statement): Promise<QueryResult> {
const { rows, rowCount } = await this.db.query<Row>({
text: statement.sql,
values: statement.args,
types: {
// Modify the parser to not parse JSON values
// Instead, return them as strings
// our conversions will correctly parse them
getTypeParser: ((oid: number) => {
if (
oid === pg.types.builtins.JSON ||
oid === pg.types.builtins.JSONB
) {
return (val) => val
}
return originalGetTypeParser(oid)
}) as typeof pg.types.getTypeParser,
},
})
return {
rows,
rowsModified: rowCount ?? 0,
try {
const { rows, rowCount } = await this.db.query<Row>({
text: statement.sql,
values: statement.args,
types: {
// Modify the parser to not parse JSON values
// Instead, return them as strings
// our conversions will correctly parse them
getTypeParser: ((oid: number) => {
if (
oid === pg.types.builtins.JSON ||
oid === pg.types.builtins.JSONB
) {
return (val) => val
}
return originalGetTypeParser(oid)
}) as typeof pg.types.getTypeParser,
},
})
return {
rows,
rowsModified: rowCount ?? 0,
}
} catch (e) {
console.log("EXEC failed: " + JSON.stringify(e) + "\n" + "Statement was: " + JSON.stringify(statement))
throw e
}
}
}

async stop() {
await this.postgres.stop()
}
type StopFn = () => Promise<void>

// Creates and opens a DB backed by Postgres
static async init(config: PostgresConfig) {
// Initialize Postgres
const pg = new EmbeddedPostgres({
databaseDir: config.databaseDir,
user: config.user ?? 'postgres',
password: config.password ?? 'password',
port: config.port ?? 54321,
persistent: config.persistent ?? true,
})
/**
* Creates and opens a DB backed by Postgres
*/
export async function createEmbeddedPostgres(config: PostgresConfig): Promise<{ db: ElectricDatabase, stop: StopFn }> {
const EmbeddedPostgres = (await import('embedded-postgres')).default
// Initialize Postgres
const pg = new EmbeddedPostgres({
databaseDir: config.databaseDir,
user: config.user ?? 'postgres',
password: config.password ?? 'password',
port: config.port ?? 54321,
persistent: config.persistent ?? true,
})

await pg.initialise()
await pg.start()
await pg.createDatabase(config.name)
const db = pg.getPgClient()
await db.connect()
await pg.initialise()
await pg.start()
await pg.createDatabase(config.name)
const db = pg.getPgClient()
await db.connect()

// We use the database directory as the name
// because it uniquely identifies the DB
return new ElectricDatabase(config.databaseDir, pg, db)
// We use the database directory as the name
// because it uniquely identifies the DB
return {
db: new ElectricDatabase(config.databaseDir, db),
stop: () => pg.stop()
}
}

Expand Down
15 changes: 10 additions & 5 deletions clients/typescript/src/drivers/node-postgres/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { DatabaseAdapter as DatabaseAdapterI } from '../../electric/adapter'
import { DatabaseAdapter } from './adapter'
import { Database, ElectricDatabase } from './database'
import { Database, ElectricDatabase, createEmbeddedPostgres } from './database'
import { ElectricConfig } from '../../config'
import { electrify as baseElectrify, ElectrifyOptions } from '../../electric'
import { WebSocketWeb } from '../../sockets/web'
import { WebSocketNode } from '../../sockets/node'
import { ElectricClient, DbSchema } from '../../client/model'
import { PgBundleMigrator } from '../../migrators/bundle'

export { DatabaseAdapter, ElectricDatabase }
export { DatabaseAdapter, ElectricDatabase, createEmbeddedPostgres }
export type { Database }

/**
Expand All @@ -25,15 +25,20 @@ export const electrify = async <T extends Database, DB extends DbSchema<any>>(
const adapter = opts?.adapter || new DatabaseAdapter(db)
const migrator =
opts?.migrator || new PgBundleMigrator(adapter, dbDescription.pgMigrations)
const socketFactory = opts?.socketFactory || WebSocketWeb
const socketFactory = opts?.socketFactory || WebSocketNode
const prepare = async (_connection: DatabaseAdapterI) => undefined

const configWithDialect = {
...config,
dialect: 'Postgres',
} as const

const client = await baseElectrify(
dbName,
dbDescription,
adapter,
socketFactory,
config,
configWithDialect,
{
migrator,
prepare,
Expand Down
7 changes: 6 additions & 1 deletion clients/typescript/src/drivers/tauri-postgres/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ export const electrify = async <T extends Database, DB extends DbSchema<any>>(
const socketFactory = opts?.socketFactory || WebSocketWeb
const prepare = async (_connection: DatabaseAdapterI) => undefined

const configWithDialect = {
...config,
dialect: 'Postgres',
} as const

const client = await baseElectrify(
dbName,
dbDescription,
adapter,
socketFactory,
config,
configWithDialect,
{
migrator,
prepare,
Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/electric/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export const electrify = async <DB extends DbSchema<any>>(
configWithDefaults
)

const dialect = migrator.electricQueryBuilder.dialect
const dialect = configWithDefaults.replication.dialect
const electric = ElectricClient.create(
dbName,
dbDescription,
Expand Down
4 changes: 3 additions & 1 deletion clients/typescript/src/migrators/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import { SatOpMigrate } from '../_generated/protocol/satellite'
import { base64, getProtocolVersion } from '../util'
import { Migration } from './index'
import { generateTriggersForTable } from '../satellite/process'
import { QueryBuilder } from './query-builder'
import { sqliteBuilder, pgBuilder, QueryBuilder } from './query-builder'

export { sqliteBuilder, pgBuilder, QueryBuilder }

const metaDataSchema = z
.object({
Expand Down
2 changes: 1 addition & 1 deletion clients/typescript/src/satellite/oplog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ function deserialiseRow(str: string, rel: Pick<Relation, 'columns'>): Rec {

export const fromTransaction = (
transaction: DataTransaction,
relations: RelationsCache
relations: RelationsCache,
): OplogEntry[] => {
return transaction.changes.map((t) => {
const columnValues = t.record ? t.record : t.oldRecord!
Expand Down
5 changes: 5 additions & 0 deletions clients/typescript/src/satellite/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,7 @@ export class SatelliteProcess implements Satellite {
}

async _applyTransaction(transaction: Transaction) {
console.log("APPLY TX: " + JSON.stringify(transaction))
const origin = transaction.origin!
const commitTimestamp = new Date(transaction.commit_timestamp.toNumber())

Expand Down Expand Up @@ -1296,6 +1297,7 @@ export class SatelliteProcess implements Satellite {
const { statements, tablenames } = await this._apply(entries, origin)
entries.forEach((e) => opLogEntries.push(e))
statements.forEach((s) => {
console.log("DML stmt: " + JSON.stringify(s))
stmts.push(s)
})
tablenames.forEach((n) => tablenamesSet.add(n))
Expand All @@ -1305,6 +1307,7 @@ export class SatelliteProcess implements Satellite {
const affectedTables: Map<string, MigrationTable> = new Map()
changes.forEach((change) => {
const changeStmt = { sql: change.sql }
console.log("DDL stmt: " + JSON.stringify(changeStmt))
stmts.push(changeStmt)

if (
Expand Down Expand Up @@ -1372,11 +1375,13 @@ export class SatelliteProcess implements Satellite {
if (transaction.migrationVersion) {
// If a migration version is specified
// then the transaction is a migration
console.log("APPLYING MIGRATION")
await this.migrator.applyIfNotAlready({
statements: allStatements,
version: transaction.migrationVersion,
})
} else {
console.log("APPLYING TRANSACTION")
await this.adapter.runInTransaction(...allStatements)
}

Expand Down
5 changes: 3 additions & 2 deletions clients/typescript/test/support/node-postgres.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import fs from 'fs/promises'
import { ElectricDatabase } from '../../src/drivers/node-postgres'
import { createEmbeddedPostgres } from '../../src/drivers/node-postgres/database';

export async function makePgDatabase(
name: string,
port: number
): Promise<{ db: ElectricDatabase; stop: () => Promise<void> }> {
const db = await ElectricDatabase.init({
const { db, stop: stopPg } = await createEmbeddedPostgres({
name,
databaseDir: `./tmp-${name}`,
persistent: false,
port,
})

const stop = async () => {
await db.stop()
await stopPg()
await fs.rm(`./tmp-${name}`, { recursive: true, force: true })
}
return { db, stop }
Expand Down
6 changes: 6 additions & 0 deletions e2e/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ test_only:

test: deps pull test_only

test_pg:
DIALECT=Postgres make test

test_only_pg:
DIALECT=Postgres make test_only

pull:
docker compose -f services_templates.yaml pull \
postgresql
Expand Down
4 changes: 4 additions & 0 deletions e2e/common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ start_satellite_client_%:
docker compose -f ${DOCKER_COMPOSE_FILE} run \
--rm \
-e TERM=dumb \
-e DIALECT=${DIALECT} \
satellite_client_$*


Expand Down Expand Up @@ -134,3 +135,6 @@ single_test:

single_test_debug:
${LUX} --debug ${TEST}

single_test_pg:
DIALECT=Postgres ${LUX} --progress doc ${TEST}
3 changes: 3 additions & 0 deletions e2e/init.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
CREATE DATABASE e2e_client_1_db;
CREATE DATABASE e2e_client_2_db;

CREATE TABLE entries (
id UUID PRIMARY KEY,
content VARCHAR NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions e2e/satellite_client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"better-sqlite3": "^8.4.0",
"electric-sql": "workspace:*",
"jsonwebtoken": "^9.0.0",
"pg": "^8.11.3",
"uuid": "^9.0.0",
"zod": "^3.21.4"
},
Expand Down
Loading

0 comments on commit 2a25ca7

Please sign in to comment.