Skip to content
This repository has been archived by the owner on Jan 26, 2024. It is now read-only.

Commit

Permalink
Fixing transaction boundaries to avoid locking db when the target hos…
Browse files Browse the repository at this point in the history
…t takes a long time to respond. Code tidy
  • Loading branch information
codebox committed Nov 18, 2011
1 parent a8918bf commit e658a25
Showing 1 changed file with 119 additions and 86 deletions.
205 changes: 119 additions & 86 deletions bmsync/src/bmsync.c
Expand Up @@ -59,11 +59,59 @@ Contains the entry-point for the bmsync command-line utility.

static struct SyncPrefs prefs = {0, 0, NULL, 0, DEFAULT_PORT, NULL, NULL};
static SOCKET doConnect(char* host, char* alias, int port);
static int parseData(SOCKET fd, char* alias, int*);
static int parseData(SOCKET fd, char* alias, time_t ts, int* rowCount);
static struct Data* parseRow(char* row);
static int sendRequest(SOCKET fd, long ts, char* host, int port);
static time_t getMaxTsForHost(char* alias);

static int syncWithHost(char* host, char* alias, int port){
resetStatusMsg();

time_t maxTsForHost = getMaxTsForHost(alias);

// Attempt to connect to the host/port specified
SOCKET sockFd = doConnect(host, alias, port);
int status = (sockFd == 0) ? FAIL : SUCCESS;

if (status == SUCCESS) {
statusMsg(MSG_CONNECTED);

// We're in - send a request for the data
status = sendRequest(sockFd, maxTsForHost, host, port);
if (status == SUCCESS){
// Parse the response
int rowCount;
status = parseData(sockFd, alias, maxTsForHost, &rowCount);

if (status == SUCCESS){
// It all worked ok
statusMsg("%d new row%s", rowCount, (rowCount == 1 ? "" : "s"));

} else {
// Something was wrong with the response
logMsg(LOG_ERR, "unable to parse sync response row");
}

} else {
// Unable to send the request to the remote host
logMsg(LOG_ERR, "unable to send sync request to %s:%d", host, port);
}

#ifdef _WIN32
closesocket(sockFd);
#else
close(sockFd);
#endif

} else {
// Unable to connect
logMsg(LOG_ERR, "failed to connect to %s:%d", host, port);
}
printf("\n");

return status;
}

int main(int argc, char **argv){
printf(COPYRIGHT);
fflush(stdout);
Expand Down Expand Up @@ -93,77 +141,37 @@ int main(int argc, char **argv){
openDb();
dbVersionCheck();
setupDb();

int i;
SOCKET sockFd;
char* host;
char* alias;
int port = prefs.port;

#ifdef _WIN32
WSADATA wsaData;
int rc = WSAStartup(MAKEWORD(2,2), &wsaData);
if (rc != 0) {
logMsg(LOG_ERR, "WSAStartup returned error %d", rc);
exit(1);
status = FAIL;
}
#endif

for (i=0; i<prefs.hostCount; i++) {
resetStatusMsg();
host = prefs.hosts[i];
alias = (prefs.alias == NULL) ? host : prefs.alias;

// We want one transaction per host, insertion of all the data for a single host must be atomic
beginTrans(TRUE);

time_t ts = getMaxTsForHost(alias);

// Attempt to connect to the host/port specified
sockFd = doConnect(host, alias, port);
if (sockFd != FAIL){
statusMsg(MSG_CONNECTED);

// We're in - send a request for the data
int sendResult = sendRequest(sockFd, ts, host, port);
if (sendResult == SUCCESS){
// Parse the response
int rowCount;
int parseResult = parseData(sockFd, alias, &rowCount);

if (parseResult == SUCCESS){
// It all worked ok, commit the transaction
commitTrans();
statusMsg("%d new row%s", rowCount, (rowCount == 1 ? "" : "s"));

} else {
// Something was wrong with the response, end the transaction
logMsg(LOG_ERR, "unable to parse sync response row");
rollbackTrans();
}

} else {
// Unable to send the request to the remote host, end the transaction
logMsg(LOG_ERR, "unable to send sync request to %s:%d", host, port);
rollbackTrans();
}
close(sockFd);

} else {
// Unable to connect, end the transaction
logMsg(LOG_ERR, "failed to connect to %s:%d", host, port);
rollbackTrans();
}
printf("\n");
}

if (status != FAIL){
for (i=0; i<prefs.hostCount; i++) {
char* host = prefs.hosts[i];
char* alias = (prefs.alias == NULL) ? host : prefs.alias;

int statusForThisHost = syncWithHost(host, alias, port);
if (statusForThisHost != SUCCESS){
status = FAIL; // Carry on even if one host fails
}
}
}
closeDb();
}

#ifdef _WIN32
WSACleanup();
#endif

return 0;
return (status == FAIL) ? 1 : 0;
}

