Skip to content

Commit

Permalink
Yea man we need UDF power for GEO IP
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardcapriolo committed May 22, 2012
1 parent 8a20ce1 commit db9baaa
Show file tree
Hide file tree
Showing 7 changed files with 537 additions and 1 deletion.
11 changes: 10 additions & 1 deletion README.md
@@ -1,4 +1,13 @@
hive-geoip
==========

GeoIP Functions for hive
GeoIP Functions for hive

add file GeoIP.dat;
add jar geo-ip-java.jar;
add jar hive-udf-geo-ip-jtg.jar;
create temporary function geoip as 'com.jointhegrid.hive.udf.GenericUDFGeoIP';
select geoip(first, 'COUNTRY_NAME', './GeoIP.dat' ) from a;

You need a geoip database extracted(separately licenced) found here:
http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
117 changes: 117 additions & 0 deletions pom.xml
@@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.m6d</groupId>
<artifactId>hive-geoip</artifactId>
<name>hive-geoip</name>
<version>1.0.0-SNAPSHOT</version>
<description>GeoIP in hive</description>
<packaging>jar</packaging>

<properties></properties>
<dependencies>

<dependency>
<groupId>org.kohsuke</groupId>
<artifactId>geoip</artifactId>
<version>1.2.5</version>
</dependency>

<dependency>
<groupId>com.jointhegrid</groupId>
<artifactId>hive_test</artifactId>
<version>4.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
<version>0.20.2</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<pluginManagement>
<plugins>

<plugin>
<configuration>
<serverId>apache-main</serverId>
<url>http://www.apache.org/dist/hadoop/common/hadoop-0.20.2</url>
<fromFile>hadoop-0.20.2.tar.gz</fromFile>
<toDir>${project.build.directory}/hadoop</toDir>
</configuration>

<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0-beta-3</version>
<executions>
<execution>
<id>download-hadoop</id>
<phase>pre-integration-test</phase>
<goals>
<goal>download-single</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<configuration>
<executable>tar</executable>
<arguments>
<argument>-xf</argument>
<argument>${project.build.directory}/hadoop/hadoop-0.20.2.tar.gz</argument>
<argument>-C</argument>
<argument>${project.build.directory}</argument>
</arguments>
</configuration>
</plugin>



<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
<configuration>
<projectNameTemplate>[artifactId]</projectNameTemplate>
<wtpmanifest>true</wtpmanifest>
<wtpapplicationxml>true</wtpapplicationxml>
<wtpversion>1.5</wtpversion>
<additionalBuildcommands>
<buildcommand>org.eclipse.jdt.core.javabuilder</buildcommand>
<buildcommand>org.maven.ide.eclipse.maven2Builder</buildcommand>
</additionalBuildcommands>
<additionalProjectnatures>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
<projectnature>org.maven.ide.eclipse.maven2Nature</projectnature>
</additionalProjectnatures>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
197 changes: 197 additions & 0 deletions src/main/java/com/jointhegrid/udf/geoip/GenericUDFGeoIP.java
@@ -0,0 +1,197 @@
package com.jointhegrid.udf.geoip;

import com.maxmind.geoip.LookupService;
import com.maxmind.geoip.Location;
import com.maxmind.geoip.Country;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.Set;
import java.util.Arrays;

/**
* GenericUDFGeoIP is a Hive User Defined Function that allows you to lookup
* database information on a given ip.
* argument 0 should be an IP string
* argument 1 should be one of the following values:
* COUNTRY_NAME, COUNTRY_CODE ,AREA_CODE
* CITY,DMA_CODE,LATITUDE,LONGITUDE,METRO_CODE,POSTAL_CODE, REGION, ORG, ID
* argument 2 should be the filename for you geo-ip database
*
* <pre>
* Usage:
* add file GeoIP.dat;
* add jar geo-ip-java.jar;
* add jar hive-udf-geo-ip-jtg.jar;
* create temporary function geoip as 'com.jointhegrid.hive.udf.GenericUDFGeoIP';
* select geoip(first, 'COUNTRY_NAME', './GeoIP.dat' ) from a;
* </pre>
* @author ecapriolo
*/

@Description(
name = "geoip",
value = "_FUNC_(ip,property,database) - loads database into GEO-IP lookup "+
"service, then looks up 'property' of ip. "
)

