Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2f3b565
CROSSLINK-250 multiple symbols per tenant
adamdickmeiss Apr 14, 2026
c10cca0
SymbolChecker used in regular broker API
adamdickmeiss Apr 14, 2026
ce2d125
SymbolChecker used for sse as well
adamdickmeiss Apr 15, 2026
7bfad4d
Merge branch 'main' into CROSSLINK-250-multiple-symbols-per-tenant
adamdickmeiss Apr 15, 2026
7358b71
Better check for okapi mode /broker prefix
adamdickmeiss Apr 15, 2026
135e953
SymbolChecker fixes
adamdickmeiss Apr 15, 2026
b56dc44
takes api.symbolChecker
adamdickmeiss Apr 15, 2026
3c43bdc
common.Tenant gone
adamdickmeiss Apr 15, 2026
dd0e565
Merge branch 'main' into CROSSLINK-250-multiple-symbols-per-tenant
adamdickmeiss Apr 15, 2026
cf699c0
No need for ptr
adamdickmeiss Apr 16, 2026
5193cee
Merge branch 'main' into CROSSLINK-250-multiple-symbols-per-tenant
adamdickmeiss Apr 16, 2026
ae01bfe
Fix test
adamdickmeiss Apr 16, 2026
3a5e318
Deal with symbols in GetIllTransactions
adamdickmeiss Apr 16, 2026
cc36ae7
Rename
adamdickmeiss Apr 16, 2026
7af44ef
Merge remote-tracking branch 'origin/main' into CROSSLINK-250-multipl…
adamdickmeiss Apr 16, 2026
8599b18
Rework tenant API
adamdickmeiss Apr 17, 2026
f74a54a
check body
adamdickmeiss Apr 17, 2026
d917ab7
Merge branch 'main' into CROSSLINK-250-multiple-symbols-per-tenant
adamdickmeiss Apr 17, 2026
0554300
tenant in separate package
adamdickmeiss Apr 17, 2026
ca320a5
Merge branch 'main' into CROSSLINK-250-multiple-symbols-per-tenant
adamdickmeiss Apr 17, 2026
749ce65
Avoid url as variable
adamdickmeiss Apr 17, 2026
4d3b117
Fix typo
adamdickmeiss Apr 17, 2026
bd22f49
Use r.Context for sse_broker
adamdickmeiss Apr 17, 2026
191f7d1
Use cqlbuilder
adamdickmeiss Apr 17, 2026
0e803fb
Create context from request in most cases
adamdickmeiss Apr 17, 2026
4d39c98
Do not call GetCachedPeersBySymbols if tenant sym == supplied syml
adamdickmeiss Apr 17, 2026
731f2e7
Minor adjustments to testing
adamdickmeiss Apr 17, 2026
ca1e0b5
Merge branch 'main' into CROSSLINK-250-multiple-symbols-per-tenant
adamdickmeiss Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 59 additions & 80 deletions broker/api/api-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/indexdata/crosslink/broker/adapter"
"github.com/indexdata/crosslink/broker/service"
"github.com/indexdata/crosslink/broker/tenant"
"github.com/indexdata/crosslink/directory"

"github.com/google/uuid"
Expand All @@ -34,36 +35,23 @@ var LIMIT_DEFAULT int32 = 10
var ARCHIVE_PROCESS_STARTED = "Archive process started"

type ApiHandler struct {
limitDefault int32
eventRepo events.EventRepo
illRepo ill_db.IllRepo
tenant common.Tenant
limitDefault int32
eventRepo events.EventRepo
illRepo ill_db.IllRepo
tenantContext tenant.TenantContext
}

