Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Changed IO to use socket objects recv and send functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Trnstrm committed Jan 26, 2012
1 parent 05a0c1d commit 85f855f
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 1,298 deletions.
66 changes: 22 additions & 44 deletions Connection.cpp
Expand Up @@ -79,7 +79,6 @@ Connection::Connection (UMConnectionCAPI *_capi)

m_timeout = -1;
m_state = NONE;
m_sockfd = -1;
m_sockInst = NULL;
m_errno = -1;
memcpy (&m_capi, _capi, sizeof (UMConnectionCAPI));
Expand Down Expand Up @@ -141,22 +140,20 @@ bool Connection::readSocket()
if (bytesToRecv == 0)
{
// Socket buffer got full!

setError("Socket receive buffer full", 0, UME_OTHER);
return false;
}
else
if (bytesToRecv > 65536)
{
bytesToRecv = 65536;
}


int recvResult = recv (m_sockfd, m_reader.getWritePtr(), bytesToRecv, MSG_NOSIGNAL);
int recvResult = m_capi.recvSocket(m_sockInst, m_reader.getWritePtr(), bytesToRecv);

if (recvResult == -1)
{
if (SocketWouldBlock(m_sockfd))
{
return true;
}

int sockError = SocketGetLastError();
setError("Socket receive failed", sockError, UME_OTHER);
return false;
}
else
Expand All @@ -178,17 +175,16 @@ bool Connection::writeSocket()
assert (bytesToSend > 0);
assert (bytesToSend < m_writer.getEnd() - m_writer.getStart());

int sendResult = send (m_sockfd, m_writer.getReadCursor(), bytesToSend, MSG_NOSIGNAL);
int sendResult = m_capi.sendSocket(m_sockInst, m_writer.getReadCursor(), bytesToSend);

if (sendResult == -1)
{
if (SocketWouldBlock(m_sockfd))
{
return true;
}

PRINTMARK();
setError("Socket send failed", SocketGetLastError(), UME_OTHER);
return false;
}
else
if (sendResult == 0)
{
setError("Connection reset by peer when receiving", 0, UME_OTHER);
return false;
}

Expand All @@ -212,17 +208,16 @@ bool Connection::close(void)

if (!sendPacket())
{
m_capi.clearException();
m_capi.clearException();
}
}

if (m_sockInst)
{
m_capi.closeSocket(m_sockInst);
m_capi.clearException();
m_capi.deleteSocket(m_sockInst);
m_capi.clearException();
m_sockInst = NULL;
m_capi.closeSocket(m_sockInst);
m_capi.clearException();
m_capi.deleteSocket(m_sockInst);
m_sockInst = NULL;
return true;
}
}
Expand All @@ -232,7 +227,6 @@ bool Connection::close(void)

bool Connection::connectSocket()
{
PRINTMARK();
if (!m_capi.connectSocket(m_sockInst, m_host.c_str(), m_port))
{
return false;
Expand Down Expand Up @@ -365,13 +359,6 @@ bool Connection::recvPacket()
{
break;
}

time_t tsStart = time (0);

if (!m_capi.wouldBlock(m_sockInst, m_sockfd, UMC_READ, m_timeout == -1 ? 10 : m_timeout))
{
return false;
}
}

return true;
Expand All @@ -396,12 +383,7 @@ bool Connection::sendPacket()
{
break;
}

if (!m_capi.wouldBlock(m_sockInst, m_sockfd, UMC_WRITE, m_timeout == -1 ? 10 : m_timeout))
{
return false;
}
}
}

