@@ -68,15 +68,6 @@ func sameProfileStream(i int) *testProfile {
68
68
return tp
69
69
}
70
70
71
- func threeProfileStreams (i int ) * testProfile {
72
- tp := sameProfileStream (i )
73
- streams := []string {"stream-a" , "stream-b" , "stream-c" }
74
-
75
- tp .lbls = phlaremodel .LabelsFromStrings ("job" , "test" , "stream" , streams [i % 3 ])
76
- tp .populateFingerprint ()
77
- return tp
78
- }
79
-
80
71
func readFullParquetFile [M any ](t * testing.T , path string ) ([]M , uint64 ) {
81
72
f , err := os .Open (path )
82
73
require .NoError (t , err )
@@ -99,7 +90,10 @@ func readFullParquetFile[M any](t *testing.T, path string) ([]M, uint64) {
99
90
return slice , numRGs
100
91
}
101
92
102
- func TestProfileStore_Ingestion (t * testing.T ) {
93
+ // TestProfileStore_RowGroupSplitting tests that the profile store splits row
94
+ // groups when certain limits are reached. It also checks that on flushing the
95
+ // block is aggregated correctly. All ingestion is done using the same profile series.
96
+ func TestProfileStore_RowGroupSplitting (t * testing.T ) {
103
97
var (
104
98
ctx = testContext (t )
105
99
store = newProfileStore (ctx )
@@ -169,3 +163,51 @@ func TestProfileStore_Ingestion(t *testing.T) {
169
163
})
170
164
}
171
165
}
166
+
167
+ func threeProfileStreams (i int ) * testProfile {
168
+ tp := sameProfileStream (i )
169
+ streams := []string {"stream-a" , "stream-b" , "stream-c" }
170
+
171
+ tp .lbls = phlaremodel .LabelsFromStrings ("job" , "test" , "stream" , streams [i % 3 ])
172
+ tp .populateFingerprint ()
173
+ return tp
174
+ }
175
+
176
+ // TestProfileStore_Ingestion_SeriesIndexes during ingestion, the profile store
177
+ // writes out row groups to disk temporarily. Later when finishing up the block
178
+ // it will have to combine those files on disk and update the seriesIndex,
179
+ // which is only known when the TSDB index is written to disk.
180
+ func TestProfileStore_Ingestion_SeriesIndexes (t * testing.T ) {
181
+ var (
182
+ ctx = testContext (t )
183
+ store = newProfileStore (ctx )
184
+ )
185
+ path := t .TempDir ()
186
+ require .NoError (t , store .Init (path , defaultParquetConfig ))
187
+
188
+ for i := 0 ; i < 9 ; i ++ {
189
+ p := threeProfileStreams (i )
190
+ require .NoError (t , store .ingest (ctx , []* schemav1.Profile {& p .p }, p .lbls , p .profileName , emptyRewriter ()))
191
+ }
192
+
193
+ // flush index
194
+ indexPath := path + "/" + block .IndexFilename
195
+ require .NoError (t , store .index .WriteTo (ctx , indexPath ))
196
+
197
+ // flush profiles and ensure the correct number of files are created
198
+ numRows , numRGs , err := store .Flush ()
199
+ require .NoError (t , err )
200
+ assert .Equal (t , uint64 (9 ), numRows )
201
+ assert .Equal (t , uint64 (1 ), numRGs )
202
+
203
+ // now compare the written parquet files
204
+ rows , numRGs := readFullParquetFile [* schemav1.Profile ](t , path + "/profiles.parquet" )
205
+ require .Equal (t , 9 , len (rows ))
206
+ assert .Equal (t , uint64 (1 ), numRGs )
207
+ // expected in series ID order and then by timeNanos
208
+ for i := 0 ; i < 9 ; i ++ {
209
+ id := i % 3 * 3 + i / 3 // generates 0,3,6,1,4,7,2,5,8
210
+ assert .Equal (t , fmt .Sprintf ("00000000-0000-0000-0000-%012d" , id ), rows [i ].ID .String ())
211
+ assert .Equal (t , uint32 (i / 3 ), rows [i ].SeriesIndex )
212
+ }
213
+ }
0 commit comments