func NewApiHandler(eventRepo events.EventRepo, illRepo ill_db.IllRepo, tenant common.Tenant, limitDefault int32) ApiHandler {
func NewApiHandler(eventRepo events.EventRepo, illRepo ill_db.IllRepo, tenantContext tenant.TenantContext, limitDefault int32) ApiHandler {
return ApiHandler{
eventRepo: eventRepo,
illRepo: illRepo,
tenant: tenant,
limitDefault: limitDefault,
eventRepo: eventRepo,
illRepo: illRepo,
tenantContext: tenantContext,
limitDefault: limitDefault,
}
}

func (a *ApiHandler) isOwner(trans *ill_db.IllTransaction, tenant *string, requesterSymbol *string) bool {
if tenant == nil && requesterSymbol != nil {
return trans.RequesterSymbol.String == *requesterSymbol
}
if !a.tenant.IsSpecified() {
return true
}
if tenant == nil {
return false
}
return trans.RequesterSymbol.String == a.tenant.GetSymbol(*tenant)
}

func (a *ApiHandler) getIllTranFromParams(ctx common.ExtendedContext, w http.ResponseWriter,
okapiTenant *string, requesterSymbol *string, requesterReqId *oapi.RequesterRequestId,
r *http.Request, requesterSymbol *string, requesterReqId *oapi.RequesterRequestId,
illTransactionId *oapi.IllTransactionId) (*ill_db.IllTransaction, error) {
var tran ill_db.IllTransaction
var err error
Expand All @@ -86,10 +74,21 @@ func (a *ApiHandler) getIllTranFromParams(ctx common.ExtendedContext, w http.Res
addInternalError(ctx, w, err)
return nil, err
}
if !a.isOwner(&tran, okapiTenant, requesterSymbol) {
return nil, nil
tenant := a.tenantContext.WithRequest(ctx, r, requesterSymbol)
syms, err := tenant.GetSymbols()
if err != nil {
addBadRequestError(ctx, w, err)
return nil, err
}
if syms == nil {
return &tran, nil
}
return &tran, nil
for _, s := range syms {
if s == tran.RequesterSymbol.String {
return &tran, nil
}
}
return nil, nil
}

func (a *ApiHandler) Get(w http.ResponseWriter, r *http.Request) {
Expand All @@ -109,10 +108,10 @@ func (a *ApiHandler) GetEvents(w http.ResponseWriter, r *http.Request, params oa
if params.IllTransactionId != nil {
logParams["IllTransactionId"] = *params.IllTransactionId
}
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: logParams,
})
tran, err := a.getIllTranFromParams(ctx, w, params.XOkapiTenant, params.RequesterSymbol,
tran, err := a.getIllTranFromParams(ctx, w, r, params.RequesterSymbol,
params.RequesterReqId, params.IllTransactionId)
if err != nil {
return
Expand All @@ -138,7 +137,7 @@ func (a *ApiHandler) GetEvents(w http.ResponseWriter, r *http.Request, params oa
}

func (a *ApiHandler) GetIllTransactions(w http.ResponseWriter, r *http.Request, params oapi.GetIllTransactionsParams) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "GetIllTransactions"},
})
var resp oapi.IllTransactions
Expand All @@ -156,7 +155,7 @@ func (a *ApiHandler) GetIllTransactions(w http.ResponseWriter, r *http.Request,
}
var fullCount int64
if params.RequesterReqId != nil {
tran, err := a.getIllTranFromParams(ctx, w, params.XOkapiTenant, params.RequesterSymbol,
tran, err := a.getIllTranFromParams(ctx, w, r, params.RequesterSymbol,
params.RequesterReqId, nil)
if err != nil {
return
Expand All @@ -165,60 +164,39 @@ func (a *ApiHandler) GetIllTransactions(w http.ResponseWriter, r *http.Request,
fullCount = 1
resp.Items = append(resp.Items, toApiIllTransaction(r, *tran))
}
} else if a.tenant.IsSpecified() {
var symbol string
if params.XOkapiTenant != nil {
symbol = a.tenant.GetSymbol(*params.XOkapiTenant)
} else if params.RequesterSymbol != nil {
symbol = *params.RequesterSymbol
}
if symbol == "" {
writeJsonResponse(w, resp)
return
}
dbparams := ill_db.GetIllTransactionsByRequesterSymbolParams{
Limit: limit,
Offset: offset,
RequesterSymbol: pgtype.Text{
String: symbol,
Valid: true,
},
}
var trans []ill_db.IllTransaction
var err error
trans, fullCount, err = a.illRepo.GetIllTransactionsByRequesterSymbol(ctx, dbparams, cql)
if err != nil { //DB error
addInternalError(ctx, w, err)
return
}
for _, t := range trans {
resp.Items = append(resp.Items, toApiIllTransaction(r, t))
}
} else {
dbparams := ill_db.ListIllTransactionsParams{
Limit: limit,
Offset: offset,
}
var trans []ill_db.IllTransaction
var err error
trans, fullCount, err = a.illRepo.ListIllTransactions(ctx, dbparams, cql)
if err != nil { //DB error
addInternalError(ctx, w, err)
tenant := a.tenantContext.WithRequest(ctx, r, params.RequesterSymbol)
symbols, err := tenant.GetSymbols()
if err != nil {
addBadRequestError(ctx, w, err)
return
Comment thread
adamdickmeiss marked this conversation as resolved.
}
for _, t := range trans {
resp.Items = append(resp.Items, toApiIllTransaction(r, t))
if symbols == nil || len(symbols) > 0 {
dbparams := ill_db.ListIllTransactionsParams{
Limit: limit,
Offset: offset,
}
var trans []ill_db.IllTransaction
var err error
trans, fullCount, err = a.illRepo.ListIllTransactions(ctx, dbparams, cql, symbols)
if err != nil { //DB error
addInternalError(ctx, w, err)
return
}
for _, t := range trans {
resp.Items = append(resp.Items, toApiIllTransaction(r, t))
}
}
}
resp.About = CollectAboutData(fullCount, offset, limit, r)
writeJsonResponse(w, resp)
}

func (a *ApiHandler) GetIllTransactionsId(w http.ResponseWriter, r *http.Request, id string, params oapi.GetIllTransactionsIdParams) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "GetIllTransactionsId", "id": id},
})
tran, err := a.getIllTranFromParams(ctx, w, params.XOkapiTenant, params.RequesterSymbol,
tran, err := a.getIllTranFromParams(ctx, w, r, params.RequesterSymbol,
nil, &id)
if err != nil {
return
Expand All @@ -231,7 +209,7 @@ func (a *ApiHandler) GetIllTransactionsId(w http.ResponseWriter, r *http.Request
}

func (a *ApiHandler) DeleteIllTransactionsId(w http.ResponseWriter, r *http.Request, id string) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "DeleteIllTransactionsId", "id": id},
})
trans, err := a.illRepo.GetIllTransactionById(ctx, id)
Expand Down Expand Up @@ -269,7 +247,7 @@ func (a *ApiHandler) returnHttpError(ctx common.ExtendedContext, w http.Response
}

