Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Preprocess logs #4

Merged
merged 7 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: 2
references:
docker_golang: &docker_golang
docker:
- image: golang:1.11
- image: golang:1.12
environment:
PGHOST: "127.0.0.1"
PGUSER: "postgres"
Expand Down
193 changes: 131 additions & 62 deletions cmd/pgreplay/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bufio"
"fmt"
stdlog "log"
"net/http"
Expand All @@ -21,25 +22,32 @@ var logger kitlog.Logger
var Version string // assigned during build

var (
app = kingpin.New("pgreplay", "Replay Postgres logs against database").Version(Version)
host = app.Flag("host", "PostgreSQL database host").Required().String()
port = app.Flag("port", "PostgreSQL database port").Default("5432").Uint16()
datname = app.Flag("database", "PostgreSQL root database").Default("postgres").String()
user = app.Flag("user", "PostgreSQL root user").Default("postgres").String()
errlogFile = app.Flag("errlog-file", "Path to PostgreSQL errlog").Required().ExistingFile()
replayRate = app.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float()
app = kingpin.New("pgreplay", "Replay Postgres logs against database").Version(Version)

// Global flags applying to every command
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
startFlag = app.Flag("start", "Play logs from this time onward ("+pgreplay.PostgresTimestampFormat+")").String()
finishFlag = app.Flag("finish", "Stop playing logs at this time ("+pgreplay.PostgresTimestampFormat+")").String()
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
pollInterval = app.Flag("poll-interval", "Interval between polling for finish").Default("5s").Duration()
metricsAddress = app.Flag("metrics-address", "Address to bind HTTP metrics listener").Default("127.0.0.1").String()
metricsPort = app.Flag("metrics-port", "Port to bind HTTP metrics listener").Default("9445").Uint16()

preprocess = app.Command("preprocess", "Process an errlog file into a pgreplay preprocessed log")
preprocessJSONOutput = preprocess.Flag("json-output", "JSON output file").Required().String()
preprocessErrlogFile = preprocess.Flag("errlog-file", "Path to PostgreSQL errlog").Required().ExistingFile()

run = app.Command("run", "Replay from log files against a real database")
runHost = run.Flag("host", "PostgreSQL database host").Required().String()
runPort = run.Flag("port", "PostgreSQL database port").Default("5432").Uint16()
runDatname = run.Flag("database", "PostgreSQL root database").Default("postgres").String()
runUser = run.Flag("user", "PostgreSQL root user").Default("postgres").String()
runReplayRate = run.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float()
runPollInterval = run.Flag("poll-interval", "Interval between polling for finish").Default("5s").Duration()
runErrlogFile = run.Flag("errlog-file", "Path to PostgreSQL errlog").ExistingFile()
runJSONFile = run.Flag("json-file", "Path to preprocessed pgreplay JSON log file").ExistingFile()
)

func main() {
if _, err := app.Parse(os.Args[1:]); err != nil {
kingpin.Fatalf("%s, try --help", err)
}
command := kingpin.MustParse(app.Parse(os.Args[1:]))

logger = kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, level.AllowInfo())
Expand All @@ -57,24 +65,115 @@ func main() {
http.ListenAndServe(fmt.Sprintf("%s:%v", *metricsAddress, *metricsPort), nil)
}()

