Skip to content

Commit

Permalink
repaired auto-init geo
Browse files Browse the repository at this point in the history
  • Loading branch information
jnorthrup committed May 20, 2012
1 parent e50fa4b commit 3c082ba
Showing 1 changed file with 159 additions and 166 deletions.
325 changes: 159 additions & 166 deletions rxf-server/src/main/java/rxf/server/GeoIpService.java
Expand Up @@ -228,10 +228,16 @@ public Map call() throws Exception {
}

public static String getBlobPutString(String fn, int limit, String ctype, String revision) {
return new StringBuilder().append("PUT ").append(fn).append("?rev=").append(revision)
final StringBuilder append = new StringBuilder().append("PUT ").append(fn);
if (revision != null && !revision.equals("null")) {

append.append("?rev=").append(revision);
}
append
.append(" HTTP/1.1\r\nContent-Length: ").append(limit)
.append("\r\nContent-Type: ").append(ctype)
.append("\r\nExpect: 100-continue\r\nAccept: */*\r\n\r\n").toString();
.append("\r\nExpect: 100-continue\r\nAccept: */*\r\n\r\n");
return append.toString();
}

static Pair<ByteBuffer, ByteBuffer> buildGeoIpSecondPass(Triple<Integer[], ByteBuffer, ByteBuffer> triple) throws UnknownHostException {
Expand Down Expand Up @@ -432,203 +438,190 @@ static void startGeoIpService(final String dbinstance) throws IOException, XPath
public void onRead(final SelectionKey key) throws IOException, InterruptedException {
final AsioVisitor parent = this;
final SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer dst = ByteBuffer.allocateDirect(BlobAntiPatternObject.getReceiveBufferSize());
int read = channel.read(dst);
dst.flip();
System.err.println("response: " + UTF8.decode((ByteBuffer) dst.duplicate().rewind()));
while (!Character.isWhitespace(dst.get())) ;
ByteBuffer d2 = dst.duplicate();
while (!Character.isWhitespace(dst.get())) ;
d2.limit(dst.position());
String s1 = UTF8.decode(d2).toString().trim();
int resultCode = Integer.parseInt(s1);

switch (resultCode) {
case 200:
case 201: {

final String keyDocument = GEOIP_ROOTNODE;

key.selector().wakeup();
key.interestOps(OP_WRITE).attach(new Impl() {


@Override
public void onWrite(final SelectionKey key) {

try {
String format = (MessageFormat.format("GET /{0} HTTP/1.1\r\n\r\n", keyDocument));
System.err.println("attempting connect: " + format.trim());
channel.write(UTF8.encode(format));
} catch (IOException e) {
e.printStackTrace(); //todo: verify for a purpose
}
key.attach(BlobAntiPatternObject.createJsonResponseReader(retVal));
key.selector().wakeup();
key.interestOps(OP_READ);
ByteBuffer cursor = ByteBuffer.allocateDirect(BlobAntiPatternObject.getReceiveBufferSize());
int read = channel.read(cursor);
final Rfc822HeaderPrefix apply = new Rfc822HeaderPrefix(new String[]{"Etag"}).apply((ByteBuffer) cursor.flip());
if (apply.getRescode().startsWith("20")) {

final String keyDocument = GEOIP_ROOTNODE;

key.selector().wakeup();
key.interestOps(OP_WRITE).attach(new Impl() {


@Override
public void onWrite(final SelectionKey key) {

try {
String format = (MessageFormat.format("GET /{0} HTTP/1.1\r\n\r\n", keyDocument));
System.err.println("attempting connect: " + format.trim());
channel.write(UTF8.encode(format));
} catch (IOException e) {
e.printStackTrace(); //todo: verify for a purpose
}
});
key.attach(BlobAntiPatternObject.createJsonResponseReader(retVal));
key.selector().wakeup();
key.interestOps(OP_READ);
}
});

Callable<Object> callable = new Callable<Object>() {
public Object call() throws Exception {
Callable<Object> callable = new Callable<Object>() {
public Object call() throws Exception {

String take = retVal.take();
key.attach(this);
System.err.println("rootnode: " + take);
Map map = GSON.fromJson(take, Map.class);
String take = retVal.take();
key.attach(this);
System.err.println("rootnode: " + take);
Map map = GSON.fromJson(take, Map.class);

//happens if we need to create the 'current' geoip database.
if (map.containsKey("responseCode") || null != System.getenv(DEBUG_CREATEGEOIPINDEX)) {
createGeoIpIndex();
}
//happens if we need to create the 'current' geoip database.
if (map.containsKey("error") || null != System.getenv(DEBUG_CREATEGEOIPINDEX)) {
createGeoIpIndex();
}
// happens every time we start
{
{
// ArrayList<Callable<MappedByteBuffer>> cc = new ArrayList<Callable<MappedByteBuffer>>();
/*cc.add*/
indexMMBuf =
EXECUTOR_SERVICE.submit(
getMappedIndexFile(GEOIP_CURRENT_INDEX)).get();
locationMMBuf = EXECUTOR_SERVICE.submit(
getMappedIndexFile(GEOIP_CURRENT_LOCATIONS_CSV)).get();
/*cc.add*/
indexMMBuf =
EXECUTOR_SERVICE.submit(
getMappedIndexFile(GEOIP_CURRENT_INDEX)).get();
locationMMBuf = EXECUTOR_SERVICE.submit(
getMappedIndexFile(GEOIP_CURRENT_LOCATIONS_CSV)).get();
// List<Future<MappedByteBuffer>> futures = EXECUTOR_SERVICE.invokeAll(cc);

ByteBuffer ix = (ByteBuffer) indexMMBuf.duplicate().clear();
ByteBuffer loc = (ByteBuffer) locationMMBuf.duplicate().clear();
ByteBuffer ix = (ByteBuffer) indexMMBuf.duplicate().clear();
ByteBuffer loc = (ByteBuffer) locationMMBuf.duplicate().clear();

indexMMBuf.clear();
IntBuffer intBuffer = indexMMBuf.asIntBuffer();
while (intBuffer.hasRemaining())
geoipMap.put(intBuffer.get() & IPMASK, intBuffer.get());
indexMMBuf.clear();
IntBuffer intBuffer = indexMMBuf.asIntBuffer();
while (intBuffer.hasRemaining())
geoipMap.put(intBuffer.get() & IPMASK, intBuffer.get());

//this should report 'Martinez'
testWalnutCreek(ix, loc, null, null);
//this should report 'Martinez'
testWalnutCreek(ix, loc, null, null);


if (null != System.getenv(GEOIP_BENCHMARK_ON_STARTUP)) {
for (int i = 0; i < 1000; i++) {
long l = System.currentTimeMillis();
if (null != System.getenv(GEOIP_BENCHMARK_ON_STARTUP)) {
for (int i = 0; i < 1000; i++) {
long l = System.currentTimeMillis();

Runtime.getRuntime().gc();
long l1 = Runtime.getRuntime().freeMemory();
runGeoIpLookupBenchMark((ByteBuffer) loc.clear(), null, null, (ByteBuffer) ix.clear());
long l2 = Runtime.getRuntime().freeMemory();
System.err.println(MessageFormat.format("{0}: {1} (ms) -----------------------------", i, System.currentTimeMillis() - l));
System.err.println(MessageFormat.format("freemem delta current:{0} before{1} delta(Mb):{2}", l2, l1, (l2 - l1) / (1024 * 1024)));
}
Runtime.getRuntime().gc();
long l1 = Runtime.getRuntime().freeMemory();
runGeoIpLookupBenchMark((ByteBuffer) loc.clear(), null, null, (ByteBuffer) ix.clear());
long l2 = Runtime.getRuntime().freeMemory();
System.err.println(MessageFormat.format("{0}: {1} (ms) -----------------------------", i, System.currentTimeMillis() - l));
System.err.println(MessageFormat.format("freemem delta current:{0} before{1} delta(Mb):{2}", l2, l1, (l2 - l1) / (1024 * 1024)));
}


}
return new Pair<ByteBuffer, ByteBuffer>(ix, loc);
}
return new Pair<ByteBuffer, ByteBuffer>(ix, loc);
}
}

Callable<MappedByteBuffer> getMappedIndexFile(final String path) throws IOException {
SocketChannel couchConnection = createCouchConnection();
final SynchronousQueue<MappedByteBuffer> retVal = new SynchronousQueue<MappedByteBuffer>();
Callable<MappedByteBuffer> getMappedIndexFile(final String path) throws IOException {
SocketChannel couchConnection = createCouchConnection();
final SynchronousQueue<MappedByteBuffer> retVal = new SynchronousQueue<MappedByteBuffer>();

Callable<MappedByteBuffer> callable = new Callable<MappedByteBuffer>() {
@Override
public MappedByteBuffer call() throws Exception {
return retVal.take(); //todo: verify for a purpose
}
};
Callable<MappedByteBuffer> callable = new Callable<MappedByteBuffer>() {
@Override
public MappedByteBuffer call() throws Exception {
return retVal.take(); //todo: verify for a purpose
}
};


HttpMethod.enqueue(couchConnection, OP_CONNECT | OP_WRITE, new Impl() {
HttpMethod.enqueue(couchConnection, OP_CONNECT | OP_WRITE, new Impl() {

@Override
public void onWrite(SelectionKey selectionKey) throws Exception {
mapTmpFile(selectionKey, path);
@Override
public void onWrite(SelectionKey selectionKey) throws Exception {
mapTmpFile(selectionKey, path);

}
}

void mapTmpFile(SelectionKey key, final String path) throws IOException {
String req = "GET " + path + " HTTP/1.1\r\n\r\n";
int write = ((SocketChannel) key.channel()).write(UTF8.encode(req));
key.selector().wakeup();
key.interestOps(OP_READ);
key.attach(new Impl() {
@Override
public void onRead(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer dst1 = ByteBuffer.allocateDirect(BlobAntiPatternObject.getReceiveBufferSize());
int read1 = channel.read(dst1);
final long l2 = System.currentTimeMillis();
void mapTmpFile(SelectionKey key, final String path) throws IOException {
String req = "GET " + path + " HTTP/1.1\r\n\r\n";
int write = ((SocketChannel) key.channel()).write(UTF8.encode(req));
key.selector().wakeup();
key.interestOps(OP_READ);
key.attach(new Impl() {
@Override
public void onRead(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer dst1 = ByteBuffer.allocateDirect(BlobAntiPatternObject.getReceiveBufferSize());
int read1 = channel.read(dst1);
final long l2 = System.currentTimeMillis();
// System.err.println("response for "+path+": "+UTF8.decode((ByteBuffer) dst1.flip()))


ByteBuffer headers = (ByteBuffer) moveCaretToDoubleEol((ByteBuffer) dst1.flip()).duplicate().flip();
while (!Character.isWhitespace(headers.get())) ;
ByteBuffer h2 = (ByteBuffer) headers.duplicate().position(headers.position());
while (!Character.isWhitespace(headers.get())) ;
h2.limit(headers.position() - 1);
int rc = Integer.parseInt(UTF8.decode(h2).toString().trim());
if (200 == rc) {
Map<String, int[]> hm = HttpHeaders.getHeaders((ByteBuffer) headers.rewind());
int[] ints = hm.get("Content-Length");
String cl = UTF8.decode((ByteBuffer) h2.clear().position(ints[0]).limit(ints[1])).toString().trim();
final long total = Long.parseLong(cl);

final File geoip = File.createTempFile("geoip", path.substring(path.length() - 5));
try {
geoip.createNewFile();
} catch (IOException e) {
e.printStackTrace(); //todo: verify for a purpose
}
final RandomAccessFile randomAccessFile = new RandomAccessFile(geoip, "rw");
final FileChannel fileChannel = randomAccessFile.getChannel();
final float pos1 = fileChannel.write(dst1);

key.attach(new Impl() {
long pos = (long) pos1;

private final SynchronousQueue<MappedByteBuffer> returnTo = retVal;

@Override
public void onRead(SelectionKey key) throws IOException, InterruptedException {
long l = fileChannel.transferFrom((ReadableByteChannel) key.channel(), pos, 16 * 1024 * 1024);
pos += l;
if (pos >= total) {
MappedByteBuffer map1 = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, total);
returnTo.put(map1);
key.attach(null);
long l1 = System.currentTimeMillis() - l2;
System.err.println(MessageFormat.format("file write ended: {0} {1}/{2} in {3} (ms) @ {4}M/s", geoip, total, randomAccessFile.length(), l1, (total / 1024. * 1024.) / l1 / 1000.));
geoip.deleteOnExit();
}
}
});
ByteBuffer headers = (ByteBuffer) moveCaretToDoubleEol((ByteBuffer) dst1.flip()).duplicate().flip();
while (!Character.isWhitespace(headers.get())) ;
ByteBuffer h2 = (ByteBuffer) headers.duplicate().position(headers.position());
while (!Character.isWhitespace(headers.get())) ;
h2.limit(headers.position() - 1);
int rc = Integer.parseInt(UTF8.decode(h2).toString().trim());
if (200 == rc) {
Map<String, int[]> hm = HttpHeaders.getHeaders((ByteBuffer) headers.rewind());
int[] ints = hm.get("Content-Length");
String cl = UTF8.decode((ByteBuffer) h2.clear().position(ints[0]).limit(ints[1])).toString().trim();
final long total = Long.parseLong(cl);

final File geoip = File.createTempFile("geoip", path.substring(path.length() - 5));
try {
geoip.createNewFile();
} catch (IOException e) {
e.printStackTrace(); //todo: verify for a purpose
}

final RandomAccessFile randomAccessFile = new RandomAccessFile(geoip, "rw");
final FileChannel fileChannel = randomAccessFile.getChannel();
final float pos1 = fileChannel.write(dst1);

key.attach(new Impl() {
long pos = (long) pos1;

private final SynchronousQueue<MappedByteBuffer> returnTo = retVal;

@Override
public void onRead(SelectionKey key) throws IOException, InterruptedException {
long l = fileChannel.transferFrom((ReadableByteChannel) key.channel(), pos, 16 * 1024 * 1024);
pos += l;
if (pos >= total) {
MappedByteBuffer map1 = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, total);
returnTo.put(map1);
key.attach(null);
long l1 = System.currentTimeMillis() - l2;
System.err.println(MessageFormat.format("file write ended: {0} {1}/{2} in {3} (ms) @ {4}M/s", geoip, total, randomAccessFile.length(), l1, (total / 1024. * 1024.) / l1 / 1000.));
geoip.deleteOnExit();
}
}
});
}
});
}
});
return callable;
}
};

BlobAntiPatternObject.EXECUTOR_SERVICE.submit(callable);
}
break;
default:
key.selector().wakeup();
key.interestOps(OP_WRITE);
key.attach(new Impl() {
@Override
public void onWrite(SelectionKey key) throws IOException {

String format = MessageFormat.format("PUT /{0} HTTP/1.1\r\nContent-Length: 0\r\nContent-type: application/json\r\n\r\n", dbinstance);
int write = ((SocketChannel) key.channel()).write(UTF8.encode(format));
key.selector().wakeup();
key.interestOps(OP_READ);
key.attach(parent);
}
});
}
});
}
});
return callable;
}
};

BlobAntiPatternObject.EXECUTOR_SERVICE.submit(callable);
} else {
key.selector().wakeup();
key.interestOps(OP_WRITE);
key.attach(new Impl() {
@Override
public void onWrite(SelectionKey key) throws IOException {

break;
String format = MessageFormat.format("PUT /{0} HTTP/1.1\r\nContent-Length: 0\r\nContent-type: application/json\r\n\r\n", dbinstance);
int write = ((SocketChannel) key.channel()).write(UTF8.encode(format));
key.selector().wakeup();
key.interestOps(OP_READ);
key.attach(parent);
}
});
}


}

@Override
Expand Down

0 comments on commit 3c082ba

Please sign in to comment.