-
Notifications
You must be signed in to change notification settings - Fork 66
/
pgslice.rb
702 lines (582 loc) · 22 KB
/
pgslice.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
require "pgslice/version"
require "slop"
require "pg"
require "cgi"
module PgSlice
class Error < StandardError; end
class Client
attr_reader :arguments, :options
SQL_FORMAT = {
day: "YYYYMMDD",
month: "YYYYMM"
}
def initialize(args)
$stdout.sync = true
$stderr.sync = true
parse_args(args)
@command = @arguments.shift
end
def perform
return if @exit
case @command
when "prep"
prep
when "add_partitions"
add_partitions
when "fill"
fill
when "swap"
swap
when "unswap"
unswap
when "unprep"
unprep
when "analyze"
analyze
when nil
log "Commands: add_partitions, analyze, fill, prep, swap, unprep, unswap"
else
abort "Unknown command: #{@command}"
end
ensure
@connection.close if @connection
end
protected
# commands
def prep
table, column, period = arguments
intermediate_table = "#{table}_intermediate"
trigger_name = self.trigger_name(table)
if options[:no_partition]
abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1
abort "Can't use --trigger-based and --no-partition" if options[:trigger_based]
else
abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3
end
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
unless options[:no_partition]
abort "Column not found: #{column}" unless columns(table).include?(column)
abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym]
end
queries = []
declarative = server_version_num >= 100000 && !options[:trigger_based]
if declarative && !options[:no_partition]
queries << <<-SQL
CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING STORAGE INCLUDING COMMENTS) PARTITION BY RANGE (#{quote_ident(column)});
SQL
# add comment
cast = column_cast(table, column)
queries << <<-SQL
COMMENT ON TABLE #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
SQL
else
queries << <<-SQL
CREATE TABLE #{quote_ident(intermediate_table)} (LIKE #{quote_ident(table)} INCLUDING ALL);
SQL
foreign_keys(table).each do |fk_def|
queries << "ALTER TABLE #{quote_ident(intermediate_table)} ADD #{fk_def};"
end
end
if !options[:no_partition] && !declarative
sql_format = SQL_FORMAT[period.to_sym]
queries << <<-SQL
CREATE FUNCTION #{quote_ident(trigger_name)}()
RETURNS trigger AS $$
BEGIN
RAISE EXCEPTION 'Create partitions first.';
END;
$$ LANGUAGE plpgsql;
SQL
queries << <<-SQL
CREATE TRIGGER #{quote_ident(trigger_name)}
BEFORE INSERT ON #{quote_ident(intermediate_table)}
FOR EACH ROW EXECUTE PROCEDURE #{quote_ident(trigger_name)}();
SQL
cast = column_cast(table, column)
queries << <<-SQL
COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(intermediate_table)} is 'column:#{column},period:#{period},cast:#{cast}';
SQL
end
run_queries(queries)
end
def unprep
table = arguments.first
intermediate_table = "#{table}_intermediate"
trigger_name = self.trigger_name(table)
abort "Usage: pgslice unprep <table>" if arguments.length != 1
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
queries = [
"DROP TABLE #{quote_ident(intermediate_table)} CASCADE;",
"DROP FUNCTION IF EXISTS #{quote_ident(trigger_name)}();"
]
run_queries(queries)
end
def add_partitions
original_table = arguments.first
table = options[:intermediate] ? "#{original_table}_intermediate" : original_table
trigger_name = self.trigger_name(original_table)
abort "Usage: pgslice add_partitions <table>" if arguments.length != 1
abort "Table not found: #{table}" unless table_exists?(table)
future = options[:future]
past = options[:past]
range = (-1 * past)..future
period, field, cast, needs_comment, declarative = settings_from_trigger(original_table, table)
unless period
message = "No settings found: #{table}"
message = "#{message}\nDid you mean to use --intermediate?" unless options[:intermediate]
abort message
end
queries = []
if needs_comment
queries << "COMMENT ON TRIGGER #{quote_ident(trigger_name)} ON #{quote_ident(table)} is 'column:#{field},period:#{period},cast:#{cast}';"
end
# today = utc date
today = round_date(DateTime.now.new_offset(0).to_date, period)
schema_table =
if !declarative
table
elsif options[:intermediate]
original_table
else
"#{original_table}_#{today.strftime(name_format(period))}"
end
index_defs = execute("SELECT pg_get_indexdef(indexrelid) FROM pg_index WHERE indrelid = #{regclass(schema, schema_table)} AND indisprimary = 'f'").map { |r| r["pg_get_indexdef"] }
fk_defs = foreign_keys(schema_table)
primary_key = self.primary_key(schema_table)
added_partitions = []
range.each do |n|
day = advance_date(today, period, n)
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
next if table_exists?(partition_name)
added_partitions << partition_name
if declarative
queries << <<-SQL
CREATE TABLE #{quote_ident(partition_name)} PARTITION OF #{quote_ident(table)} FOR VALUES FROM (#{sql_date(day, cast, false)}) TO (#{sql_date(advance_date(day, period, 1), cast, false)});
SQL
else
queries << <<-SQL
CREATE TABLE #{quote_ident(partition_name)}
(CHECK (#{quote_ident(field)} >= #{sql_date(day, cast)} AND #{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}))
INHERITS (#{quote_ident(table)});
SQL
end
queries << "ALTER TABLE #{quote_ident(partition_name)} ADD PRIMARY KEY (#{quote_ident(primary_key)});" if primary_key
index_defs.each do |index_def|
queries << index_def.sub(/ ON \S+ USING /, " ON #{quote_ident(partition_name)} USING ").sub(/ INDEX .+ ON /, " INDEX ON ") + ";"
end
fk_defs.each do |fk_def|
queries << "ALTER TABLE #{quote_ident(partition_name)} ADD #{fk_def};"
end
end
unless declarative
# update trigger based on existing partitions
current_defs = []
future_defs = []
past_defs = []
name_format = self.name_format(period)
existing_tables = existing_partitions(original_table)
existing_tables = (existing_tables + added_partitions).uniq.sort
existing_tables.each do |table|
day = DateTime.strptime(table.split("_").last, name_format)
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
sql = "(NEW.#{quote_ident(field)} >= #{sql_date(day, cast)} AND NEW.#{quote_ident(field)} < #{sql_date(advance_date(day, period, 1), cast)}) THEN
INSERT INTO #{quote_ident(partition_name)} VALUES (NEW.*);"
if day.to_date < today
past_defs << sql
elsif advance_date(day, period, 1) < today
current_defs << sql
else
future_defs << sql
end
end
# order by current period, future periods asc, past periods desc
trigger_defs = current_defs + future_defs + past_defs.reverse
if trigger_defs.any?
queries << <<-SQL
CREATE OR REPLACE FUNCTION #{quote_ident(trigger_name)}()
RETURNS trigger AS $$
BEGIN
IF #{trigger_defs.join("\n ELSIF ")}
ELSE
RAISE EXCEPTION 'Date out of range. Ensure partitions are created.';
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
SQL
end
end
run_queries(queries) if queries.any?
end
def fill
table = arguments.first
abort "Usage: pgslice fill <table>" if arguments.length != 1
source_table = options[:source_table]
dest_table = options[:dest_table]
if options[:swapped]
source_table ||= retired_name(table)
dest_table ||= table
else
source_table ||= table
dest_table ||= intermediate_name(table)
end
abort "Table not found: #{source_table}" unless table_exists?(source_table)
abort "Table not found: #{dest_table}" unless table_exists?(dest_table)
period, field, cast, needs_comment, declarative = settings_from_trigger(table, dest_table)
if period
name_format = self.name_format(period)
existing_tables = existing_partitions(table)
if existing_tables.any?
starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format)
ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1)
end
end
schema_table = period && declarative ? existing_tables.last : table
primary_key = self.primary_key(schema_table)
abort "No primary key" unless primary_key
max_source_id = max_id(source_table, primary_key)
max_dest_id =
if options[:start]
options[:start]
elsif options[:swapped]
max_id(dest_table, primary_key, where: options[:where], below: max_source_id)
else
max_id(dest_table, primary_key, where: options[:where])
end
if max_dest_id == 0 && !options[:swapped]
min_source_id = min_id(source_table, primary_key, field, cast, starting_time, options[:where])
max_dest_id = min_source_id - 1 if min_source_id
end
starting_id = max_dest_id
fields = columns(source_table).map { |c| quote_ident(c) }.join(", ")
batch_size = options[:batch_size]
i = 1
batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil
if batch_count == 0
log_sql "/* nothing to fill */"
end
while starting_id < max_source_id
where = "#{quote_ident(primary_key)} > #{starting_id} AND #{quote_ident(primary_key)} <= #{starting_id + batch_size}"
if starting_time
where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}"
end
if options[:where]
where << " AND #{options[:where]}"
end
query = <<-SQL
/* #{i} of #{batch_count} */
INSERT INTO #{quote_ident(dest_table)} (#{fields})
SELECT #{fields} FROM #{quote_ident(source_table)}
WHERE #{where}
SQL
run_query(query)
starting_id += batch_size
i += 1
if options[:sleep] && starting_id <= max_source_id
sleep(options[:sleep])
end
end
end
def swap
table = arguments.first
intermediate_table = intermediate_name(table)
retired_table = retired_name(table)
abort "Usage: pgslice swap <table>" if arguments.length != 1
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
abort "Table already exists: #{retired_table}" if table_exists?(retired_table)
queries = [
"ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(retired_table)};",
"ALTER TABLE #{quote_ident(intermediate_table)} RENAME TO #{quote_ident(table)};"
]
self.sequences(table).each do |sequence|
queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};"
end
queries.unshift("SET LOCAL lock_timeout = '#{options[:lock_timeout]}';") if server_version_num >= 90300
run_queries(queries)
end
def unswap
table = arguments.first
intermediate_table = intermediate_name(table)
retired_table = retired_name(table)
abort "Usage: pgslice unswap <table>" if arguments.length != 1
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table not found: #{retired_table}" unless table_exists?(retired_table)
abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
queries = [
"ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(intermediate_table)};",
"ALTER TABLE #{quote_ident(retired_table)} RENAME TO #{quote_ident(table)};"
]
self.sequences(table).each do |sequence|
queries << "ALTER SEQUENCE #{quote_ident(sequence["sequence_name"])} OWNED BY #{quote_ident(table)}.#{quote_ident(sequence["related_column"])};"
end
run_queries(queries)
end
def analyze
table = arguments.first
parent_table = options[:swapped] ? table : intermediate_name(table)
abort "Usage: pgslice analyze <table>" if arguments.length != 1
existing_tables = existing_partitions(table)
analyze_list = existing_tables + [parent_table]
run_queries_without_transaction analyze_list.map { |t| "ANALYZE VERBOSE #{quote_ident(t)};" }
end
# arguments
def parse_args(args)
opts = Slop.parse(args) do |o|
o.boolean "--intermediate"
o.boolean "--swapped"
o.float "--sleep"
o.integer "--future", default: 0
o.integer "--past", default: 0
o.integer "--batch-size", default: 10000
o.boolean "--dry-run", default: false
o.boolean "--no-partition", default: false
o.boolean "--trigger-based", default: false
o.integer "--start"
o.string "--url"
o.string "--source-table"
o.string "--dest-table"
o.string "--where"
o.string "--lock-timeout", default: "5s"
o.on "-v", "--version", "print the version" do
log PgSlice::VERSION
@exit = true
end
end
@arguments = opts.arguments
@options = opts.to_hash
rescue Slop::Error => e
abort e.message
end
# output
def log(message = nil)
$stderr.puts message
end
def log_sql(message = nil)
$stdout.puts message
end
def abort(message)
raise PgSlice::Error, message
end
# database connection
def connection
@connection ||= begin
url = options[:url] || ENV["PGSLICE_URL"]
abort "Set PGSLICE_URL or use the --url option" unless url
uri = URI.parse(url)
uri_parser = URI::Parser.new
config = {
host: uri.host,
port: uri.port,
dbname: uri.path.sub(/\A\//, ""),
user: uri.user,
password: uri.password,
connect_timeout: 1
}.reject { |_, value| value.to_s.empty? }
config.map { |key, value| config[key] = uri_parser.unescape(value) if value.is_a?(String) }
@schema = CGI.parse(uri.query.to_s)["schema"][0] || "public"
PG::Connection.new(config)
end
rescue PG::ConnectionBad => e
abort e.message
end
def schema
connection # ensure called first
@schema
end
def execute(query, params = [])
connection.exec_params(query, params).to_a
end
def run_queries(queries)
connection.transaction do
execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run]
log_sql "BEGIN;"
log_sql
run_queries_without_transaction(queries)
log_sql "COMMIT;"
end
end
def run_query(query)
log_sql query
unless options[:dry_run]
begin
execute(query)
rescue PG::ServerError => e
abort("#{e.class.name}: #{e.message}")
end
end
log_sql
end
def run_queries_without_transaction(queries)
queries.each do |query|
run_query(query)
end
end
def server_version_num
execute("SHOW server_version_num")[0]["server_version_num"].to_i
end
def existing_partitions(table)
existing_tables(like: "#{table}_%").select { |t| /\A#{Regexp.escape("#{table}_")}\d{6,8}\z/.match(t) }
end
def existing_tables(like:)
query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename LIKE $2"
execute(query, [schema, like]).map { |r| r["tablename"] }.sort
end
def table_exists?(table)
existing_tables(like: table).any?
end
def columns(table)
execute("SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2", [schema, table]).map{ |r| r["column_name"] }
end
# http://stackoverflow.com/a/20537829
def primary_key(table)
query = <<-SQL
SELECT
pg_attribute.attname,
format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
FROM
pg_index, pg_class, pg_attribute, pg_namespace
WHERE
relname = $2 AND
indrelid = pg_class.oid AND
nspname = $1 AND
pg_class.relnamespace = pg_namespace.oid AND
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey) AND
indisprimary
SQL
row = execute(query, [schema, table])[0]
row && row["attname"]
end
def max_id(table, primary_key, below: nil, where: nil)
query = "SELECT MAX(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}"
conditions = []
conditions << "#{quote_ident(primary_key)} <= #{below}" if below
conditions << where if where
query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
execute(query)[0]["max"].to_i
end
def min_id(table, primary_key, column, cast, starting_time, where)
query = "SELECT MIN(#{quote_ident(primary_key)}) FROM #{quote_ident(table)}"
conditions = []
conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time
conditions << where if where
query << " WHERE #{conditions.join(" AND ")}" if conditions.any?
(execute(query)[0]["min"] || 1).to_i
end
def has_trigger?(trigger_name, table)
!fetch_trigger(trigger_name, table).nil?
end
def fetch_comment(table)
execute("SELECT obj_description(#{regclass(schema, table)}) AS comment")[0]
end
# http://www.dbforums.com/showthread.php?1667561-How-to-list-sequences-and-the-columns-by-SQL
def sequences(table)
query = <<-SQL
SELECT
a.attname as related_column,
s.relname as sequence_name
FROM pg_class s
JOIN pg_depend d ON d.objid = s.oid
JOIN pg_class t ON d.objid = s.oid AND d.refobjid = t.oid
JOIN pg_attribute a ON (d.refobjid, d.refobjsubid) = (a.attrelid, a.attnum)
JOIN pg_namespace n ON n.oid = s.relnamespace
WHERE s.relkind = 'S'
AND n.nspname = $1
AND t.relname = $2
SQL
execute(query, [schema, table])
end
# helpers
def trigger_name(table)
"#{table}_insert_trigger"
end
def intermediate_name(table)
"#{table}_intermediate"
end
def retired_name(table)
"#{table}_retired"
end
def column_cast(table, column)
data_type = execute("SELECT data_type FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 AND column_name = $3", [schema, table, column])[0]["data_type"]
data_type == "timestamp with time zone" ? "timestamptz" : "date"
end
def sql_date(time, cast, add_cast = true)
if cast == "timestamptz"
fmt = "%Y-%m-%d %H:%M:%S UTC"
else
fmt = "%Y-%m-%d"
end
str = "'#{time.strftime(fmt)}'"
add_cast ? "#{str}::#{cast}" : str
end
def name_format(period)
case period.to_sym
when :day
"%Y%m%d"
else
"%Y%m"
end
end
def round_date(date, period)
date = date.to_date
case period.to_sym
when :day
date
else
Date.new(date.year, date.month)
end
end
def advance_date(date, period, count = 1)
date = date.to_date
case period.to_sym
when :day
date.next_day(count)
else
date.next_month(count)
end
end
def quote_ident(value)
PG::Connection.quote_ident(value)
end
def regclass(schema, table)
"'#{quote_ident(schema)}.#{quote_ident(table)}'::regclass"
end
def fetch_trigger(trigger_name, table)
execute("SELECT obj_description(oid, 'pg_trigger') AS comment FROM pg_trigger WHERE tgname = $1 AND tgrelid = #{regclass(schema, table)}", [trigger_name])[0]
end
def settings_from_trigger(original_table, table)
trigger_name = self.trigger_name(original_table)
needs_comment = false
trigger_comment = fetch_trigger(trigger_name, table)
comment = trigger_comment || fetch_comment(table)
if comment
field, period, cast = comment["comment"].split(",").map { |v| v.split(":").last } rescue [nil, nil, nil]
end
unless period
needs_comment = true
function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]
return [] unless function_def
function_def = function_def["pg_get_functiondef"]
sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") }
return [] unless sql_format
period = sql_format[0]
field = /to_char\(NEW\.(\w+),/.match(function_def)[1]
end
# backwards compatibility with 0.2.3 and earlier (pre-timestamptz support)
unless cast
cast = "date"
# update comment to explicitly define cast
needs_comment = true
end
[period, field, cast, needs_comment, !trigger_comment]
end
def foreign_keys(table)
execute("SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = #{regclass(schema, table)} AND contype ='f'").map { |r| r["pg_get_constraintdef"] }
end
def server_version_num
execute("SHOW server_version_num").first["server_version_num"].to_i
end
end
end