Skip to content

Commit

Permalink
Kruto
Browse files Browse the repository at this point in the history
  • Loading branch information
Satont committed Dec 23, 2023
1 parent a11a828 commit d1a63da
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
53 changes: 50 additions & 3 deletions internal/messagesender/temporal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,66 @@ package temporal

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/satont/twitch-notifier/internal/messagesender"
"github.com/satont/twitch-notifier/pkg/logger"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.uber.org/fx"
)

func NewTemporal() *Temporal {
return &Temporal{}
type TemporalOpts struct {
fx.In

Workflow *Workflow
Logger logger.Logger
}

func NewTemporal(opts TemporalOpts) (*Temporal, error) {
cl, err := client.Dial(client.Options{})
if err != nil {
return nil, err
}

return &Temporal{
client: cl,
workflow: opts.Workflow,
logger: opts.Logger,
}, nil
}

const queueName = "message-sender"

type Temporal struct {
client client.Client
workflow *Workflow
logger logger.Logger
}

var _ messagesender.MessageSender = (*Temporal)(nil)

func (m *Temporal) SendMessageTelegram(ctx context.Context, opts messagesender.TelegramOpts) error {
func (c *Temporal) SendMessageTelegram(ctx context.Context, opts messagesender.TelegramOpts) error {
workflowOptions := client.StartWorkflowOptions{
ID: fmt.Sprintf("MSG: Telegram to %s #%s", uuid.NewString(), opts.ServiceChatID),
TaskQueue: queueName,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 5,
InitialInterval: 10 * time.Second,
},
}

we, err := c.client.ExecuteWorkflow(ctx, workflowOptions, c.workflow.SendTelegram, thumbnailUrl)
if err != nil {
return err
}

err = we.Get(ctx, nil)
if err != nil {
return err
}

return nil
}
7 changes: 6 additions & 1 deletion internal/messagesender/temporal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package temporal

import (
"github.com/satont/twitch-notifier/internal/messagesender"
"github.com/satont/twitch-notifier/pkg/logger"
"go.temporal.io/sdk/workflow"
"go.uber.org/fx"
)

type WorkflowOpts struct {
fx.In

Logger logger.Logger
}

func NewWorkflow(opts WorkflowOpts) *Workflow {
Expand All @@ -17,4 +20,6 @@ func NewWorkflow(opts WorkflowOpts) *Workflow {
type Workflow struct {
}

func (c *Workflow) SendTelegram(ctx workflow.Context, opts messagesender.TelegramOpts)
func (c *Workflow) SendTelegram(ctx workflow.Context, opts messagesender.TelegramOpts) error {
return nil
}
2 changes: 1 addition & 1 deletion internal/thumbnailchecker/temporal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *Temporal) ValidateThumbnail(ctx context.Context, thumbnailUrl string) e
return err
}

err = we.Get(context.Background(), nil)
err = we.Get(ctx, nil)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion internal/thumbnailchecker/temporal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type WorkerOpts struct {
}

func NewWorker(opts WorkerOpts) error {
// The client and worker are heavyweight objects that should be created once per process.
temporalClient, err := client.Dial(
client.Options{
Logger: log.NewStructuredLogger(opts.Logger.GetSlog()),
Expand Down

0 comments on commit d1a63da

Please sign in to comment.