Skip to content

Commit

Permalink
refactor worker session search
Browse files Browse the repository at this point in the history
  • Loading branch information
Vadman97 committed Apr 26, 2024
1 parent 260600e commit d5078a0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 38 deletions.
42 changes: 30 additions & 12 deletions backend/public-graph/graph/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,18 @@ func (r *Resolver) IndexSessionClickhouse(ctx context.Context, session *model.Se
return r.DataSyncQueue.Submit(ctx, strconv.Itoa(session.ID), &kafka_queue.Message{Type: kafka_queue.SessionDataSync, SessionDataSync: &kafka_queue.SessionDataSyncArgs{SessionID: session.ID}})
}

func (r *Resolver) getSession(ctx context.Context, sessionSecureID string) (*model.Session, error) {
sessionObj, found := r.SessionCache.Get(sessionSecureID)
if !found {
if err := r.DB.WithContext(ctx).Where(&model.Session{SecureID: sessionSecureID}).Limit(1).Take(&sessionObj).Error; err != nil {
retErr := e.Wrapf(err, "error reading from session %v", sessionSecureID)
log.WithContext(ctx).WithField("sessionSecureID", sessionSecureID).WithError(retErr).Error("failed to get session")
return nil, err
}
}
return sessionObj, nil
}

func (r *Resolver) getExistingSession(ctx context.Context, projectID int, secureID string) (*model.Session, error) {
existingSessionObj := &model.Session{}
if err := r.DB.WithContext(ctx).Model(&existingSessionObj).Where(&model.Session{SecureID: secureID}).Take(&existingSessionObj).Error; err != nil {
Expand Down Expand Up @@ -1551,11 +1563,16 @@ func (r *Resolver) IdentifySessionImpl(ctx context.Context, sessionSecureID stri
return nil
}

func (r *Resolver) AddSessionPropertiesImpl(ctx context.Context, sessionID int, propertiesObject interface{}) error {
func (r *Resolver) AddSessionPropertiesImpl(ctx context.Context, sessionSecureID string, propertiesObject interface{}) error {
outerSpan, ctx := util.StartSpanFromContext(ctx, "public-graph.AddSessionPropertiesImpl",
util.ResourceName("go.sessions.AddSessionPropertiesImpl"))
defer outerSpan.Finish()

sessionObj, err := r.getSession(ctx, sessionSecureID)
if err != nil {
return err
}

obj, ok := propertiesObject.(map[string]interface{})
if !ok {
return e.New("error converting userObject interface type")
Expand All @@ -1564,7 +1581,7 @@ func (r *Resolver) AddSessionPropertiesImpl(ctx context.Context, sessionID int,
for k, v := range obj {
fields[k] = fmt.Sprintf("%v", v)
}
err := r.AppendProperties(ctx, sessionID, fields, PropertyType.SESSION)
err = r.AppendProperties(ctx, sessionObj.ID, fields, PropertyType.SESSION)
if err != nil {
return e.Wrap(err, "error adding set of properties to db")
}
Expand Down Expand Up @@ -2243,11 +2260,16 @@ func (r *Resolver) ProcessBackendPayloadImpl(ctx context.Context, sessionSecureI
}

// Deprecated, left for backward compatibility with older client versions. Use AddTrackProperties instead
func (r *Resolver) AddTrackPropertiesImpl(ctx context.Context, sessionID int, propertiesObject interface{}) error {
func (r *Resolver) AddTrackPropertiesImpl(ctx context.Context, sessionSecureID string, propertiesObject interface{}) error {
outerSpan, ctx := util.StartSpanFromContext(ctx, "public-graph.AddTrackPropertiesImpl",
util.ResourceName("go.sessions.AddTrackPropertiesImpl"))
defer outerSpan.Finish()

sessionObj, err := r.getSession(ctx, sessionSecureID)
if err != nil {
return err
}

obj, ok := propertiesObject.(map[string]interface{})
if !ok {
return e.New("error converting userObject interface type")
Expand All @@ -2259,7 +2281,7 @@ func (r *Resolver) AddTrackPropertiesImpl(ctx context.Context, sessionID int, pr
return e.New("therewasonceahumblebumblebeeflyingthroughtheforestwhensuddenlyadropofwaterfullyencasedhimittookhimasecondtofigureoutthathesinaraindropsuddenlytheraindrophitthegroundasifhewasdivingintoapoolandheflewawaywithnofurtherissues")
}
}
err := r.AppendProperties(ctx, sessionID, fields, PropertyType.TRACK)
err = r.AppendProperties(ctx, sessionObj.ID, fields, PropertyType.TRACK)
if err != nil {
return e.Wrap(err, "error adding set of properties to db")
}
Expand Down Expand Up @@ -2455,14 +2477,10 @@ func (r *Resolver) ProcessPayload(ctx context.Context, sessionSecureID string, e
return e.New("ProcessPayload called without secureID")
}

sessionObj, found := r.SessionCache.Get(sessionSecureID)
if !found {
if err := r.DB.WithContext(ctx).Where(&model.Session{SecureID: sessionSecureID}).Limit(1).Take(&sessionObj).Error; err != nil {
retErr := e.Wrapf(err, "error reading from session %v", sessionSecureID)
log.WithContext(ctx).Error(retErr)
querySessionSpan.Finish(retErr)
return retErr
}
sessionObj, err := r.getSession(ctx, sessionSecureID)
if err != nil {
querySessionSpan.Finish(err)
return err
}
querySessionSpan.SetAttribute("secure_id", sessionObj.SecureID)
querySessionSpan.SetAttribute("project_id", sessionObj.ProjectID)
Expand Down
28 changes: 2 additions & 26 deletions backend/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,6 @@ func (w *Worker) scanSessionPayload(ctx context.Context, manager *payload.Payloa
return nil
}

func (w *Worker) getSessionID(ctx context.Context, sessionSecureID string) (id int, err error) {
s, _ := util.StartSpanFromContext(ctx, "getSessionID", util.ResourceName("worker.getSessionID"))
s.SetAttribute("secure_id", sessionSecureID)
defer s.Finish()
if sessionSecureID == "" {
return 0, e.New("getSessionID called with no secure id")
}
session := &model.Session{}
w.Resolver.DB.Select("id").Where(&model.Session{SecureID: sessionSecureID}).Take(&session)
if session.ID == 0 {
return 0, e.New(fmt.Sprintf("no session found for secure id: '%s'", sessionSecureID))
}
id = session.ID
return
}

func (w *Worker) processPublicWorkerMessage(ctx context.Context, task *kafkaqueue.Message) error {
switch task.Type {
case kafkaqueue.PushPayload:
Expand Down Expand Up @@ -375,23 +359,15 @@ func (w *Worker) processPublicWorkerMessage(ctx context.Context, task *kafkaqueu
if task.AddTrackProperties == nil {
break
}
sessionID, err := w.getSessionID(ctx, task.AddTrackProperties.SessionSecureID)
if err != nil {
return err
}
if err := w.PublicResolver.AddTrackPropertiesImpl(ctx, sessionID, task.AddTrackProperties.PropertiesObject); err != nil {
if err := w.PublicResolver.AddTrackPropertiesImpl(ctx, task.AddTrackProperties.SessionSecureID, task.AddTrackProperties.PropertiesObject); err != nil {
log.WithContext(ctx).WithError(err).WithField("type", task.Type).Error("failed to process task")
return err
}
case kafkaqueue.AddSessionProperties:
if task.AddSessionProperties == nil {
break
}
sessionID, err := w.getSessionID(ctx, task.AddSessionProperties.SessionSecureID)
if err != nil {
return err
}
if err := w.PublicResolver.AddSessionPropertiesImpl(ctx, sessionID, task.AddSessionProperties.PropertiesObject); err != nil {
if err := w.PublicResolver.AddSessionPropertiesImpl(ctx, task.AddSessionProperties.SessionSecureID, task.AddSessionProperties.PropertiesObject); err != nil {
log.WithContext(ctx).WithError(err).WithField("type", task.Type).Error("failed to process task")
return err
}
Expand Down

0 comments on commit d5078a0

Please sign in to comment.