static time_t getMaxTsForHost(char* alias){
Expand Down Expand Up @@ -253,67 +261,92 @@ static int readLine(SOCKET fd, char* line){
return FALSE;
}

static int parseData(SOCKET fd, char* alias, int* rowCount){
// Handle the response that is returned from the host
int result = SUCCESS;
char line[MAX_LINE_LEN + 1];

// First read all the HTTP headers
while(readLine(fd, line)) {
static int checkHttpHeaders(SOCKET fd){
char line[MAX_LINE_LEN + 1];

while(readLine(fd, line)) {
if (startsWith(line, "HTTP")) {
// Check the server returned an HTTP 200 response code
char responseCode[4];
sscanf(line, "%*s %s %*s", responseCode);
if (strcmp("200", responseCode) != 0){
statusMsg("Bad HTTP response code: %s", responseCode);
result = FAIL;
break;
return FAIL;
}

} else if (startsWith(line, HEADER_CONTENT_TYPE)) {
// Check that the content-type is what we expect
if (strstr(line, SYNC_CONTENT_TYPE) == NULL){
statusMsg("Bad content type: %s", line);
result = FAIL;
break;
return FAIL;
}

} else if (startsWith(line, HTTP_EOL)) {
// Reached the end of the headers
break;
}
}

return SUCCESS;
}

static int parseData(SOCKET fd, char* alias, time_t prevMaxTsForHost, int* rowCount){
// Handle the response that is returned from the host
char line[MAX_LINE_LEN + 1];

// First read all the HTTP headers
int status = checkHttpHeaders(fd);

if (result == FAIL){
if (status == FAIL){
// There was a problem with the headers, so stop now
return FAIL;
}

// Read the data one row at a time
struct Data* data;
int resultCount = 0, rc;

while(readLine(fd, line)) {
data = parseRow(line);
if (data == NULL){
statusMsg("Malformed data returned from host");
result = FAIL;
break;
}

setHost(data, alias);
rc = insertData(data);
if (rc == FAIL){
statusMsg("Unable to insert data into local db");
result = FAIL;
break;
}
freeData(data);
resultCount++;
// We want one transaction per host, insertion of all the data for a single host must be atomic
beginTrans(TRUE);

/* There is a small chance that something added rows for the host we are currently syncing with
since we last checked the database (which was before the sync request was sent). Check the db again
within the same transaction that we will use to insert the new values, and bail out if things have
changed. */
time_t currMaxTsForHost = getMaxTsForHost(alias);
if (currMaxTsForHost != prevMaxTsForHost){
statusMsg("Data for host %s has changed in local database during sync - aborting", alias);
status = FAIL;
} else {
// Read the data one row at a time
while(readLine(fd, line)) {
data = parseRow(line);
if (data == NULL){
statusMsg("Malformed data returned from host");
status = FAIL;
break;
}

setHost(data, alias);
rc = insertData(data);
if (rc == FAIL){
statusMsg("Unable to insert data into local db");
status = FAIL;
break;
}
freeData(data);
resultCount++;
}
}

if (status == SUCCESS){
commitTrans();
} else {
rollbackTrans();
}

*rowCount = resultCount;

return result;
return status;
}

static SOCKET doConnect(char* host, char* alias, int port){
Expand Down

0 comments on commit e658a25

Please sign in to comment.