errlog, err := os.Open(*errlogFile)
if err != nil {
logger.Log("event", "logfile.error", "error", err)
os.Exit(255)
var err error
var start, finish *time.Time

if start, err = parseTimestamp(*startFlag); err != nil {
kingpin.Fatalf("--start flag %s", err)
}

database, err := pgreplay.NewDatabase(pgx.ConnConfig{
Host: *host,
Port: *port,
Database: *datname,
User: *user,
})
if finish, err = parseTimestamp(*finishFlag); err != nil {
kingpin.Fatalf("--finish flag %s", err)
}

if err != nil {
logger.Log("event", "postgres.error", "error", err)
os.Exit(255)
switch command {
case preprocess.FullCommand():
output, err := os.Create(*preprocessJSONOutput)
if err != nil {
kingpin.Fatalf("failed to create output file: %v", err)
}

items := parseErrlog(openLogfile(*preprocessErrlogFile))
for item := range pgreplay.NewStreamer(start, finish).Filter(items) {
bytes, err := pgreplay.ItemMarshalJSON(item)
if err != nil {
kingpin.Fatalf("failed to serialize item: %v", err)
}

if _, err := output.Write(append(bytes, byte('\n'))); err != nil {
kingpin.Fatalf("failed to write to output file: %v", err)
}
}

case run.FullCommand():
var items chan pgreplay.Item

if *runJSONFile != "" {
items = parseJSONlog(openLogfile(*runJSONFile))
} else if *runErrlogFile != "" {
items = parseErrlog(openLogfile(*runErrlogFile))
} else {
kingpin.Fatalf("must provide either an errlog or jsonlog")
}

database, err := pgreplay.NewDatabase(
pgx.ConnConfig{
Host: *runHost,
Port: *runPort,
Database: *runDatname,
User: *runUser,
},
)

if err != nil {
logger.Log("event", "postgres.error", "error", err)
os.Exit(255)
}

stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *runReplayRate)
if err != nil {
kingpin.Fatalf("failed to start streamer: %s", err)
}

errs, consumeDone := database.Consume(stream)
poller := time.NewTicker(*runPollInterval)

var status int

for {
select {
case err := <-errs:
if err != nil {
logger.Log("event", "consume.error", "error", err)
}
case err := <-consumeDone:
if err != nil {
status = 255
}

logger.Log("event", "consume.finished", "error", err, "status", status)
os.Exit(status)

// Poll our consumer to determine how much work remains
case <-poller.C:
if connections, items := database.Pending(); connections > 0 {
logger.Log("event", "consume.pending", "connections", connections, "items", items)
}
}
}
}
}

func parseJSONlog(file *os.File) chan pgreplay.Item {
items := make(chan pgreplay.Item)

go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
item, err := pgreplay.ItemUnmarshalJSON([]byte(line))
if err == nil {
items <- item
}
}

close(items)
}()

return items
}

func parseErrlog(errlog *os.File) chan pgreplay.Item {
items, logerrs, done := pgreplay.Parse(errlog)

go func() {
Expand All @@ -87,47 +186,17 @@ func main() {
}
}()

var start, finish *time.Time

if start, err = parseTimestamp(*startFlag); err != nil {
kingpin.Fatalf("--start flag %s", err)
}

if finish, err = parseTimestamp(*finishFlag); err != nil {
kingpin.Fatalf("--finish flag %s", err)
}
return items
}

stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *replayRate)
func openLogfile(path string) *os.File {
file, err := os.Open(path)
if err != nil {
kingpin.Fatalf("failed to start streamer: %s", err)
logger.Log("event", "logfile.error", "path", path, "error", err)
os.Exit(255)
}

errs, consumeDone := database.Consume(stream)
poller := time.NewTicker(*pollInterval)

var status int

for {
select {
case err := <-errs:
if err != nil {
logger.Log("event", "consume.error", "error", err)
}
case err := <-consumeDone:
if err != nil {
status = 255
}

logger.Log("event", "consume.finished", "error", err, "status", status)
os.Exit(status)

// Poll our consumer to determine how much work remains
case <-poller.C:
if connections, items := database.Pending(); connections > 0 {
logger.Log("event", "consume.pending", "connections", connections, "items", items)
}
}
}
return file
}

// parseTimestamp parsed a Postgres friendly timestamp
Expand Down
28 changes: 16 additions & 12 deletions pkg/pgreplay/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,8 @@ func (d *Database) Consume(items chan Item) (chan error, chan error) {
conn.In() <- item
}

// Flush disconnects down each of our connection sessions, ensuring even connections
// that we don't have disconnects for in our logs get closed.
for _, conn := range d.conns {
if !conn.IsAlive() {
// Non-blocking channel op to avoid read-write-race between checking whether the
// connection is alive and the channel having been closed
select {
case conn.In() <- &Disconnect{}:
default:
}
}
conn.Close()
}

