Skip to content

Commit

Permalink
server,kvserver/rangelog: extract implementation of rangelog writer
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
ajwerner committed Oct 11, 2022
1 parent 953873d commit 0c9a5da
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 69 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/raftutil:raftutil_test",
"//pkg/kv/kvserver/rangefeed:rangefeed",
"//pkg/kv/kvserver/rangefeed:rangefeed_test",
"//pkg/kv/kvserver/rangelog:rangelog",
"//pkg/kv/kvserver/rditer:rditer",
"//pkg/kv/kvserver/rditer:rditer_test",
"//pkg/kv/kvserver/readsummary/rspb:rspb",
Expand Down Expand Up @@ -2470,6 +2471,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/raftentry:get_x_data",
"//pkg/kv/kvserver/raftutil:get_x_data",
"//pkg/kv/kvserver/rangefeed:get_x_data",
"//pkg/kv/kvserver/rangelog:get_x_data",
"//pkg/kv/kvserver/rditer:get_x_data",
"//pkg/kv/kvserver/readsummary:get_x_data",
"//pkg/kv/kvserver/readsummary/rspb:get_x_data",
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,11 @@ go_library(
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/security/username",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
Expand Down
56 changes: 1 addition & 55 deletions pkg/kv/kvserver/range_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ package kvserver

import (
"context"
"encoding/json"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -65,7 +61,7 @@ func (w *wrappedRangeLogWriter) WriteRangeLogEvent(
if c := w.getCounter(event.EventType); c != nil {
c.Inc(1)
}
if w.shouldWrite() {
if w.shouldWrite() && w.underlying != nil {
return w.underlying.WriteRangeLogEvent(ctx, txn, event)
}
return nil
Expand All @@ -84,56 +80,6 @@ func maybeLogRangeLogEvent(ctx context.Context, event kvserverpb.RangeLogEvent)
event.EventType, event.RangeID, info)
}

// internalExecutorRangeLogWriter implements RangeLogWriter using the
// InternalExecutor.
type internalExecutorRangeLogWriter struct {
ie sqlutil.InternalExecutor
}

// WriteRangeLogEvent implements RangeLogWriter. It writes the event to the
// system.rangelog table in the provided transaction.
func (s *internalExecutorRangeLogWriter) WriteRangeLogEvent(
ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent,
) error {
const insertEventTableStmt = `
INSERT INTO system.rangelog (
timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info
)
VALUES(
$1, $2, $3, $4, $5, $6
)
`
args := []interface{}{
event.Timestamp,
event.RangeID,
event.StoreID,
event.EventType.String(),
nil, // otherRangeID
nil, // info
}
if event.OtherRangeID != 0 {
args[4] = event.OtherRangeID
}
if event.Info != nil {
infoBytes, err := json.Marshal(*event.Info)
if err != nil {
return err
}
args[5] = string(infoBytes)
}

rows, err := s.ie.ExecEx(ctx, "log-range-event", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
insertEventTableStmt, args...)
if err != nil {
return err
}
if rows != 1 {
return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows)
}
return nil
}

// logSplit logs a range split event into the event table. The affected range is
// the range which previously existed and is being split in half; the "other"
// range is the new range which is being created.
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/rangelog/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "rangelog",
srcs = ["rangelog.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/security/username",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"@com_github_cockroachdb_errors//:errors",
],
)

get_x_data(name = "get_x_data")
79 changes: 79 additions & 0 deletions pkg/kv/kvserver/rangelog/rangelog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package rangelog implements kvserver.RangeLogWriter
package rangelog

