Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tempo-cli command: convert parquet-2-to-3 #2828

Merged
merged 7 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Assert ingestion rate limits as early as possible [#2640](https://github.com/grafana/tempo/pull/2703) (@mghildiy)
* [ENHANCEMENT] Add several metrics-generator fields to user-configurable overrides [#2711](https://github.com/grafana/tempo/pull/2711) (@kvrhdn)
* [ENHANCEMENT] Update /api/metrics/summary to correctly handle missing attributes and improve performance of TraceQL `select()` queries. [#2765](https://github.com/grafana/tempo/pull/2765) (@mdisibio)
* [ENHANCEMENT] Tempo CLI command to convert from vParquet2 -> 3. [#2828](https://github.com/grafana/tempo/pull/2828) (@joe-elliott)
* [ENHANCEMENT] Add `TempoUserConfigurableOverridesReloadFailing` alert [#2784](https://github.com/grafana/tempo/pull/2784) (@kvrhdn)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix node role auth IDMSv1 [#2760](https://github.com/grafana/tempo/pull/2760) (@coufalja)
Expand Down
134 changes: 134 additions & 0 deletions cmd/tempo-cli/cmd-convert-parquet-2to3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"context"
"fmt"
"io"
"os"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet2"
"github.com/grafana/tempo/tempodb/encoding/vparquet3"
"github.com/parquet-go/parquet-go"
)

type convertParquet2to3 struct {
In string `arg:"" help:"The input parquet file to read from."`
Out string `arg:"" help:"The output folder to write to."`
DedicatedColumns []string `arg:"" help:"List of dedicated columns to convert"`
}

func (cmd *convertParquet2to3) Run() error {
// open the in file
ctx := context.Background()

in, err := os.Open(cmd.In)
if err != nil {
return err
}
defer in.Close()

inStat, err := in.Stat()
if err != nil {
return err
}

pf, err := parquet.OpenFile(in, inStat.Size())
if err != nil {
return err
}

// create out block
if cmd.Out == "" {
cmd.Out = "./out"
}
outR, outW, _, err := local.New(&local.Config{
Path: cmd.Out,
})
if err != nil {
return err
}

dedicatedCols := make([]backend.DedicatedColumn, 0, len(cmd.DedicatedColumns))
for _, col := range cmd.DedicatedColumns {
att, err := traceql.ParseIdentifier(col)
if err != nil {
return err
}

scope := backend.DedicatedColumnScopeSpan
if att.Scope == traceql.AttributeScopeResource {
scope = backend.DedicatedColumnScopeResource
}

fmt.Println("scope", scope, "name", att.Name)

dedicatedCols = append(dedicatedCols, backend.DedicatedColumn{
Scope: scope,
Name: att.Name,
Type: backend.DedicatedColumnTypeString,
})
}

blockCfg := &common.BlockConfig{
BloomFP: 0.99,
BloomShardSizeBytes: 1024 * 1024,
Version: vparquet3.VersionString,
RowGroupSizeBytes: 100 * 1024 * 1024,
DedicatedColumns: dedicatedCols,
}
meta := &backend.BlockMeta{
Version: vparquet3.VersionString,
BlockID: uuid.New(),
TenantID: "test",
TotalObjects: 1000000, // required for bloom filter calculations
DedicatedColumns: dedicatedCols,
}

// create iterator over in file
iter := &parquetIterator{
r: parquet.NewGenericReader[*vparquet2.Trace](pf),
}

_, err = vparquet3.CreateBlock(ctx, blockCfg, meta, iter, backend.NewReader(outR), backend.NewWriter(outW))
if err != nil {
return err
}

return nil
}

type parquetIterator struct {
r *parquet.GenericReader[*vparquet2.Trace]
i int
}

func (i *parquetIterator) Next(_ context.Context) (common.ID, *tempopb.Trace, error) {
traces := make([]*vparquet2.Trace, 1)

i.i++
if i.i%1000 == 0 {
fmt.Println(i.i)
}

_, err := i.r.Read(traces)
if err == io.EOF {
return nil, nil, io.EOF
}
if err != nil {
return nil, nil, err
}

pqTrace := traces[0]
pbTrace := vparquet2.ParquetTraceToTempopbTrace(pqTrace)
return pqTrace.TraceID, pbTrace, nil
}

func (i *parquetIterator) Close() {
_ = i.r.Close()
}
3 changes: 2 additions & 1 deletion cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ var cli struct {

Parquet struct {
Convert convertParquet `cmd:"" help:"convert from an existing file to tempodb parquet schema"`
Convert1to2 convertParquet1to2 `cmd:"" help:"convert an exiting vParquet file to vParquet2 schema"`
Convert1to2 convertParquet1to2 `cmd:"" help:"convert an existing vParquet file to vParquet2 schema"`
Convert2to3 convertParquet2to3 `cmd:"" help:"convert an existing vParquet2 file to vParquet3 block"`
} `cmd:""`

Migrate struct {
Expand Down
20 changes: 20 additions & 0 deletions docs/sources/tempo/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,26 @@ Arguments:
```bash
tempo-cli parquet convert data.parquet out.parquet
```
## Parquet convert 2 to 3 command
Converts a vParquet2 file (actual data.parquet) to a vParquet3 block with an optional list of dedicated columns.
This utility command is useful when testing the impact of different combinations of dedicated columns. Currently,
all listed columns are assumed to be at the span scope.

```bash
tempo-cli parquet convert-2-to-3 <in file> <out path> <list of dedicated columns>
```

Arguments:
- `in file` Filename of an existing vParquet2 file containing Tempo trace data
- `out path` Path to write the vParquet3 block to.
- `list of dedicated columns` Additional params indicating which columns to make dedicated. Max 10. Dedicated columns
should be named using TraceQL syntax with scope. i.e. `span.db.statement` or `resource.namespace`.

**Example:**
```bash
tempo-cli parquet convert-2-to-3 data.parquet ./out db.statement db.name
```


## Migrate tenant command
Copy blocks from one backend and tenant to another. Blocks can be copied within the same backend or between two
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet2/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMe
}

// convert to proto trace and return
return parquetTraceToTempopbTrace(tr), nil
return ParquetTraceToTempopbTrace(tr), nil
}

// binarySearch that finds exact matching entry. Returns non-zero index when found, or -1 when not found
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet2/block_findtracebyid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBackendBlockFindTraceByID(t *testing.T) {

// Now find and verify all test traces
for _, tr := range traces {
wantProto := parquetTraceToTempopbTrace(tr)
wantProto := ParquetTraceToTempopbTrace(tr)

gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.DefaultSearchOptions())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet2/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func parquetToProtoEvents(parquetEvents []Event) []*v1_trace.Span_Event {
return protoEvents
}

func parquetTraceToTempopbTrace(parquetTrace *Trace) *tempopb.Trace {
func ParquetTraceToTempopbTrace(parquetTrace *Trace) *tempopb.Trace {
protoTrace := &tempopb.Trace{}
protoTrace.Batches = make([]*v1_trace.ResourceSpans, 0, len(parquetTrace.ResourceSpans))

Expand Down
10 changes: 5 additions & 5 deletions tempodb/encoding/vparquet2/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func TestProtoParquetRoundTrip(t *testing.T) {
// Proto -> Parquet -> Proto

traceIDA := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
expectedTrace := parquetTraceToTempopbTrace(fullyPopulatedTestTrace(traceIDA))
expectedTrace := ParquetTraceToTempopbTrace(fullyPopulatedTestTrace(traceIDA))

parquetTrace := traceToParquet(traceIDA, expectedTrace, nil)
actualTrace := parquetTraceToTempopbTrace(parquetTrace)
actualTrace := ParquetTraceToTempopbTrace(parquetTrace)
assert.Equal(t, expectedTrace, actualTrace)
}

Expand All @@ -50,14 +50,14 @@ func TestProtoParquetRando(t *testing.T) {
expectedTrace := test.MakeTrace(batches, id)

parqTr := traceToParquet(id, expectedTrace, trp)
actualTrace := parquetTraceToTempopbTrace(parqTr)
actualTrace := ParquetTraceToTempopbTrace(parqTr)
require.Equal(t, expectedTrace, actualTrace)
}
}

func TestFieldsAreCleared(t *testing.T) {
traceID := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}
complexTrace := parquetTraceToTempopbTrace(fullyPopulatedTestTrace(traceID))
complexTrace := ParquetTraceToTempopbTrace(fullyPopulatedTestTrace(traceID))
simpleTrace := &tempopb.Trace{
Batches: []*v1_trace.ResourceSpans{
{
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestFieldsAreCleared(t *testing.T) {
tr := &Trace{}
_ = traceToParquet(traceID, complexTrace, tr)
parqTr := traceToParquet(traceID, simpleTrace, tr)
actualTrace := parquetTraceToTempopbTrace(parqTr)
actualTrace := ParquetTraceToTempopbTrace(parqTr)
require.Equal(t, simpleTrace, actualTrace)
}

Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet2/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func (b *walBlock) FindTraceByID(_ context.Context, id common.ID, _ common.Searc
return nil, errors.Wrap(err, "error reading row from backend")
}

trp := parquetTraceToTempopbTrace(tr)
trp := ParquetTraceToTempopbTrace(tr)

trs = append(trs, trp)
}
Expand Down Expand Up @@ -791,7 +791,7 @@ func (i *commonIterator) Next(ctx context.Context) (common.ID, *tempopb.Trace, e
return nil, nil, err
}

tr := parquetTraceToTempopbTrace(t)
tr := ParquetTraceToTempopbTrace(t)
return id, tr, nil
}

Expand Down