-
Notifications
You must be signed in to change notification settings - Fork 127
/
common.go
187 lines (157 loc) · 9 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package utils
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"strings"
"time"
"go.opentelemetry.io/otel/codes"
"golang.org/x/exp/rand"
"go.opentelemetry.io/otel/trace"
"github.com/Masterminds/squirrel"
base "github.com/Permify/permify/pkg/pb/base/v1"
)
const (
BulkEntityFilterTemplate = `
WITH entities AS (
(SELECT id, entity_id, entity_type, tenant_id, created_tx_id, expired_tx_id FROM relation_tuples)
UNION ALL
(SELECT id, entity_id, entity_type, tenant_id, created_tx_id, expired_tx_id FROM attributes)
), filtered_entities AS (
SELECT DISTINCT ON (entity_id) id, entity_id
FROM entities
WHERE tenant_id = '%s'
AND entity_type = '%s'
AND %s
AND %s
)
SELECT id, entity_id
FROM filtered_entities`
TransactionTemplate = `INSERT INTO transactions (tenant_id) VALUES ($1) RETURNING id`
InsertTenantTemplate = `INSERT INTO tenants (id, name) VALUES ($1, $2) RETURNING created_at`
DeleteTenantTemplate = `DELETE FROM tenants WHERE id = $1 RETURNING name, created_at`
)
// SnapshotQuery adds conditions to a SELECT query for checking transaction visibility based on created and expired transaction IDs.
// The query checks if transactions are visible in a snapshot associated with the provided value.
func SnapshotQuery(sl squirrel.SelectBuilder, value uint64) squirrel.SelectBuilder {
// Convert the value to a string once to reduce redundant calls to fmt.Sprintf.
valStr := fmt.Sprintf("'%v'::xid8", value)
// Create a subquery for the snapshot associated with the provided value.
snapshotQuery := fmt.Sprintf("(select snapshot from transactions where id = %s)", valStr)
// Create an expression to check if a transaction with a specific created_tx_id is visible in the snapshot.
visibilityExpr := squirrel.Expr(fmt.Sprintf("pg_visible_in_snapshot(created_tx_id, %s) = true", snapshotQuery))
// Create an expression to check if the created_tx_id is equal to the provided value.
createdExpr := squirrel.Expr(fmt.Sprintf("created_tx_id = %s", valStr))
// Use OR condition for the created expressions.
createdWhere := squirrel.Or{visibilityExpr, createdExpr}
// Create an expression to check if a transaction with a specific expired_tx_id is not visible in the snapshot.
expiredVisibilityExpr := squirrel.Expr(fmt.Sprintf("pg_visible_in_snapshot(expired_tx_id, %s) = false", snapshotQuery))
// Create an expression to check if the expired_tx_id is equal to zero.
expiredZeroExpr := squirrel.Expr("expired_tx_id = '0'::xid8")
// Create an expression to check if the expired_tx_id is not equal to the provided value.
expiredNotExpr := squirrel.Expr(fmt.Sprintf("expired_tx_id <> %s", valStr))
// Use AND condition for the expired expressions, checking both visibility and non-equality with value.
expiredWhere := squirrel.And{squirrel.Or{expiredVisibilityExpr, expiredZeroExpr}, expiredNotExpr}
// Add the created and expired conditions to the SELECT query.
return sl.Where(createdWhere).Where(expiredWhere)
}
// snapshotQuery function generates two strings representing conditions to be applied in a SQL query to filter data based on visibility of transactions.
func snapshotQuery(value uint64) (string, string) {
// Convert the provided value into a string format suitable for our SQL query, formatted as a transaction ID.
valStr := fmt.Sprintf("'%v'::xid8", value)
// Create a subquery that fetches the snapshot associated with the transaction ID.
snapshotQ := fmt.Sprintf("(SELECT snapshot FROM transactions WHERE id = %s)", valStr)
// Create an expression that checks whether a transaction (represented by 'created_tx_id') is visible in the snapshot.
visibilityExpr := fmt.Sprintf("pg_visible_in_snapshot(created_tx_id, %s) = true", snapshotQ)
// Create an expression that checks if the 'created_tx_id' is the same as our transaction ID.
createdExpr := fmt.Sprintf("created_tx_id = %s", valStr)
// Combine these expressions to form a condition. A row will satisfy this condition if either of the expressions are true.
createdWhere := fmt.Sprintf("(%s OR %s)", visibilityExpr, createdExpr)
// Create an expression that checks whether a transaction (represented by 'expired_tx_id') is not visible in the snapshot.
expiredVisibilityExpr := fmt.Sprintf("pg_visible_in_snapshot(expired_tx_id, %s) = false", snapshotQ)
// Create an expression that checks if the 'expired_tx_id' is zero. This handles cases where the transaction hasn't expired.
expiredZeroExpr := "expired_tx_id = '0'::xid8"
// Create an expression that checks if the 'expired_tx_id' is not the same as our transaction ID.
expiredNotExpr := fmt.Sprintf("expired_tx_id <> %s", valStr)
// Combine these expressions to form a condition. A row will satisfy this condition if the first set of expressions are true (either the transaction hasn't expired, or if it has, it's not visible in the snapshot) and the second expression is also true (the 'expired_tx_id' is not the same as our transaction ID).
expiredWhere := fmt.Sprintf("(%s AND %s)", fmt.Sprintf("(%s OR %s)", expiredVisibilityExpr, expiredZeroExpr), expiredNotExpr)
// Return the conditions for both 'created' and 'expired' transactions. These can be used in a WHERE clause of a SQL query to filter results.
return createdWhere, expiredWhere
}
// BulkEntityFilterQuery -
func BulkEntityFilterQuery(tenantID, entityType string, snap uint64) string {
createdWhere, expiredWhere := snapshotQuery(snap)
return fmt.Sprintf(BulkEntityFilterTemplate, tenantID, entityType, createdWhere, expiredWhere)
}
// GenerateGCQuery generates a Squirrel DELETE query builder for garbage collection.
// It constructs a query to delete expired records from the specified table
// based on the provided value, which represents a transaction ID.
func GenerateGCQuery(table string, value uint64) squirrel.DeleteBuilder {
// Convert the provided value into a string format suitable for our SQL query, formatted as a transaction ID.
valStr := fmt.Sprintf("'%v'::xid8", value)
// Create a Squirrel DELETE builder for the specified table.
deleteBuilder := squirrel.Delete(table)
// Create an expression to check if 'expired_tx_id' is not equal to '0' (not expired).
expiredZeroExpr := squirrel.Expr("expired_tx_id <> '0'::xid8")
// Create an expression to check if 'expired_tx_id' is less than the provided value (before the cutoff).
beforeExpr := squirrel.Expr(fmt.Sprintf("expired_tx_id < %s", valStr))
// Add the WHERE clauses to the DELETE query builder to filter and delete expired data.
return deleteBuilder.Where(expiredZeroExpr).Where(beforeExpr)
}
// HandleError records an error in the given span, logs the error, and returns a standardized error.
// This function is used for consistent error handling across different parts of the application.
func HandleError(ctx context.Context, span trace.Span, err error, errorCode base.ErrorCode) error {
// Check if the error is context-related
if IsContextRelatedError(ctx, err) {
slog.Debug("A context-related error occurred",
slog.String("error", err.Error()))
return errors.New(base.ErrorCode_ERROR_CODE_CANCELLED.String())
}
// Check if the error is serialization-related
if IsSerializationRelatedError(err) {
slog.Debug("A serialization-related error occurred",
slog.String("error", err.Error()))
return errors.New(base.ErrorCode_ERROR_CODE_SERIALIZATION.String())
}
// For all other types of errors, log them at the error level and record them in the span
slog.Error("An operational error occurred",
slog.Any("error", err))
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
// Return a new error with the standard error code provided
return errors.New(errorCode.String())
}
// IsContextRelatedError checks if the error is due to context cancellation, deadline exceedance, or closed connection
func IsContextRelatedError(ctx context.Context, err error) bool {
if errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded) {
return true
}
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
strings.Contains(err.Error(), "conn closed") {
return true
}
return false
}
// IsSerializationRelatedError checks if the error is a serialization failure, typically in database transactions.
func IsSerializationRelatedError(err error) bool {
if strings.Contains(err.Error(), "could not serialize") ||
strings.Contains(err.Error(), "duplicate key value") {
return true
}
return false
}
// WaitWithBackoff implements an exponential backoff strategy with jitter for retries.
// It waits for a calculated duration or until the context is cancelled, whichever comes first.
func WaitWithBackoff(ctx context.Context, tenantID string, retries int) {
backoff := time.Duration(math.Min(float64(20*time.Millisecond)*math.Pow(2, float64(retries)), float64(1*time.Second)))
jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5)
nextBackoff := backoff + jitter
slog.Warn("waiting before retry", slog.String("tenant_id", tenantID), slog.Int64("backoff_duration", nextBackoff.Milliseconds()))
select {
case <-time.After(nextBackoff):
case <-ctx.Done():
}
}