return true;
}
Expand Down Expand Up @@ -477,8 +459,8 @@ bool Connection::connect(const char *_host, int _port, const char *_username, co
m_charset = _charset;

PRINTMARK();
m_sockInst = m_capi.createSocket(AF_INET, SOCK_STREAM, 0);
m_sockInst = m_capi.getSocket();
if (m_sockInst == NULL)
{
m_dbgMethodProgress --;
Expand All @@ -494,10 +476,6 @@ bool Connection::connect(const char *_host, int _port, const char *_username, co
}
}

PRINTMARK();
m_sockfd = m_capi.getSocketFD(m_sockInst);


if (!connectSocket())
{
m_dbgMethodProgress --;
Expand Down
1 change: 0 additions & 1 deletion Connection.h
Expand Up @@ -94,7 +94,6 @@ class Connection
std::string m_database;
bool m_autoCommit;
MYSQL_CHARSETS m_charset;
SOCKET m_sockfd;
void *m_sockInst;
PacketReader m_reader;
PacketWriter m_writer;
Expand Down
93 changes: 38 additions & 55 deletions io_cpython.c
Expand Up @@ -85,7 +85,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//#define PRINTMARK() fprintf(stderr, "%s: MARK(%d)\n", __FILE__, __LINE__)
#define PRINTMARK()

void *API_createSocket(int family, int type, int proto)
void *API_getSocket()
{
/* Create a normal socket */
PyObject *sockobj;
Expand Down Expand Up @@ -169,35 +169,6 @@ int API_setTimeout(void *sock, int timeoutSec)

}


int API_getSocketFD(void *sock)
{
int ret;
PyObject *fdobj;
PRINTMARK();

fdobj = PyObject_CallMethod ((PyObject *) sock, "fileno", NULL);
PRINTMARK();

if (fdobj == NULL)
{
PRINTMARK();
return -1;
}

if (!PyInt_Check(fdobj))
{
Py_XDECREF(fdobj);
PRINTMARK();
return -1;
}

ret = PyInt_AS_LONG(fdobj);

Py_DECREF(fdobj);
return ret;
}

void API_closeSocket(void *sock)
{
PyObject *res = PyObject_CallMethod( (PyObject *) sock, "close", NULL);
Expand All @@ -220,7 +191,6 @@ int API_connectSocket(void *sock, const char *host, int port)
{
PyObject *res;
PyObject *addrTuple;
PyObject *argList;
PyObject *connectStr;

PRINTMARK();
Expand All @@ -247,37 +217,50 @@ int API_connectSocket(void *sock, const char *host, int port)
return 1;
}

int API_wouldBlock(void *sock, int fd, int ops, int timeout)
int API_recvSocket(void *sock, char *buffer, int cbBuffer)
{
struct timeval tv;
int result;
fd_set readSet;
fd_set writeSet;

tv.tv_sec = timeout;
tv.tv_usec = 0;
PyObject *res;
PyObject *bufSize;
PyObject *funcStr;
int ret;

FD_ZERO(&writeSet);
FD_ZERO(&readSet);
funcStr = PyString_FromString("recv");
bufSize = PyInt_FromLong(cbBuffer);
res = PyObject_CallMethodObjArgs ((PyObject *) sock, funcStr, bufSize, NULL);
Py_DECREF(funcStr);
Py_DECREF(bufSize);

switch (ops)
if (res == NULL)
{
case AMC_READ:
FD_SET (fd, &readSet);
break;

case AMC_WRITE:
FD_SET (fd, &writeSet);
break;
return -1;
}
Py_BEGIN_ALLOW_THREADS
result = select (fd + 1, &readSet, &writeSet, NULL, &tv);
Py_END_ALLOW_THREADS

if (result < 1)
ret = (int) PyString_GET_SIZE(res);
memcpy (buffer, PyString_AS_STRING(res), ret);
Py_DECREF(res);
return ret;
}

int API_sendSocket(void *sock, const char *buffer, int cbBuffer)
{
PyObject *res;
PyObject *pybuffer;
PyObject *funcStr;
int ret;

funcStr = PyString_FromString("send");
pybuffer = PyString_FromStringAndSize(buffer, cbBuffer);
res = PyObject_CallMethodObjArgs ((PyObject *) sock, funcStr, pybuffer, NULL);
Py_DECREF(funcStr);
Py_DECREF(pybuffer);

if (res == NULL)
{
return 0;
return -1;
}

return 1;
ret = (int) PyInt_AsLong(res);
Py_DECREF(res);
return ret;
}

0 comments on commit 85f855f

Please sign in to comment.