import (
"context"
"encoding/json"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

// Writer implements kvserver.RangeLogWriter using the InternalExecutor.
type Writer struct {
ie sqlutil.InternalExecutor
}

// NewWriter returns a new Writer which implements kvserver.RangeLogWriter
// using the InternalExecutor.
func NewWriter(ie sqlutil.InternalExecutor) *Writer {
return &Writer{ie: ie}
}

// WriteRangeLogEvent implements kvserver.RangeLogWriter. It writes the event
// to the system.rangelog table in the provided transaction.
func (s *Writer) WriteRangeLogEvent(
ctx context.Context, txn *kv.Txn, event kvserverpb.RangeLogEvent,
) error {
const insertEventTableStmt = `
INSERT INTO system.rangelog (
timestamp, "rangeID", "storeID", "eventType", "otherRangeID", info
)
VALUES(
$1, $2, $3, $4, $5, $6
)
`
args := []interface{}{
event.Timestamp,
event.RangeID,
event.StoreID,
event.EventType.String(),
nil, // otherRangeID
nil, // info
}
if event.OtherRangeID != 0 {
args[4] = event.OtherRangeID
}
if event.Info != nil {
infoBytes, err := json.Marshal(*event.Info)
if err != nil {
return err
}
args[5] = string(infoBytes)
}

rows, err := s.ie.ExecEx(ctx, "log-range-event", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
insertEventTableStmt, args...)
if err != nil {
return err
}
if rows != 1 {
return errors.Errorf("%d rows affected by log insertion; expected exactly one row affected.", rows)
}
return nil
}
8 changes: 1 addition & 7 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
Expand Down Expand Up @@ -995,9 +994,6 @@ type StoreConfig struct {
ClosedTimestampSender *sidetransport.Sender
ClosedTimestampReceiver sidetransportReceiver

// SQLExecutor is used by the store to execute SQL statements.
SQLExecutor sqlutil.InternalExecutor

// TimeSeriesDataStore is an interface used by the store's time series
// maintenance queue to dispatch individual maintenance tasks.
TimeSeriesDataStore TimeSeriesDataStore
Expand Down Expand Up @@ -1228,9 +1224,7 @@ func NewStore(
return cfg.LogRangeAndNodeEvents &&
logRangeAndNodeEventsEnabled.Get(&cfg.Settings.SV)
},
&internalExecutorRangeLogWriter{
ie: cfg.SQLExecutor,
},
cfg.RangeLogWriter,
)

s.draining.Store(false)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptprovider",
"//pkg/kv/kvserver/protectedts/ptreconcile",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rangelog",
"//pkg/kv/kvserver/reports",
"//pkg/multitenant",
"//pkg/multitenant/multitenantio",
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -50,7 +51,7 @@ func logPendingLossOfQuorumRecoveryEvents(ctx context.Context, stores *kvserver.
}

func publishPendingLossOfQuorumRecoveryEvents(
ctx context.Context, stores *kvserver.Stores, stopper *stop.Stopper,
ctx context.Context, ie sqlutil.InternalExecutor, stores *kvserver.Stores, stopper *stop.Stopper,
) {
_ = stopper.RunAsyncTask(ctx, "publish-loss-of-quorum-events", func(ctx context.Context) {
if err := stores.VisitStores(func(s *kvserver.Store) error {
Expand All @@ -59,7 +60,7 @@ func publishPendingLossOfQuorumRecoveryEvents(
s.Engine(),
func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) {
sqlExec := func(ctx context.Context, stmt string, args ...interface{}) (int, error) {
return s.GetStoreConfig().SQLExecutor.ExecEx(ctx, "", nil,
return ie.ExecEx(ctx, "", nil,
sessiondata.InternalExecutorOverride{User: username.RootUserName()}, stmt, args...)
}
if err := loqrecovery.UpdateRangeLogWithRecovery(ctx, sqlExec, record); err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile"
serverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangelog"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/reports"
"github.com/cockroachdb/cockroach/pkg/obs"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
Expand Down Expand Up @@ -653,7 +654,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
ScanMaxIdleTime: cfg.ScanMaxIdleTime,
HistogramWindowInterval: cfg.HistogramWindowInterval(),
StorePool: storePool,
SQLExecutor: internalExecutor,
LogRangeAndNodeEvents: cfg.EventLogEnabled,
RangeDescriptorCache: distSender.RangeDescriptorCache(),
TimeSeriesDataStore: tsDB,
Expand All @@ -667,6 +667,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
SpanConfigsDisabled: cfg.SpanConfigsDisabled,
SnapshotApplyLimit: cfg.SnapshotApplyLimit,
SnapshotSendLimit: cfg.SnapshotSendLimit,
RangeLogWriter: rangelog.NewWriter(internalExecutor),
}

if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil {
Expand Down Expand Up @@ -1654,7 +1655,9 @@ func (s *Server) PreStart(ctx context.Context) error {
// range logs. We do it as a separate stage to log events early just in case
// startup fails, and write to range log once the server is running as we need
// to run sql statements to update rangelog.
publishPendingLossOfQuorumRecoveryEvents(ctx, s.node.stores, s.stopper)
publishPendingLossOfQuorumRecoveryEvents(
ctx, s.node.execCfg.InternalExecutor, s.node.stores, s.stopper,
)

log.Event(ctx, "server initialized")

Expand Down

0 comments on commit 0c9a5da

Please sign in to comment.