Skip to content

Commit

Permalink
coord: test that type OIDs match PostgreSQL
Browse files Browse the repository at this point in the history
And actually wire that test up in CI so that it doesn't break.
  • Loading branch information
benesch committed Feb 14, 2022
1 parent 0e05e03 commit 21943fc
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 62 deletions.
4 changes: 3 additions & 1 deletion ci/test/cargo-test/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from materialize import ROOT, ci_util, spawn
from materialize.mzcompose import Composition, Service
from materialize.mzcompose.services import Kafka, SchemaRegistry, Zookeeper
from materialize.mzcompose.services import Kafka, Postgres, SchemaRegistry, Zookeeper

SERVICES = [
Zookeeper(),
Kafka(),
SchemaRegistry(),
Postgres(),
Service(
name="ci-cargo-test",
config={
Expand All @@ -23,6 +24,7 @@
"ZOOKEEPER_ADDR=zookeeper:2181",
"KAFKA_ADDRS=kafka:9092",
"SCHEMA_REGISTRY_URL=http://schema-registry:8081",
"POSTGRES_URL=postgres://postgres:postgres@postgres",
"MZ_SOFT_ASSERTIONS=1",
"MZ_PERSIST_EXTERNAL_STORAGE_TEST_S3_BUCKET=mtlz-test-persist-1d-lifecycle-delete",
"AWS_DEFAULT_REGION",
Expand Down
5 changes: 1 addition & 4 deletions src/coord/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use mz_dataflow_types::{
sources::{AwsExternalId, SourceConnector, Timeline},
};
use mz_expr::{ExprHumanizer, GlobalId, MirScalarExpr, OptimizedMirRelationExpr};
use mz_pgrepr::oid::FIRST_USER_OID;
use mz_repr::{RelationDesc, ScalarType};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::{Expr, Raw};
Expand Down Expand Up @@ -76,10 +77,6 @@ pub use crate::catalog::error::ErrorKind;
const SYSTEM_CONN_ID: u32 = 0;
const SYSTEM_USER: &str = "mz_system";

// TODO@jldlaughlin: Better assignment strategy for system type OIDs.
// https://github.com/MaterializeInc/materialize/pull/4316#discussion_r496238962
pub const FIRST_USER_OID: u32 = 20_000;

/// A `Catalog` keeps track of the SQL objects known to the planner.
///
/// For each object, it keeps track of both forward and reverse dependencies:
Expand Down
223 changes: 168 additions & 55 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2232,108 +2232,221 @@ impl BUILTINS {
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::env;

use tokio_postgres::NoTls;

use mz_ore::task;
use mz_pgrepr::oid::FIRST_MATERIALIZE_OID;

use super::*;

// Connect to a running Postgres server and verify that our builtin functions
// match it, in addition to some other things. Ignored because we don't have a
// postgres server running during normal tests.
// Connect to a running Postgres server and verify that our builtin
// types and functions match it, in addition to some other things.
#[tokio::test]
#[ignore]
async fn verify_function_sanity() -> Result<(), anyhow::Error> {
async fn compare_builtins_postgres() -> Result<(), anyhow::Error> {
// Verify that all builtin functions:
// - have a unique OID
// - if they have a postgres counterpart (same oid) then they have matching name
// Note: Use Postgres 13 when testing because older version don't have all
// functions.
let (client, connection) =
tokio_postgres::connect("host=localhost user=postgres", NoTls).await?;
let (client, connection) = tokio_postgres::connect(
&env::var("POSTGRES_URL").unwrap_or_else(|_| "host=localhost user=postgres".into()),
NoTls,
)
.await?;

task::spawn(|| "verify_function_sanity", async move {
task::spawn(|| "compare_builtin_postgres", async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
panic!("connection error: {}", e);
}
});

let rows = client
.query(
"SELECT oid, proname, proargtypes, prorettype, proretset FROM pg_proc",
&[],
)
.await?;

struct PgProc {
name: String,
arg_oids: Vec<u32>,
ret_oid: Option<u32>,
ret_set: bool,
}

let mut pg_proc = HashMap::new();
for row in rows {
let oid: u32 = row.get("oid");
pg_proc.insert(
oid,
PgProc {
struct PgType {
name: String,
ty: String,
elem: u32,
array: u32,
}

let pg_proc: HashMap<_, _> = client
.query(
"SELECT oid, proname, proargtypes, prorettype, proretset FROM pg_proc",
&[],
)
.await?
.into_iter()
.map(|row| {
let oid: u32 = row.get("oid");
let pg_proc = PgProc {
name: row.get("proname"),
arg_oids: row.get("proargtypes"),
ret_oid: row.get("prorettype"),
ret_set: row.get("proretset"),
},
);
}
};
(oid, pg_proc)
})
.collect();

let mut oids = HashSet::new();
let pg_type: HashMap<_, _> = client
.query(
"SELECT oid, typname, typtype::text, typelem, typarray FROM pg_type",
&[],
)
.await?
.into_iter()
.map(|row| {
let oid: u32 = row.get("oid");
let pg_type = PgType {
name: row.get("typname"),
ty: row.get("typtype"),
elem: row.get("typelem"),
array: row.get("typarray"),
};
(oid, pg_type)
})
.collect();

let mut proc_oids = HashSet::new();
let mut type_oids = HashSet::new();

for builtin in BUILTINS.values() {
if let Builtin::Func(func) = builtin {
for imp in func.inner.func_impls() {
// Verify that all function OIDs are unique.
assert!(oids.insert(imp.oid), "{} reused oid {}", func.name, imp.oid);
match builtin {
Builtin::Type(ty) => {
// Verify that all type OIDs are unique.
assert!(
type_oids.insert(ty.oid),
"{} reused oid {}",
ty.name,
ty.oid
);

if imp.oid > 16_000 {
// High OIDS are reserved in materialize and don't have postgres counterparts.
if ty.oid >= FIRST_MATERIALIZE_OID {
// High OIDs are reserved in Materialize and don't have
// PostgreSQL counterparts.
continue;
}

// For functions that have a postgres counterpart, verify that the name and
// oid match.
let pg_fn = pg_proc.get(&imp.oid).unwrap_or_else(|| {
panic!("pg_proc missing function {}: oid {}", func.name, imp.oid)
// For types that have a PostgreSQL counterpart, verify that
// the name and oid match.
let pg_ty = pg_type.get(&ty.oid).unwrap_or_else(|| {
panic!("pg_proc missing type {}: oid {}", ty.name, ty.oid)
});
assert_eq!(
func.name, pg_fn.name,
"funcs with oid {} don't match names: {} in mz, {} in pg",
imp.oid, func.name, pg_fn.name
ty.name, pg_ty.name,
"oid {} has name {} in postgres; expected {}",
ty.oid, pg_ty.name, ty.name,
);

// Complain, but don't fail, if argument oids don't match.
// TODO: make these match.
if imp.arg_oids != pg_fn.arg_oids {
println!(
"funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
imp.oid, func.name, imp.arg_oids, pg_fn.arg_oids
);
// Ensure the type matches.
match ty.details.typ {
CatalogType::Array { element_id } => {
let elem_ty = match BUILTINS.get(&element_id) {
Some(Builtin::Type(ty)) => ty,
_ => panic!("{} is unexpectedly not a type", element_id),
};
assert_eq!(
pg_ty.elem, elem_ty.oid,
"type {} has mismatched element OIDs",
ty.name
)
}
CatalogType::Pseudo => {
assert_eq!(
pg_ty.ty, "p",
"type {} is not a pseudo type as expected",
ty.name
)
}
_ => {
assert_eq!(
pg_ty.ty, "b",
"type {} is not a base type as expected",
ty.name
)
}
}

if imp.return_oid != pg_fn.ret_oid {
println!(
"funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
imp.oid, func.name, imp.return_oid, pg_fn.ret_oid
);
// Ensure the array type reference is correct.
match ty.details.array_id {
Some(array_id) => {
let array_ty = match BUILTINS.get(&array_id) {
Some(Builtin::Type(ty)) => ty,
_ => panic!("{} is unexpectedly not a type", array_id),
};
assert_eq!(
pg_ty.array, array_ty.oid,
"type {} has mismatched array OIDs",
ty.name,
);
}
None => assert_eq!(
pg_ty.array, 0,
"type {} does not have an array type in mz but does in pg",
ty.name,
),
}
}
Builtin::Func(func) => {
continue;
for imp in func.inner.func_impls() {
// Verify that all function OIDs are unique.
assert!(
proc_oids.insert(imp.oid),
"{} reused oid {}",
func.name,
imp.oid
);

if imp.return_is_set != pg_fn.ret_set {
println!(
"funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
if imp.oid >= FIRST_MATERIALIZE_OID {
// High OIDs are reserved in Materialize and don't have
// postgres counterparts.
continue;
}

// For functions that have a postgres counterpart, verify that the name and
// oid match.
let pg_fn = pg_proc.get(&imp.oid).unwrap_or_else(|| {
panic!("pg_proc missing function {}: oid {}", func.name, imp.oid)
});
assert_eq!(
func.name, pg_fn.name,
"funcs with oid {} don't match names: {} in mz, {} in pg",
imp.oid, func.name, pg_fn.name
);

// Complain, but don't fail, if argument oids don't match.
// TODO: make these match.
if imp.arg_oids != pg_fn.arg_oids {
println!(
"funcs with oid {} ({}) don't match arguments: {:?} in mz, {:?} in pg",
imp.oid, func.name, imp.arg_oids, pg_fn.arg_oids
);
}

if imp.return_oid != pg_fn.ret_oid {
println!(
"funcs with oid {} ({}) don't match return types: {:?} in mz, {:?} in pg",
imp.oid, func.name, imp.return_oid, pg_fn.ret_oid
);
}

if imp.return_is_set != pg_fn.ret_set {
println!(
"funcs with oid {} ({}) don't match set-returning value: {:?} in mz, {:?} in pg",
imp.oid, func.name, imp.return_is_set, pg_fn.ret_set
);
}
}
}
_ => (),
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/pgrepr/src/oid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@

#![allow(missing_docs)]

//! Reserved OIDs through Materialized.
//! PostgreSQL OID constants.

/// The first OID that is assigned by Materialize rather than PostgreSQL.
pub const FIRST_MATERIALIZE_OID: u32 = 16384;

/// The first OID that is assigned to user objects rather than system builtins.
pub const FIRST_USER_OID: u32 = 20_000;

// Materialize-specific builtin OIDs.
pub const TYPE_LIST_OID: u32 = 16_384;
pub const TYPE_MAP_OID: u32 = 16_385;
pub const FUNC_CEIL_F32_OID: u32 = 16_386;
Expand Down Expand Up @@ -78,4 +86,3 @@ pub const FUNC_MZ_DATE_BIN_HOPPING_UNIX_EPOCH_TSTZ_OID: u32 = 16_450;
pub const FUNC_MZ_DATE_BIN_HOPPING_TS_OID: u32 = 16_451;
pub const FUNC_MZ_DATE_BIN_HOPPING_TSTZ_OID: u32 = 16_452;
pub const FUNC_MZ_TYPE_NAME: u32 = 16_453;
// next ID: 16_454

0 comments on commit 21943fc

Please sign in to comment.