This repository has been archived by the owner on Oct 13, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
/
connections.py
174 lines (147 loc) · 5.49 KB
/
connections.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import sys
import time
import datetime
import socket
import umysql
import pymysql.connections
from pymysql.constants import FIELD_TYPE
from .util import setdocstring
from .cursors import Cursor
from .err import (
map_umysql_error_to_umysqldb_exception,
map_runtime_error_to_umysqldb_exception,
Error,
OperationalError,
)
from .times import (
encode_struct_time,
encode_timedelta,
encode_time,
TimeDelta_or_None,
)
encoders = {
time.struct_time: encode_struct_time,
datetime.timedelta: encode_timedelta,
datetime.time: encode_time,
}
decoders = {
FIELD_TYPE.TIME: TimeDelta_or_None,
FIELD_TYPE.NEWDECIMAL: float,
}
def notouch(x):
return x
class ResultSet(object):
def __init__(self, affected_rows=None, insert_id=None, description=None,
rows=None):
self.affected_rows = affected_rows
self.insert_id = insert_id
self.description = description
self.rows = rows
class Connection(pymysql.connections.Connection):
"""MySQL Database Connection Object"""
@setdocstring(pymysql.connections.Connection.__init__)
def __init__(self, *args, **kwargs):
if 'cursorclass' not in kwargs:
kwargs['cursorclass'] = Cursor
if 'conv' not in kwargs:
kwargs['conv'] = decoders
if 'charset' not in kwargs:
kwargs['charset'] = 'utf8'
self._umysql_conn = umysql.Connection()
super(Connection, self).__init__(*args, **kwargs)
@setdocstring(pymysql.connections.Connection.set_charset)
def set_charset(self, charset):
if charset:
self._umysql_conn.query("SET NAMES %s", (charset,))
self.charset = charset
@setdocstring(pymysql.connections.Connection.autocommit)
def autocommit(self, value):
self._umysql_conn.query("SET AUTOCOMMIT = %s", (value,))
@setdocstring(pymysql.connections.Connection.commit)
def commit(self):
self.query('COMMIT')
@setdocstring(pymysql.connections.Connection.rollback)
def rollback(self):
self.query("ROLLBACK")
@setdocstring(pymysql.connections.Connection.close)
def close(self):
if not self._umysql_conn.is_connected():
raise Error("Already closed")
self._umysql_conn.close()
def _connect(self):
try:
self._umysql_conn.connect(self.host, self.port, self.user,
self.password, self.db or '', False, self.charset)
if self.sql_mode is not None:
c = self.cursor()
c.execute("SET sql_mode=%s", (self.sql_mode,))
if self.init_command is not None:
c = self.cursor()
c.execute(self.init_command)
self.commit()
if self.autocommit_mode is not None:
self.autocommit(self.autocommit_mode)
except socket.error, e:
raise OperationalError(2003, "Can't connect to MySQL server on %r (%s)" % (
self.host, e.args[0]))
except umysql.Error, exc:
traceback = sys.exc_info()[2]
exc = map_umysql_error_to_umysqldb_exception(exc)
raise exc, None, traceback
# internal use only (called from cursor)
def query(self, sql, args=()):
args = self._convert_args(args)
try:
result_set = self._umysql_conn.query(sql, args)
except umysql.Error, exc:
traceback = sys.exc_info()[2]
exc = map_umysql_error_to_umysqldb_exception(exc)
raise exc, None, traceback
except RuntimeError, exc:
traceback = sys.exc_info()[2]
exc = map_runtime_error_to_umysqldb_exception(exc)
raise exc, None, traceback
else:
self._result = self._convert_result_set(result_set)
return self._result.affected_rows
def _convert_args(self, args):
args = tuple(encoders.get(type(arg), notouch)(arg)
for arg in args)
return args
def _convert_result_set(self, result_set):
if isinstance(result_set, tuple):
rs = ResultSet(affected_rows=result_set[0],
insert_id=result_set[1])
else:
converters = [self.decoders.get(field[1]) for field in
result_set.fields]
rows = tuple(tuple(conv(data) if conv and data is not None else data
for data, conv in zip(row, converters))
for row in result_set.rows)
description = tuple(f + (None,) * (7 - len(f))
for f in result_set.fields)
rs = ResultSet(description=description, rows=rows,
affected_rows=len(rows))
return rs
# _mysql support
def get_proto_info(self):
raise NotImplementedError("umysql has no proto info")
def get_server_info(self):
raise NotImplementedError("umysql has no server info")
def thread_id(self):
raise NotImplementedError("umysql has no thread info")
def ping(self, reconnect=False):
flag = False
if self._umysql_conn.is_connected():
try:
self._umysql_conn.query("select 1;")
except:
self._umysql_conn.close()
flag = True
else:
flag = True
if flag:
if reconnect:
self._connect()
else:
raise OperationalError(2006, 'MySQL server has gone away')