Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: jobs: fix bug in WriteChunkedFileToJobInfo during overwriting #113290

Merged
merged 1 commit into from Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/jobs/execution_detail_utils.go
Expand Up @@ -60,6 +60,9 @@ func WriteProtobinExecutionDetailFile(
// `~profiler/` prefix. Files written using this method can be read using the
// `ReadExecutionDetailFile` and will show up in the list of files displayed on
// the jobs' Advanced Debugging DBConsole page.
//
// This method clears any existing file with the same filename before writing a
// new one.
func WriteExecutionDetailFile(
ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID,
) error {
Expand Down
14 changes: 12 additions & 2 deletions pkg/jobs/job_info_utils.go
Expand Up @@ -27,12 +27,22 @@ const bundleChunkSize = 1 << 20 // 1 MiB
const finalChunkSuffix = "#_final"

// WriteChunkedFileToJobInfo will break up data into chunks of a fixed size, and
// gzip compress them before writing them to the job_info table
// gzip compress them before writing them to the job_info table. This method
// clears any existing chunks with the same filename before writing the new
// chunks and so if the caller wishes to preserve history they must use a
// unique filename.
func WriteChunkedFileToJobInfo(
ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID,
) error {
finalChunkName := filename + finalChunkSuffix
jobInfo := InfoStorageForJob(txn, jobID)

// Clear any existing chunks with the same filename before writing new chunks.
// We clear all rows that with info keys in [filename, filename#_final~).
if err := jobInfo.DeleteRange(ctx, filename, finalChunkName+"~"); err != nil {
return err
}

var chunkCounter int
var chunkName string
for len(data) > 0 {
Expand All @@ -46,7 +56,7 @@ func WriteChunkedFileToJobInfo(
chunk = chunk[:chunkSize]
} else {
// This is the last chunk we will write, assign it a sentinel file name.
chunkName = filename + finalChunkSuffix
chunkName = finalChunkName
}
data = data[len(chunk):]
var err error
Expand Down
96 changes: 96 additions & 0 deletions pkg/jobs/job_info_utils_test.go
Expand Up @@ -55,6 +55,10 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) {
name: "file greater than 1MiB",
data: make([]byte, 1<<20+1), // 1 MiB + 1 byte
},
{
name: "file much greater than 1MiB",
data: make([]byte, 10<<20), // 10 MiB
},
}

db := s.InternalDB().(isql.DB)
Expand All @@ -78,3 +82,95 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) {
})
}
}

func TestOverwriteChunkingWithVariableLengths(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rng, _ := randutil.NewTestRand()
ctx := context.Background()
params := base.TestServerArgs{}
params.Knobs.JobsTestingKnobs = NewTestingKnobsWithShortIntervals()
s := serverutils.StartServerOnly(t, params)
defer s.Stopper().Stop(ctx)

tests := []struct {
name string
numChunks int
data []byte
moreChunks []byte
lessChunks []byte
}{
{
name: "zero chunks",
data: []byte{},
numChunks: 0,
},
{
name: "one chunk",
numChunks: 1,
},
{
name: "two chunks",
numChunks: 2,
},
{
name: "five chunks",
numChunks: 5,
},
}

db := s.InternalDB().(isql.DB)
generateData := func(numChunks int) []byte {
data := make([]byte, (1<<20)*numChunks)
if len(data) > 0 {
randutil.ReadTestdataBytes(rng, data)
}
return data
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.data = generateData(tt.numChunks)
// Write the first file, this will generate a certain number of chunks.
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
}))

// Overwrite the file with fewer chunks, this should delete the extra
// chunks before writing the new ones.
t.Run("overwrite with fewer chunks", func(t *testing.T) {
lessChunks := tt.numChunks - 1
if lessChunks >= 0 {
tt.data = generateData(lessChunks)
var got []byte
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
if err != nil {
return err
}
got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123))
return err
}))
require.Equal(t, tt.data, got)
}
})

// Overwrite the file with more chunks, this should delete the extra
// chunks before writing the new ones.
t.Run("overwrite with more chunks", func(t *testing.T) {
moreChunks := tt.numChunks + 1
tt.data = generateData(moreChunks)
var got []byte
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123))
if err != nil {
return err
}
got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123))
return err
}))
require.Equal(t, tt.data, got)
})
})
}
}