Skip to content

Commit

Permalink
tune message consumption in production (#8352)
Browse files Browse the repository at this point in the history
## Summary

Production performance tuning
* Caches billing queries that can be slow and cause high postgres CPU
* Cleans up duplicate session id query code
* Fixes stripe billing for overages on annual subscriptions
* Fixes network request parsing error
https://app.highlight.io/1/errors/ckj5r5aj8wfnyrd5Fv7K7p0wGlb5
* Removes unused index
* Removes old `sessions to process` query which is not used (using redis
to track sessions to process)
* Supports programmatic sampling of consumer spans

## How did you test this change?

local deploy, ec2 deploy

## Are there any deployment considerations?

no

## Does this work require review from our design team?

no
  • Loading branch information
Vadman97 committed May 1, 2024
1 parent 28de613 commit dc81d0f
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 112 deletions.
2 changes: 1 addition & 1 deletion backend/kafka-queue/kafkaqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const ConsumerGroupName = "group-default"

const (
TaskRetries = 2
prefetchQueueCapacity = 1_000
prefetchQueueCapacity = 100
MaxMessageSizeBytes = 256 * 1024 * 1024 // MiB
)

Expand Down
2 changes: 1 addition & 1 deletion backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func main() {
samplingMap = map[trace.SpanKind]float64{
trace.SpanKindUnspecified: 1. / 1_000_000,
trace.SpanKindInternal: 1. / 1_000_000,
trace.SpanKindConsumer: 1. / 1_00,
trace.SpanKindConsumer: util.ConsumerSpanSamplingRate(),
// report `sampling`
trace.SpanKindServer: 1.,
// report all customer data
Expand Down
4 changes: 2 additions & 2 deletions backend/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ type Session struct {
Fingerprint int `json:"fingerprint"`
// User provided identifier (see IdentifySession)
Identifier string `json:"identifier"`
ProjectID int `json:"project_id" gorm:"index:idx_project_id_email"`
Email *string `json:"email" gorm:"index:idx_project_id_email"`
ProjectID int `json:"project_id"`
Email *string `json:"email"`
// Location data based off user ip (see InitializeSession)
IP string `json:"ip"`
City string `json:"city"`
Expand Down
58 changes: 32 additions & 26 deletions backend/pricing/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/marketplacemetering"
"github.com/aws/aws-sdk-go-v2/service/marketplacemetering/types"
"github.com/aws/smithy-go/ptr"
"github.com/google/uuid"
"github.com/stripe/stripe-go/v76"

"github.com/highlight-run/highlight/backend/redis"
Expand Down Expand Up @@ -312,14 +313,15 @@ func GetSessions7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhou
return avg, nil
}

func GetWorkspaceSessionsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceSessionsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redisClient *redis.Client, workspace *model.Workspace) (int64, error) {
meterSpan, _ := util.StartSpanFromContext(ctx, "pricing.GetWorkspaceSessionsMeter",
util.ResourceName("GetWorkspaceSessionsMeter"),
util.Tag("workspace_id", workspace.ID))
defer meterSpan.Finish()

var meter int64
if err := DB.WithContext(ctx).Raw(`
res, err := redis.CachedEval(ctx, redisClient, fmt.Sprintf(`workspace-sessions-meter-%d`, workspace.ID), 5*time.Second, time.Minute, func() (*int64, error) {
var meter int64
if err := DB.WithContext(ctx).Raw(`
WITH billing_start AS (
SELECT COALESCE(next_invoice_date - interval '1 month', billing_period_start, date_trunc('month', now(), 'UTC'))
FROM workspaces
Expand Down Expand Up @@ -350,10 +352,12 @@ func GetWorkspaceSessionsMeter(ctx context.Context, DB *gorm.DB, ccClient *click
UNION ALL SELECT COALESCE(SUM(count), 0) FROM materialized_rows
WHERE date < (SELECT MAX(date) FROM materialized_rows)
) a`, sql.Named("workspace_id", workspace.ID)).
Scan(&meter).Error; err != nil {
return 0, e.Wrap(err, "error querying for session meter")
}
return meter, nil
Scan(&meter).Error; err != nil {
return nil, e.Wrap(err, "error querying for session meter")
}
return &meter, nil
})
return pointy.Int64Value(res, 0), err
}

func GetErrors7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (float64, error) {
Expand All @@ -370,14 +374,15 @@ func GetErrors7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse
return avg, nil
}

func GetWorkspaceErrorsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceErrorsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redisClient *redis.Client, workspace *model.Workspace) (int64, error) {
meterSpan, _ := util.StartSpanFromContext(ctx, "pricing.GetWorkspaceErrorsMeter",
util.ResourceName("GetWorkspaceErrorsMeter"),
util.Tag("workspace_id", workspace.ID))
defer meterSpan.Finish()

var meter int64
if err := DB.WithContext(ctx).Raw(`
res, err := redis.CachedEval(ctx, redisClient, fmt.Sprintf(`workspace-errors-meter-%d`, workspace.ID), 5*time.Second, time.Minute, func() (*int64, error) {
var meter int64
if err := DB.WithContext(ctx).Raw(`
WITH billing_start AS (
SELECT COALESCE(next_invoice_date - interval '1 month', billing_period_start, date_trunc('month', now(), 'UTC'))
FROM workspaces
Expand All @@ -404,10 +409,12 @@ func GetWorkspaceErrorsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickho
UNION ALL SELECT COALESCE(SUM(count), 0) FROM materialized_rows
WHERE date < (SELECT MAX(date) FROM materialized_rows)
) a`, sql.Named("workspace_id", workspace.ID)).
Scan(&meter).Error; err != nil {
return 0, e.Wrap(err, "error querying for session meter")
}
return meter, nil
Scan(&meter).Error; err != nil {
return nil, e.Wrap(err, "error querying for error meter")
}
return &meter, nil
})
return pointy.Int64Value(res, 0), err
}

func get7DayAverageImpl(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace, productType model.PricingProductType) (float64, error) {
Expand Down Expand Up @@ -477,15 +484,15 @@ func GetLogs7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.C
return get7DayAverageImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeLogs)
}

func GetWorkspaceLogsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceLogsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redis *redis.Client, workspace *model.Workspace) (int64, error) {
return getWorkspaceMeterImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeLogs)
}

func GetTraces7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (float64, error) {
return get7DayAverageImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeTraces)
}

func GetWorkspaceTracesMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceTracesMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redis *redis.Client, workspace *model.Workspace) (int64, error) {
return getWorkspaceMeterImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeTraces)
}

Expand Down Expand Up @@ -859,7 +866,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeMembers] = calculateOverage(workspace, membersLimit, membersMeter)

// Update sessions overage
sessionsMeter, err := GetWorkspaceSessionsMeter(ctx, w.db, w.ccClient, workspace)
sessionsMeter, err := GetWorkspaceSessionsMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting sessions meter")
}
Expand All @@ -870,7 +877,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeSessions] = calculateOverage(workspace, &sessionsLimit, sessionsMeter)

// Update errors overage
errorsMeter, err := GetWorkspaceErrorsMeter(ctx, w.db, w.ccClient, workspace)
errorsMeter, err := GetWorkspaceErrorsMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting errors meter")
}
Expand All @@ -881,7 +888,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeErrors] = calculateOverage(workspace, &errorsLimit, errorsMeter)

// Update logs overage
logsMeter, err := GetWorkspaceLogsMeter(ctx, w.db, w.ccClient, workspace)
logsMeter, err := GetWorkspaceLogsMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting errors meter")
}
Expand All @@ -892,7 +899,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeLogs] = calculateOverage(workspace, &logsLimit, logsMeter)

