Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 82 lines (68 sloc) 3.066 kb
2656717 @fiorix added support for unix socket
authored
1 # coding: utf-8
2 # Copyright 2009 Alexandre Fiori
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
62e0a85 @fiorix support for travis-ci
authored
16 import os
17
2656717 @fiorix added support for unix socket
authored
18 import txredisapi as redis
19
20 from twisted.internet import base
21 from twisted.internet import defer
22 from twisted.trial import unittest
23
24 base.DelayedCall.debug = False
25 redis_sock = "/tmp/redis.sock"
26
62e0a85 @fiorix support for travis-ci
authored
27
90b473f @jeethu Skip tests without hacks
jeethu authored
28 class TestUnixConnectionMethods(unittest.TestCase):
2656717 @fiorix added support for unix socket
authored
29 @defer.inlineCallbacks
30 def test_UnixConnection(self):
31 db = yield redis.UnixConnection(redis_sock, reconnect=False)
32 self.assertEqual(isinstance(db, redis.UnixConnectionHandler), True)
33 yield db.disconnect()
34
35 @defer.inlineCallbacks
36 def test_UnixConnectionDB1(self):
37 db = yield redis.UnixConnection(redis_sock, dbid=1, reconnect=False)
38 self.assertEqual(isinstance(db, redis.UnixConnectionHandler), True)
39 yield db.disconnect()
40
41 @defer.inlineCallbacks
42 def test_UnixConnectionPool(self):
43 db = yield redis.UnixConnectionPool(redis_sock, poolsize=2,
44 reconnect=False)
45 self.assertEqual(isinstance(db, redis.UnixConnectionHandler), True)
46 yield db.disconnect()
47
48 @defer.inlineCallbacks
49 def test_lazyUnixConnection(self):
50 db = redis.lazyUnixConnection(redis_sock, reconnect=False)
51 self.assertEqual(isinstance(db._connected, defer.Deferred), True)
52 db = yield db._connected
53 self.assertEqual(isinstance(db, redis.UnixConnectionHandler), True)
54 yield db.disconnect()
55
56 @defer.inlineCallbacks
57 def test_lazyUnixConnectionPool(self):
58 db = redis.lazyUnixConnectionPool(redis_sock, reconnect=False)
59 self.assertEqual(isinstance(db._connected, defer.Deferred), True)
60 db = yield db._connected
61 self.assertEqual(isinstance(db, redis.UnixConnectionHandler), True)
62 yield db.disconnect()
63
64 @defer.inlineCallbacks
65 def test_ShardedUnixConnection(self):
66 paths = [redis_sock]
67 db = yield redis.ShardedUnixConnection(paths, reconnect=False)
68 self.assertEqual(isinstance(db,
69 redis.ShardedUnixConnectionHandler), True)
70 yield db.disconnect()
71
72 @defer.inlineCallbacks
73 def test_ShardedUnixConnectionPool(self):
74 paths = [redis_sock]
75 db = yield redis.ShardedUnixConnectionPool(paths, reconnect=False)
76 self.assertEqual(isinstance(db,
77 redis.ShardedUnixConnectionHandler), True)
78 yield db.disconnect()
90b473f @jeethu Skip tests without hacks
jeethu authored
79
80 if not os.path.exists(redis_sock):
81 TestUnixConnectionMethods.skip = True
Something went wrong with that request. Please try again.