Skip to content

Commit

Permalink
multiple improvements to telegram auth
Browse files Browse the repository at this point in the history
- sync code style with remark42
- improve parseError returned error description
- handle empty response case in BotInfo
- remove unused fields from botInfo object
- remove unnecessary returned error from Run
- separate cleanup and processUpdates tickers
- increase processUpdates ticker from 1s to 5s
- set expired requests cleanup ticker to 5m
- increase test coverage
  • Loading branch information
paskal committed Oct 31, 2021
1 parent 3387ae6 commit b7b1f7e
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 78 deletions.
118 changes: 67 additions & 51 deletions provider/telegram.go
Expand Up @@ -56,7 +56,8 @@ type TelegramAPI interface {
}

// changed in tests
var tgPollInterval = time.Second
var apiPollInterval = time.Second * 5 // interval to check updates from Telegram API and answer to users
var expiredCleanupInterval = time.Minute * 5 // interval to check and clean up expired notification requests

// Run starts processing login requests sent in Telegram
// Blocks caller
Expand All @@ -73,21 +74,23 @@ func (th *TelegramHandler) Run(ctx context.Context) error {

th.username = info.Username

ticker := time.NewTicker(tgPollInterval)
processUpdatedTicker := time.NewTicker(apiPollInterval)
cleanupTicker := time.NewTicker(expiredCleanupInterval)

for {
select {
case <-ctx.Done():
ticker.Stop()
processUpdatedTicker.Stop()
cleanupTicker.Stop()
return ctx.Err()
case <-ticker.C:
err := th.processUpdates(ctx)
case <-processUpdatedTicker.C:
updates, err := th.Telegram.GetUpdates(ctx)
if err != nil {
th.Logf("Error while processing updates: %v", err)
th.Logf("Error while getting telegram updates: %v", err)
continue
}

// Purge expired requests
th.processUpdates(ctx, updates)
case <-cleanupTicker.C:
now := time.Now()
th.requests.Lock()
for key, req := range th.requests.data {
Expand All @@ -100,6 +103,7 @@ func (th *TelegramHandler) Run(ctx context.Context) error {
}
}

// telegramUpdate contains update information, which is used from whole telegram API response
type telegramUpdate struct {
Result []struct {
UpdateID int `json:"update_id"`
Expand All @@ -116,12 +120,7 @@ type telegramUpdate struct {

// processUpdates processes a batch of updates from telegram servers
// Returns offset for subsequent calls
func (th *TelegramHandler) processUpdates(ctx context.Context) error {
updates, err := th.Telegram.GetUpdates(ctx)
if err != nil {
return err
}

func (th *TelegramHandler) processUpdates(ctx context.Context, updates *telegramUpdate) {
for _, update := range updates.Result {
if update.Message.Chat.Type != "private" {
continue
Expand Down Expand Up @@ -173,10 +172,46 @@ func (th *TelegramHandler) processUpdates(ctx context.Context) error {
th.Logf("failed to notify telegram peer: %v", err)
}
}
}

// addToken adds token
func (th *TelegramHandler) addToken(token string, expires time.Time) error {
th.requests.Lock()
if th.requests.data == nil {
th.requests.Unlock()
return errors.New("run goroutine is not running")
}
th.requests.data[token] = tgAuthRequest{
expires: expires,
}
th.requests.Unlock()
return nil
}

// checkToken verifies incoming token, returns the user address if it's confirmed and empty string otherwise
func (th *TelegramHandler) checkToken(token string) (*authtoken.User, error) {
th.requests.RLock()
authRequest, ok := th.requests.data[token]
th.requests.RUnlock()

if !ok {
return nil, errors.New("request is not found")
}

if time.Now().After(authRequest.expires) {
th.requests.Lock()
delete(th.requests.data, token)
th.requests.Unlock()
return nil, errors.New("request expired")
}

if !authRequest.confirmed {
return nil, errors.New("request is not verified yet")
}

return authRequest.user, nil
}

// Name of the provider
func (th *TelegramHandler) Name() string { return th.ProviderName }

Expand All @@ -195,16 +230,11 @@ func (th *TelegramHandler) LoginHandler(w http.ResponseWriter, r *http.Request)
return
}

th.requests.Lock()
if th.requests.data == nil {
th.requests.Unlock()
rest.SendErrorJSON(w, r, th.L, http.StatusInternalServerError, errors.New("run goroutine is not running"), "failed to process login request")
err = th.addToken(token, time.Now().Add(tgAuthRequestLifetime))
if err != nil {
rest.SendErrorJSON(w, r, th.L, http.StatusInternalServerError, err, "failed to process login request")
return
}
th.requests.data[token] = tgAuthRequest{
expires: time.Now().Add(tgAuthRequestLifetime),
}
th.requests.Unlock()

rest.RenderJSON(w, struct {
Token string `json:"token"`
Expand All @@ -215,25 +245,13 @@ func (th *TelegramHandler) LoginHandler(w http.ResponseWriter, r *http.Request)
}

// GET /login?token=blah
th.requests.RLock()
authRequest, ok := th.requests.data[queryToken]
th.requests.RUnlock()

if !ok || time.Now().After(authRequest.expires) {
th.requests.Lock()
delete(th.requests.data, queryToken)
th.requests.Unlock()

rest.SendErrorJSON(w, r, nil, http.StatusNotFound, nil, "request expired")
return
}

if !authRequest.confirmed {
rest.SendErrorJSON(w, r, nil, http.StatusNotFound, nil, "request not yet confirmed")
authUser, err := th.checkToken(queryToken)
if err != nil {
rest.SendErrorJSON(w, r, nil, http.StatusNotFound, err, err.Error())
return
}

u, err := setAvatar(th.AvatarSaver, *authRequest.user, &http.Client{Timeout: 5 * time.Second})
u, err := setAvatar(th.AvatarSaver, *authUser, &http.Client{Timeout: 5 * time.Second})
if err != nil {
rest.SendErrorJSON(w, r, th.L, http.StatusInternalServerError, err, "failed to save avatar to proxy")
return
Expand Down Expand Up @@ -363,9 +381,8 @@ func (tg *tgAPI) Avatar(ctx context.Context, id int) (string, error) {
return avatarURL, nil
}

// botInfo structure contains information about telegram bot, which is used from whole telegram API response
type botInfo struct {
ID int `json:"id"`
Name string `json:"first_name"`
Username string `json:"username"`
}

Expand All @@ -377,16 +394,17 @@ func (tg *tgAPI) BotInfo(ctx context.Context) (*botInfo, error) {

err := tg.request(ctx, "getMe", &resp)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch bot info")
return nil, err
}
if resp.Result == nil {
return nil, errors.New("received empty result")
}

return resp.Result, nil
}

func (tg *tgAPI) request(ctx context.Context, method string, data interface{}) error {
repeat := repeater.NewDefault(3, time.Millisecond*50)

return repeat.Do(ctx, func() error {
return repeater.NewDefault(3, time.Millisecond*50).Do(ctx, func() error {
url := fmt.Sprintf("https://api.telegram.org/bot%s/%s", tg.token, method)

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
Expand All @@ -401,7 +419,7 @@ func (tg *tgAPI) request(ctx context.Context, method string, data interface{}) e
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return tg.parseError(resp.Body)
return tg.parseError(resp.Body, resp.StatusCode)
}

if err = json.NewDecoder(resp.Body).Decode(data); err != nil {
Expand All @@ -412,14 +430,12 @@ func (tg *tgAPI) request(ctx context.Context, method string, data interface{}) e
})
}

func (tg *tgAPI) parseError(r io.Reader) error {
var tgErr = struct {
func (tg *tgAPI) parseError(r io.Reader, statusCode int) error {
tgErr := struct {
Description string `json:"description"`
}{}

if err := json.NewDecoder(r).Decode(&tgErr); err != nil {
return errors.Wrap(err, "can't decode error")
return errors.Errorf("unexpected telegram API status code %d", statusCode)
}

return errors.Errorf("telegram returned error: %v", tgErr.Description)
return errors.Errorf("unexpected telegram API status code %d, error: %q", statusCode, tgErr.Description)
}

0 comments on commit b7b1f7e

Please sign in to comment.