From 4c9d45cadea8a7ecb45480f640d7023b838cc06e Mon Sep 17 00:00:00 2001 From: Saif Date: Thu, 16 Nov 2023 23:50:41 -0800 Subject: [PATCH] feat: Added logs for schemaReader, schemaWriter, tenantReader, tenantWriter and watch. --- internal/storage/postgres/schemaReader.go | 89 +++++++++++++++++++++++ internal/storage/postgres/schemaWriter.go | 13 ++++ internal/storage/postgres/tenantReader.go | 27 +++++++ internal/storage/postgres/tenantWriter.go | 20 +++++ internal/storage/postgres/watch.go | 48 ++++++++++++ 5 files changed, 197 insertions(+) diff --git a/internal/storage/postgres/schemaReader.go b/internal/storage/postgres/schemaReader.go index 363d2842c..e46d82d87 100644 --- a/internal/storage/postgres/schemaReader.go +++ b/internal/storage/postgres/schemaReader.go @@ -5,6 +5,8 @@ import ( "database/sql" "errors" + "log/slog" + "github.com/Masterminds/squirrel" "go.opentelemetry.io/otel/codes" @@ -34,6 +36,8 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) ctx, span := tracer.Start(ctx, "schema-reader.read-schema") defer span.End() + slog.Info("Reading schema: ", slog.Any("tenant_id", tenantID), slog.Any("version", version)) + builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID}) var query string @@ -43,14 +47,22 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error in building SQL query: ", slog.Any("error", err)) + return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String()) } + slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args)) + var rows *sql.Rows rows, err = r.database.DB.QueryContext(ctx, query, args...) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error in executing query: ", slog.Any("error", err)) + return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } defer rows.Close() @@ -62,6 +74,9 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error scanning rows: ", slog.Any("error", err)) + return nil, err } definitions = append(definitions, sd.Serialized()) @@ -69,16 +84,26 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) if err = rows.Err(); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error iterating over rows: ", slog.Any("error", err)) + return nil, err } + slog.Info("Successfully retrieved", slog.Any("schema definitions", len(definitions))) + sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Failed while creating schema from definitions: ", slog.Any("error", err)) + return nil, err } + slog.Info("Successfully created schema.") + return sch, err } @@ -87,6 +112,8 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, ctx, span := tracer.Start(ctx, "schema-reader.read-entity-definition") defer span.End() + slog.Info("Reading entity definition: ", slog.Any("tenant_id", tenantID), slog.Any("version", version)) + builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1) var query string @@ -96,14 +123,22 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error building SQL query: ", slog.Any("error", err)) + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String()) } + slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args)) + var def storage.SchemaDefinition row := r.database.DB.QueryRowContext(ctx, query, args...) if err = row.Err(); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error executing query: ", slog.Any("error", err)) + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } @@ -113,6 +148,9 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, if errors.Is(err, sql.ErrNoRows) { return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String()) } + + slog.Error("Error scanning rows: ", slog.Any("error", err)) + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCAN.String()) } @@ -121,10 +159,16 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Failed while creating schema from definitions: ", slog.Any("error", err)) + return nil, "", err } definition, err = schema.GetEntityByName(sch, name) + + slog.Info("Successfully retrieved", slog.Any("schema definition", definition)) + return definition, def.Version, err } @@ -133,6 +177,8 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v ctx, span := tracer.Start(ctx, "schema-reader.read-rule-definition") defer span.End() + slog.Info("Reading rule definition: ", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version)) + builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1) var query string @@ -142,35 +188,59 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error building SQL query: ", slog.Any("error", err)) + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String()) } + slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args)) + var def storage.SchemaDefinition row := r.database.DB.QueryRowContext(ctx, query, args...) if err = row.Err(); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error executing query: ", slog.Any("error", err)) + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil { + + slog.Error("Error scanning row: ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(codes.Error, err.Error()) if errors.Is(err, sql.ErrNoRows) { + + slog.Error("Rule not found in the database") + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String()) } + + slog.Error("Error scanning row values: ", slog.Any("error", err)) + return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCAN.String()) } + slog.Info("Successfully retrieved rule definition for: ", slog.Any("name", name)) + var sch *base.SchemaDefinition sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized()) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error creating schema from definition: ", slog.Any("error", err)) + return nil, "", err } definition, err = schema.GetRuleByName(sch, name) + slog.Info("Successfully created rule definition") + return definition, def.Version, err } @@ -179,26 +249,45 @@ func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (versio ctx, span := tracer.Start(ctx, "schema-reader.head-version") defer span.End() + slog.Info("Finding the latest version fo the schema for: ", slog.String("tenant_id", tenantID)) + var query string var args []interface{} query, args, err = r.database.Builder. Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1). ToSql() if err != nil { + + slog.Error("Failed to build SQL query: ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String()) } + + slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args)) + row := r.database.DB.QueryRowContext(ctx, query, args...) err = row.Scan(&version) if err != nil { + + slog.Error("Error while scanning row: ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(codes.Error, err.Error()) if errors.Is(err, sql.ErrNoRows) { + + slog.Error("Schema not found in the database.") + return "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String()) } + + slog.Error("Error while scanning row values: ", slog.Any("error", err)) + return "", err } + slog.Info("Successfully found the latest schema version: ", slog.Any("version", version)) + return version, nil } diff --git a/internal/storage/postgres/schemaWriter.go b/internal/storage/postgres/schemaWriter.go index ee153d994..5ad768f60 100644 --- a/internal/storage/postgres/schemaWriter.go +++ b/internal/storage/postgres/schemaWriter.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "log/slog" otelCodes "go.opentelemetry.io/otel/codes" @@ -32,6 +33,8 @@ func (w *SchemaWriter) WriteSchema(ctx context.Context, schemas []storage.Schema ctx, span := tracer.Start(ctx, "schema-writer.write-schema") defer span.End() + slog.Info("Writing schemas to the database", slog.Any("number_of_schemas", len(schemas))) + insertBuilder := w.database.Builder.Insert(SchemaDefinitionTable).Columns("name, serialized_definition, version, tenant_id") for _, schema := range schemas { @@ -43,17 +46,27 @@ func (w *SchemaWriter) WriteSchema(ctx context.Context, schemas []storage.Schema query, args, err = insertBuilder.ToSql() if err != nil { + + slog.Error("Error while building SQL query: ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(otelCodes.Error, err.Error()) return errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String()) } + slog.Debug("Executing SQL insert query: ", slog.Any("query", query), slog.Any("arguments", args)) + _, err = w.database.DB.ExecContext(ctx, query, args...) if err != nil { + + slog.Error("Failed to execute insert query: ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(otelCodes.Error, err.Error()) return err } + slog.Info("Successfully wrote schemas to the database. ", slog.Any("number_of_schemas", len(schemas))) + return nil } diff --git a/internal/storage/postgres/tenantReader.go b/internal/storage/postgres/tenantReader.go index 957695698..d2dc07cb3 100644 --- a/internal/storage/postgres/tenantReader.go +++ b/internal/storage/postgres/tenantReader.go @@ -5,6 +5,8 @@ import ( "database/sql" "errors" + "log/slog" + "github.com/Masterminds/squirrel" "go.opentelemetry.io/otel/codes" @@ -34,11 +36,16 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi ctx, span := tracer.Start(ctx, "tenant-reader.list-tenants") defer span.End() + slog.Info("Listing tenants with pagination: ", slog.Any("pagination", pagination)) + builder := r.database.Builder.Select("id, name, created_at").From(TenantsTable) if pagination.Token() != "" { var t database.ContinuousToken t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode() if err != nil { + + slog.Error("Failed to decode pagination token. ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return nil, nil, err @@ -55,14 +62,22 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error while building SQL query: ", slog.Any("error", err)) + return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String()) } + slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args)) + var rows *sql.Rows rows, err = r.database.DB.QueryContext(ctx, query, args...) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Failed to execute query: ", slog.Any("error", err)) + return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } defer rows.Close() @@ -75,6 +90,9 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Failed to scan rows: ", slog.Any("error", err)) + return nil, nil, err } lastID = sd.ID @@ -83,12 +101,21 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi if err = rows.Err(); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + + slog.Error("Error iterating over rows: ", slog.Any("error", err)) + return nil, nil, err } + slog.Info("Successfully listed tenants. ", slog.Any("number_of_tenants", len(tenants))) + if len(tenants) > int(pagination.PageSize()) { + + slog.Info("Returning tenants with a continuous token. ", slog.Any("page_size", pagination.PageSize())) return tenants[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil } + slog.Info("Returning all tenants with no continuous token.") + return tenants, database.NewNoopContinuousToken().Encode(), nil } diff --git a/internal/storage/postgres/tenantWriter.go b/internal/storage/postgres/tenantWriter.go index 2b5d45ea6..d3452319c 100644 --- a/internal/storage/postgres/tenantWriter.go +++ b/internal/storage/postgres/tenantWriter.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "log/slog" + "github.com/Masterminds/squirrel" otelCodes "go.opentelemetry.io/otel/codes" "google.golang.org/protobuf/types/known/timestamppb" @@ -35,20 +37,33 @@ func (w *TenantWriter) CreateTenant(ctx context.Context, id, name string) (resul ctx, span := tracer.Start(ctx, "tenant-writer.create-tenant") defer span.End() + slog.Info("Creating new Tenant: ", slog.Any("id", id), slog.Any("name", name)) + var createdAt time.Time query := w.database.Builder.Insert(TenantsTable).Columns("id, name").Values(id, name).Suffix("RETURNING created_at").RunWith(w.database.DB) err = query.QueryRowContext(ctx).Scan(&createdAt) if err != nil { + + slog.Error("Error while creating tenant: ", slog.Any("error", err)) + span.RecordError(err) span.SetStatus(otelCodes.Error, err.Error()) if strings.Contains(err.Error(), "duplicate key value") { + + slog.Error("Duplicate key violation: Tenant with ID already exists", slog.Any("id", id)) + return nil, errors.New(base.ErrorCode_ERROR_CODE_UNIQUE_CONSTRAINT.String()) } + + slog.Error("Error executing query: ", slog.Any("error", err)) + return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } + slog.Info("Successfully created Tenant", slog.Any("id", id), slog.Any("name", name), slog.Any("createdAt", createdAt)) + return &base.Tenant{ Id: id, Name: name, @@ -61,17 +76,22 @@ func (w *TenantWriter) DeleteTenant(ctx context.Context, tenantID string) (resul ctx, span := tracer.Start(ctx, "tenant-writer.delete-tenant") defer span.End() + slog.Info("Deleting Tenant: ", slog.Any("tenant_id", tenantID)) + var name string var createdAt time.Time query := w.database.Builder.Delete(TenantsTable).Where(squirrel.Eq{"id": tenantID}).Suffix("RETURNING name, created_at").RunWith(w.database.DB) err = query.QueryRowContext(ctx).Scan(&name, &createdAt) if err != nil { + slog.Error("Error while deleting tenant: ", slog.Any("error", err)) span.RecordError(err) span.SetStatus(otelCodes.Error, err.Error()) return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } + slog.Info("Successfully deleted Tenant") + return &base.Tenant{ Id: tenantID, Name: name, diff --git a/internal/storage/postgres/watch.go b/internal/storage/postgres/watch.go index 254753d30..2b786e777 100644 --- a/internal/storage/postgres/watch.go +++ b/internal/storage/postgres/watch.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "log/slog" "time" "google.golang.org/protobuf/proto" @@ -49,12 +50,17 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base. changes := make(chan *base.DataChanges, w.bufferSize) errs := make(chan error, 1) + slog.Info("Watching for changes in the database. ", slog.Any("tenant_id", tenantID), slog.Any("snapshot", snap)) + // Decode the snapshot value. // The snapshot value represents a point in the history of the database. st, err := snapshot.EncodedToken{Value: snap}.Decode() if err != nil { // If there is an error in decoding the snapshot, send the error and return. errs <- err + + slog.Error("Failed to decode snapshot.", slog.Any("error", err)) + return changes, errs } @@ -73,6 +79,9 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base. recentIDs, err := w.getRecentXIDs(ctx, cr, tenantID) if err != nil { // If there is an error in getting recent transaction IDs, send the error and return. + + slog.Error("Error getting recent transaction. ", slog.Any("error", err)) + errs <- err return } @@ -83,6 +92,9 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base. updates, err := w.getChanges(ctx, id, tenantID) if err != nil { // If there is an error in getting the changes, send the error and return. + + slog.Error("Failed to get changes for transaction. ", slog.Any("id", id), slog.Any("error", err)) + errs <- err return } @@ -90,7 +102,9 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base. // Send the changes, but respect the context cancellation. select { case changes <- updates: // Send updates to the changes channel. + slog.Info("Sent updates to the changes channel for transaction. ", slog.Any("id", id)) case <-ctx.Done(): // If the context is done, send an error and return. + slog.Error("Context canceled, stopping watch.") errs <- errors.New(base.ErrorCode_ERROR_CODE_CANCELLED.String()) return } @@ -105,7 +119,9 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base. select { case <-sleep.C: // If the timer is done, continue the loop. + slog.Info("No recent transaction IDs, waiting for changes...") case <-ctx.Done(): // If the context is done, send an error and return. + slog.Error("Context canceled, stopping watch.") errs <- errors.New(base.ErrorCode_ERROR_CODE_CANCELLED.String()) return } @@ -113,6 +129,8 @@ func (w *Watch) Watch(ctx context.Context, tenantID, snap string) (<-chan *base. } }() + slog.Info("Watch started successfully.") + // Return the channels that the caller will listen to for changes and errors. return changes, errs } @@ -146,12 +164,20 @@ func (w *Watch) getRecentXIDs(ctx context.Context, value uint64, tenantID string // Convert the builder to a SQL query and arguments. query, args, err := builder.ToSql() if err != nil { + + slog.Error("Error while building SQL query. ", slog.Any("error", err)) + return nil, err } + slog.Debug("Executing SQL query to get recent transaction: ", slog.Any("query", query), slog.Any("arguments", args)) + // Execute the SQL query. rows, err := w.database.DB.QueryContext(ctx, query, args...) if err != nil { + + slog.Error("Failed to execute SQL query. ", slog.Any("error", err)) + return nil, err } defer rows.Close() @@ -162,6 +188,9 @@ func (w *Watch) getRecentXIDs(ctx context.Context, value uint64, tenantID string var xid types.XID8 err := rows.Scan(&xid) if err != nil { + + slog.Error("Error while scanning row. ", slog.Any("error", err)) + return nil, err } xids = append(xids, xid) @@ -170,9 +199,13 @@ func (w *Watch) getRecentXIDs(ctx context.Context, value uint64, tenantID string // Check for errors that could have occurred during iteration. err = rows.Err() if err != nil { + + slog.Error("Failed to iterate over rows. ", slog.Any("error", err)) + return nil, err } + slog.Info("Successfully retrieved recent transaction. ", slog.Any("ids", xids)) return xids, nil } @@ -188,6 +221,8 @@ func (w *Watch) getChanges(ctx context.Context, value types.XID8, tenantID strin // Initialize a new TupleChanges instance. changes := &base.DataChanges{} + slog.Info("Retrieving changes for transaction. ", slog.Any("id", value), slog.Any("tenant_id", tenantID)) + // Construct the SQL SELECT statement for retrieving the changes from the RelationTuplesTable. tbuilder := w.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation, expired_tx_id"). From(RelationTuplesTable). @@ -199,13 +234,17 @@ func (w *Watch) getChanges(ctx context.Context, value types.XID8, tenantID strin // Generate the SQL query and arguments. tquery, targs, err := tbuilder.ToSql() if err != nil { + slog.Error("Error while building SQL query for relation tuples", slog.Any("error", err)) return nil, err } + slog.Debug("Executing SQL query for relation tuples. ", slog.Any("query", tquery), slog.Any("arguments", targs)) + // Execute the SQL query and retrieve the result rows. var trows *sql.Rows trows, err = w.database.DB.QueryContext(ctx, tquery, targs...) if err != nil { + slog.Error("Failed to execute SQL query for relation tuples. ", slog.Any("error", err)) return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } // Ensure the rows are closed after processing. @@ -220,12 +259,16 @@ func (w *Watch) getChanges(ctx context.Context, value types.XID8, tenantID strin aquery, aargs, err := abuilder.ToSql() if err != nil { + slog.Error("Error while building SQL query for attributes. ", slog.Any("error", err)) return nil, err } + slog.Debug("Executing SQL query for attributes. ", slog.Any("query", aquery), slog.Any("arguments", aargs)) + var arows *sql.Rows arows, err = w.database.DB.QueryContext(ctx, aquery, aargs...) if err != nil { + slog.Error("Error while executing SQL query for attributes. ", slog.Any("error", err)) return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String()) } // Ensure the rows are closed after processing. @@ -242,6 +285,7 @@ func (w *Watch) getChanges(ctx context.Context, value types.XID8, tenantID strin // Scan the result row into a RelationTuple instance. err = trows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation, &expiredXID) if err != nil { + slog.Error("Error while scanning row for relation tuples. ", slog.Any("error", err)) return nil, err } @@ -271,12 +315,14 @@ func (w *Watch) getChanges(ctx context.Context, value types.XID8, tenantID strin // Scan the result row into a RelationTuple instance. err = trows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueBytes, &expiredXID) if err != nil { + slog.Error("Error while scanning row for attributes", slog.Any("error", err)) return nil, err } rt.Value = &anypb.Any{} err = proto.Unmarshal(valueBytes, rt.Value) if err != nil { + slog.Error("Failed to unmarshal attribute value", slog.Any("error", err)) return nil, err } @@ -295,6 +341,8 @@ func (w *Watch) getChanges(ctx context.Context, value types.XID8, tenantID strin }) } + slog.Info("Successfully retrieved changes for transaction. ", slog.Any("id", value)) + // Return the changes and no error. return changes, nil }