diff --git a/rxf-server/src/main/java/rxf/server/GeoIpService.java b/rxf-server/src/main/java/rxf/server/GeoIpService.java index cb0ffccd..a24f1969 100644 --- a/rxf-server/src/main/java/rxf/server/GeoIpService.java +++ b/rxf-server/src/main/java/rxf/server/GeoIpService.java @@ -8,15 +8,19 @@ import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.net.Inet4Address; import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.IntBuffer; import java.nio.MappedByteBuffer; -import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; @@ -35,6 +39,7 @@ import java.util.concurrent.SynchronousQueue; import one.xio.AsioVisitor; +import one.xio.HttpHeaders; import one.xio.HttpMethod; import org.apache.commons.compress.archivers.ArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; @@ -48,15 +53,14 @@ import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; import static one.xio.HttpMethod.UTF8; -import static one.xio.HttpMethod.enqueue; import static one.xio.HttpMethod.wheresWaldo; import static rxf.server.BlobAntiPatternObject.EXECUTOR_SERVICE; import static rxf.server.BlobAntiPatternObject.GSON; import static rxf.server.BlobAntiPatternObject.ISO88591; import static rxf.server.BlobAntiPatternObject.createCouchConnection; import static rxf.server.BlobAntiPatternObject.fetchJsonByPath; -import static rxf.server.BlobAntiPatternObject.getReceiveBufferSize; import static rxf.server.BlobAntiPatternObject.inferRevision; +import static rxf.server.BlobAntiPatternObject.moveCaretToDoubleEol; import static rxf.server.BlobAntiPatternObject.recycleChannel; import static rxf.server.BlobAntiPatternObject.sortableInetAddress; @@ -115,6 +119,8 @@ public Map call() throws Exception { map.put("created", new Date()); HttpMethod.enqueue(couchConnection, OP_WRITE, new SendJsonVisitor(GSON.toJson(map).trim(), retVal, GEOIP_ROOTNODE)); String json = retVal.take(); + + m = GSON.fromJson(json, Map.class); } @@ -146,8 +152,7 @@ public void onWrite(SelectionKey key) throws Exception { String fn = GEOIP_CURRENT_LOCATIONS_CSV; int limit = d2.limit(); - final String revision = inferRevision(map); - String push = getBlobPutString(fn, limit, ctype, revision); + String push = getBlobPutString(fn, limit, ctype, inferRevision(map)); System.err.println("pushing: " + push); putFile(key, d2, push, retVal); } @@ -419,7 +424,6 @@ private static CharBuffer lookupMappedAddress(InetAddress inetAddress, Navigable * @throws javax.xml.xpath.XPathExpressionException * */ -/* static void startGeoIpService(final String dbinstance) throws IOException, XPathExpressionException, InterruptedException { final SynchronousQueue retVal = new SynchronousQueue(); SocketChannel connection = BlobAntiPatternObject.createCouchConnection(); @@ -474,15 +478,13 @@ public Object call() throws Exception { Map map = GSON.fromJson(take, Map.class); //happens if we need to create the 'current' geoip database. - if (map.containsKey("error") || System.getenv().containsKey(DEBUG_CREATEGEOIPINDEX)) { + if (map.containsKey("responseCode") || null != System.getenv(DEBUG_CREATEGEOIPINDEX)) { createGeoIpIndex(); } // happens every time we start { // ArrayList> cc = new ArrayList>(); - */ -/*cc.add*//* - + /*cc.add*/ indexMMBuf = EXECUTOR_SERVICE.submit( getMappedIndexFile(GEOIP_CURRENT_INDEX)).get(); @@ -551,7 +553,6 @@ public void onRead(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer dst1 = ByteBuffer.allocateDirect(BlobAntiPatternObject.getReceiveBufferSize()); int read1 = channel.read(dst1); - if(read1==1){throw new Error("geo files died while transferring "+path);} final long l2 = System.currentTimeMillis(); // System.err.println("response for "+path+": "+UTF8.decode((ByteBuffer) dst1.flip())) @@ -652,7 +653,7 @@ public void onConnect(SelectionKey key) throws IOException { } }); } -*/ + static String scrapeMaxMindUrl() throws IOException, XPathExpressionException { Tidy tidy = new Tidy(); tidy.setQuiet(true); @@ -785,75 +786,4 @@ public static Triple buildGeoIpFirstPass(Byte return new Triple(locations, indexBuf, locBuf); } - static final String s1 = "/geoip/current"; - - public static void startGeoIpService(String geoip) throws ClosedChannelException { - enqueue(createCouchConnection(), OP_CONNECT | OP_WRITE, new AsioVisitor.Impl() { - @Override - public void onWrite(SelectionKey key) throws Exception { - final SocketChannel channel = (SocketChannel) key.channel(); - final String s = "HEAD " + s1 + - " HTTP/1.1\r\n\r\n"; - channel.write(UTF8.encode(s)); - key.attach(new MyImpl(channel)); - } - }); - } - - private static class MyImpl extends AsioVisitor.Impl { - private final SocketChannel channel; - - public MyImpl(SocketChannel channel) { - this.channel = channel; - } - - @Override - public void onRead(final SelectionKey key) throws Exception { - - String headers[] = {"Etag"}; - String cookies[] = {BlobAntiPatternObject.MYGEOIPSTRING}; - final ByteBuffer cursor = ByteBuffer.allocateDirect(getReceiveBufferSize()); - final int read = channel.read(cursor); - Rfc822HeaderPrefix rfc822HeaderPrefix = new Rfc822HeaderPrefix(headers, cookies).apply((ByteBuffer) cursor.flip()); - Map headerStrings = rfc822HeaderPrefix.getHeaderStrings(); - Map cookieStrings = rfc822HeaderPrefix.getCookieStrings(); - ByteBuffer headerBuf = rfc822HeaderPrefix.getHeaderBuf(); - final String rescode = rfc822HeaderPrefix.getRescode(); - final ByteBuffer dst = rfc822HeaderPrefix.getCursor(); - - headerStrings.toString(); - cookieStrings.toString(); - headerBuf.asCharBuffer(); - Callable callable = new Callable() { - public Object call() throws Exception { - if (!rescode.startsWith("200")) { - createGeoIpIndex(); - } - - -// public static final String GEOIP_CURRENT_LOCATIONS_CSV = "/geoip/current/locations.csv"; -// public static final String GEOIP_CURRENT_INDEX = "/geoip/current/index"; - - final String s2 = "GET " + GEOIP_CURRENT_INDEX + " HTTP/1.1\r\nAccept: */*\r\n\r\n"; - key.interestOps(OP_WRITE).attach(new Impl() { - @Override - public void onWrite(SelectionKey key) throws Exception { - final int read = channel.write(UTF8.encode(s2)); - dst.flip(); - - - } - }); - - - return null; - } - }; - EXECUTOR_SERVICE.submit(callable); - - - } - - } - }