Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tune message consumption in production #8352

Merged
merged 13 commits into from
May 1, 2024
2 changes: 1 addition & 1 deletion backend/kafka-queue/kafkaqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const ConsumerGroupName = "group-default"

const (
TaskRetries = 2
prefetchQueueCapacity = 1_000
prefetchQueueCapacity = 100
MaxMessageSizeBytes = 256 * 1024 * 1024 // MiB
)

Expand Down
2 changes: 1 addition & 1 deletion backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func main() {
samplingMap = map[trace.SpanKind]float64{
trace.SpanKindUnspecified: 1. / 1_000_000,
trace.SpanKindInternal: 1. / 1_000_000,
trace.SpanKindConsumer: 1. / 1_00,
trace.SpanKindConsumer: util.ConsumerSpanSamplingRate(),
Vadman97 marked this conversation as resolved.
Show resolved Hide resolved
// report `sampling`
trace.SpanKindServer: 1.,
// report all customer data
Expand Down
4 changes: 2 additions & 2 deletions backend/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ type Session struct {
Fingerprint int `json:"fingerprint"`
// User provided identifier (see IdentifySession)
Identifier string `json:"identifier"`
ProjectID int `json:"project_id" gorm:"index:idx_project_id_email"`
Email *string `json:"email" gorm:"index:idx_project_id_email"`
ProjectID int `json:"project_id"`
Email *string `json:"email"`
// Location data based off user ip (see InitializeSession)
IP string `json:"ip"`
City string `json:"city"`
Expand Down
58 changes: 32 additions & 26 deletions backend/pricing/pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/marketplacemetering"
"github.com/aws/aws-sdk-go-v2/service/marketplacemetering/types"
"github.com/aws/smithy-go/ptr"
"github.com/google/uuid"
"github.com/stripe/stripe-go/v76"

"github.com/highlight-run/highlight/backend/redis"
Expand Down Expand Up @@ -312,14 +313,15 @@ func GetSessions7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhou
return avg, nil
}

func GetWorkspaceSessionsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceSessionsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redisClient *redis.Client, workspace *model.Workspace) (int64, error) {
meterSpan, _ := util.StartSpanFromContext(ctx, "pricing.GetWorkspaceSessionsMeter",
util.ResourceName("GetWorkspaceSessionsMeter"),
util.Tag("workspace_id", workspace.ID))
defer meterSpan.Finish()

var meter int64
if err := DB.WithContext(ctx).Raw(`
res, err := redis.CachedEval(ctx, redisClient, fmt.Sprintf(`workspace-sessions-meter-%d`, workspace.ID), 5*time.Second, time.Minute, func() (*int64, error) {
var meter int64
if err := DB.WithContext(ctx).Raw(`
WITH billing_start AS (
SELECT COALESCE(next_invoice_date - interval '1 month', billing_period_start, date_trunc('month', now(), 'UTC'))
FROM workspaces
Expand Down Expand Up @@ -350,10 +352,12 @@ func GetWorkspaceSessionsMeter(ctx context.Context, DB *gorm.DB, ccClient *click
UNION ALL SELECT COALESCE(SUM(count), 0) FROM materialized_rows
WHERE date < (SELECT MAX(date) FROM materialized_rows)
) a`, sql.Named("workspace_id", workspace.ID)).
Scan(&meter).Error; err != nil {
return 0, e.Wrap(err, "error querying for session meter")
}
return meter, nil
Scan(&meter).Error; err != nil {
return nil, e.Wrap(err, "error querying for session meter")
}
return &meter, nil
})
return pointy.Int64Value(res, 0), err
}

func GetErrors7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (float64, error) {
Expand All @@ -370,14 +374,15 @@ func GetErrors7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse
return avg, nil
}

func GetWorkspaceErrorsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceErrorsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redisClient *redis.Client, workspace *model.Workspace) (int64, error) {
meterSpan, _ := util.StartSpanFromContext(ctx, "pricing.GetWorkspaceErrorsMeter",
util.ResourceName("GetWorkspaceErrorsMeter"),
util.Tag("workspace_id", workspace.ID))
defer meterSpan.Finish()

var meter int64
if err := DB.WithContext(ctx).Raw(`
res, err := redis.CachedEval(ctx, redisClient, fmt.Sprintf(`workspace-errors-meter-%d`, workspace.ID), 5*time.Second, time.Minute, func() (*int64, error) {
var meter int64
if err := DB.WithContext(ctx).Raw(`
WITH billing_start AS (
SELECT COALESCE(next_invoice_date - interval '1 month', billing_period_start, date_trunc('month', now(), 'UTC'))
FROM workspaces
Expand All @@ -404,10 +409,12 @@ func GetWorkspaceErrorsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickho
UNION ALL SELECT COALESCE(SUM(count), 0) FROM materialized_rows
WHERE date < (SELECT MAX(date) FROM materialized_rows)
) a`, sql.Named("workspace_id", workspace.ID)).
Scan(&meter).Error; err != nil {
return 0, e.Wrap(err, "error querying for session meter")
}
return meter, nil
Scan(&meter).Error; err != nil {
return nil, e.Wrap(err, "error querying for error meter")
}
return &meter, nil
})
return pointy.Int64Value(res, 0), err
}

func get7DayAverageImpl(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace, productType model.PricingProductType) (float64, error) {
Expand Down Expand Up @@ -477,15 +484,15 @@ func GetLogs7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.C
return get7DayAverageImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeLogs)
}

func GetWorkspaceLogsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceLogsMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redis *redis.Client, workspace *model.Workspace) (int64, error) {
return getWorkspaceMeterImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeLogs)
}

