Skip to content

Commit

Permalink
feat(ledger): Add groupBy (lvl segment) to /volumes endpoint (#1417)
Browse files Browse the repository at this point in the history
Co-authored-by: agourdel <gourdel.alexandre@gmail.com>
Co-authored-by: David Ragot <david@formance.com>
  • Loading branch information
3 people committed Apr 12, 2024
1 parent 76c4cc3 commit 187af68
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 39 deletions.
5 changes: 5 additions & 0 deletions components/fctl/cmd/ledger/volumes/list.go
Expand Up @@ -28,6 +28,7 @@ type ListController struct {
pitFlag string
ootFlag string
useInsertionDateFlag string
groupByFlag string
}

var _ fctl.Controller[*ListStore] = (*ListController)(nil)
Expand Down Expand Up @@ -68,6 +69,7 @@ func (c *ListController) Run(cmd *cobra.Command, args []string) (fctl.Renderable
cursor := fctl.GetString(cmd, c.cursorFlag)
pageSize := int64(fctl.GetInt(cmd, c.pageSizeFlag))
useInsertionDate := fctl.GetBool(cmd, c.useInsertionDateFlag)
groupBy := int64(fctl.GetInt(cmd, c.groupByFlag))

pit, err := fctl.GetDateTime(cmd, c.pitFlag)
if err != nil {
Expand Down Expand Up @@ -96,6 +98,7 @@ func (c *ListController) Run(cmd *cobra.Command, args []string) (fctl.Renderable
Cursor: &cursor,
PageSize: &pageSize,
InsertionDate: &useInsertionDate,
GroupBy: &groupBy,
}

response, err := store.Client().Ledger.V2GetVolumesWithBalances(cmd.Context(), request)
Expand Down Expand Up @@ -151,6 +154,7 @@ func NewListController() *ListController {
pitFlag: "end-time",
ootFlag: "start-time",
useInsertionDateFlag: "insertion-date",
groupByFlag: "group-by",
}
}

Expand All @@ -165,6 +169,7 @@ func NewListCommand() *cobra.Command {
fctl.WithStringFlag(c.pitFlag, "", "PIT (Point in Time)"),
fctl.WithStringFlag(c.ootFlag, "", "OOT (Origin of Time)"),
fctl.WithBoolFlag(c.useInsertionDateFlag, false, "Use insertion date"),
fctl.WithIntFlag(c.groupByFlag, 0, "Group by level of segment of the address"),
fctl.WithStringFlag(c.addressFlag, "", "Filter accounts with address"),
fctl.WithStringSliceFlag(c.metadataFlag, []string{}, "Filter accounts with metadata"),
fctl.WithStringFlag(c.cursorFlag, "", "Cursor pagination"),
Expand Down
2 changes: 1 addition & 1 deletion components/ledger/internal/api/v2/controllers_volumes.go
Expand Up @@ -17,7 +17,7 @@ func getVolumesWithBalances(w http.ResponseWriter, r *http.Request) {
l := backend.LedgerFromContext(r.Context())

query, err := bunpaginate.Extract[ledgerstore.GetVolumesWithBalancesQuery](r, func() (*ledgerstore.GetVolumesWithBalancesQuery, error) {
options, err := getPaginatedQueryOptionsOfPITOOTFilterForVolumes(r)
options, err := getPaginatedQueryOptionsOfFiltersForVolumes(r)
if err != nil {
return nil, err
}
Expand Down
22 changes: 18 additions & 4 deletions components/ledger/internal/api/v2/controllers_volumes_test.go
Expand Up @@ -30,7 +30,7 @@ func TestGetVolumes(t *testing.T) {
name string
queryParams url.Values
body string
expectQuery ledgerstore.PaginatedQueryOptions[ledgerstore.PITFilterForVolumes]
expectQuery ledgerstore.PaginatedQueryOptions[ledgerstore.FiltersForVolumes]
expectStatusCode int
expectedErrorCode string
}
Expand All @@ -40,7 +40,7 @@ func TestGetVolumes(t *testing.T) {
testCases := []testCase{
{
name: "basic",
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterForVolumes{
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.FiltersForVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &before,
OOT: &zero,
Expand All @@ -53,7 +53,7 @@ func TestGetVolumes(t *testing.T) {
{
name: "using metadata",
body: `{"$match": { "metadata[roles]": "admin" }}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterForVolumes{
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.FiltersForVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &before,
OOT: &zero,
Expand All @@ -65,7 +65,7 @@ func TestGetVolumes(t *testing.T) {
{
name: "using account",
body: `{"$match": { "account": "foo" }}`,
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.PITFilterForVolumes{
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.FiltersForVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &before,
OOT: &zero,
Expand All @@ -80,6 +80,20 @@ func TestGetVolumes(t *testing.T) {
expectStatusCode: http.StatusBadRequest,
expectedErrorCode: v2.ErrValidation,
},
{
name: "using pit",
queryParams: url.Values{
"pit": []string{before.Format(time.RFC3339Nano)},
"groupBy": []string{"3"},
},
expectQuery: ledgerstore.NewPaginatedQueryOptions(ledgerstore.FiltersForVolumes{
PITFilter: ledgerstore.PITFilter{
PIT: &before,
OOT: &zero,
},
GroupLvl: 3,
}).WithPageSize(v2.DefaultPageSize),
},
}

for _, testCase := range testCases {
Expand Down
26 changes: 19 additions & 7 deletions components/ledger/internal/api/v2/utils.go
Expand Up @@ -3,10 +3,10 @@ package v2
import (
"io"
"net/http"

"github.com/formancehq/stack/libs/go-libs/time"
"strconv"

"github.com/formancehq/stack/libs/go-libs/bun/bunpaginate"
"github.com/formancehq/stack/libs/go-libs/time"

"github.com/formancehq/ledger/internal/storage/ledgerstore"
sharedapi "github.com/formancehq/stack/libs/go-libs/api"
Expand Down Expand Up @@ -74,17 +74,29 @@ func getPITFilterWithVolumes(r *http.Request) (*ledgerstore.PITFilterWithVolumes
}, nil
}

func getPITOOTFilterForVolumes(r *http.Request) (*ledgerstore.PITFilterForVolumes, error) {
func getFiltersForVolumes(r *http.Request) (*ledgerstore.FiltersForVolumes, error) {
pit, err := getPITOOTFilter(r)
if err != nil {
return nil, err
}

useInsertionDate := sharedapi.QueryParamBool(r, "insertionDate")
groupLvl := 0

return &ledgerstore.PITFilterForVolumes{
groupLvlStr := r.URL.Query().Get("groupBy")
if groupLvlStr != "" {
groupLvlInt, err := strconv.Atoi(groupLvlStr)
if err != nil {
return nil, err
}
if groupLvlInt > 0 {
groupLvl = groupLvlInt
}
}
return &ledgerstore.FiltersForVolumes{
PITFilter: *pit,
UseInsertionDate: useInsertionDate,
GroupLvl: uint(groupLvl),
}, nil
}

Expand Down Expand Up @@ -121,13 +133,13 @@ func getPaginatedQueryOptionsOfPITFilterWithVolumes(r *http.Request) (*ledgersto
WithPageSize(pageSize)), nil
}

func getPaginatedQueryOptionsOfPITOOTFilterForVolumes(r *http.Request) (*ledgerstore.PaginatedQueryOptions[ledgerstore.PITFilterForVolumes], error) {
func getPaginatedQueryOptionsOfFiltersForVolumes(r *http.Request) (*ledgerstore.PaginatedQueryOptions[ledgerstore.FiltersForVolumes], error) {
qb, err := getQueryBuilder(r)
if err != nil {
return nil, err
}

pitFilter, err := getPITOOTFilterForVolumes(r)
filtersForVolumes, err := getFiltersForVolumes(r)
if err != nil {
return nil, err
}
Expand All @@ -137,7 +149,7 @@ func getPaginatedQueryOptionsOfPITOOTFilterForVolumes(r *http.Request) (*ledgers
return nil, err
}

return pointer.For(ledgerstore.NewPaginatedQueryOptions(*pitFilter).
return pointer.For(ledgerstore.NewPaginatedQueryOptions(*filtersForVolumes).
WithPageSize(pageSize).
WithQueryBuilder(qb)), nil
}
3 changes: 2 additions & 1 deletion components/ledger/internal/storage/ledgerstore/utils.go
Expand Up @@ -249,7 +249,8 @@ type PITFilterWithVolumes struct {
ExpandEffectiveVolumes bool `json:"effectiveVolumes"`
}

type PITFilterForVolumes struct {
type FiltersForVolumes struct {
PITFilter
UseInsertionDate bool
GroupLvl uint
}
26 changes: 16 additions & 10 deletions components/ledger/internal/storage/ledgerstore/volumes.go
Expand Up @@ -2,6 +2,7 @@ package ledgerstore

import (
"context"
"fmt"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/stack/libs/go-libs/bun/bunpaginate"
Expand Down Expand Up @@ -57,21 +58,26 @@ func (store *Store) volumesQueryContext(qb lquery.Builder, q GetVolumesWithBalan

func (store *Store) buildVolumesWithBalancesQuery(query *bun.SelectQuery, q GetVolumesWithBalancesQuery, where string, args []any) *bun.SelectQuery {

pitFilter := q.Options.Options
filtersForVolumes := q.Options.Options
dateFilterColumn := "effective_date"

if pitFilter.UseInsertionDate {
if filtersForVolumes.UseInsertionDate {
dateFilterColumn = "insertion_date"
}

query = query.
ColumnExpr("account_address as account").
Column("asset").
ColumnExpr("sum(case when not is_source then amount else 0 end) as input").
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
Table("moves")

if filtersForVolumes.GroupLvl > 0 {
query = query.ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(account_address, ':'))[1:LEAST(array_length(string_to_array(account_address, ':'),1),%d)],':')) as account`, filtersForVolumes.GroupLvl))
} else {
query = query.ColumnExpr("account_address as account")
}

if where != "" {
query = query.
Join(`join lateral (
Expand All @@ -84,9 +90,9 @@ func (store *Store) buildVolumesWithBalancesQuery(query *bun.SelectQuery, q GetV

query = query.
Where("ledger = ?", store.name).
Apply(filterPIT(pitFilter.PIT, dateFilterColumn)).
Apply(filterOOT(pitFilter.OOT, dateFilterColumn)).
GroupExpr("account_address, asset")
Apply(filterPIT(filtersForVolumes.PIT, dateFilterColumn)).
Apply(filterOOT(filtersForVolumes.OOT, dateFilterColumn)).
GroupExpr("account, asset")

return query
}
Expand All @@ -104,17 +110,17 @@ func (store *Store) GetVolumesWithBalances(ctx context.Context, q GetVolumesWith
}
}

return paginateWithOffsetWithoutModel[PaginatedQueryOptions[PITFilterForVolumes], ledger.VolumesWithBalanceByAssetByAccount](
store, ctx, (*bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[PITFilterForVolumes]])(&q),
return paginateWithOffsetWithoutModel[PaginatedQueryOptions[FiltersForVolumes], ledger.VolumesWithBalanceByAssetByAccount](
store, ctx, (*bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[FiltersForVolumes]])(&q),
func(query *bun.SelectQuery) *bun.SelectQuery {
return store.buildVolumesWithBalancesQuery(query, q, where, args)
},
)
}

type GetVolumesWithBalancesQuery bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[PITFilterForVolumes]]
type GetVolumesWithBalancesQuery bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[FiltersForVolumes]]

func NewGetVolumesWithBalancesQuery(opts PaginatedQueryOptions[PITFilterForVolumes]) GetVolumesWithBalancesQuery {
func NewGetVolumesWithBalancesQuery(opts PaginatedQueryOptions[FiltersForVolumes]) GetVolumesWithBalancesQuery {
return GetVolumesWithBalancesQuery{
PageSize: opts.PageSize,
Order: bunpaginate.OrderAsc,
Expand Down

0 comments on commit 187af68

Please sign in to comment.