Skip to content

Commit

Permalink
didnt mean to commit this earlier
Browse files Browse the repository at this point in the history
  • Loading branch information
jnorthrup committed May 20, 2012
1 parent bea566e commit e50fa4b
Showing 1 changed file with 13 additions and 83 deletions.
96 changes: 13 additions & 83 deletions rxf-server/src/main/java/rxf/server/GeoIpService.java
Expand Up @@ -8,15 +8,19 @@
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.IntBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
Expand All @@ -35,6 +39,7 @@
import java.util.concurrent.SynchronousQueue;

import one.xio.AsioVisitor;
import one.xio.HttpHeaders;
import one.xio.HttpMethod;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
Expand All @@ -48,15 +53,14 @@
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import static one.xio.HttpMethod.UTF8;
import static one.xio.HttpMethod.enqueue;
import static one.xio.HttpMethod.wheresWaldo;
import static rxf.server.BlobAntiPatternObject.EXECUTOR_SERVICE;
import static rxf.server.BlobAntiPatternObject.GSON;
import static rxf.server.BlobAntiPatternObject.ISO88591;
import static rxf.server.BlobAntiPatternObject.createCouchConnection;
import static rxf.server.BlobAntiPatternObject.fetchJsonByPath;
import static rxf.server.BlobAntiPatternObject.getReceiveBufferSize;
import static rxf.server.BlobAntiPatternObject.inferRevision;
import static rxf.server.BlobAntiPatternObject.moveCaretToDoubleEol;
import static rxf.server.BlobAntiPatternObject.recycleChannel;
import static rxf.server.BlobAntiPatternObject.sortableInetAddress;

Expand Down Expand Up @@ -115,6 +119,8 @@ public Map call() throws Exception {
map.put("created", new Date());
HttpMethod.enqueue(couchConnection, OP_WRITE, new SendJsonVisitor(GSON.toJson(map).trim(), retVal, GEOIP_ROOTNODE));
String json = retVal.take();


m = GSON.fromJson(json, Map.class);

}
Expand Down Expand Up @@ -146,8 +152,7 @@ public void onWrite(SelectionKey key) throws Exception {

String fn = GEOIP_CURRENT_LOCATIONS_CSV;
int limit = d2.limit();
final String revision = inferRevision(map);
String push = getBlobPutString(fn, limit, ctype, revision);
String push = getBlobPutString(fn, limit, ctype, inferRevision(map));
System.err.println("pushing: " + push);
putFile(key, d2, push, retVal);
}
Expand Down Expand Up @@ -419,7 +424,6 @@ private static CharBuffer lookupMappedAddress(InetAddress inetAddress, Navigable
* @throws javax.xml.xpath.XPathExpressionException
*
*/
/*
static void startGeoIpService(final String dbinstance) throws IOException, XPathExpressionException, InterruptedException {
final SynchronousQueue<String> retVal = new SynchronousQueue<String>();
SocketChannel connection = BlobAntiPatternObject.createCouchConnection();
Expand Down Expand Up @@ -474,15 +478,13 @@ public Object call() throws Exception {
Map map = GSON.fromJson(take, Map.class);

//happens if we need to create the 'current' geoip database.
if (map.containsKey("error") || System.getenv().containsKey(DEBUG_CREATEGEOIPINDEX)) {
if (map.containsKey("responseCode") || null != System.getenv(DEBUG_CREATEGEOIPINDEX)) {
createGeoIpIndex();
}
// happens every time we start
{
// ArrayList<Callable<MappedByteBuffer>> cc = new ArrayList<Callable<MappedByteBuffer>>();
*/
/*cc.add*//*
/*cc.add*/
indexMMBuf =
EXECUTOR_SERVICE.submit(
getMappedIndexFile(GEOIP_CURRENT_INDEX)).get();
Expand Down Expand Up @@ -551,7 +553,6 @@ public void onRead(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer dst1 = ByteBuffer.allocateDirect(BlobAntiPatternObject.getReceiveBufferSize());
int read1 = channel.read(dst1);
if(read1==1){throw new Error("geo files died while transferring "+path);}
final long l2 = System.currentTimeMillis();
// System.err.println("response for "+path+": "+UTF8.decode((ByteBuffer) dst1.flip()))

Expand Down Expand Up @@ -652,7 +653,7 @@ public void onConnect(SelectionKey key) throws IOException {
}
});
}
*/

static String scrapeMaxMindUrl() throws IOException, XPathExpressionException {
Tidy tidy = new Tidy();
tidy.setQuiet(true);
Expand Down Expand Up @@ -785,75 +786,4 @@ public static Triple<Integer[], ByteBuffer, ByteBuffer> buildGeoIpFirstPass(Byte
return new Triple<Integer[], ByteBuffer, ByteBuffer>(locations, indexBuf, locBuf);
}

static final String s1 = "/geoip/current";

public static void startGeoIpService(String geoip) throws ClosedChannelException {
enqueue(createCouchConnection(), OP_CONNECT | OP_WRITE, new AsioVisitor.Impl() {
@Override
public void onWrite(SelectionKey key) throws Exception {
final SocketChannel channel = (SocketChannel) key.channel();
final String s = "HEAD " + s1 +
" HTTP/1.1\r\n\r\n";
channel.write(UTF8.encode(s));
key.attach(new MyImpl(channel));
}
});
}

private static class MyImpl extends AsioVisitor.Impl {
private final SocketChannel channel;

public MyImpl(SocketChannel channel) {
this.channel = channel;
}

@Override
public void onRead(final SelectionKey key) throws Exception {

String headers[] = {"Etag"};
String cookies[] = {BlobAntiPatternObject.MYGEOIPSTRING};
final ByteBuffer cursor = ByteBuffer.allocateDirect(getReceiveBufferSize());
final int read = channel.read(cursor);
Rfc822HeaderPrefix rfc822HeaderPrefix = new Rfc822HeaderPrefix(headers, cookies).apply((ByteBuffer) cursor.flip());
Map headerStrings = rfc822HeaderPrefix.getHeaderStrings();
Map cookieStrings = rfc822HeaderPrefix.getCookieStrings();
ByteBuffer headerBuf = rfc822HeaderPrefix.getHeaderBuf();
final String rescode = rfc822HeaderPrefix.getRescode();
final ByteBuffer dst = rfc822HeaderPrefix.getCursor();

headerStrings.toString();
cookieStrings.toString();
headerBuf.asCharBuffer();
Callable<Object> callable = new Callable<Object>() {
public Object call() throws Exception {
if (!rescode.startsWith("200")) {
createGeoIpIndex();
}


// public static final String GEOIP_CURRENT_LOCATIONS_CSV = "/geoip/current/locations.csv";
// public static final String GEOIP_CURRENT_INDEX = "/geoip/current/index";

final String s2 = "GET " + GEOIP_CURRENT_INDEX + " HTTP/1.1\r\nAccept: */*\r\n\r\n";
key.interestOps(OP_WRITE).attach(new Impl() {
@Override
public void onWrite(SelectionKey key) throws Exception {
final int read = channel.write(UTF8.encode(s2));
dst.flip();


}
});


return null;
}
};
EXECUTOR_SERVICE.submit(callable);


}

}

}

0 comments on commit e50fa4b

Please sign in to comment.