func GetTraces7DayAverage(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (float64, error) {
return get7DayAverageImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeTraces)
}

func GetWorkspaceTracesMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, workspace *model.Workspace) (int64, error) {
func GetWorkspaceTracesMeter(ctx context.Context, DB *gorm.DB, ccClient *clickhouse.Client, redis *redis.Client, workspace *model.Workspace) (int64, error) {
return getWorkspaceMeterImpl(ctx, DB, ccClient, workspace, model.PricingProductTypeTraces)
}

Expand Down Expand Up @@ -859,7 +866,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeMembers] = calculateOverage(workspace, membersLimit, membersMeter)

// Update sessions overage
sessionsMeter, err := GetWorkspaceSessionsMeter(ctx, w.db, w.ccClient, workspace)
sessionsMeter, err := GetWorkspaceSessionsMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting sessions meter")
}
Expand All @@ -870,7 +877,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeSessions] = calculateOverage(workspace, &sessionsLimit, sessionsMeter)

// Update errors overage
errorsMeter, err := GetWorkspaceErrorsMeter(ctx, w.db, w.ccClient, workspace)
errorsMeter, err := GetWorkspaceErrorsMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting errors meter")
}
Expand All @@ -881,7 +888,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeErrors] = calculateOverage(workspace, &errorsLimit, errorsMeter)

