Skip to content

Commit a6bb126

Browse files
committed
db: add BlobFileCreated and BlobFileDeleted events
Add two new events to the EventListener: one invoked when a new blob file is created and when a blob file is deleted. For now these are largely untested while blob files are still being integrated. Informs #112.
1 parent da6410b commit a6bb126

File tree

3 files changed

+112
-16
lines changed

3 files changed

+112
-16
lines changed

compaction.go

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,15 @@ func (k compactionKind) String() string {
174174
return "?"
175175
}
176176

177+
// compactingOrFlushing returns "flushing" if the compaction kind is a flush,
178+
// otherwise it returns "compacting".
179+
func (k compactionKind) compactingOrFlushing() string {
180+
if k == compactionKindFlush {
181+
return "flushing"
182+
}
183+
return "compacting"
184+
}
185+
177186
// compaction is a table compaction from one level to the next, starting from a
178187
// given version.
179188
type compaction struct {
@@ -3143,7 +3152,7 @@ func (d *DB) compactAndWrite(
31433152
}
31443153
// Create a new table.
31453154
writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3146-
objMeta, tw, err := d.newCompactionOutput(jobID, c, writerOpts)
3155+
objMeta, tw, err := d.newCompactionOutputTable(jobID, c, writerOpts)
31473156
if err != nil {
31483157
return runner.Finish().WithError(err)
31493158
}
@@ -3271,43 +3280,56 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
32713280
return ve, nil
32723281
}
32733282

