forked from pat/thinking-sphinx
/
postgresql_adapter.rb
188 lines (167 loc) · 4.57 KB
/
postgresql_adapter.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
module ThinkingSphinx
class PostgreSQLAdapter < AbstractAdapter
def setup
create_array_accum_function
create_crc32_function
end
def sphinx_identifier
"pgsql"
end
def concatenate(clause, separator = ' ')
if clause[/^COALESCE/]
clause.split('), ').join(") || '#{separator}' || ")
else
clause.split(', ').collect { |field|
"CAST(COALESCE(#{field}::varchar, '') as varchar)"
}.join(" || '#{separator}' || ")
end
end
def group_concatenate(clause, separator = ' ')
if server_version >= 80400
"array_to_string(array_agg(COALESCE(#{clause}, '0')), '#{separator}')"
else
"array_to_string(array_accum(COALESCE(#{clause}, '0')), '#{separator}')"
end
end
def cast_to_string(clause)
clause
end
def cast_to_datetime(clause)
if ThinkingSphinx::Configuration.instance.use_64_bit
"cast(extract(epoch from #{clause}) as bigint)"
else
"cast(extract(epoch from #{clause}) as int)"
end
end
def cast_to_unsigned(clause)
clause
end
def cast_to_int(clause)
"#{clause}::INT8"
end
def convert_nulls(clause, default = '')
default = case default
when String
"'#{default}'"
when NilClass
'NULL'
when Fixnum
"#{default}::bigint"
else
default
end
"COALESCE(#{clause}, #{default})"
end
def boolean(value)
value ? 'TRUE' : 'FALSE'
end
def crc(clause, blank_to_null = false)
clause = "NULLIF(#{clause},'')" if blank_to_null
"crc32(#{clause})"
end
def utf8_query_pre
nil
end
def time_difference(diff)
"current_timestamp - interval '#{diff} seconds'"
end
def utc_query_pre
"SET TIME ZONE 'UTC'"
end
private
def execute(command, output_error = false)
if RUBY_PLATFORM == 'java'
connection.transaction do
execute_command command, output_error
end
else
execute_command command, output_error
end
end
def execute_command(command, output_error = false)
connection.execute "begin"
connection.execute "savepoint ts"
begin
connection.execute command
rescue StandardError => err
puts err if output_error
connection.execute "rollback to savepoint ts"
end
connection.execute "release savepoint ts"
connection.execute "commit"
end
def create_array_accum_function
if server_version >= 80311
return
elsif server_version > 80200
execute <<-SQL
CREATE AGGREGATE array_accum (anyelement)
(
sfunc = array_append,
stype = anyarray,
initcond = '{}'
);
SQL
else
execute <<-SQL
CREATE AGGREGATE array_accum
(
basetype = anyelement,
sfunc = array_append,
stype = anyarray,
initcond = '{}'
);
SQL
end
end
def create_crc32_function
execute "CREATE LANGUAGE 'plpgsql';"
function = <<-SQL
CREATE OR REPLACE FUNCTION crc32(word text)
RETURNS bigint AS $$
DECLARE tmp bigint;
DECLARE i int;
DECLARE j int;
DECLARE byte_length int;
DECLARE word_array bytea;
BEGIN
IF COALESCE(word, '') = '' THEN
return 0;
END IF;
i = 0;
tmp = 4294967295;
byte_length = bit_length(word) / 8;
word_array = decode(replace(word, E'\\\\', E'\\\\\\\\'), 'escape');
LOOP
tmp = (tmp # get_byte(word_array, i))::bigint;
i = i + 1;
j = 0;
LOOP
tmp = ((tmp >> 1) # (3988292384 * (tmp & 1)))::bigint;
j = j + 1;
IF j >= 8 THEN
EXIT;
END IF;
END LOOP;
IF i >= byte_length THEN
EXIT;
END IF;
END LOOP;
return (tmp # 4294967295);
END
$$ IMMUTABLE LANGUAGE plpgsql;
SQL
execute function, true
end
def server_version
if RUBY_PLATFORM == 'java'
(connection.raw_connection.connection.server_major_version * 10000) +
(connection.raw_connection.connection.server_minor_version * 100)
elsif connection.raw_connection.respond_to?(:server_version)
connection.raw_connection.server_version
else
0
end
end
end
end