Permalink
Browse files

Socket connection prototype

  • Loading branch information...
gegao committed Nov 18, 2013
1 parent 690cd1d commit da85e0f6cab9e3bbc19adb880360cdff107c4dda
Showing with 131 additions and 76 deletions.
  1. +131 −76 src/frontend/edu/brown/hstore/HStoreJVMSnapshotManager.java
@@ -1,7 +1,7 @@
package edu.brown.hstore;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.io.*;
import java.net.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
@@ -64,6 +64,11 @@
private TransactionResponse response;
private BlockingQueue<LocalTransaction> queue;
private ServerSocket serverSocket;
private DataOutputStream out;
private DataInputStream in;
// ----------------------------------------------------------------------------
// INITIALIZATION
@@ -90,6 +95,14 @@ public HStoreJVMSnapshotManager(HStoreSite hstore_site) {
hstore_site.getSiteId(), hstore_site.getLocalPartitionIds()));
// Incoming RPC Handler
serverSocket = null;
Integer local_port = null;
try {
local_port = this.catalog_site.getJVMSnapshot_port();
serverSocket = new ServerSocket(local_port);
} catch (IOException e) {
LOG.info("Could not listen on port: "+local_port);
}
}
@@ -114,27 +127,6 @@ public boolean isParent() {
return isParent;
}
// ----------------------------------------------------------------------------
// LISTENER THREAD
// ----------------------------------------------------------------------------
private class ListenerThread implements Runnable {
@Override
public void run() {
try {
if (debug.val)
LOG.debug("Parent start listening");
eventLoop.run();
} catch (Throwable ex) {
if (debug.val)
LOG.debug("ListenerThread error", ex);
}
if (debug.val)
LOG.debug("Never reach here");
}
}
/**
* Fork a new snapshot. This is a blocking call that will initialize the
* snapshot and set up the connection!
@@ -158,6 +150,7 @@ private boolean forkNewSnapShot() {
LOG.debug("Fork Child process " + pid);
snapshot_pid = pid;
// Connect to the child snapshot.
/*
InetSocketAddress destinationAddress = new InetSocketAddress(
this.catalog_site.getHost().getIpaddr(),
this.catalog_site.getJVMSnapshot_port());
@@ -200,6 +193,7 @@ private boolean forkNewSnapShot() {
if (debug.val)
LOG.debug("Site #" + this.getLocalSiteId()
+ " is connected to the new JVM snapshot");
*/
return true;
} else {
// child process
@@ -218,6 +212,40 @@ private boolean forkNewSnapShot() {
} catch (InterruptedException e) {
e.printStackTrace();
}
Socket kkSocket = null;
try {
int local_port = this.catalog_site.getJVMSnapshot_port();
kkSocket = new Socket("localhost", local_port);
out = new DataOutputStream(kkSocket.getOutputStream());
in = new DataInputStream(kkSocket.getInputStream());
while (true) {
int len = in.readInt();
if (len == 0) break;
byte[] barr = new byte[len];
in.readFully(barr);
FastDeserializer des = new FastDeserializer(barr);
LocalTransaction ts = new LocalTransaction(hstore_site);
try {
ts.readExternal(des);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
hstore_site.transactionQueue(ts);
}
in.close();
out.close();
kkSocket.close();
} catch (UnknownHostException e) {
System.exit(1);
} catch (IOException e) {
System.exit(1);
}
/*
// Initialize listener
this.eventLoop = new NIOEventLoop();
@@ -244,6 +272,7 @@ private boolean forkNewSnapShot() {
if (debug.val)
LOG.debug("Never reach here");
System.exit(-1);
*/
return false;
}
@@ -258,50 +287,6 @@ public void addTransactionRequest(LocalTransaction ts) {
}
}
// ----------------------------------------------------------------------------
// HSTORE RPC SERVICE METHODS
// ----------------------------------------------------------------------------
public void execTransactionRequest(LocalTransaction ts) {
if (debug.val)
LOG.debug("Send execTransactionRequest to the snapshot;");
if (!isParent)
return;
if (snapshot_pid == 0 || refresh == true) {
stopSnapshot();
if (!forkNewSnapShot()) {
stopSnapshot();
hstore_site.responseError(
ts.getClientHandle(),
Status.ABORT_CONNECTION_LOST,
"Forking Snapshot fails",
ts.getClientCallback(),
ts.getInitiateTime());
return;
};
refresh = false;
}
ByteString bs = ByteString.EMPTY;
try {
bs = ByteString.copyFrom(FastSerializer.serialize(ts));
} catch (IOException e) {
e.printStackTrace();
}
TransactionRequest tr = TransactionRequest.newBuilder().setRequest(bs)
.build();
if (debug.val)
LOG.debug("Send execTransactionRequest to the snapshot;");
JVMSnapshotTransactionCallback callback = new JVMSnapshotTransactionCallback(
hstore_site, ts);
ProtoRpcController rpc = new ProtoRpcController();
channel.execTransactionRequest(rpc, tr, callback);
rpc.block();
if (debug.val)
LOG.debug("Send finish;");
}
// ----------------------------------------------------------------------------
// HSTORE SNAPSHOT RPC SERVICE METHODS
// ----------------------------------------------------------------------------
@@ -363,7 +348,15 @@ public void sendResponseToParent(ClientResponseImpl response) {
} catch (IOException e) {
e.printStackTrace();
}
this.response = TransactionResponse.newBuilder().setOutput(bs).build();
//this.response = TransactionResponse.newBuilder().setOutput(bs).build();
try {
out.writeInt(bs.size());
out.write(bs.toByteArray());
out.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (debug.val)
LOG.debug("Generate response "
+ this.response.toString());
@@ -425,17 +418,79 @@ public void run() {
e.printStackTrace();
continue;
}
this.execTransactionRequest(ts);
/*
synchronized (this) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (snapshot_pid == 0 || refresh == true) {
stopSnapshot();
if (!forkNewSnapShot()) {
stopSnapshot();
hstore_site.responseError(
ts.getClientHandle(),
Status.ABORT_CONNECTION_LOST,
"Forking Snapshot fails",
ts.getClientCallback(),
ts.getInitiateTime());
return;
};
refresh = false;
}
ByteString bs = ByteString.EMPTY;
try {
bs = ByteString.copyFrom(FastSerializer.serialize(ts));
} catch (IOException e) {
e.printStackTrace();
}
/*
TransactionRequest tr = TransactionRequest.newBuilder().setRequest(bs)
.build();
if (debug.val)
LOG.debug("Send execTransactionRequest to the snapshot;");
JVMSnapshotTransactionCallback callback = new JVMSnapshotTransactionCallback(
hstore_site, ts);
ProtoRpcController rpc = new ProtoRpcController();
channel.execTransactionRequest(rpc, tr, callback);
rpc.block();
if (debug.val)
LOG.debug("Send finish;");
*/
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
} catch (IOException e) {
System.err.println("Accept failed.");
System.exit(1);
}
DataOutputStream out;
try {
out = new DataOutputStream(clientSocket.getOutputStream());
DataInputStream in = new DataInputStream(
clientSocket.getInputStream());
out.writeInt(bs.size());
out.write(bs.toByteArray());
out.flush();
int len = in.readInt();
byte[] barr = new byte[len];
in.readFully(barr);
FastDeserializer des = new FastDeserializer(barr);
ClientResponseImpl response = new ClientResponseImpl();
try {
response.readExternal(des);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (debug.val) LOG.debug("Msg: "+response.toString());
this.hstore_site.responseSend(ts, response);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

0 comments on commit da85e0f

Please sign in to comment.