/
conn.rb
119 lines (102 loc) · 2.52 KB
/
conn.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
require 'thread'
require 'uri'
require 'pg'
module QC
module Conn
extend self
@exec_mutex = Mutex.new
def execute(stmt, *params)
@exec_mutex.synchronize do
log(:at => "exec_sql", :sql => stmt.inspect)
begin
params = nil if params.empty?
r = connection.exec(stmt, params)
result = []
r.each {|t| result << t}
result.length > 1 ? result : result.pop
rescue PGError => e
log(:error => e.inspect)
disconnect
raise
end
end
end
def wait(chan)
execute('LISTEN "' + chan + '"')
wait_for_notify(WAIT_TIME)
execute('UNLISTEN "' + chan + '"')
drain_notify
end
def transaction
begin
execute("BEGIN")
yield
execute("COMMIT")
rescue Exception
execute("ROLLBACK")
raise
end
end
def transaction_idle?
connection.transaction_status == PGconn::PQTRANS_IDLE
end
def connection
@connection ||= connect
end
def connection=(connection)
unless connection.is_a? PG::Connection
c = connection.class
err = "connection must be an instance of PG::Connection, but was #{c}"
raise(ArgumentError, err)
end
@connection = connection
end
def disconnect
begin connection.finish
ensure @connection = nil
end
end
def connect
log(:at => "establish_conn")
conn = PGconn.connect(*normalize_db_url(db_url))
if conn.status != PGconn::CONNECTION_OK
log(:error => conn.error)
end
conn.exec("SET application_name = '#{QC::APP_NAME}'")
conn
end
def normalize_db_url(url)
host = url.host
host = host.gsub(/%2F/i, '/') if host
[
host, # host or percent-encoded socket path
url.port || 5432,
nil, '', #opts, tty
url.path.gsub("/",""), # database name
url.user,
url.password
]
end
def db_url
return @db_url if @db_url
url = ENV["QC_DATABASE_URL"] ||
ENV["DATABASE_URL"] ||
raise(ArgumentError, "missing QC_DATABASE_URL or DATABASE_URL")
@db_url = URI.parse(url)
end
private
def log(msg)
QC.log(msg)
end
def wait_for_notify(t)
Array.new.tap do |msgs|
connection.wait_for_notify(t) {|event, pid, msg| msgs << msg}
end
end
def drain_notify
until connection.notifies.nil?
log(:at => "drain_notifications")
end
end
end
end