// Update traces overage
tracesMeter, err := GetWorkspaceTracesMeter(ctx, w.db, w.ccClient, workspace)
tracesMeter, err := GetWorkspaceTracesMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting traces meter")
}
Expand Down Expand Up @@ -1000,9 +1007,8 @@ func (w *Worker) reportStripeUsage(ctx context.Context, workspaceID int) error {

// For non-monthly subscriptions, set PendingInvoiceItemInterval to 'month' if not set
// so that overage is reported via monthly invoice items.
if interval != model.PricingSubscriptionIntervalMonthly &&
subscription.PendingInvoiceItemInterval != nil &&
subscription.PendingInvoiceItemInterval.Interval != stripe.SubscriptionPendingInvoiceItemIntervalIntervalMonth {
if interval != model.PricingSubscriptionIntervalMonthly {
log.WithContext(ctx).WithField("workspaceID", workspaceID).Info("configuring monthly invoices for non-monthly subscription")
updated, err := w.stripeClient.Subscriptions.Update(subscription.ID, &stripe.SubscriptionParams{
PendingInvoiceItemInterval: &stripe.SubscriptionPendingInvoiceItemIntervalParams{
Interval: stripe.String(string(stripe.SubscriptionPendingInvoiceItemIntervalIntervalMonth)),
Expand Down Expand Up @@ -1241,15 +1247,15 @@ func calculateOverage(workspace *model.Workspace, limit *int64, meter int64) int

func (w *Worker) AddOrUpdateOverageItem(newPrice *stripe.Price, invoiceLine *stripe.InvoiceLineItem, customer *stripe.Customer, subscription *stripe.Subscription, overage int64) error {
// if the price is a metered recurring subscription, use subscription items and usage records
if newPrice.Recurring != nil && newPrice.Recurring.UsageType == stripe.PriceRecurringUsageTypeMetered {
if newPrice.Recurring != nil {
var subscriptionItemID string
// if the subscription item doesn't create for this price, create it
if invoiceLine == nil || invoiceLine.SubscriptionItem.ID == "" {
params := &stripe.SubscriptionItemParams{
Subscription: &subscription.ID,
Price: &newPrice.ID,
}
params.SetIdempotencyKey(subscription.ID + ":" + newPrice.ID + ":item")
params.SetIdempotencyKey(subscription.ID + ":" + newPrice.ID + ":item:" + uuid.New().String())
subscriptionItem, err := w.stripeClient.SubscriptionItems.New(params)
if err != nil {
return e.Wrapf(err, "BILLING_ERROR failed to add invoice item for usage record item; invoiceLine=%+v, priceID=%s, subscriptionID=%s", invoiceLine, newPrice.ID, subscription.ID)
Expand Down Expand Up @@ -1282,7 +1288,7 @@ func (w *Worker) AddOrUpdateOverageItem(newPrice *stripe.Price, invoiceLine *str
Price: &newPrice.ID,
Quantity: stripe.Int64(overage),
}
params.SetIdempotencyKey(customer.ID + ":" + subscription.ID + ":" + newPrice.ID)
params.SetIdempotencyKey(customer.ID + ":" + subscription.ID + ":" + newPrice.ID + ":" + uuid.New().String())
if _, err := w.stripeClient.InvoiceItems.New(params); err != nil {
return e.Wrap(err, "BILLING_ERROR failed to add invoice item")
}
Expand Down
8 changes: 4 additions & 4 deletions backend/private-graph/graph/schema.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6745,7 +6745,7 @@ func (r *queryResolver) BillingDetails(ctx context.Context, workspaceID int) (*m
var tracesAvg float64

g.Go(func() error {
sessionsMeter, err = pricing.GetWorkspaceSessionsMeter(ctx, r.DB, r.ClickhouseClient, workspace)
sessionsMeter, err = pricing.GetWorkspaceSessionsMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
if err != nil {
return e.Wrap(err, "error from get quota")
}
Expand All @@ -6761,23 +6761,23 @@ func (r *queryResolver) BillingDetails(ctx context.Context, workspaceID int) (*m
})

g.Go(func() error {
errorsMeter, err = pricing.GetWorkspaceErrorsMeter(ctx, r.DB, r.ClickhouseClient, workspace)
errorsMeter, err = pricing.GetWorkspaceErrorsMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
if err != nil {
return e.Wrap(err, "error querying errors meter")
}
return nil
})

g.Go(func() error {
logsMeter, err = pricing.GetWorkspaceLogsMeter(ctx, r.DB, r.ClickhouseClient, workspace)
logsMeter, err = pricing.GetWorkspaceLogsMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
if err != nil {
return e.Wrap(err, "error querying logs meter")
}
return nil
})

g.Go(func() error {
tracesMeter, err = pricing.GetWorkspaceTracesMeter(ctx, r.DB, r.ClickhouseClient, workspace)
tracesMeter, err = pricing.GetWorkspaceTracesMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
return err
})

Expand Down

0 comments on commit dc81d0f

Please sign in to comment.