Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
00e9a9c
Added PostgresSchemaRegistry.java
suddendust Dec 28, 2025
31846e9
Spotless
suddendust Dec 28, 2025
2fdbf0e
WIP
suddendust Dec 29, 2025
1727dd0
Spotless
suddendust Dec 29, 2025
a62fbc2
Remove unused method in SchemaRegistry
suddendust Dec 29, 2025
6b7595b
Remove unused method in ColumnMetadata
suddendust Dec 29, 2025
7b4ef2a
WIP
suddendust Dec 29, 2025
598cb25
WIP
suddendust Dec 29, 2025
9c173b9
Configure cache expiry and cooldown
suddendust Dec 29, 2025
7bf77c5
Added PostgresMetadataFetcherTest
suddendust Dec 29, 2025
6d03cd5
WIP
suddendust Dec 29, 2025
c3f5f7e
Added docs on thread safety
suddendust Dec 29, 2025
827381f
Added PostgresSchemaRegistryIntegrationTest.java
suddendust Dec 29, 2025
602037b
WIP
suddendust Dec 29, 2025
c8a53eb
WIP
suddendust Dec 30, 2025
31f16e2
Refactor
suddendust Jan 4, 2026
c139bee
Merge branch 'schema_cache' into pg_write_create
suddendust Jan 4, 2026
75150e3
Merge branch 'main' of github.com:hypertrace/document-store into sche…
suddendust Jan 11, 2026
5412f9b
Merge branch 'schema_cache' into pg_write_create
suddendust Jan 12, 2026
bf5ca5c
WIP
suddendust Jan 12, 2026
57b623a
Implement create for flat collections
suddendust Jan 12, 2026
233c9c4
Merge branch 'main' of github.com:hypertrace/document-store into pg_w…
suddendust Jan 12, 2026
9f8811e
Fix compilation issue
suddendust Jan 12, 2026
bfd6651
Refactor
suddendust Jan 14, 2026
70ec4b3
Enhanced CreateResult.java and others
suddendust Jan 14, 2026
6d1c277
Added more test cases
suddendust Jan 14, 2026
910ef8c
Spotless
suddendust Jan 14, 2026
3e2c178
WIP
suddendust Jan 14, 2026
9061e24
Add more test coverage
suddendust Jan 14, 2026
c3024b0
Added `bestEfforts` configuration to PG custom parameters.
suddendust Jan 16, 2026
900c87f
Create MissingColumnStrategy.java
suddendust Jan 19, 2026
daac3c1
Merge branch 'main' of github.com:hypertrace/document-store into pg_w…
suddendust Jan 19, 2026
5872b2c
Implement createOrReplace
suddendust Jan 19, 2026
b624905
Remove inadvertent change
suddendust Jan 19, 2026
e604144
WIP
suddendust Jan 19, 2026
91fb483
WIP
suddendust Jan 20, 2026
bc73765
Merge branch 'main' of github.com:hypertrace/document-store into pg_w…
suddendust Jan 20, 2026
a4f72e2
WIP
suddendust Jan 21, 2026
40adf62
Fix failing test case
suddendust Jan 21, 2026
06d7351
Remove createAndReplaceAndReturn impl
suddendust Jan 21, 2026
aaef758
Fix failing test cases
suddendust Jan 21, 2026
4cba90d
wip
Jan 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public interface ColumnMetadata {
* @return whether this column is an array type
*/
boolean isArray();

boolean isPrimaryKey();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ public interface SchemaRegistry<T extends ColumnMetadata> {
* @return optional of the col metadata.
*/
Optional<T> getColumnOrRefresh(String tableName, String colName);

/**
* Returns the primary key column name for the given table.
*
* @param tableName the table name
* @return optional of the primary key column name
*/
Optional<String> getPrimaryKeyColumn(String tableName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,10 @@ public enum MissingColumnStrategy {
* a field doesn't match the schema. The write operation will fail.
*/
THROW,
IGNORE_DOCUMENT
/** Ignore the entire document if it doesn't match the schema. */
IGNORE_DOCUMENT;

public static MissingColumnStrategy defaultStrategy() {
return SKIP;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
Expand All @@ -16,15 +17,9 @@
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.hypertrace.core.documentstore.BulkArrayValueUpdateRequest;
import org.hypertrace.core.documentstore.BulkDeleteResult;
import org.hypertrace.core.documentstore.BulkUpdateRequest;
Expand Down Expand Up @@ -52,6 +47,7 @@
import org.hypertrace.core.documentstore.postgres.update.FlatUpdateContext;
import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocSetOperatorParser;
import org.hypertrace.core.documentstore.postgres.update.parser.FlatCollectionSubDocUpdateOperatorParser;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;
import org.hypertrace.core.documentstore.query.Query;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand All @@ -72,6 +68,7 @@ public class FlatPostgresCollection extends PostgresCollection {
private static final String WRITE_NOT_SUPPORTED =
"Write operations are not supported for flat collections yet!";
private static final String MISSING_COLUMN_STRATEGY_CONFIG = "missingColumnStrategy";
private static final String DEFAULT_PRIMARY_KEY_COLUMN = "key";

private static final Map<UpdateOperator, FlatCollectionSubDocUpdateOperatorParser>
SUB_DOC_UPDATE_PARSERS = Map.of(SET, new FlatCollectionSubDocSetOperatorParser());
Expand All @@ -88,24 +85,35 @@ public class FlatPostgresCollection extends PostgresCollection {
final PostgresClient client,
final String collectionName,
final PostgresLazyilyLoadedSchemaRegistry schemaRegistry) {
this(client, collectionName, schemaRegistry, null);
}

FlatPostgresCollection(
final PostgresClient client,
final String collectionName,
final PostgresLazyilyLoadedSchemaRegistry schemaRegistry,
final MissingColumnStrategy missingColumnStrategy) {
super(client, collectionName);
this.schemaRegistry = schemaRegistry;
this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters());
this.missingColumnStrategy =
missingColumnStrategy != null
? missingColumnStrategy
: parseMissingColumnStrategy(client.getCustomParameters());
}

private static MissingColumnStrategy parseMissingColumnStrategy(Map<String, String> params) {
String value = params.get(MISSING_COLUMN_STRATEGY_CONFIG);
if (value == null || value.isEmpty()) {
return MissingColumnStrategy.SKIP; // default
return MissingColumnStrategy.defaultStrategy();
}
try {
return MissingColumnStrategy.valueOf(value.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.warn(
"Invalid missingColumnStrategy value: '{}', using default SKIP. Valid values: {}",
value,
java.util.Arrays.toString(MissingColumnStrategy.values()));
return MissingColumnStrategy.SKIP;
Arrays.toString(MissingColumnStrategy.values()));
return MissingColumnStrategy.defaultStrategy();
}
}

Expand Down Expand Up @@ -215,7 +223,7 @@ public CreateResult create(Key key, Document document) throws IOException {

@Override
public boolean createOrReplace(Key key, Document document) throws IOException {
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
return createOrReplaceWithRetry(key, document, false);
}

@Override
Expand Down Expand Up @@ -487,6 +495,12 @@ private CreateResult createWithRetry(Key key, Document document, boolean isRetry
try {
TypedDocument parsed = parseDocument(document, tableName, skippedFields);

// Add the key as the primary key column
String pkColumn = getPKForTable(tableName);
String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn);
PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn);
parsed.add(quotedPkColumn, key.toString(), pkType, false);

// If IGNORE_DOCUMENT strategy and any fields were skipped, ignore the entire document
if (missingColumnStrategy == MissingColumnStrategy.IGNORE_DOCUMENT
&& !skippedFields.isEmpty()) {
Expand All @@ -495,12 +509,6 @@ private CreateResult createWithRetry(Key key, Document document, boolean isRetry
return new CreateResult(CreateStatus.IGNORED, isRetry, skippedFields);
}

// if there are no valid columns in the document
if (parsed.isEmpty()) {
LOGGER.warn("No valid columns found in the document for table: {}", tableName);
return new CreateResult(CreateStatus.FAILED, isRetry, skippedFields);
}

String sql = buildInsertSql(parsed.getColumns());
LOGGER.debug("Insert SQL: {}", sql);

Expand Down Expand Up @@ -543,12 +551,18 @@ private TypedDocument parseDocument(
continue;
}

if (columnMetadata.get().isPrimaryKey()) {
// PK is added by the caller
continue;
}

PostgresDataType type = columnMetadata.get().getPostgresType();
boolean isArray = columnMetadata.get().isArray();

try {
Object value = extractValue(fieldValue, type, isArray);
typedDocument.add("\"" + fieldName + "\"", value, type, isArray);
typedDocument.add(
PostgresUtils.wrapFieldNamesWithDoubleQuotes(fieldName), value, type, isArray);
} catch (Exception e) {
if (missingColumnStrategy == MissingColumnStrategy.THROW) {
throw new SchemaMismatchException(
Expand Down Expand Up @@ -589,12 +603,132 @@ private int executeUpdate(String sql, TypedDocument parsed) throws SQLException
}
}

private boolean createOrReplaceWithRetry(Key key, Document document, boolean isRetry)
throws IOException {
String tableName = tableIdentifier.getTableName();
List<String> skippedFields = new ArrayList<>();

try {
TypedDocument parsed = parseDocument(document, tableName, skippedFields);

// Add the key as the primary key column
String pkColumn = getPKForTable(tableName);
String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn);
PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn);
parsed.add(quotedPkColumn, key.toString(), pkType, false);

String sql = buildUpsertSql(parsed.getColumns(), quotedPkColumn);
LOGGER.debug("Upsert SQL: {}", sql);

return executeUpsert(sql, parsed);

} catch (PSQLException e) {
return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry);
} catch (SQLException e) {
LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e);
throw new IOException(e);
}
}

/**
* Builds a PostgreSQL upsert (INSERT ... ON CONFLICT DO UPDATE) SQL statement.
*
* <p>This method constructs an atomic upsert query that:
*
* <ul>
* <li>Inserts a new row if no conflict on the primary key
* <li>Updates all non-PK columns if a row with the same PK already exists
* </ul>
*
* <p><b>Generated SQL pattern:</b>
*
* <pre>{@code
* INSERT INTO table (col1, col2, pk_col)
* VALUES (?, ?, ?)
* ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
* RETURNING (xmax = 0) AS is_insert
* }</pre>
*
* <p><b>The EXCLUDED table:</b> In PostgreSQL's ON CONFLICT clause, {@code EXCLUDED} is a special
* table that references the row that would have been inserted (the "proposed" row). This allows
* us to update existing rows with the new values without re-specifying them.
*
* <p><b>The RETURNING clause:</b> {@code (xmax = 0) AS is_insert} is a PostgreSQL trick to
* determine if the operation was an INSERT or UPDATE:
*
* <ul>
* <li>{@code xmax} is a system column that stores the transaction ID of the deleting/updating
* transaction
* <li>For a freshly inserted row, {@code xmax = 0} (no prior transaction modified it)
* <li>For an updated row, {@code xmax != 0} (the UPDATE sets it to the current transaction ID)
* <li>Thus, {@code is_insert = true} means INSERT, {@code is_insert = false} means UPDATE
* </ul>
*
* @param columns List of quoted column names to include in the upsert (including PK)
* @param pkColumn The quoted primary key column name used for conflict detection
* @return The complete upsert SQL statement with placeholders for values
*/
private String buildUpsertSql(List<String> columns, String pkColumn) {
String columnList = String.join(", ", columns);
String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new));

// Build SET clause for non-PK columns: col = EXCLUDED.col
String setClause =
columns.stream()
.filter(col -> !col.equals(pkColumn))
.map(col -> col + " = EXCLUDED." + col)
.collect(Collectors.joining(", "));

return String.format(
"INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s RETURNING (xmax = 0) AS is_insert",
tableIdentifier, columnList, placeholders, pkColumn, setClause);
}

private boolean executeUpsert(String sql, TypedDocument parsed) throws SQLException {
try (Connection conn = client.getPooledConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
int index = 1;
for (String column : parsed.getColumns()) {
setParameter(
conn,
ps,
index++,
parsed.getValue(column),
parsed.getType(column),
parsed.isArray(column));
}
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
// is_insert is true if xmax = 0 (new row), false if updated. This helps us differentiate
// b/w creates/upserts
return rs.getBoolean("is_insert");
}
}
return false;
}
}

