-
Notifications
You must be signed in to change notification settings - Fork 155
/
cassandra.rb
165 lines (140 loc) · 3.75 KB
/
cassandra.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class Cassandra
def self.DEFAULT_TRANSPORT_WRAPPER
Thrift::FramedTransport
end
def login!(username, password)
@auth_request = CassandraThrift::AuthenticationRequest.new
@auth_request.credentials = {'username' => username, 'password' => password}
client.login(@auth_request)
end
def keyspace=(ks)
client.set_keyspace(ks) if check_keyspace(ks)
@schema = nil; @keyspace = ks
end
def keyspaces
client.describe_keyspaces.to_a.collect {|ksdef| ksdef.name }
end
def schema(load=true)
if !load && !@schema
Cassandra::Keyspace.new
else
@schema ||= client.describe_keyspace(@keyspace)
end
end
def schema_agreement?
client.describe_schema_versions().length == 1
end
def version
client.describe_version()
end
def cluster_name
@cluster_name ||= client.describe_cluster_name()
end
def ring
client.describe_ring(@keyspace)
end
def partitioner
client.describe_partitioner()
end
## Delete
# Remove all rows in the column family you request.
def truncate!(column_family)
#each_key(column_family) do |key|
# remove(column_family, key, options)
#end
client.truncate(column_family)
end
# Remove all rows in the keyspace.
def clear_keyspace!
schema.cf_defs.each { |cfdef| truncate!(cfdef.name) }
end
### Read
def add_column_family(cf_def)
begin
res = client.system_add_column_family(cf_def)
rescue CassandraThrift::TimedOutException => te
puts "Timed out: #{te.inspect}"
end
@schema = nil
res
end
def drop_column_family(cf_name)
begin
res = client.system_drop_column_family(cf_name)
rescue CassandraThrift::TimedOutException => te
puts "Timed out: #{te.inspect}"
end
@schema = nil
res
end
def rename_column_family(old_name, new_name)
begin
res = client.system_rename_column_family(old_name, new_name)
rescue CassandraThrift::TimedOutException => te
puts "Timed out: #{te.inspect}"
end
@schema = nil
res
end
def add_keyspace(ks_def)
begin
res = client.system_add_keyspace(ks_def)
rescue CassandraThrift::TimedOutException => toe
puts "Timed out: #{toe.inspect}"
rescue Thrift::TransportException => te
puts "Timed out: #{te.inspect}"
end
@keyspaces = nil
res
end
def drop_keyspace(ks_name)
begin
res = client.system_drop_keyspace(ks_name)
rescue CassandraThrift::TimedOutException => toe
puts "Timed out: #{toe.inspect}"
rescue Thrift::TransportException => te
puts "Timed out: #{te.inspect}"
end
keyspace = "system" if ks_name.eql?(@keyspace)
@keyspaces = nil
res
end
def rename_keyspace(old_name, new_name)
begin
res = client.system_rename_keyspace(old_name, new_name)
rescue CassandraThrift::TimedOutException => toe
puts "Timed out: #{toe.inspect}"
rescue Thrift::TransportException => te
puts "Timed out: #{te.inspect}"
end
keyspace = new_name if old_name.eql?(@keyspace)
@keyspaces = nil
res
end
protected
def client
if @client.nil? || @client.current_server.nil?
reconnect!
@client.set_keyspace(@keyspace) if check_keyspace
end
@client
end
def reconnect!
@servers = all_nodes
@client = new_client
end
def check_keyspace(ks = @keyspace)
!(unless (_keyspaces = keyspaces()).include?(ks)
raise AccessError, "Keyspace #{ks.inspect} not found. Available: #{_keyspaces.inspect}"
end)
end
def all_nodes
if @auto_discover_nodes && !@keyspace.eql?("system")
ips = (new_client.describe_ring(@keyspace).map {|range| range.endpoints}).flatten.uniq
port = @servers.first.split(':').last
ips.map{|ip| "#{ip}:#{port}" }
else
@servers
end
end
end