func (a *ApiHandler) GetPeers(w http.ResponseWriter, r *http.Request, params oapi.GetPeersParams) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "GetPeers"},
})
dbparams := ill_db.ListPeersParams{
Expand Down Expand Up @@ -308,7 +286,7 @@ func (a *ApiHandler) GetPeers(w http.ResponseWriter, r *http.Request, params oap
}

func (a *ApiHandler) PostPeers(w http.ResponseWriter, r *http.Request) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "PostPeers"},
})
var newPeer oapi.Peer
Expand Down Expand Up @@ -376,7 +354,7 @@ func (a *ApiHandler) PostPeers(w http.ResponseWriter, r *http.Request) {
}

func (a *ApiHandler) DeletePeersId(w http.ResponseWriter, r *http.Request, id string) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "DeletePeersSymbol", "id": id},
})
err := a.illRepo.WithTxFunc(ctx, func(repo ill_db.IllRepo) error {
Expand Down Expand Up @@ -434,7 +412,7 @@ func (a *ApiHandler) DeletePeersId(w http.ResponseWriter, r *http.Request, id st
}

func (a *ApiHandler) GetPeersId(w http.ResponseWriter, r *http.Request, id string) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "GetPeersSymbol", "id": id},
})
peer, err := a.illRepo.GetPeerById(ctx, id)
Expand Down Expand Up @@ -462,7 +440,7 @@ func (a *ApiHandler) GetPeersId(w http.ResponseWriter, r *http.Request, id strin
}