private boolean handlePSQLExceptionForUpsert(
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
throws IOException {
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
LOGGER.info(
"Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}",
e.getSQLState(),
key);
schemaRegistry.invalidate(tableName);
return createOrReplaceWithRetry(key, document, true);
}
LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e);
throw new IOException(e);
}

private CreateResult handlePSQLExceptionForCreate(
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
throws IOException {
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
LOGGER.info(
"Schema mismatch detected (SQLState: {}), refreshing schema and retrying. key: {}",
"Schema mismatch detected during create (SQLState: {}), refreshing schema and retrying. key: {}",
e.getSQLState(),
key);
schemaRegistry.invalidate(tableName);
Expand All @@ -613,6 +747,17 @@ private boolean shouldRefreshSchemaAndRetry(String sqlState) {
|| PSQLState.DATATYPE_MISMATCH.getState().equals(sqlState);
}

private String getPKForTable(String tableName) {
return schemaRegistry.getPrimaryKeyColumn(tableName).orElse(DEFAULT_PRIMARY_KEY_COLUMN);
}

private PostgresDataType getPrimaryKeyType(String tableName, String pkColumn) {
return schemaRegistry
.getColumnOrRefresh(tableName, pkColumn)
.map(PostgresColumnMetadata::getPostgresType)
.orElse(PostgresDataType.TEXT);
}

/**
* Typed document contains field information along with the field type. Uses LinkedHashMaps keyed
* by column name. LinkedHashMap preserves insertion order for consistent parameter binding.
Expand All @@ -629,10 +774,6 @@ void add(String column, Object value, PostgresDataType type, boolean isArray) {
arrays.put(column, isArray);
}

boolean isEmpty() {
return values.isEmpty();
}

List<String> getColumns() {
return new ArrayList<>(values.keySet());
}
Expand Down Expand Up @@ -721,8 +862,9 @@ private void setParameter(
}

if (isArray) {
Object[] arrayValues = (value instanceof Object[]) ? (Object[]) value : new Object[] {value};
java.sql.Array sqlArray = conn.createArrayOf(type.getSqlType(), arrayValues);
// todo: Maybe check if the value is actually an array
Object[] arrayValues = (Object[]) value;
Array sqlArray = conn.createArrayOf(type.getSqlType(), arrayValues);
ps.setArray(index, sqlArray);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.hypertrace.core.documentstore.model.config.ConnectionConfig;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig;
import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig;
import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy;
import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -181,6 +182,15 @@ public Collection getCollectionForType(String collectionName, DocumentType docum
}
}

public Collection getFlatCollection(
String collectionName, MissingColumnStrategy missingColumnStrategy) {
return new FlatPostgresCollection(
client,
collectionName,
(PostgresLazyilyLoadedSchemaRegistry) schemaRegistry,
missingColumnStrategy);
}

@Override
public boolean healthCheck() {
String healthCheckSql = "SELECT 1;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,19 @@ private boolean canRefresh(String tableName) {
}
return Duration.between(lastRefresh, Instant.now()).compareTo(refreshCooldown) >= 0;
}

/**
* Returns the primary key column name for the given table.
*
* @param tableName the name of the table
* @return optional of the primary key column name, or empty if no primary key is found
*/
@Override
public Optional<String> getPrimaryKeyColumn(String tableName) {
Map<String, PostgresColumnMetadata> schema = getSchema(tableName);
return schema.values().stream()
.filter(PostgresColumnMetadata::isPrimaryKey)
.map(PostgresColumnMetadata::getName)
.findFirst();
}
}
Loading
Loading