public class GenericUDFGeoIP extends GenericUDF {

private String ipString = null;
private Long ipLong = null;
private String property;
private String database;
private LookupService ls;

private static final String COUNTRY_NAME = "COUNTRY_NAME";
private static final String COUNTRY_CODE = "COUNTRY_CODE";
private static final String AREA_CODE = "AREA_CODE";
private static final String CITY = "CITY";
private static final String DMA_CODE = "DMA_CODE";
private static final String LATITUDE = "LATITUDE";
private static final String LONGITUDE = "LONGITUDE";
private static final String METRO_CODE = "METRO_CODE";
private static final String POSTAL_CODE = "POSTAL_CODE";
private static final String REGION = "REGION";
private static final String ORG = "ORG";
private static final String ID = "ID";

private static final Set<String> COUNTRY_PROPERTIES =
new CopyOnWriteArraySet<String>(Arrays.asList(
new String[] {COUNTRY_NAME, COUNTRY_CODE}));

private static final Set<String> LOCATION_PROPERTIES =
new CopyOnWriteArraySet<String>(Arrays.asList(
new String[] {AREA_CODE, CITY, DMA_CODE, LATITUDE, LONGITUDE, METRO_CODE, POSTAL_CODE, REGION}));

PrimitiveObjectInspector [] argumentOIs;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {

argumentOIs = new PrimitiveObjectInspector [arguments.length];

if ( arguments.length != 3) {
throw new UDFArgumentLengthException(
"The function GenericUDFGeoIP( 'input', 'resultfield', 'datafile' ) "
+ " accepts 3 arguments.");
}

if (!(arguments[0] instanceof StringObjectInspector) && !(arguments[0] instanceof LongObjectInspector)) {
throw new UDFArgumentTypeException(0,
"The first 3 parameters of GenericUDFGeoIP('input', 'resultfield', 'datafile')"
+ " should be string.");
}
argumentOIs[0] = (PrimitiveObjectInspector) arguments[0];

for (int i = 1; i < arguments.length; i++) {
if (!(arguments[i] instanceof StringObjectInspector )) {
throw new UDFArgumentTypeException(i,
"The first 3 parameters of GenericUDFGeoIP('input', 'resultfield', 'datafile')"
+ " should be string.");
}
argumentOIs[i] = (StringObjectInspector) arguments[i];
}
return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveCategory.STRING);
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (argumentOIs[0] instanceof LongObjectInspector) {
this.ipLong = ((LongObjectInspector)argumentOIs[0]).get(arguments[0].get());
} else {
this.ipString = ((StringObjectInspector)argumentOIs[0]).getPrimitiveJavaObject(arguments[0].get());
}
this.property = ((StringObjectInspector)argumentOIs[1]).getPrimitiveJavaObject(arguments[1].get());

if (this.property != null) {
this.property = this.property.toUpperCase();
}

if (ls ==null){
if (argumentOIs.length == 3){
this.database = ((StringObjectInspector)argumentOIs[1]).getPrimitiveJavaObject(arguments[2].get());
File f = new File(database);
if (!f.exists()){
throw new HiveException(database+" does not exist");
}
try {
ls = new LookupService ( f , LookupService.GEOIP_MEMORY_CACHE );
} catch (IOException ex){
throw new HiveException (ex);
}
}
/** // how to do this???
if (argumentOIs.length == 2) {
URL u = getClass().getClassLoader().getResource("GeoIP.dat");
try {
System.out.println("f exists ?"+ new File(u.getFile()).exists() );
ls = new LookupService ( u.getFile() );
} catch (IOException ex){ throw new HiveException (ex); }
}
* */
} // ls null ?

if (COUNTRY_PROPERTIES.contains(this.property)) {
Country country = ipString != null ? ls.getCountry(ipString) : ls.getCountry(ipLong);
if (country == null) {
return null;
} else if (this.property.equals(COUNTRY_NAME)) {
return country.getName();
} else if (this.property.equals(COUNTRY_CODE)) {
return country.getCode();
}
assert(false);
} else if (LOCATION_PROPERTIES.contains(this.property)) {
Location loc = ipString != null ? ls.getLocation(ipString) : ls.getLocation(ipLong);
if (loc == null) {
return null;
}
//country
if (this.property.equals(AREA_CODE)) {
return loc.area_code + "";
} else if (this.property.equals(CITY)) {
return loc.city == null ? null : loc.city + "";
} else if (this.property.equals(DMA_CODE)) {
return loc.dma_code + "";
} else if (this.property.equals(LATITUDE)) {
return loc.latitude + "";
} else if (this.property.equals(LONGITUDE)) {
return loc.longitude + "";
} else if (this.property.equals(METRO_CODE)) {
return loc.metro_code + "";
} else if (this.property.equals(POSTAL_CODE)) {
return loc.postalCode == null ? null : loc.postalCode + "";
} else if (this.property.equals(REGION)) {
return loc.region == null ? null : loc.region + "";
}
assert(false);
} else if (this.property.equals(ORG)) {
return ipString != null ? ls.getOrg(ipString) : ls.getOrg(ipLong);
} else if (this.property.equals(ID)) {
return ipString != null ? ls.getID(ipString) : ls.getID(ipLong);
}

return null;
}

@Override
public String getDisplayString(String[] children) {
assert(children.length == 3);
return "GenericUDFGeoIP ( "+children[0]+", "+children[1]+", "+children[2]+" )";
}
}
54 changes: 54 additions & 0 deletions src/test/java/com/jointhegrid/udf/geoip/GenericUDFGeoIPTest.java
@@ -0,0 +1,54 @@
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.jointhegrid.udf.geoip;

import java.io.OutputStreamWriter;
import java.io.BufferedWriter;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import java.util.List;
import java.util.Arrays;
import java.io.IOException;
import com.jointhegrid.hive_test.HiveTestService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;

public class GenericUDFGeoIPTest extends HiveTestService {

public GenericUDFGeoIPTest() throws IOException {
super();
}

public void testCollect() throws Exception {
Path p = new Path(this.ROOT_DIR, "rankfile");

FSDataOutputStream o = this.getFileSystem().create(p);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(o));
bw.write("209.191.139.200\n");
bw.write("twelve\n");
bw.close();

String jarFile;
jarFile = GenericUDFGeoIP.class.getProtectionDomain().getCodeSource().getLocation().getFile();
client.execute("add jar " + jarFile);
jarFile = com.maxmind.geoip.LookupService.class.getProtectionDomain().getCodeSource().getLocation().getFile();
client.execute("add jar " + jarFile);
//download this or put in reasources
client.execute(" add file /tmp/GeoIP.dat");

client.execute("create temporary function geoip as 'com.jointhegrid.udf.geoip.GenericUDFGeoIP'");
client.execute("create table ips ( ip string) row format delimited fields terminated by '09' lines terminated by '10'");
client.execute("load data local inpath '" + p.toString() + "' into table ips");

client.execute("select geoip(ip, 'COUNTRY_NAME', './GeoIP.dat') FROM ips");
List<String> expected = Arrays.asList("United States","N/A");
assertEquals(expected, client.fetchAll());


client.execute("drop table ips");
}
}

0 comments on commit db9baaa

Please sign in to comment.