forked from mperham/data_fabric
/
connection_proxy.rb
161 lines (136 loc) · 4.29 KB
/
connection_proxy.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
module DataFabric
module ActiveRecordConnectionMethods
def self.included(base)
base.alias_method_chain :reload, :master
end
def reload_with_master(*args, &block)
connection.with_master { reload_without_master }
end
end
class StringProxy
def initialize(&block)
@proc = block
end
def to_s
@proc.call
end
end
class PoolProxy
def initialize(proxy)
@proxy = proxy
end
def connection
@proxy
end
def release_connection
DataFabric.logger.debug { 'data_fabric does not implement release_connection' }
end
def spec
@proxy.spec
end
def with_connection
yield @proxy
end
def connected?
@proxy.connected?
end
end
class ConnectionProxy
cattr_accessor :shard_pools
attr_accessor :spec
def initialize(model_class, options)
@model_class = model_class
@replicated = options[:replicated]
@shard_group = options[:shard_by]
@prefix = options[:prefix]
set_role('slave') if @replicated
@model_class.send :include, ActiveRecordConnectionMethods if @replicated
end
delegate :insert, :update, :delete, :create_table, :rename_table, :drop_table, :add_column, :remove_column,
:change_column, :change_column_default, :rename_column, :add_index, :remove_index, :initialize_schema_information,
:dump_schema_information, :execute, :execute_ignore_duplicate, :to => :master
delegate :insert_many, :to => :master # ar-extensions bulk insert support
def transaction(start_db_transaction = true, &block)
# Transaction is not re-entrant in SQLite 3 so we
# need to track if we've already started an XA to avoid
# calling it twice.
return yield if in_transaction?
with_master do
connection.transaction(start_db_transaction, &block)
end
end
def method_missing(method, *args, &block)
DataFabric.logger.debug { "Calling #{method} on #{connection}" }
connection.send(method, *args, &block)
end
def connection_name
connection_name_builder.join('_')
end
def with_master
# Allow nesting of with_master.
old_role = current_role
set_role('master')
yield
ensure
set_role(old_role)
end
def connected?
current_pool.connected?
end
def connection
current_pool.connection
end
private
def in_transaction?
current_role == 'master'
end
def current_pool
name = connection_name
self.class.shard_pools[name] ||= begin
config = ActiveRecord::Base.configurations[name]
raise ArgumentError, "Unknown database config: #{name}, have #{ActiveRecord::Base.configurations.inspect}" unless config
ActiveRecord::ConnectionAdapters::ConnectionPool.new(spec_for(config))
end
end
def spec_for(config)
# XXX This looks pretty fragile. Will break if AR changes how it initializes connections and adapters.
config = config.symbolize_keys
adapter_method = "#{config[:adapter]}_connection"
initialize_adapter(config[:adapter])
@spec = ActiveRecord::Base::ConnectionSpecification.new(config, adapter_method)
end
def initialize_adapter(adapter)
begin
require 'rubygems'
gem "activerecord-#{adapter}-adapter"
require "active_record/connection_adapters/#{adapter}_adapter"
rescue LoadError
begin
require "active_record/connection_adapters/#{adapter}_adapter"
rescue LoadError
raise "Please install the #{adapter} adapter: `gem install activerecord-#{adapter}-adapter` (#{$!})"
end
end
end
def connection_name_builder
@connection_name_builder ||= begin
clauses = []
clauses << @prefix if @prefix
clauses << @shard_group if @shard_group
clauses << StringProxy.new { DataFabric.active_shard(@shard_group) } if @shard_group
clauses << RAILS_ENV
clauses << StringProxy.new { current_role } if @replicated
clauses
end
end
def set_role(role)
Thread.current[:data_fabric_role] = role
end
def current_role
Thread.current[:data_fabric_role] || 'slave'
end
def master
with_master { return connection }
end
end
end