3274-
// newCompactionOutput creates an object for a new table produced by a
3283+
// newCompactionOutputTable creates an object for a new table produced by a
32753284
// compaction or flush.
3276-
func (d *DB) newCompactionOutput(
3285+
func (d *DB) newCompactionOutputTable(
32773286
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
32783287
) (objstorage.ObjectMetadata, sstable.RawWriter, error) {
3279-
writable, objMeta, err := d.newCompactionOutputObj(jobID, c, base.FileTypeTable)
3288+
writable, objMeta, err := d.newCompactionOutputObj(c, base.FileTypeTable)
32803289
if err != nil {
32813290
return objstorage.ObjectMetadata{}, nil, err
32823291
}
3283-
3284-
var reason string
3285-
if c.kind == compactionKindFlush {
3286-
reason = "flushing"
3287-
} else {
3288-
reason = "compacting"
3289-
}
32903292
d.opts.EventListener.TableCreated(TableCreateInfo{
32913293
JobID: int(jobID),
3292-
Reason: reason,
3294+
Reason: c.kind.compactingOrFlushing(),
32933295
Path: d.objProvider.Path(objMeta),
32943296
FileNum: objMeta.DiskFileNum,
32953297
})
3296-
32973298
writerOpts.SetInternal(sstableinternal.WriterOptions{
32983299
CacheOpts: sstableinternal.CacheOptions{
32993300
CacheHandle: d.cacheHandle,
33003301
FileNum: objMeta.DiskFileNum,
33013302
},
33023303
})
3303-
33043304
tw := sstable.NewRawWriterWithCPUMeasurer(writable, writerOpts, c.grantHandle)
33053305
return objMeta, tw, nil
33063306
}
33073307

3308+
// Allow the newCompactionOutputBlob method to be unused for now.
3309+
// TODO(jackson): Hook this up.
3310+
var _ = (*DB).newCompactionOutputBlob
3311+
3312+
// newCompactionOutputBlob creates an object for a new blob produced by a
3313+
// compaction or flush.
3314+
func (d *DB) newCompactionOutputBlob(
3315+
jobID JobID, c *compaction,
3316+
) (objstorage.Writable, objstorage.ObjectMetadata, error) {
3317+
writable, objMeta, err := d.newCompactionOutputObj(c, base.FileTypeBlob)
3318+
if err != nil {
3319+
return nil, objstorage.ObjectMetadata{}, err
3320+
}
3321+
d.opts.EventListener.BlobFileCreated(BlobFileCreateInfo{
3322+
JobID: int(jobID),
3323+
Reason: c.kind.compactingOrFlushing(),
3324+
Path: d.objProvider.Path(objMeta),
3325+
FileNum: objMeta.DiskFileNum,
3326+
})
3327+
return writable, objMeta, nil
3328+
}
3329+
33083330
// newCompactionOutputObj creates an object produced by a compaction or flush.
33093331
func (d *DB) newCompactionOutputObj(
3310-
jobID JobID, c *compaction, typ base.FileType,
3332+
c *compaction, typ base.FileType,
33113333
) (objstorage.Writable, objstorage.ObjectMetadata, error) {
33123334
diskFileNum := d.mu.versions.getNextDiskFileNum()
33133335

event.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,48 @@ func (i LevelInfo) SafeFormat(w redact.SafePrinter, _ rune) {
9797
redact.Safe(i.Score))
9898
}
9999

100+
// BlobFileCreateInfo contains the info for a blob file creation event.
101+
type BlobFileCreateInfo struct {
102+
JobID int
103+
// Reason is the reason for the table creation: "compacting", "flushing", or
104+
// "ingesting".
105+
Reason string
106+
Path string
107+
FileNum base.DiskFileNum
108+
}
109+
110+
func (i BlobFileCreateInfo) String() string {
111+
return redact.StringWithoutMarkers(i)
112+
}
113+
114+
// SafeFormat implements redact.SafeFormatter.
115+
func (i BlobFileCreateInfo) SafeFormat(w redact.SafePrinter, _ rune) {
116+
w.Printf("[JOB %d] %s: blob file created %s",
117+
redact.Safe(i.JobID), redact.Safe(i.Reason), i.FileNum)
118+
}
119+
120+
// BlobFileDeleteInfo contains the info for a blob file deletion event.
121+
type BlobFileDeleteInfo struct {
122+
JobID int
123+
Path string
124+
FileNum base.DiskFileNum
125+
Err error
126+
}
127+
128+
func (i BlobFileDeleteInfo) String() string {
129+
return redact.StringWithoutMarkers(i)
130+
}
131+
132+
// SafeFormat implements redact.SafeFormatter.
133+
func (i BlobFileDeleteInfo) SafeFormat(w redact.SafePrinter, _ rune) {
134+
if i.Err != nil {
135+
w.Printf("[JOB %d] blob file delete error %s: %s",
136+
redact.Safe(i.JobID), i.FileNum, i.Err)
137+
return
138+
}
139+
w.Printf("[JOB %d] blob file deleted %s", redact.Safe(i.JobID), i.FileNum)
140+
}
141+
100142
// CompactionInfo contains the info for a compaction event.
101143
type CompactionInfo struct {
102144
// JobID is the ID of the compaction job.
@@ -728,6 +770,12 @@ type EventListener struct {
728770
// operation such as flush or compaction.
729771
BackgroundError func(error)
730772

773+
// BlobFileCreated is invoked after a blob file has been created.
774+
BlobFileCreated func(BlobFileCreateInfo)
775+
776+
// BlobFileDeleted is invoked after a blob file has been deleted.
777+
BlobFileDeleted func(BlobFileDeleteInfo)
778+
731779
// DataCorruption is invoked when an on-disk corruption is detected. It should
732780
// not block, as it is called synchronously in read paths.
733781
DataCorruption func(DataCorruptionInfo)
@@ -827,6 +875,12 @@ func (l *EventListener) EnsureDefaults(logger Logger) {
827875
l.BackgroundError = func(error) {}
828876
}
829877
}
878+
if l.BlobFileCreated == nil {
879+
l.BlobFileCreated = func(info BlobFileCreateInfo) {}
880+
}
881+
if l.BlobFileDeleted == nil {
882+
l.BlobFileDeleted = func(info BlobFileDeleteInfo) {}
883+
}
830884
if l.DataCorruption == nil {
831885
if logger != nil {
832886
l.DataCorruption = func(info DataCorruptionInfo) {
@@ -912,6 +966,12 @@ func MakeLoggingEventListener(logger Logger) EventListener {
912966
BackgroundError: func(err error) {
913967
logger.Errorf("background error: %s", err)
914968
},
969+
BlobFileCreated: func(info BlobFileCreateInfo) {
970+
logger.Infof("%s", info)
971+
},
972+
BlobFileDeleted: func(info BlobFileDeleteInfo) {
973+
logger.Infof("%s", info)
974+
},
915975
DataCorruption: func(info DataCorruptionInfo) {
916976
logger.Errorf("%s", info)
917977
},
@@ -990,6 +1050,14 @@ func TeeEventListener(a, b EventListener) EventListener {
9901050
a.BackgroundError(err)
9911051
b.BackgroundError(err)
9921052
},
1053+
BlobFileCreated: func(info BlobFileCreateInfo) {
1054+
a.BlobFileCreated(info)
1055+
b.BlobFileCreated(info)
1056+
},
1057+
BlobFileDeleted: func(info BlobFileDeleteInfo) {
1058+
a.BlobFileDeleted(info)
1059+
b.BlobFileDeleted(info)
1060+
},
9931061
DataCorruption: func(info DataCorruptionInfo) {
9941062
a.DataCorruption(info)
9951063
b.DataCorruption(info)

obsolete_files.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,13 @@ func (cm *cleanupManager) deleteObsoleteObject(
286286
FileNum: fileNum,
287287
Err: err,
288288
})
289-
// TODO(jackson): Add BlobFileDeleted event.
289+
case base.FileTypeBlob:
290+
cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
291+
JobID: int(jobID),
292+
Path: path,
293+
FileNum: fileNum,
294+
Err: err,
295+
})
290296
}
291297
}
292298

0 commit comments

Comments
 (0)