// Update logs overage
logsMeter, err := GetWorkspaceLogsMeter(ctx, w.db, w.ccClient, workspace)
logsMeter, err := GetWorkspaceLogsMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting errors meter")
}
Expand All @@ -892,7 +899,7 @@ func (w *Worker) CalculateOverages(ctx context.Context, workspaceID int) (Worksp
usage[model.PricingProductTypeLogs] = calculateOverage(workspace, &logsLimit, logsMeter)

// Update traces overage
tracesMeter, err := GetWorkspaceTracesMeter(ctx, w.db, w.ccClient, workspace)
tracesMeter, err := GetWorkspaceTracesMeter(ctx, w.db, w.ccClient, w.redis, workspace)
if err != nil {
return nil, e.Wrap(err, "BILLING_ERROR error getting traces meter")
}
Expand Down Expand Up @@ -1000,9 +1007,8 @@ func (w *Worker) reportStripeUsage(ctx context.Context, workspaceID int) error {

// For non-monthly subscriptions, set PendingInvoiceItemInterval to 'month' if not set
// so that overage is reported via monthly invoice items.
if interval != model.PricingSubscriptionIntervalMonthly &&
subscription.PendingInvoiceItemInterval != nil &&
subscription.PendingInvoiceItemInterval.Interval != stripe.SubscriptionPendingInvoiceItemIntervalIntervalMonth {
if interval != model.PricingSubscriptionIntervalMonthly {
log.WithContext(ctx).WithField("workspaceID", workspaceID).Info("configuring monthly invoices for non-monthly subscription")
updated, err := w.stripeClient.Subscriptions.Update(subscription.ID, &stripe.SubscriptionParams{
PendingInvoiceItemInterval: &stripe.SubscriptionPendingInvoiceItemIntervalParams{
Interval: stripe.String(string(stripe.SubscriptionPendingInvoiceItemIntervalIntervalMonth)),
Expand Down Expand Up @@ -1241,15 +1247,15 @@ func calculateOverage(workspace *model.Workspace, limit *int64, meter int64) int

func (w *Worker) AddOrUpdateOverageItem(newPrice *stripe.Price, invoiceLine *stripe.InvoiceLineItem, customer *stripe.Customer, subscription *stripe.Subscription, overage int64) error {
// if the price is a metered recurring subscription, use subscription items and usage records
if newPrice.Recurring != nil && newPrice.Recurring.UsageType == stripe.PriceRecurringUsageTypeMetered {
if newPrice.Recurring != nil {
var subscriptionItemID string
// if the subscription item doesn't create for this price, create it
if invoiceLine == nil || invoiceLine.SubscriptionItem.ID == "" {
params := &stripe.SubscriptionItemParams{
Subscription: &subscription.ID,
Price: &newPrice.ID,
}
params.SetIdempotencyKey(subscription.ID + ":" + newPrice.ID + ":item")
params.SetIdempotencyKey(subscription.ID + ":" + newPrice.ID + ":item:" + uuid.New().String())
subscriptionItem, err := w.stripeClient.SubscriptionItems.New(params)
if err != nil {
return e.Wrapf(err, "BILLING_ERROR failed to add invoice item for usage record item; invoiceLine=%+v, priceID=%s, subscriptionID=%s", invoiceLine, newPrice.ID, subscription.ID)
Expand Down Expand Up @@ -1282,7 +1288,7 @@ func (w *Worker) AddOrUpdateOverageItem(newPrice *stripe.Price, invoiceLine *str
Price: &newPrice.ID,
Quantity: stripe.Int64(overage),
}
params.SetIdempotencyKey(customer.ID + ":" + subscription.ID + ":" + newPrice.ID)
params.SetIdempotencyKey(customer.ID + ":" + subscription.ID + ":" + newPrice.ID + ":" + uuid.New().String())
if _, err := w.stripeClient.InvoiceItems.New(params); err != nil {
return e.Wrap(err, "BILLING_ERROR failed to add invoice item")
}
Expand Down
8 changes: 4 additions & 4 deletions backend/private-graph/graph/schema.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6745,7 +6745,7 @@ func (r *queryResolver) BillingDetails(ctx context.Context, workspaceID int) (*m
var tracesAvg float64

g.Go(func() error {
sessionsMeter, err = pricing.GetWorkspaceSessionsMeter(ctx, r.DB, r.ClickhouseClient, workspace)
sessionsMeter, err = pricing.GetWorkspaceSessionsMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
if err != nil {
return e.Wrap(err, "error from get quota")
}
Expand All @@ -6761,23 +6761,23 @@ func (r *queryResolver) BillingDetails(ctx context.Context, workspaceID int) (*m
})

g.Go(func() error {
errorsMeter, err = pricing.GetWorkspaceErrorsMeter(ctx, r.DB, r.ClickhouseClient, workspace)
errorsMeter, err = pricing.GetWorkspaceErrorsMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
if err != nil {
return e.Wrap(err, "error querying errors meter")
}
return nil
})

g.Go(func() error {
logsMeter, err = pricing.GetWorkspaceLogsMeter(ctx, r.DB, r.ClickhouseClient, workspace)
logsMeter, err = pricing.GetWorkspaceLogsMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
if err != nil {
return e.Wrap(err, "error querying logs meter")
}
return nil
})

g.Go(func() error {
tracesMeter, err = pricing.GetWorkspaceTracesMeter(ctx, r.DB, r.ClickhouseClient, workspace)
tracesMeter, err = pricing.GetWorkspaceTracesMeter(ctx, r.DB, r.ClickhouseClient, r.Redis, workspace)
return err
})

Expand Down