// Wait for every connection to terminate
Expand Down Expand Up @@ -138,7 +129,7 @@ func (d *Database) Connect(item Item) (*Conn, error) {
return nil, err
}

return &Conn{conn, channels.NewInfiniteChannel()}, nil
return &Conn{conn, channels.NewInfiniteChannel(), sync.Once{}}, nil
}

// Pending returns a count of the connections that are still active, and how many items
Expand All @@ -162,14 +153,19 @@ func (d *Database) Pending() (connections, items int) {
type Conn struct {
*pgx.Conn
channels.Channel
sync.Once
}

func (c *Conn) Close() {
c.Once.Do(c.Channel.Close)
}

// Start begins to process the items that are placed into the Conn's channel. We'll finish
// once the connection has died or we run out of items to process.
func (c *Conn) Start() error {
items := make(chan Item)
channels.Unwrap(c.Channel, items)
defer c.Channel.Close()
defer c.Close()

for item := range items {
if item == nil {
Expand All @@ -187,5 +183,13 @@ func (c *Conn) Start() error {
}
}

// If we're still alive after consuming all our items, assume that we finished
// processing our logs before we saw this connection be disconnected. We should
// terminate ourselves by handling our own disconnect, so we can know when all our
// connection are done.
if c.IsAlive() {
Disconnect{}.Handle(c.Conn)
}

return nil
}
14 changes: 7 additions & 7 deletions pkg/pgreplay/integration/testdata/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ SET default_tablespace = '';
SET default_with_oids = false;

--
-- Name: logs; Type: TABLE; Schema: public; Owner: lawrence
-- Name: logs; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.logs (
Expand All @@ -44,10 +44,10 @@ CREATE TABLE public.logs (
);


ALTER TABLE public.logs OWNER TO lawrence;
ALTER TABLE public.logs OWNER TO postgres;

--
-- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: lawrence
-- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: postgres
--

CREATE SEQUENCE public.logs_id_seq
Expand All @@ -59,24 +59,24 @@ CREATE SEQUENCE public.logs_id_seq
CACHE 1;


ALTER TABLE public.logs_id_seq OWNER TO lawrence;
ALTER TABLE public.logs_id_seq OWNER TO postgres;

--
-- Name: logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: lawrence
-- Name: logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: postgres
--

ALTER SEQUENCE public.logs_id_seq OWNED BY public.logs.id;


--
-- Name: logs id; Type: DEFAULT; Schema: public; Owner: lawrence
-- Name: logs id; Type: DEFAULT; Schema: public; Owner: postgres
--

ALTER TABLE ONLY public.logs ALTER COLUMN id SET DEFAULT nextval('public.logs_id_seq'::regclass);


--
-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: lawrence
-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: postgres
--

ALTER TABLE ONLY public.logs
Expand Down
7 changes: 4 additions & 3 deletions pkg/pgreplay/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (
// for this session, as this log line will confirm the unbound query has no parameters.
if strings.HasPrefix(msg, LogDuration) {
if unbound, ok := unbounds[details.SessionID]; ok {
delete(unbounds, details.SessionID)
return unbound.Bind(nil), nil
}

Expand All @@ -139,7 +140,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (

// LOG: statement: select pg_reload_conf();
if strings.HasPrefix(msg, LogStatement) {
return &Statement{details, strings.TrimPrefix(msg, LogStatement)}, nil
return Statement{details, strings.TrimPrefix(msg, LogStatement)}, nil
}

// LOG: execute <unnamed>: select pg_sleep($1)
Expand Down Expand Up @@ -200,12 +201,12 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) (

// LOG: connection authorized: user=postgres database=postgres
if strings.HasPrefix(msg, LogConnectionAuthorized) {
return &Connect{details}, nil
return Connect{details}, nil
}

// LOG: disconnection: session time: 0:00:03.861 user=postgres database=postgres host=192.168.99.1 port=51529
if strings.HasPrefix(msg, LogConnectionDisconnect) {
return &Disconnect{details}, nil
return Disconnect{details}, nil
}

// LOG: connection received: host=192.168.99.1 port=52188
Expand Down
Loading