func (a *ApiHandler) PutPeersId(w http.ResponseWriter, r *http.Request, id string) {
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: map[string]string{"method": "PutPeersSymbol", "id": id},
})
peer, err := a.illRepo.GetPeerById(ctx, id)
Expand Down Expand Up @@ -568,10 +546,10 @@ func (a *ApiHandler) GetLocatedSuppliers(w http.ResponseWriter, r *http.Request,
if params.IllTransactionId != nil {
logParams["IllTransactionId"] = *params.IllTransactionId
}
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
ctx := common.CreateExtCtxWithArgs(r.Context(), &common.LoggerArgs{
Other: logParams,
})
tran, err := a.getIllTranFromParams(ctx, w, params.XOkapiTenant, params.RequesterSymbol,
tran, err := a.getIllTranFromParams(ctx, w, r, params.RequesterSymbol,
params.RequesterReqId, params.IllTransactionId)
if err != nil {
return
Expand All @@ -598,6 +576,7 @@ func (a *ApiHandler) GetLocatedSuppliers(w http.ResponseWriter, r *http.Request,

func (a *ApiHandler) PostArchiveIllTransactions(w http.ResponseWriter, r *http.Request, params oapi.PostArchiveIllTransactionsParams) {
logParams := map[string]string{"method": "PostArchiveIllTransactions", "ArchiveDelay": params.ArchiveDelay, "ArchiveStatus": params.ArchiveStatus}
// a background process so use background context instead of request context to avoid cancellation when request is finished
ctx := common.CreateExtCtxWithArgs(context.Background(), &common.LoggerArgs{
Other: logParams,
})
Expand Down
22 changes: 0 additions & 22 deletions broker/api/common.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package api

import (
"errors"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/indexdata/crosslink/broker/common"
"github.com/indexdata/crosslink/broker/oapi"
)

Expand Down Expand Up @@ -134,23 +132,3 @@ func CollectAboutData(fullCount int64, offset int32, limit int32, r *http.Reques
}
return about
}

func GetSymbolForRequest(r *http.Request, tenantResolver common.Tenant, tenant *string, symbol *string) (string, error) {
if IsBrokerRequest(r) {
if tenantResolver.IsSpecified() {
if tenant == nil {
return "", errors.New("X-Okapi-Tenant must be specified")
} else {
return tenantResolver.GetSymbol(*tenant), nil
}
} else {
return "", errors.New("tenant mapping must be specified")
}
} else {
if symbol == nil || *symbol == "" {
return "", errors.New("symbol must be specified")
} else {
return *symbol, nil
}
}
}
70 changes: 0 additions & 70 deletions broker/api/common_test.go
Original file line number Diff line number Diff line change
@@ -1,82 +1,12 @@
package api

import (
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/indexdata/crosslink/broker/common"
"github.com/stretchr/testify/assert"
)

func TestGetSymbolForRequest(t *testing.T) {
req, _ := http.NewRequest("GET", "/broker/patron_request", strings.NewReader("{"))
req.RequestURI = "/broker/patron_request"
tenant := "req"
resolved, err := GetSymbolForRequest(req, common.NewTenant("ISIL:{tenant}"), &tenant, nil)
assert.NoError(t, err)
assert.Equal(t, "ISIL:REQ", resolved)

resolved, err = GetSymbolForRequest(req, common.NewTenant("ISIL:{tenant}"), nil, nil)
assert.Equal(t, "X-Okapi-Tenant must be specified", err.Error())
assert.Equal(t, "", resolved)

resolved, err = GetSymbolForRequest(req, common.NewTenant(""), &tenant, nil)
assert.Equal(t, "tenant mapping must be specified", err.Error())
assert.Equal(t, "", resolved)
}

func TestWithBrokerPrefix(t *testing.T) {
brokerReq, _ := http.NewRequest("GET", "/broker/patron_request", strings.NewReader("{"))
brokerReq.RequestURI = "/broker/patron_request"
assert.True(t, IsBrokerRequest(brokerReq))
assert.Equal(t, "/broker/patron_requests/1", WithBrokerPrefix(brokerReq, "/patron_requests/1"))
assert.Equal(t, "/broker/", WithBrokerPrefix(brokerReq, ""))
assert.Equal(t, "/broker/", WithBrokerPrefix(brokerReq, "/"))

regularReq, _ := http.NewRequest("GET", "/patron_request", strings.NewReader("{"))
regularReq.RequestURI = "/patron_request"
assert.False(t, IsBrokerRequest(regularReq))
assert.Equal(t, "/patron_requests/1", WithBrokerPrefix(regularReq, "/patron_requests/1"))
assert.Equal(t, "/", WithBrokerPrefix(regularReq, ""))
assert.Equal(t, "/", WithBrokerPrefix(regularReq, "/"))
}

func TestPathAndQuery(t *testing.T) {
assert.Equal(t, "/patron_requests/1/items", Path("patron_requests", "1", "items"))
assert.Equal(t, "/patron_requests/1/items", Path("/patron_requests/", "/1/", "/items/"))
assert.Equal(t, "/patron%20requests/a%2Fb%3Fx/items", Path("patron requests", "a/b?x", "items"))

values := Query("symbol", "ISIL:REQ", "offset", "10", "dangling")
assert.Equal(t, "ISIL:REQ", values.Get("symbol"))
assert.Equal(t, "10", values.Get("offset"))
assert.Empty(t, values.Get("dangling"))
}

func TestLink(t *testing.T) {
regularReq := httptest.NewRequest("GET", "https://example.org/patron_requests", nil)
regularReq.RequestURI = "/patron_requests"
link := Link(regularReq, Path("patron_requests", "1", "items"), Query("symbol", "ISIL:REQ", "q", "a b"))
assert.Equal(t, "https://example.org/patron_requests/1/items?q=a+b&symbol=ISIL%3AREQ", link)

brokerReq := httptest.NewRequest("GET", "https://example.org/broker/patron_requests", nil)
brokerReq.RequestURI = "/broker/patron_requests"
brokerLink := Link(brokerReq, Path("patron_requests", "1", "items"), Query("symbol", "ISIL:REQ"))
assert.Equal(t, "https://example.org/broker/patron_requests/1/items?symbol=ISIL%3AREQ", brokerLink)
}

func TestLinkRel(t *testing.T) {
req := httptest.NewRequest("GET", "https://example.org/patron_requests/1", nil)
req.RequestURI = "/patron_requests/1"

currentLink := LinkRel(req, "", Query("symbol", "ISIL:REQ"))
assert.Equal(t, "https://example.org/patron_requests/1?symbol=ISIL%3AREQ", currentLink)

relativeLink := LinkRel(req, "items", Query("symbol", "ISIL:REQ"))
assert.Equal(t, "https://example.org/patron_requests/1/items?symbol=ISIL%3AREQ", relativeLink)
}

func TestCollectAboutDataLastLink(t *testing.T) {
reqOffset0 := httptest.NewRequest("GET", "http://localhost/ill_transactions?symbol=ISIL:DK-BIB1&offset=0", nil)
reqOffset10 := httptest.NewRequest("GET", "http://localhost/ill_transactions?symbol=ISIL:DK-BIB1&offset=10", nil)
Expand Down
Loading
Loading