Skip to content

Commit bd1fa1b

Browse files
committed
Ledger-lscc: Introduce queriabilty on to-be state
This CR introduces a query executor that operates on statedb and one or more state-updates on top. This enables a queriability on to-be committed state before committing the state. The combiner expects the input - a slice of queriable kv-data (either statedb or state-update batches). The slice is assumed to contain the kv-data in the decending order of updates - i.e, a kv-data present at an index 'i' is assumed to overwrite the kv-data present at the index 'i+1'. For instance, if a key is deleted from the kv-data present in the lowest index, the combiner will treated the key as deleted. The combiner offers a simple query interface that provides 'Get' and 'Range' query options. This is expected to be used in the state listener so that a state listener can query the to-be committed state. FAB-11560 #done Change-Id: Ie24e1e4e859b0e3393bc0854700a8a6579159167 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent bbbd098 commit bd1fa1b

File tree

6 files changed

+750
-0
lines changed

6 files changed

+750
-0
lines changed
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package queryutil_test
8+
9+
import (
10+
"errors"
11+
"os"
12+
"testing"
13+
14+
"github.com/hyperledger/fabric/common/flogging"
15+
commonledger "github.com/hyperledger/fabric/common/ledger"
16+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil"
17+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil/mock"
18+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
19+
statedbmock "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/mock"
20+
"github.com/hyperledger/fabric/protos/ledger/queryresult"
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
func TestMain(m *testing.M) {
25+
flogging.SetModuleLevel("util", "debug")
26+
flogging.SetModuleLevel("statedb", "debug")
27+
os.Exit(m.Run())
28+
}
29+
30+
func TestCombinerGetState(t *testing.T) {
31+
batch1 := statedb.NewUpdateBatch()
32+
batch1.Put("ns1", "key1", []byte("b1_value1"), nil)
33+
batch1.Delete("ns1", "key2", nil)
34+
batch1.Put("ns1", "key3", []byte("b1_value3"), nil)
35+
36+
batch2 := statedb.NewUpdateBatch()
37+
batch2.Put("ns1", "key1", []byte("b2_value1"), nil)
38+
batch2.Put("ns1", "key2", []byte("b2_value2"), nil)
39+
batch2.Put("ns1", "key3", []byte("b2_value3"), nil)
40+
41+
batch3 := statedb.NewUpdateBatch()
42+
batch3.Put("ns1", "key1", []byte("b3_value1"), nil)
43+
batch3.Put("ns1", "key2", []byte("b3_value2"), nil)
44+
batch3.Delete("ns1", "key3", nil)
45+
46+
combiner := &queryutil.QECombiner{
47+
QueryExecuters: []queryutil.QueryExecuter{
48+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
49+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
50+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
51+
}}
52+
53+
val, err := combiner.GetState("ns1", "key1")
54+
assert.NoError(t, err)
55+
assert.Equal(t, []byte("b1_value1"), val)
56+
57+
val, err = combiner.GetState("ns1", "key2")
58+
assert.NoError(t, err)
59+
assert.Nil(t, val)
60+
61+
val, err = combiner.GetState("ns1", "key3")
62+
assert.NoError(t, err)
63+
assert.Equal(t, []byte("b1_value3"), val)
64+
65+
combiner = &queryutil.QECombiner{
66+
QueryExecuters: []queryutil.QueryExecuter{
67+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
68+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
69+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
70+
}}
71+
val, err = combiner.GetState("ns1", "key1")
72+
assert.NoError(t, err)
73+
assert.Equal(t, []byte("b3_value1"), val)
74+
75+
val, err = combiner.GetState("ns1", "key2")
76+
assert.NoError(t, err)
77+
assert.Equal(t, []byte("b3_value2"), val)
78+
79+
val, err = combiner.GetState("ns1", "key3")
80+
assert.NoError(t, err)
81+
assert.Nil(t, val)
82+
}
83+
84+
func TestCombinerRangeScan(t *testing.T) {
85+
batch1 := statedb.NewUpdateBatch()
86+
batch1.Put("ns1", "key1", []byte("batch1_value1"), nil)
87+
batch1.Delete("ns1", "key2", nil)
88+
batch1.Put("ns1", "key3", []byte("batch1_value3"), nil)
89+
90+
batch2 := statedb.NewUpdateBatch()
91+
batch2.Put("ns1", "key1", []byte("batch2_value1"), nil)
92+
batch2.Put("ns1", "key2", []byte("batch2_value2"), nil)
93+
batch2.Delete("ns1", "key3", nil)
94+
batch2.Put("ns1", "key4", []byte("batch2_value4"), nil)
95+
96+
batch3 := statedb.NewUpdateBatch()
97+
batch3.Put("ns1", "key0", []byte("batch3_value0"), nil)
98+
batch3.Put("ns1", "key1", []byte("batch3_value1"), nil)
99+
batch3.Put("ns1", "key2", []byte("batch3_value2"), nil)
100+
batch3.Put("ns1", "key3", []byte("batch3_value3"), nil)
101+
batch3.Put("ns1", "key4", []byte("batch3_value4"), nil)
102+
batch3.Put("ns1", "key5", []byte("batch3_value5"), nil)
103+
104+
combiner := &queryutil.QECombiner{
105+
QueryExecuters: []queryutil.QueryExecuter{
106+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
107+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
108+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
109+
},
110+
}
111+
112+
itr, err := combiner.GetStateRangeScanIterator("ns1", "key1", "key4")
113+
assert.NoError(t, err)
114+
expectedResults := []*queryresult.KV{
115+
{Namespace: "ns1", Key: "key1", Value: []byte("batch1_value1")},
116+
{Namespace: "ns1", Key: "key3", Value: []byte("batch1_value3")},
117+
}
118+
testutilCheckIteratorResults(t, itr, expectedResults)
119+
120+
itr, err = combiner.GetStateRangeScanIterator("ns1", "key0", "key6")
121+
assert.NoError(t, err)
122+
expectedResults = []*queryresult.KV{
123+
{Namespace: "ns1", Key: "key0", Value: []byte("batch3_value0")},
124+
{Namespace: "ns1", Key: "key1", Value: []byte("batch1_value1")},
125+
{Namespace: "ns1", Key: "key3", Value: []byte("batch1_value3")},
126+
{Namespace: "ns1", Key: "key4", Value: []byte("batch2_value4")},
127+
{Namespace: "ns1", Key: "key5", Value: []byte("batch3_value5")},
128+
}
129+
testutilCheckIteratorResults(t, itr, expectedResults)
130+
131+
combiner = &queryutil.QECombiner{
132+
QueryExecuters: []queryutil.QueryExecuter{
133+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
134+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
135+
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
136+
},
137+
}
138+
itr, err = combiner.GetStateRangeScanIterator("ns1", "key0", "key6")
139+
assert.NoError(t, err)
140+
expectedResults = []*queryresult.KV{
141+
{Namespace: "ns1", Key: "key0", Value: []byte("batch3_value0")},
142+
{Namespace: "ns1", Key: "key1", Value: []byte("batch3_value1")},
143+
{Namespace: "ns1", Key: "key2", Value: []byte("batch3_value2")},
144+
{Namespace: "ns1", Key: "key3", Value: []byte("batch3_value3")},
145+
{Namespace: "ns1", Key: "key4", Value: []byte("batch3_value4")},
146+
{Namespace: "ns1", Key: "key5", Value: []byte("batch3_value5")},
147+
}
148+
testutilCheckIteratorResults(t, itr, expectedResults)
149+
}
150+
151+
func TestGetStateError(t *testing.T) {
152+
qe1 := &mock.QueryExecuter{}
153+
qe1.GetStateReturns(&statedb.VersionedValue{Value: []byte("testValue")}, nil)
154+
qe2 := &mock.QueryExecuter{}
155+
qe2.GetStateReturns(nil, errors.New("Error for testing"))
156+
combiner1 := &queryutil.QECombiner{
157+
QueryExecuters: []queryutil.QueryExecuter{
158+
qe1, qe2,
159+
},
160+
}
161+
_, err := combiner1.GetState("ns", "key1")
162+
assert.NoError(t, err)
163+
164+
combiner2 := &queryutil.QECombiner{
165+
QueryExecuters: []queryutil.QueryExecuter{
166+
qe2, qe1,
167+
},
168+
}
169+
_, err = combiner2.GetState("ns", "key1")
170+
assert.Error(t, err)
171+
}
172+
173+
func TestGetRangeScanError(t *testing.T) {
174+
itr1 := &statedbmock.ResultsIterator{}
175+
itr1.NextReturns(
176+
&statedb.VersionedKV{
177+
CompositeKey: statedb.CompositeKey{Namespace: "ns", Key: "dummyKey"},
178+
VersionedValue: statedb.VersionedValue{Value: []byte("dummyVal")},
179+
},
180+
nil,
181+
)
182+
183+
qe1 := &mock.QueryExecuter{}
184+
qe1.GetStateRangeScanIteratorReturns(itr1, nil)
185+
qe2 := &mock.QueryExecuter{}
186+
qe2.GetStateRangeScanIteratorReturns(nil, errors.New("dummy error on getting the iterator"))
187+
combiner := &queryutil.QECombiner{
188+
QueryExecuters: []queryutil.QueryExecuter{
189+
qe1, qe2,
190+
},
191+
}
192+
_, err := combiner.GetStateRangeScanIterator("ns", "startKey", "endKey")
193+
assert.Error(t, err)
194+
}
195+
196+
func TestGetRangeScanUnderlyingIteratorReturnsError(t *testing.T) {
197+
itr1 := &statedbmock.ResultsIterator{}
198+
itr1.NextReturns(
199+
&statedb.VersionedKV{
200+
CompositeKey: statedb.CompositeKey{Namespace: "ns", Key: "dummyKey"},
201+
VersionedValue: statedb.VersionedValue{Value: []byte("dummyVal")},
202+
},
203+
nil,
204+
)
205+
206+
itr2 := &statedbmock.ResultsIterator{}
207+
itr2.NextReturns(
208+
nil,
209+
errors.New("dummyErrorOnIteratorNext"),
210+
)
211+
212+
qe1 := &mock.QueryExecuter{}
213+
qe1.GetStateRangeScanIteratorReturns(itr1, nil)
214+
qe2 := &mock.QueryExecuter{}
215+
qe2.GetStateRangeScanIteratorReturns(itr2, nil)
216+
combiner := &queryutil.QECombiner{
217+
QueryExecuters: []queryutil.QueryExecuter{
218+
qe1, qe2,
219+
},
220+
}
221+
_, err := combiner.GetStateRangeScanIterator("ns", "startKey", "endKey")
222+
assert.Error(t, err)
223+
}
224+
225+
func testutilCheckIteratorResults(t *testing.T, itr commonledger.ResultsIterator, expectedResults []*queryresult.KV) {
226+
results := []*queryresult.KV{}
227+
for {
228+
result, err := itr.Next()
229+
assert.NoError(t, err)
230+
if result == nil {
231+
break
232+
}
233+
results = append(results, result.(*queryresult.KV))
234+
}
235+
assert.Equal(t, expectedResults, results)
236+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package queryutil
8+
9+
import (
10+
"fmt"
11+
12+
commonledger "github.com/hyperledger/fabric/common/ledger"
13+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
14+
"github.com/hyperledger/fabric/protos/ledger/queryresult"
15+
)
16+
17+
type itrCombiner struct {
18+
namespace string
19+
holders []*itrHolder
20+
}
21+
22+
func newItrCombiner(namespace string, baseIterators []statedb.ResultsIterator) (*itrCombiner, error) {
23+
var holders []*itrHolder
24+
for _, itr := range baseIterators {
25+
res, err := itr.Next()
26+
if err != nil {
27+
for _, holder := range holders {
28+
holder.itr.Close()
29+
}
30+
return nil, err
31+
}
32+
if res != nil {
33+
holders = append(holders, &itrHolder{itr, res.(*statedb.VersionedKV)})
34+
}
35+
}
36+
return &itrCombiner{namespace, holders}, nil
37+
}
38+
39+
// Next returns the next eligible item from the underlying iterators.
40+
// This function evaluates the underlying iterators, and picks the one which is
41+
// gives the lexicographically smallest key. Then, it saves that value, and advances the chosen iterator.
42+
// If the chosen iterator is out of elements, then that iterator is closed, and removed from the list of iterators.
43+
func (combiner *itrCombiner) Next() (commonledger.QueryResult, error) {
44+
logger.Debugf("Iterators position at beginning: %s", combiner.holders)
45+
if len(combiner.holders) == 0 {
46+
return nil, nil
47+
}
48+
smallestHolderIndex := 0
49+
for i := 1; i < len(combiner.holders); i++ {
50+
smallestKey, holderKey := combiner.keyAt(smallestHolderIndex), combiner.keyAt(i)
51+
switch {
52+
case holderKey == smallestKey: // we found the same key in the lower order iterator (stale value of the key);
53+
// we already have the latest value for this key (in smallestHolder). Ignore this value and move the iterator
54+
// to next item (to a greater key) so that for next round of key selection, we do not consider this key again
55+
removed, err := combiner.moveItrAndRemoveIfExhausted(i)
56+
if err != nil {
57+
return nil, err
58+
}
59+
if removed { // if the current iterator is exhaused and hence removed, decrement the index
60+
// because indexes of the remaining iterators are decremented by one
61+
i--
62+
}
63+
case holderKey < smallestKey:
64+
smallestHolderIndex = i
65+
default:
66+
// the current key under evaluation is greater than the smallestKey - do nothing
67+
}
68+
}
69+
kv := combiner.kvAt(smallestHolderIndex)
70+
combiner.moveItrAndRemoveIfExhausted(smallestHolderIndex)
71+
if kv.IsDelete() {
72+
return combiner.Next()
73+
}
74+
logger.Debugf("Key [%s] selected from iterator at index [%d]", kv.Key, smallestHolderIndex)
75+
logger.Debugf("Iterators position at end: %s", combiner.holders)
76+
return &queryresult.KV{Namespace: combiner.namespace, Key: kv.Key, Value: kv.Value}, nil
77+
}
78+
79+
// moveItrAndRemoveIfExhausted moves the iterator at index i to the next item. If the iterator gets exhausted
80+
// then the iterator is removed from the underlying slice
81+
func (combiner *itrCombiner) moveItrAndRemoveIfExhausted(i int) (removed bool, err error) {
82+
holder := combiner.holders[i]
83+
exhausted, err := holder.moveToNext()
84+
if err != nil {
85+
return false, err
86+
}
87+
if exhausted {
88+
combiner.holders[i].itr.Close()
89+
combiner.holders = append(combiner.holders[:i], combiner.holders[i+1:]...)
90+
91+
}
92+
return exhausted, nil
93+
}
94+
95+
// kvAt returns the kv available from iterator at index i
96+
func (combiner *itrCombiner) kvAt(i int) *statedb.VersionedKV {
97+
return combiner.holders[i].kv
98+
}
99+
100+
// keyAt returns the key available from iterator at index i
101+
func (combiner *itrCombiner) keyAt(i int) string {
102+
return combiner.kvAt(i).Key
103+
}
104+
105+
// Close closes all the underlying iterators
106+
func (combiner *itrCombiner) Close() {
107+
for _, holder := range combiner.holders {
108+
holder.itr.Close()
109+
}
110+
}
111+
112+
// itrHolder encloses an iterator and keeps the next item available from the iterator in the buffer
113+
type itrHolder struct {
114+
itr statedb.ResultsIterator
115+
kv *statedb.VersionedKV
116+
}
117+
118+
// moveToNext fetches the next item to keep in buffer and returns true if the iterator is exhausted
119+
func (holder *itrHolder) moveToNext() (exhausted bool, err error) {
120+
var res statedb.QueryResult
121+
if res, err = holder.itr.Next(); err != nil {
122+
return false, err
123+
}
124+
if res != nil {
125+
holder.kv = res.(*statedb.VersionedKV)
126+
}
127+
return res == nil, nil
128+
}
129+
130+
// String returns the key that the holder has in the buffer for serving as a next key
131+
func (holder *itrHolder) String() string {
132+
return fmt.Sprintf("{%s}", holder.kv.Key)
133+
}

0 commit comments

Comments
 (0)