Skip to content

Commit

Permalink
hepa: periodically refresh auth session
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold committed Dec 23, 2023
1 parent b877bf6 commit cfbe117
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/hepa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ var runCmd = &cli.Command{
}
}()

if srv.engine.AdminClient != nil {
go func() {
if err := srv.RunRefreshAdminClient(ctx); err != nil {
slog.Error("session refresh failed", "err", err)
}
}()
}

// the main service loop
if err := srv.RunConsumer(ctx); err != nil {
return fmt.Errorf("failure consuming and processing firehose: %w", err)
Expand Down
38 changes: 38 additions & 0 deletions cmd/hepa/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,44 @@ func (s *Server) PersistCursor(ctx context.Context) error {
return err
}

// Periodically refreshes the engine's admin XRPC client JWT auth token.
//
// Expects to be run in a goroutine, and to be the only running code which touches the auth fields (aka, there is no locking).
// TODO: this is a hack until we have an XRPC client which handles these details automatically.
func (s *Server) RunRefreshAdminClient(ctx context.Context) error {
if s.engine.AdminClient == nil {
return nil
}
ac := s.engine.AdminClient
ticker := time.NewTicker(1 * time.Hour)
for {
select {
case <-ticker.C:
// uses a temporary xrpc client instead of the existing one because we need to put refreshJwt in the position of accessJwt, and that would cause an error for any concurrent requests
tmpClient := xrpc.Client{
Host: ac.Host,
Auth: &xrpc.AuthInfo{
Did: ac.Auth.Did,
Handle: ac.Auth.Handle,
AccessJwt: ac.Auth.RefreshJwt,
RefreshJwt: ac.Auth.RefreshJwt,
},
}
refresh, err := comatproto.ServerRefreshSession(ctx, &tmpClient)
if err != nil {
// don't return an error, just log, and attempt again on the next tick
s.logger.Error("failed to refresh admin client session", "err", err, "host", ac.Host)
} else {
s.engine.AdminClient.Auth.RefreshJwt = refresh.RefreshJwt
s.engine.AdminClient.Auth.AccessJwt = refresh.AccessJwt
s.logger.Info("refreshed admin client session")
}
case <-ctx.Done():
return nil
}
}
}

// this method runs in a loop, persisting the current cursor state every 5 seconds
func (s *Server) RunPersistCursor(ctx context.Context) error {

Expand Down

0 comments on commit cfbe117

Please sign in to comment.