Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alco/send enum sql to satellite #1166

Closed
wants to merge 11 commits into from
125 changes: 122 additions & 3 deletions clients/typescript/src/_generated/protocol/satellite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ export interface SatInStartReplicationReq {
* observed additional data before disconnect
*/
observedTransactionData: Long[];
/**
* The SQL dialect used by the client
* Defaults to SQLite if not specified
*/
sqlDialect?: SatInStartReplicationReq_Dialect | undefined;
}

export enum SatInStartReplicationReq_Option {
Expand All @@ -130,6 +135,12 @@ export enum SatInStartReplicationReq_Option {
UNRECOGNIZED = -1,
}

export enum SatInStartReplicationReq_Dialect {
SQLITE = 0,
POSTGRES = 1,
UNRECOGNIZED = -1,
}

/** (Producer) The result of the start replication requests */
export interface SatInStartReplicationResp {
$type: "Electric.Satellite.SatInStartReplicationResp";
Expand Down Expand Up @@ -418,22 +429,32 @@ export interface SatOpMigrate {
*/
version: string;
/**
* a list of sql ddl statements to apply, converted from the pg originals
* A list of SQL DDL statements to apply, translated from Postgres to SQLite dialect.
*
* The migration machinery converts an `ALTER TABLE action1, action2, action3;`
* query into a set of 3: `ALTER TABLE action1; ALTER TABLE action2,` etc
* so we need to support 1+ statements for every migration event.
*
* There is an exception for enum types. Since SQLite does not have a matching concept,
* the original Postgres DDL statement `CREATE TYPE ... AS ENUM (...)` is included as is,
* without translation.
*/
stmts: SatOpMigrate_Stmt[];
/**
* The resulting table definition after applying these migrations
* (a DDL statement can only affect one table at a time).
*/
table?: SatOpMigrate_Table | undefined;
table?:
| SatOpMigrate_Table
| undefined;
/** This field is set if stmts includes a single item which is an enum type definition. */
enumType?: SatOpMigrate_EnumType | undefined;
}

export enum SatOpMigrate_Type {
CREATE_TABLE = 0,
CREATE_INDEX = 1,
CREATE_ENUM_TYPE = 2,
ALTER_ADD_COLUMN = 6,
UNRECOGNIZED = -1,
}
Expand All @@ -457,9 +478,15 @@ export interface SatOpMigrate_PgColumnType {
size: number[];
}

/** reserved 2; */
export interface SatOpMigrate_Column {
$type: "Electric.Satellite.SatOpMigrate.Column";
name: string;
/**
* deprecated
* leaving it here to avoid breaking TypeScript tests that have hard-coded,
* base64-encoded SatOpMigrate messages.
*/
sqliteType: string;
pgType: SatOpMigrate_PgColumnType | undefined;
}
Expand All @@ -482,6 +509,12 @@ export interface SatOpMigrate_Table {
pks: string[];
}

export interface SatOpMigrate_EnumType {
$type: "Electric.Satellite.SatOpMigrate.EnumType";
name: string;
values: string[];
}

/** (Consumer) Request for new subscriptions */
export interface SatSubsReq {
$type: "Electric.Satellite.SatSubsReq";
Expand Down Expand Up @@ -1102,6 +1135,7 @@ function createBaseSatInStartReplicationReq(): SatInStartReplicationReq {
subscriptionIds: [],
schemaVersion: undefined,
observedTransactionData: [],
sqlDialect: undefined,
};
}

Expand All @@ -1128,6 +1162,9 @@ export const SatInStartReplicationReq = {
writer.uint64(v);
}
writer.ldelim();
if (message.sqlDialect !== undefined) {
writer.uint32(56).int32(message.sqlDialect);
}
return writer;
},

Expand Down Expand Up @@ -1193,6 +1230,13 @@ export const SatInStartReplicationReq = {
}

break;
case 7:
if (tag !== 56) {
break;
}

message.sqlDialect = reader.int32() as any;
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -1213,6 +1257,7 @@ export const SatInStartReplicationReq = {
message.subscriptionIds = object.subscriptionIds?.map((e) => e) || [];
message.schemaVersion = object.schemaVersion ?? undefined;
message.observedTransactionData = object.observedTransactionData?.map((e) => Long.fromValue(e)) || [];
message.sqlDialect = object.sqlDialect ?? undefined;
return message;
},
};
Expand Down Expand Up @@ -2709,7 +2754,7 @@ export const SatOpRow = {
messageTypeRegistry.set(SatOpRow.$type, SatOpRow);

function createBaseSatOpMigrate(): SatOpMigrate {
return { $type: "Electric.Satellite.SatOpMigrate", version: "", stmts: [], table: undefined };
return { $type: "Electric.Satellite.SatOpMigrate", version: "", stmts: [], table: undefined, enumType: undefined };
}

export const SatOpMigrate = {
Expand All @@ -2725,6 +2770,9 @@ export const SatOpMigrate = {
if (message.table !== undefined) {
SatOpMigrate_Table.encode(message.table, writer.uint32(26).fork()).ldelim();
}
if (message.enumType !== undefined) {
SatOpMigrate_EnumType.encode(message.enumType, writer.uint32(34).fork()).ldelim();
}
return writer;
},

Expand Down Expand Up @@ -2756,6 +2804,13 @@ export const SatOpMigrate = {

message.table = SatOpMigrate_Table.decode(reader, reader.uint32());
continue;
case 4:
if (tag !== 34) {
break;
}

message.enumType = SatOpMigrate_EnumType.decode(reader, reader.uint32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -2776,6 +2831,9 @@ export const SatOpMigrate = {
message.table = (object.table !== undefined && object.table !== null)
? SatOpMigrate_Table.fromPartial(object.table)
: undefined;
message.enumType = (object.enumType !== undefined && object.enumType !== null)
? SatOpMigrate_EnumType.fromPartial(object.enumType)
: undefined;
return message;
},
};
Expand Down Expand Up @@ -3168,6 +3226,67 @@ export const SatOpMigrate_Table = {

messageTypeRegistry.set(SatOpMigrate_Table.$type, SatOpMigrate_Table);

function createBaseSatOpMigrate_EnumType(): SatOpMigrate_EnumType {
return { $type: "Electric.Satellite.SatOpMigrate.EnumType", name: "", values: [] };
}

export const SatOpMigrate_EnumType = {
$type: "Electric.Satellite.SatOpMigrate.EnumType" as const,

encode(message: SatOpMigrate_EnumType, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.name !== "") {
writer.uint32(10).string(message.name);
}
for (const v of message.values) {
writer.uint32(18).string(v!);
}
return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): SatOpMigrate_EnumType {
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseSatOpMigrate_EnumType();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
if (tag !== 10) {
break;
}

message.name = reader.string();
continue;
case 2:
if (tag !== 18) {
break;
}

message.values.push(reader.string());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skipType(tag & 7);
}
return message;
},

create<I extends Exact<DeepPartial<SatOpMigrate_EnumType>, I>>(base?: I): SatOpMigrate_EnumType {
return SatOpMigrate_EnumType.fromPartial(base ?? {});
},

fromPartial<I extends Exact<DeepPartial<SatOpMigrate_EnumType>, I>>(object: I): SatOpMigrate_EnumType {
const message = createBaseSatOpMigrate_EnumType();
message.name = object.name ?? "";
message.values = object.values?.map((e) => e) || [];
return message;
},
};

messageTypeRegistry.set(SatOpMigrate_EnumType.$type, SatOpMigrate_EnumType);

function createBaseSatSubsReq(): SatSubsReq {
return { $type: "Electric.Satellite.SatSubsReq", subscriptionId: "", shapeRequests: [] };
}
Expand Down
10 changes: 9 additions & 1 deletion clients/typescript/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ export interface ElectricConfig {
connectionBackOffOptions?: ConnectionBackOffOptions
}

export type ElectricConfigWithDialect = ElectricConfig & {
dialect?: 'SQLite' | 'Postgres'
}

export type HydratedConfig = {
auth: AuthConfig
replication: {
host: string
port: number
ssl: boolean
timeout: number
dialect: 'SQLite' | 'Postgres'
}
debug: boolean
connectionBackOffOptions: ConnectionBackOffOptions
Expand All @@ -68,7 +73,9 @@ export type InternalElectricConfig = {
connectionBackOffOptions?: ConnectionBackOffOptions
}

export const hydrateConfig = (config: ElectricConfig): HydratedConfig => {
export const hydrateConfig = (
config: ElectricConfigWithDialect
): HydratedConfig => {
const auth = config.auth ?? {}

const debug = config.debug ?? false
Expand All @@ -86,6 +93,7 @@ export const hydrateConfig = (config: ElectricConfig): HydratedConfig => {
port: port,
ssl: sslEnabled,
timeout: config.timeout ?? 3000,
dialect: config.dialect ?? 'SQLite',
}

const {
Expand Down
4 changes: 2 additions & 2 deletions clients/typescript/src/electric/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ElectricConfig, hydrateConfig } from '../config/index'
import { ElectricConfigWithDialect, hydrateConfig } from '../config/index'
import { DatabaseAdapter } from './adapter'
import { BundleMigrator, Migrator } from '../migrators/index'
import { EventNotifier, Notifier } from '../notifiers/index'
Expand Down Expand Up @@ -47,7 +47,7 @@ export const electrify = async <DB extends DbSchema<any>>(
dbDescription: DB,
adapter: DatabaseAdapter,
socketFactory: SocketFactory,
config: ElectricConfig = {},
config: ElectricConfigWithDialect = {},
opts?: Omit<ElectrifyOptions, 'adapter' | 'socketFactory'>
): Promise<ElectricClient<DB>> => {
setLogLevel(config.debug ? 'TRACE' : 'WARN')
Expand Down
20 changes: 9 additions & 11 deletions clients/typescript/src/migrators/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ type ForeignKey = {
}

type ColumnName = string
type SQLiteType = string
type PgType = string
type ColumnType = {
sqliteType: SQLiteType
pgType: PgType
}
type ColumnType = string
type ColumnTypes = Record<ColumnName, ColumnType>

export type Table = {
Expand Down Expand Up @@ -283,17 +278,20 @@ function joinColsForJSON(
// Perform transformations on some columns to ensure consistent
// serializability into JSON
const transformIfNeeded = (col: string, targetedCol: string) => {
const tpes = colTypes[col]
const sqliteType = tpes.sqliteType
const pgType = tpes.pgType
const pgType = colTypes[col]

// cast REALs, INT8s, BIGINTs to TEXT to work around SQLite's `json_object` bug
if (sqliteType === 'REAL' || pgType === 'INT8' || pgType === 'BIGINT') {
if (
pgType === 'FLOAT4' ||
pgType === 'REAL' ||
pgType === 'INT8' ||
pgType === 'BIGINT'
) {
return `cast(${targetedCol} as TEXT)`
}

// transform blobs/bytestrings into hexadecimal strings for JSON encoding
if (sqliteType === 'BLOB' || pgType === 'BYTEA') {
if (pgType === 'BYTEA') {
return `CASE WHEN ${targetedCol} IS NOT NULL THEN hex(${targetedCol}) ELSE NULL END`
}
return targetedCol
Expand Down
12 changes: 11 additions & 1 deletion clients/typescript/src/satellite/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
Root,
RootClientImpl,
SatRpcRequest,
SatInStartReplicationReq_Dialect,
} from '../_generated/protocol/satellite'
import {
getObjFromString,
Expand Down Expand Up @@ -131,6 +132,7 @@ type EventEmitter = AsyncEventEmitter<Events>

export class SatelliteClient implements Client {
private opts: Required<SatelliteClientOpts>
private dialect: SatInStartReplicationReq_Dialect

private emitter: EventEmitter

Expand Down Expand Up @@ -194,6 +196,10 @@ export class SatelliteClient implements Client {
this.emitter = new AsyncEventEmitter<Events>()

this.opts = { ...satelliteClientDefaults, ...opts }
this.dialect =
opts.dialect === 'SQLite'
? SatInStartReplicationReq_Dialect.SQLITE
: SatInStartReplicationReq_Dialect.POSTGRES
this.socketFactory = socketFactory

this.inbound = this.resetInboundReplication()
Expand Down Expand Up @@ -365,7 +371,10 @@ export class SatelliteClient implements Client {
)
)
}
request = SatInStartReplicationReq.fromPartial({ schemaVersion })
request = SatInStartReplicationReq.fromPartial({
schemaVersion,
sqlDialect: this.dialect,
})
} else {
Log.info(
`starting replication with lsn: ${base64.fromBytes(
Expand All @@ -376,6 +385,7 @@ export class SatelliteClient implements Client {
lsn,
subscriptionIds,
observedTransactionData,
sqlDialect: this.dialect,
})
}

Expand Down
1 change: 1 addition & 0 deletions clients/typescript/src/satellite/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface SatelliteClientOpts {
ssl: boolean
timeout: number
pushPeriod?: number
dialect: 'SQLite' | 'Postgres'
}

export const validateConfig = (config: any) => {
Expand Down
Loading
Loading