Skip to content

Commit

Permalink
feat: Added logs for schemaReader, schemaWriter, tenantReader, tenant…
Browse files Browse the repository at this point in the history
…Writer and watch.
  • Loading branch information
saifxd7 committed Nov 17, 2023
1 parent a367d87 commit 4c9d45c
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 0 deletions.
89 changes: 89 additions & 0 deletions internal/storage/postgres/schemaReader.go
Expand Up @@ -5,6 +5,8 @@ import (
"database/sql"
"errors"

"log/slog"

"github.com/Masterminds/squirrel"
"go.opentelemetry.io/otel/codes"

Expand Down Expand Up @@ -34,6 +36,8 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string)
ctx, span := tracer.Start(ctx, "schema-reader.read-schema")
defer span.End()

slog.Info("Reading schema: ", slog.Any("tenant_id", tenantID), slog.Any("version", version))

builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})

var query string
Expand All @@ -43,14 +47,22 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error in building SQL query: ", slog.Any("error", err))

return nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
}

slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))

var rows *sql.Rows
rows, err = r.database.DB.QueryContext(ctx, query, args...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error in executing query: ", slog.Any("error", err))

return nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
}
defer rows.Close()
Expand All @@ -62,23 +74,36 @@ func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error scanning rows: ", slog.Any("error", err))

return nil, err
}
definitions = append(definitions, sd.Serialized())
}
if err = rows.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error iterating over rows: ", slog.Any("error", err))

return nil, err
}

slog.Info("Successfully retrieved", slog.Any("schema definitions", len(definitions)))

sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Failed while creating schema from definitions: ", slog.Any("error", err))

return nil, err
}

slog.Info("Successfully created schema.")

return sch, err
}

Expand All @@ -87,6 +112,8 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name,
ctx, span := tracer.Start(ctx, "schema-reader.read-entity-definition")
defer span.End()

slog.Info("Reading entity definition: ", slog.Any("tenant_id", tenantID), slog.Any("version", version))

builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)

var query string
Expand All @@ -96,14 +123,22 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name,
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error building SQL query: ", slog.Any("error", err))

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
}

slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))

var def storage.SchemaDefinition
row := r.database.DB.QueryRowContext(ctx, query, args...)
if err = row.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error executing query: ", slog.Any("error", err))

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
}

Expand All @@ -113,6 +148,9 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name,
if errors.Is(err, sql.ErrNoRows) {
return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String())
}

slog.Error("Error scanning rows: ", slog.Any("error", err))

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCAN.String())
}

Expand All @@ -121,10 +159,16 @@ func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name,
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Failed while creating schema from definitions: ", slog.Any("error", err))

return nil, "", err
}

definition, err = schema.GetEntityByName(sch, name)

slog.Info("Successfully retrieved", slog.Any("schema definition", definition))

return definition, def.Version, err
}

Expand All @@ -133,6 +177,8 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v
ctx, span := tracer.Start(ctx, "schema-reader.read-rule-definition")
defer span.End()

slog.Info("Reading rule definition: ", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version))

builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)

var query string
Expand All @@ -142,35 +188,59 @@ func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, v
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error building SQL query: ", slog.Any("error", err))

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
}

slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))

var def storage.SchemaDefinition
row := r.database.DB.QueryRowContext(ctx, query, args...)
if err = row.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error executing query: ", slog.Any("error", err))

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
}

if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {

slog.Error("Error scanning row: ", slog.Any("error", err))

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
if errors.Is(err, sql.ErrNoRows) {

slog.Error("Rule not found in the database")

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String())
}

slog.Error("Error scanning row values: ", slog.Any("error", err))

return nil, "", errors.New(base.ErrorCode_ERROR_CODE_SCAN.String())
}

slog.Info("Successfully retrieved rule definition for: ", slog.Any("name", name))

var sch *base.SchemaDefinition
sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error creating schema from definition: ", slog.Any("error", err))

return nil, "", err
}

definition, err = schema.GetRuleByName(sch, name)
slog.Info("Successfully created rule definition")

return definition, def.Version, err
}

Expand All @@ -179,26 +249,45 @@ func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (versio
ctx, span := tracer.Start(ctx, "schema-reader.head-version")
defer span.End()

slog.Info("Finding the latest version fo the schema for: ", slog.String("tenant_id", tenantID))

var query string
var args []interface{}
query, args, err = r.database.Builder.
Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1).
ToSql()
if err != nil {

slog.Error("Failed to build SQL query: ", slog.Any("error", err))

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return "", errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
}

slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))

row := r.database.DB.QueryRowContext(ctx, query, args...)
err = row.Scan(&version)
if err != nil {

slog.Error("Error while scanning row: ", slog.Any("error", err))

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
if errors.Is(err, sql.ErrNoRows) {

slog.Error("Schema not found in the database.")

return "", errors.New(base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND.String())
}

slog.Error("Error while scanning row values: ", slog.Any("error", err))

return "", err
}

slog.Info("Successfully found the latest schema version: ", slog.Any("version", version))

return version, nil
}
13 changes: 13 additions & 0 deletions internal/storage/postgres/schemaWriter.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"errors"
"log/slog"

otelCodes "go.opentelemetry.io/otel/codes"

Expand Down Expand Up @@ -32,6 +33,8 @@ func (w *SchemaWriter) WriteSchema(ctx context.Context, schemas []storage.Schema
ctx, span := tracer.Start(ctx, "schema-writer.write-schema")
defer span.End()

slog.Info("Writing schemas to the database", slog.Any("number_of_schemas", len(schemas)))

insertBuilder := w.database.Builder.Insert(SchemaDefinitionTable).Columns("name, serialized_definition, version, tenant_id")

for _, schema := range schemas {
Expand All @@ -43,17 +46,27 @@ func (w *SchemaWriter) WriteSchema(ctx context.Context, schemas []storage.Schema

query, args, err = insertBuilder.ToSql()
if err != nil {

slog.Error("Error while building SQL query: ", slog.Any("error", err))

span.RecordError(err)
span.SetStatus(otelCodes.Error, err.Error())
return errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
}

slog.Debug("Executing SQL insert query: ", slog.Any("query", query), slog.Any("arguments", args))

_, err = w.database.DB.ExecContext(ctx, query, args...)
if err != nil {

slog.Error("Failed to execute insert query: ", slog.Any("error", err))

span.RecordError(err)
span.SetStatus(otelCodes.Error, err.Error())
return err
}

slog.Info("Successfully wrote schemas to the database. ", slog.Any("number_of_schemas", len(schemas)))

return nil
}
27 changes: 27 additions & 0 deletions internal/storage/postgres/tenantReader.go
Expand Up @@ -5,6 +5,8 @@ import (
"database/sql"
"errors"

"log/slog"

"github.com/Masterminds/squirrel"
"go.opentelemetry.io/otel/codes"

Expand Down Expand Up @@ -34,11 +36,16 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi
ctx, span := tracer.Start(ctx, "tenant-reader.list-tenants")
defer span.End()

slog.Info("Listing tenants with pagination: ", slog.Any("pagination", pagination))

builder := r.database.Builder.Select("id, name, created_at").From(TenantsTable)
if pagination.Token() != "" {
var t database.ContinuousToken
t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
if err != nil {

slog.Error("Failed to decode pagination token. ", slog.Any("error", err))

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, nil, err
Expand All @@ -55,14 +62,22 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error while building SQL query: ", slog.Any("error", err))

return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_SQL_BUILDER.String())
}

slog.Debug("Executing SQL query: ", slog.Any("query", query), slog.Any("arguments", args))

var rows *sql.Rows
rows, err = r.database.DB.QueryContext(ctx, query, args...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Failed to execute query: ", slog.Any("error", err))

return nil, nil, errors.New(base.ErrorCode_ERROR_CODE_EXECUTION.String())
}
defer rows.Close()
Expand All @@ -75,6 +90,9 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Failed to scan rows: ", slog.Any("error", err))

return nil, nil, err
}
lastID = sd.ID
Expand All @@ -83,12 +101,21 @@ func (r *TenantReader) ListTenants(ctx context.Context, pagination database.Pagi
if err = rows.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

slog.Error("Error iterating over rows: ", slog.Any("error", err))

return nil, nil, err
}

slog.Info("Successfully listed tenants. ", slog.Any("number_of_tenants", len(tenants)))

if len(tenants) > int(pagination.PageSize()) {

slog.Info("Returning tenants with a continuous token. ", slog.Any("page_size", pagination.PageSize()))
return tenants[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
}

slog.Info("Returning all tenants with no continuous token.")

return tenants, database.NewNoopContinuousToken().Encode(), nil
}

0 comments on commit 4c9d45c

Please sign in to comment.