Skip to content

Commit 76858c4

Browse files
FrankLi123kallydev
authored andcommitted
feat(worker/polymarket): add polymarket worker (#540)
1 parent 51c209f commit 76858c4

File tree

14 files changed

+6503
-48
lines changed

14 files changed

+6503
-48
lines changed
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package polymarket
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/big"
7+
8+
"github.com/ethereum/go-ethereum/common"
9+
"github.com/rss3-network/node/config"
10+
"github.com/rss3-network/node/internal/engine"
11+
source "github.com/rss3-network/node/internal/engine/source/ethereum"
12+
"github.com/rss3-network/node/provider/ethereum"
13+
"github.com/rss3-network/node/provider/ethereum/contract"
14+
"github.com/rss3-network/node/provider/ethereum/contract/polymarket"
15+
"github.com/rss3-network/node/provider/ethereum/token"
16+
"github.com/rss3-network/node/schema/worker/decentralized"
17+
"github.com/rss3-network/protocol-go/schema"
18+
activityx "github.com/rss3-network/protocol-go/schema/activity"
19+
"github.com/rss3-network/protocol-go/schema/metadata"
20+
"github.com/rss3-network/protocol-go/schema/network"
21+
"github.com/rss3-network/protocol-go/schema/tag"
22+
"github.com/rss3-network/protocol-go/schema/typex"
23+
"github.com/samber/lo"
24+
"github.com/shopspring/decimal"
25+
"go.uber.org/zap"
26+
)
27+
28+
var _ engine.Worker = (*worker)(nil)
29+
30+
type worker struct {
31+
ethereumClient ethereum.Client
32+
tokenClient token.Client
33+
ctfExchange *polymarket.CTFExchangeFilterer
34+
}
35+
36+
func (w *worker) Name() string {
37+
return decentralized.Polymarket.String()
38+
}
39+
40+
func (w *worker) Platform() string {
41+
return decentralized.PlatformPolymarket.String()
42+
}
43+
44+
func (w *worker) Network() []network.Network {
45+
return []network.Network{
46+
network.Polygon,
47+
}
48+
}
49+
50+
func (w *worker) Tags() []tag.Tag {
51+
return []tag.Tag{
52+
tag.Collectible,
53+
}
54+
}
55+
56+
func (w *worker) Types() []schema.Type {
57+
return []schema.Type{
58+
typex.CollectibleTrade,
59+
}
60+
}
61+
62+
func (w *worker) Filter() engine.DataSourceFilter {
63+
return &source.Filter{
64+
LogAddresses: []common.Address{
65+
polymarket.AddressPolyMarketCTFExchange,
66+
polymarket.AddressPolyMarketNegRiskCTFExchange,
67+
},
68+
LogTopics: []common.Hash{
69+
polymarket.EventOrderFinalized,
70+
},
71+
}
72+
}
73+
74+
func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Activity, error) {
75+
polygonTask, ok := task.(*source.Task)
76+
if !ok {
77+
return nil, fmt.Errorf("invalid task type %T", task)
78+
}
79+
80+
activity, err := task.BuildActivity(activityx.WithActivityPlatform(w.Platform()))
81+
if err != nil {
82+
return nil, fmt.Errorf("build activity: %w", err)
83+
}
84+
85+
for _, log := range polygonTask.Receipt.Logs {
86+
if len(log.Topics) == 0 {
87+
continue
88+
}
89+
90+
var (
91+
actions []*activityx.Action
92+
err error
93+
)
94+
95+
switch {
96+
case w.matchOrderFinalizedLog(polygonTask, log):
97+
actions, err = w.transformOrderFinalizedLog(ctx, polygonTask, log)
98+
99+
default:
100+
zap.L().Warn("unsupported log", zap.String("task", polygonTask.ID()), zap.Uint("topic.index", log.Index))
101+
continue
102+
}
103+
104+
if err != nil {
105+
zap.L().Warn("handle polymarket order transaction", zap.Error(err), zap.String("worker", w.Name()), zap.String("task", polygonTask.ID()))
106+
107+
return nil, err
108+
}
109+
110+
activity.Actions = append(activity.Actions, actions...)
111+
}
112+
113+
activity.Type = typex.CollectibleTrade
114+
115+
return activity, nil
116+
}
117+
118+
func (w *worker) matchOrderFinalizedLog(_ *source.Task, log *ethereum.Log) bool {
119+
return contract.MatchEventHashes(log.Topics[0], polymarket.EventOrderFinalized) &&
120+
contract.MatchAddresses(log.Address, polymarket.AddressPolyMarketCTFExchange, polymarket.AddressPolyMarketNegRiskCTFExchange)
121+
}
122+
123+
func (w *worker) transformOrderFinalizedLog(ctx context.Context, task *source.Task, log *ethereum.Log) ([]*activityx.Action, error) {
124+
var err error
125+
126+
// CTF and NegRiskCTF shares the same OrderFilled struct
127+
event, err := w.ctfExchange.ParseOrderFilled(log.Export())
128+
129+
if err != nil {
130+
return nil, fmt.Errorf("parse OrderFilled event: %w", err)
131+
}
132+
133+
buyAction, sellAction, err := w.buildMarketTradeAction(ctx, task, task.ChainID, event.Maker, event.Taker, event.MakerAssetId, event.TakerAssetId, event.OrderHash, event.MakerAmountFilled, event.TakerAmountFilled)
134+
if err != nil {
135+
return nil, fmt.Errorf("build market trade action: %w", err)
136+
}
137+
138+
return []*activityx.Action{buyAction, sellAction}, nil
139+
}
140+
141+
func (w *worker) buildMarketTradeAction(ctx context.Context, _ *source.Task, chainID uint64, maker, taker common.Address, makerAssetID, takerAssetID *big.Int, _ [32]byte, makerAmountFilled, takerAmountFilled *big.Int) (*activityx.Action, *activityx.Action, error) {
142+
makerAmountFilledDecimal := decimal.NewFromBigInt(makerAmountFilled, 0)
143+
takerAmountFilledDecimal := decimal.NewFromBigInt(takerAmountFilled, 0)
144+
145+
var takerTokenAddress *common.Address
146+
if takerAssetID.Cmp(big.NewInt(0)) == 0 {
147+
takerTokenAddress = nil
148+
} else {
149+
address := common.HexToAddress(polymarket.AddressPolyMarketConditionTokens.String())
150+
takerTokenAddress = &address
151+
}
152+
153+
takerToken, err := w.tokenClient.Lookup(ctx, chainID, takerTokenAddress, takerAssetID, nil)
154+
155+
if err != nil {
156+
return nil, nil, fmt.Errorf("lookup taker token: %w", err)
157+
}
158+
159+
takerToken.Value = &takerAmountFilledDecimal
160+
161+
var makerTokenAddress *common.Address
162+
163+
if makerAssetID.Cmp(big.NewInt(0)) == 0 {
164+
makerTokenAddress = nil
165+
} else {
166+
address := common.HexToAddress(polymarket.AddressPolyMarketConditionTokens.String())
167+
makerTokenAddress = &address
168+
}
169+
170+
makerToken, err := w.tokenClient.Lookup(ctx, chainID, makerTokenAddress, makerAssetID, nil)
171+
if err != nil {
172+
return nil, nil, fmt.Errorf("lookup maker token: %w", err)
173+
}
174+
175+
makerToken.Value = &makerAmountFilledDecimal
176+
177+
buyAction := &activityx.Action{
178+
Type: typex.CollectibleTrade,
179+
Platform: w.Platform(),
180+
From: taker.String(),
181+
To: maker.String(),
182+
Metadata: metadata.CollectibleTrade{
183+
Action: metadata.ActionCollectibleTradeBuy,
184+
Token: *takerToken,
185+
Cost: makerToken,
186+
},
187+
}
188+
189+
// Sell action (from the maker's perspective)
190+
sellAction := &activityx.Action{
191+
Type: typex.CollectibleTrade,
192+
Platform: w.Platform(),
193+
From: maker.String(),
194+
To: taker.String(),
195+
Metadata: metadata.CollectibleTrade{
196+
Action: metadata.ActionCollectibleTradeSell,
197+
Token: *takerToken,
198+
Cost: makerToken,
199+
},
200+
}
201+
202+
return buyAction, sellAction, nil
203+
}
204+
205+
func NewWorker(config *config.Module) (engine.Worker, error) {
206+
instance := worker{
207+
ctfExchange: lo.Must(polymarket.NewCTFExchangeFilterer(ethereum.AddressGenesis, nil)),
208+
// negRiskCTF: lo.Must(polymarket.NewNegRiskCTFExchangeFilterer(ethereum.AddressGenesis, nil)),
209+
}
210+
211+
var err error
212+
213+
if instance.ethereumClient, err = ethereum.Dial(context.Background(), config.Endpoint.URL, config.Endpoint.BuildEthereumOptions()...); err != nil {
214+
return nil, fmt.Errorf("initialize ethereum client: %w", err)
215+
}
216+
217+
instance.tokenClient = token.NewClient(instance.ethereumClient)
218+
219+
return &instance, nil
220+
}

0 commit comments

Comments
 (0)