Skip to content

Commit

Permalink
[SUP-49] add daily job to delete sessions according to a SESSION_RETE…
Browse files Browse the repository at this point in the history
…NTION_DAYS env var (#8813)

## Summary
- in the `All` runtime, start a job which runs on start up and every 24
hours that runs session deletion logic to remove any session data from
all projects older than `SESSION_RETENTION_DAYS`
<!--
Ideally, there is an attached GitHub issue that will describe the "why".

If relevant, use this section to call out any additional information
you'd like to _highlight_ to the reviewer.
-->

## How did you test this change?
- tested locally with different `SESSION_RETENTION_DAYS`, confirmed the
appropriate sessions were deleted from clickhouse, postgres, and the
local filesystem
<!--
Frontend - Leave a screencast or a screenshot to visually describe the
changes.
-->

## Are there any deployment considerations?
- no
<!--
 Backend - Do we need to consider migrations or backfilling data?
-->

## Does this work require review from our design team?
- no
<!--
 Request review from julian-highlight / our design team 
-->
  • Loading branch information
mayberryzane committed Jun 27, 2024
1 parent 95cec49 commit bada438
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func main() {
)
hlog.Init()
lambda.StartWithOptions(
h.DeleteSessionBatchFromOpenSearch,
h.DeleteSessionBatchFromClickhouse,
lambda.WithEnableSIGTERM(highlight.Stop),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func main() {
)
hlog.Init()
lambda.StartWithOptions(
h.DeleteSessionBatchFromS3,
h.DeleteSessionBatchFromObjectStorage,
lambda.WithEnableSIGTERM(highlight.Stop),
)
}
127 changes: 76 additions & 51 deletions backend/lambda-functions/deleteSessions/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,40 @@ import (
"github.com/openlyinc/pointy"
log "github.com/sirupsen/logrus"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
"github.com/highlight-run/highlight/backend/clickhouse"
"github.com/highlight-run/highlight/backend/email"
"github.com/highlight-run/highlight/backend/lambda-functions/deleteSessions/utils"
"github.com/highlight-run/highlight/backend/model"
modelInputs "github.com/highlight-run/highlight/backend/private-graph/graph/model"
storage "github.com/highlight-run/highlight/backend/storage"
"github.com/highlight-run/highlight/backend/util"
"github.com/pkg/errors"
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"
"gorm.io/gorm"
)

type Handlers interface {
DeleteSessionBatchFromOpenSearch(context.Context, utils.BatchIdResponse) (*utils.BatchIdResponse, error)
DeleteSessionBatchFromClickhouse(context.Context, utils.BatchIdResponse) (*utils.BatchIdResponse, error)
DeleteSessionBatchFromPostgres(context.Context, utils.BatchIdResponse) (*utils.BatchIdResponse, error)
DeleteSessionBatchFromS3(context.Context, utils.BatchIdResponse) (*utils.BatchIdResponse, error)
DeleteSessionBatchFromObjectStorage(context.Context, utils.BatchIdResponse) (*utils.BatchIdResponse, error)
GetSessionIdsByQuery(context.Context, utils.QuerySessionsInput) ([]utils.BatchIdResponse, error)
SendEmail(context.Context, utils.QuerySessionsInput) error
}

type handlers struct {
db *gorm.DB
clickhouseClient *clickhouse.Client
s3ClientEast2 *s3.Client
sendgridClient *sendgrid.Client
storageClient storage.Client
}

func InitHandlers(db *gorm.DB, clickhouseClient *clickhouse.Client, s3ClientEast2 *s3.Client, sendgridClient *sendgrid.Client) *handlers {
func InitHandlers(db *gorm.DB, clickhouseClient *clickhouse.Client, sendgridClient *sendgrid.Client, storageClient storage.Client) *handlers {
return &handlers{
db: db,
clickhouseClient: clickhouseClient,
s3ClientEast2: s3ClientEast2,
sendgridClient: sendgridClient,
storageClient: storageClient,
}
}

Expand All @@ -60,27 +58,17 @@ func NewHandlers() *handlers {
log.WithContext(ctx).Fatal(errors.Wrap(err, "error creating clickhouse client"))
}

cfgEast2, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-2"))
s3Client, err := storage.NewS3Client(ctx)
if err != nil {
log.WithContext(ctx).Fatal(errors.Wrap(err, "error loading default from config"))
log.WithContext(ctx).Fatal(errors.Wrap(err, "error creating s3 storage client"))
}
s3ClientEast2 := s3.NewFromConfig(cfgEast2, func(o *s3.Options) {
o.UsePathStyle = true
})

sendgridClient := sendgrid.NewSendClient(os.Getenv("SENDGRID_API_KEY"))

return InitHandlers(db, clickhouseClient, s3ClientEast2, sendgridClient)
return InitHandlers(db, clickhouseClient, sendgridClient, s3Client)
}

func (h *handlers) getSessionClientAndBucket(sessionId int) (*s3.Client, *string) {
client := h.s3ClientEast2
bucket := pointy.String(storage.S3SessionsPayloadBucketNameNew)

return client, bucket
}

func (h *handlers) DeleteSessionBatchFromOpenSearch(ctx context.Context, event utils.BatchIdResponse) (*utils.BatchIdResponse, error) {
func (h *handlers) DeleteSessionBatchFromClickhouse(ctx context.Context, event utils.BatchIdResponse) (*utils.BatchIdResponse, error) {
sessionIds, err := utils.GetSessionIdsInBatch(h.db, event.TaskId, event.BatchId)
if err != nil {
return nil, errors.Wrap(err, "error getting session ids to delete")
Expand Down Expand Up @@ -125,41 +113,16 @@ func (h *handlers) DeleteSessionBatchFromPostgres(ctx context.Context, event uti
return &event, nil
}

func (h *handlers) DeleteSessionBatchFromS3(ctx context.Context, event utils.BatchIdResponse) (*utils.BatchIdResponse, error) {
func (h *handlers) DeleteSessionBatchFromObjectStorage(ctx context.Context, event utils.BatchIdResponse) (*utils.BatchIdResponse, error) {
sessionIds, err := utils.GetSessionIdsInBatch(h.db, event.TaskId, event.BatchId)
if err != nil {
return nil, errors.Wrap(err, "error getting session ids to delete")
}

for _, sessionId := range sessionIds {
client, bucket := h.getSessionClientAndBucket(sessionId)

versionPart := "v2/"
devStr := ""
if util.IsDevOrTestEnv() {
devStr = "dev/"
}

prefix := fmt.Sprintf("%s%s%d/%d/", versionPart, devStr, event.ProjectId, sessionId)
options := s3.ListObjectsV2Input{
Bucket: bucket,
Prefix: &prefix,
}
output, err := client.ListObjectsV2(ctx, &options)
if err != nil {
return nil, errors.Wrap(err, "error listing objects in S3")
}

for _, object := range output.Contents {
options := s3.DeleteObjectInput{
Bucket: bucket,
Key: object.Key,
}
if !event.DryRun {
_, err := client.DeleteObject(ctx, &options)
if err != nil {
return nil, errors.Wrap(err, "error deleting objects from S3")
}
if !event.DryRun {
if err := h.storageClient.DeleteSessionData(ctx, event.ProjectId, sessionId); err != nil {
return nil, err
}
}
}
Expand Down Expand Up @@ -234,3 +197,65 @@ func (h *handlers) SendEmail(ctx context.Context, event utils.QuerySessionsInput

return nil
}

func (h *handlers) DeleteSessions(ctx context.Context, projectId int, startDate time.Time, endDate time.Time, query string) {
batches, err := h.GetSessionIdsByQuery(ctx, utils.QuerySessionsInput{
ProjectId: projectId,
Params: modelInputs.QueryInput{
DateRange: &modelInputs.DateRangeRequiredInput{
StartDate: startDate,
EndDate: endDate,
},
Query: query,
},
})
if err != nil {
log.WithContext(ctx).Error(err)
return
}

if len(batches) == 0 {
log.WithContext(ctx).Warnf("SessionDeleteJob - no sessions to delete for projectId %d, continuing", projectId)
return
}

log.WithContext(ctx).Infof("SessionDeleteJob - %d batches to delete for projectId %d", len(batches), projectId)

for _, batch := range batches {
log.WithContext(ctx).Infof("SessionDeleteJob - deleting sessions in batch %s for projectId %d", batch.BatchId, batch.ProjectId)

if _, err := h.DeleteSessionBatchFromPostgres(ctx, batch); err != nil {
log.WithContext(ctx).Error(err)
return
}
if _, err := h.DeleteSessionBatchFromObjectStorage(ctx, batch); err != nil {
log.WithContext(ctx).Error(err)
return
}
if _, err := h.DeleteSessionBatchFromClickhouse(ctx, batch); err != nil {
log.WithContext(ctx).Error(err)
return
}

log.WithContext(ctx).Infof("SessionDeleteJob - finished deleting sessions in batch %s for projectId %d", batch.BatchId, batch.ProjectId)
}
}

func (h *handlers) ProcessRetentionDeletions(ctx context.Context, retentionDays int) {
var projectIds []int
if err := h.db.Model(&model.Project{}).Select("id").Find(&projectIds).Error; err != nil {
log.WithContext(ctx).Error(err)
return
}

now := time.Now()
endDate := now.AddDate(0, 0, -1*retentionDays)
startDate := endDate.AddDate(-10, 0, 0)

for _, id := range projectIds {
projectId := id
// Only delete sessions which have not been viewed to avoid deleting any useful sessions
// Users can manually remove any others with the "delete sessions" button
go h.DeleteSessions(ctx, projectId, startDate, endDate, "viewed_by_anyone=false")
}
}
4 changes: 2 additions & 2 deletions backend/lambda-functions/deleteSessions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func main() {
if _, err := h.DeleteSessionBatchFromPostgres(ctx, out[0]); err != nil {
log.WithContext(ctx).Fatal(err)
}
if _, err := h.DeleteSessionBatchFromS3(ctx, out[0]); err != nil {
if _, err := h.DeleteSessionBatchFromObjectStorage(ctx, out[0]); err != nil {
log.WithContext(ctx).Fatal(err)
}
if _, err := h.DeleteSessionBatchFromOpenSearch(ctx, out[0]); err != nil {
if _, err := h.DeleteSessionBatchFromClickhouse(ctx, out[0]); err != nil {
log.WithContext(ctx).Fatal(err)
}

Expand Down
1 change: 1 addition & 0 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ func main() {
go w.GetPublicWorker(kafkaqueue.TopicTypeTraces)(ctx)
go w.StartLogAlertWatcher(ctx)
go w.StartMetricMonitorWatcher(ctx)
go w.StartSessionDeleteJob(ctx)
go func() {
w.ReportStripeUsage(ctx)
for range time.Tick(time.Hour) {
Expand Down
26 changes: 14 additions & 12 deletions backend/private-graph/graph/schema.resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/highlight-run/highlight/backend/integrations/cloudflare"
"github.com/highlight-run/highlight/backend/integrations/height"
kafka_queue "github.com/highlight-run/highlight/backend/kafka-queue"
delete_handlers "github.com/highlight-run/highlight/backend/lambda-functions/deleteSessions/handlers"
"github.com/highlight-run/highlight/backend/lambda-functions/deleteSessions/utils"
utils2 "github.com/highlight-run/highlight/backend/lambda-functions/sessionExport/utils"
"github.com/highlight-run/highlight/backend/model"
Expand Down Expand Up @@ -4291,10 +4292,6 @@ func (r *mutationResolver) DeleteDashboard(ctx context.Context, id int) (bool, e

// DeleteSessions is the resolver for the deleteSessions field.
func (r *mutationResolver) DeleteSessions(ctx context.Context, projectID int, params modelInputs.QueryInput, sessionCount int) (bool, error) {
if util.IsDevOrTestEnv() {
return false, nil
}

project, err := r.isUserInProject(ctx, projectID)
if err != nil {
return false, err
Expand Down Expand Up @@ -4328,14 +4325,19 @@ func (r *mutationResolver) DeleteSessions(ctx context.Context, projectID int, pa
return false, err
}

_, err = r.StepFunctions.DeleteSessionsByQuery(ctx, utils.QuerySessionsInput{
ProjectId: projectID,
Email: email,
FirstName: firstName,
Params: params,
SessionCount: sessionCount,
DryRun: util.IsDevOrTestEnv(),
})
if util.IsInDocker() {
deleteHandlers := delete_handlers.InitHandlers(r.DB, r.ClickhouseClient, nil, r.StorageClient)
deleteHandlers.DeleteSessions(ctx, projectID, params.DateRange.StartDate, params.DateRange.EndDate, params.Query)
} else {
_, err = r.StepFunctions.DeleteSessionsByQuery(ctx, utils.QuerySessionsInput{
ProjectId: projectID,
Email: email,
FirstName: firstName,
Params: params,
SessionCount: sessionCount,
DryRun: util.IsDevOrTestEnv(),
})
}

if err != nil {
return false, err
Expand Down
50 changes: 49 additions & 1 deletion backend/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"encoding/pem"
"fmt"
hredis "github.com/highlight-run/highlight/backend/redis"
"io"
"net/http"
"os"
Expand All @@ -17,6 +16,8 @@ import (
"sync"
"time"

hredis "github.com/highlight-run/highlight/backend/redis"

"github.com/go-chi/chi"
"github.com/rs/cors"
"github.com/samber/lo"
Expand Down Expand Up @@ -100,6 +101,7 @@ type Client interface {
UploadAsset(ctx context.Context, uuid string, contentType string, reader io.Reader, retentionPeriod privateModel.RetentionPeriod) error
ReadGitHubFile(ctx context.Context, repoPath string, fileName string, version string) ([]byte, error)
PushGitHubFile(ctx context.Context, repoPath string, fileName string, version string, fileBytes []byte) (*int64, error)
DeleteSessionData(ctx context.Context, projectId int, sessionId int) error
}

type FilesystemClient struct {
Expand Down Expand Up @@ -473,6 +475,52 @@ func (f *FilesystemClient) SetupHTTPSListener(r chi.Router) {
})
}

func (f *FilesystemClient) DeleteSessionData(ctx context.Context, projectId int, sessionId int) error {
if f.fsRoot == "" {
return errors.New("f.fsRoot cannot be empty")
}
if err := os.RemoveAll(fmt.Sprintf("%s/%d/%d", f.fsRoot, projectId, sessionId)); err != nil {
return err
}
if err := os.RemoveAll(fmt.Sprintf("%s/raw-events/%d/%d", f.fsRoot, projectId, sessionId)); err != nil {
return err
}
return nil
}

func (s *S3Client) DeleteSessionData(ctx context.Context, projectId int, sessionId int) error {
client, bucket := s.getSessionClientAndBucket(sessionId)

versionPart := "v2/"
devStr := ""
if util.IsDevOrTestEnv() {
devStr = "dev/"
}

prefix := fmt.Sprintf("%s%s%d/%d/", versionPart, devStr, projectId, sessionId)
options := s3.ListObjectsV2Input{
Bucket: bucket,
Prefix: &prefix,
}
output, err := client.ListObjectsV2(ctx, &options)
if err != nil {
return errors.Wrap(err, "error listing objects in S3")
}

for _, object := range output.Contents {
options := s3.DeleteObjectInput{
Bucket: bucket,
Key: object.Key,
}
_, err := client.DeleteObject(ctx, &options)
if err != nil {
return errors.Wrap(err, "error deleting objects from S3")
}
}

return nil
}

func NewFSClient(_ context.Context, origin, fsRoot string) (*FilesystemClient, error) {
return &FilesystemClient{origin: origin, fsRoot: fsRoot, redis: hredis.NewClient()}, nil
}
Expand Down
Loading

1 comment on commit bada438

@vercel
Copy link

@vercel vercel bot commented on bada438 Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.