Permalink
Browse files

Yea man we need UDF power for GEO IP

  • Loading branch information...
1 parent 8a20ce1 commit db9baaac9d8b3caf1537d768d6955460a1e5621d @edwardcapriolo committed May 22, 2012
View
11 README.md
@@ -1,4 +1,13 @@
hive-geoip
==========
-GeoIP Functions for hive
+GeoIP Functions for hive
+
+ add file GeoIP.dat;
+ add jar geo-ip-java.jar;
+ add jar hive-udf-geo-ip-jtg.jar;
+ create temporary function geoip as 'com.jointhegrid.hive.udf.GenericUDFGeoIP';
+ select geoip(first, 'COUNTRY_NAME', './GeoIP.dat' ) from a;
+
+You need a geoip database extracted(separately licenced) found here:
+http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz
View
117 pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.m6d</groupId>
+ <artifactId>hive-geoip</artifactId>
+ <name>hive-geoip</name>
+ <version>1.0.0-SNAPSHOT</version>
+ <description>GeoIP in hive</description>
+ <packaging>jar</packaging>
+
+ <properties></properties>
+ <dependencies>
+
+ <dependency>
+ <groupId>org.kohsuke</groupId>
+ <artifactId>geoip</artifactId>
+ <version>1.2.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.jointhegrid</groupId>
+ <artifactId>hive_test</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.7</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+
+ <plugin>
+ <configuration>
+ <serverId>apache-main</serverId>
+ <url>http://www.apache.org/dist/hadoop/common/hadoop-0.20.2</url>
+ <fromFile>hadoop-0.20.2.tar.gz</fromFile>
+ <toDir>${project.build.directory}/hadoop</toDir>
+ </configuration>
+
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>wagon-maven-plugin</artifactId>
+ <version>1.0-beta-3</version>
+ <executions>
+ <execution>
+ <id>download-hadoop</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>download-single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <configuration>
+ <executable>tar</executable>
+ <arguments>
+ <argument>-xf</argument>
+ <argument>${project.build.directory}/hadoop/hadoop-0.20.2.tar.gz</argument>
+ <argument>-C</argument>
+ <argument>${project.build.directory}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+
+
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <projectNameTemplate>[artifactId]</projectNameTemplate>
+ <wtpmanifest>true</wtpmanifest>
+ <wtpapplicationxml>true</wtpapplicationxml>
+ <wtpversion>1.5</wtpversion>
+ <additionalBuildcommands>
+ <buildcommand>org.eclipse.jdt.core.javabuilder</buildcommand>
+ <buildcommand>org.maven.ide.eclipse.maven2Builder</buildcommand>
+ </additionalBuildcommands>
+ <additionalProjectnatures>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ <projectnature>org.maven.ide.eclipse.maven2Nature</projectnature>
+ </additionalProjectnatures>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
View
197 src/main/java/com/jointhegrid/udf/geoip/GenericUDFGeoIP.java
@@ -0,0 +1,197 @@
+package com.jointhegrid.udf.geoip;
+
+import com.maxmind.geoip.LookupService;
+import com.maxmind.geoip.Location;
+import com.maxmind.geoip.Country;
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.Set;
+import java.util.Arrays;
+
+/**
+ * GenericUDFGeoIP is a Hive User Defined Function that allows you to lookup
+ * database information on a given ip.
+ * argument 0 should be an IP string
+ * argument 1 should be one of the following values:
+ * COUNTRY_NAME, COUNTRY_CODE ,AREA_CODE
+ * CITY,DMA_CODE,LATITUDE,LONGITUDE,METRO_CODE,POSTAL_CODE, REGION, ORG, ID
+ * argument 2 should be the filename for you geo-ip database
+ *
+ * <pre>
+ * Usage:
+ * add file GeoIP.dat;
+ * add jar geo-ip-java.jar;
+ * add jar hive-udf-geo-ip-jtg.jar;
+ * create temporary function geoip as 'com.jointhegrid.hive.udf.GenericUDFGeoIP';
+ * select geoip(first, 'COUNTRY_NAME', './GeoIP.dat' ) from a;
+ * </pre>
+ * @author ecapriolo
+ */
+
+@Description(
+ name = "geoip",
+ value = "_FUNC_(ip,property,database) - loads database into GEO-IP lookup "+
+ "service, then looks up 'property' of ip. "
+)
+
+public class GenericUDFGeoIP extends GenericUDF {
+
+ private String ipString = null;
+ private Long ipLong = null;
+ private String property;
+ private String database;
+ private LookupService ls;
+
+ private static final String COUNTRY_NAME = "COUNTRY_NAME";
+ private static final String COUNTRY_CODE = "COUNTRY_CODE";
+ private static final String AREA_CODE = "AREA_CODE";
+ private static final String CITY = "CITY";
+ private static final String DMA_CODE = "DMA_CODE";
+ private static final String LATITUDE = "LATITUDE";
+ private static final String LONGITUDE = "LONGITUDE";
+ private static final String METRO_CODE = "METRO_CODE";
+ private static final String POSTAL_CODE = "POSTAL_CODE";
+ private static final String REGION = "REGION";
+ private static final String ORG = "ORG";
+ private static final String ID = "ID";
+
+ private static final Set<String> COUNTRY_PROPERTIES =
+ new CopyOnWriteArraySet<String>(Arrays.asList(
+ new String[] {COUNTRY_NAME, COUNTRY_CODE}));
+
+ private static final Set<String> LOCATION_PROPERTIES =
+ new CopyOnWriteArraySet<String>(Arrays.asList(
+ new String[] {AREA_CODE, CITY, DMA_CODE, LATITUDE, LONGITUDE, METRO_CODE, POSTAL_CODE, REGION}));
+
+ PrimitiveObjectInspector [] argumentOIs;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments)
+ throws UDFArgumentException {
+
+ argumentOIs = new PrimitiveObjectInspector [arguments.length];
+
+ if ( arguments.length != 3) {
+ throw new UDFArgumentLengthException(
+ "The function GenericUDFGeoIP( 'input', 'resultfield', 'datafile' ) "
+ + " accepts 3 arguments.");
+ }
+
+ if (!(arguments[0] instanceof StringObjectInspector) && !(arguments[0] instanceof LongObjectInspector)) {
+ throw new UDFArgumentTypeException(0,
+ "The first 3 parameters of GenericUDFGeoIP('input', 'resultfield', 'datafile')"
+ + " should be string.");
+ }
+ argumentOIs[0] = (PrimitiveObjectInspector) arguments[0];
+
+ for (int i = 1; i < arguments.length; i++) {
+ if (!(arguments[i] instanceof StringObjectInspector )) {
+ throw new UDFArgumentTypeException(i,
+ "The first 3 parameters of GenericUDFGeoIP('input', 'resultfield', 'datafile')"
+ + " should be string.");
+ }
+ argumentOIs[i] = (StringObjectInspector) arguments[i];
+ }
+ return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+ PrimitiveCategory.STRING);
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ if (argumentOIs[0] instanceof LongObjectInspector) {
+ this.ipLong = ((LongObjectInspector)argumentOIs[0]).get(arguments[0].get());
+ } else {
+ this.ipString = ((StringObjectInspector)argumentOIs[0]).getPrimitiveJavaObject(arguments[0].get());
+ }
+ this.property = ((StringObjectInspector)argumentOIs[1]).getPrimitiveJavaObject(arguments[1].get());
+
+ if (this.property != null) {
+ this.property = this.property.toUpperCase();
+ }
+
+ if (ls ==null){
+ if (argumentOIs.length == 3){
+ this.database = ((StringObjectInspector)argumentOIs[1]).getPrimitiveJavaObject(arguments[2].get());
+ File f = new File(database);
+ if (!f.exists()){
+ throw new HiveException(database+" does not exist");
+ }
+ try {
+ ls = new LookupService ( f , LookupService.GEOIP_MEMORY_CACHE );
+ } catch (IOException ex){
+ throw new HiveException (ex);
+ }
+ }
+ /** // how to do this???
+ if (argumentOIs.length == 2) {
+ URL u = getClass().getClassLoader().getResource("GeoIP.dat");
+ try {
+ System.out.println("f exists ?"+ new File(u.getFile()).exists() );
+ ls = new LookupService ( u.getFile() );
+ } catch (IOException ex){ throw new HiveException (ex); }
+ }
+ * */
+ } // ls null ?
+
+ if (COUNTRY_PROPERTIES.contains(this.property)) {
+ Country country = ipString != null ? ls.getCountry(ipString) : ls.getCountry(ipLong);
+ if (country == null) {
+ return null;
+ } else if (this.property.equals(COUNTRY_NAME)) {
+ return country.getName();
+ } else if (this.property.equals(COUNTRY_CODE)) {
+ return country.getCode();
+ }
+ assert(false);
+ } else if (LOCATION_PROPERTIES.contains(this.property)) {
+ Location loc = ipString != null ? ls.getLocation(ipString) : ls.getLocation(ipLong);
+ if (loc == null) {
+ return null;
+ }
+ //country
+ if (this.property.equals(AREA_CODE)) {
+ return loc.area_code + "";
+ } else if (this.property.equals(CITY)) {
+ return loc.city == null ? null : loc.city + "";
+ } else if (this.property.equals(DMA_CODE)) {
+ return loc.dma_code + "";
+ } else if (this.property.equals(LATITUDE)) {
+ return loc.latitude + "";
+ } else if (this.property.equals(LONGITUDE)) {
+ return loc.longitude + "";
+ } else if (this.property.equals(METRO_CODE)) {
+ return loc.metro_code + "";
+ } else if (this.property.equals(POSTAL_CODE)) {
+ return loc.postalCode == null ? null : loc.postalCode + "";
+ } else if (this.property.equals(REGION)) {
+ return loc.region == null ? null : loc.region + "";
+ }
+ assert(false);
+ } else if (this.property.equals(ORG)) {
+ return ipString != null ? ls.getOrg(ipString) : ls.getOrg(ipLong);
+ } else if (this.property.equals(ID)) {
+ return ipString != null ? ls.getID(ipString) : ls.getID(ipLong);
+ }
+
+ return null;
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ assert(children.length == 3);
+ return "GenericUDFGeoIP ( "+children[0]+", "+children[1]+", "+children[2]+" )";
+ }
+}
View
54 src/test/java/com/jointhegrid/udf/geoip/GenericUDFGeoIPTest.java
@@ -0,0 +1,54 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.jointhegrid.udf.geoip;
+
+import java.io.OutputStreamWriter;
+import java.io.BufferedWriter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import java.util.List;
+import java.util.Arrays;
+import java.io.IOException;
+import com.jointhegrid.hive_test.HiveTestService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class GenericUDFGeoIPTest extends HiveTestService {
+
+ public GenericUDFGeoIPTest() throws IOException {
+ super();
+ }
+
+ public void testCollect() throws Exception {
+ Path p = new Path(this.ROOT_DIR, "rankfile");
+
+ FSDataOutputStream o = this.getFileSystem().create(p);
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(o));
+ bw.write("209.191.139.200\n");
+ bw.write("twelve\n");
+ bw.close();
+
+ String jarFile;
+ jarFile = GenericUDFGeoIP.class.getProtectionDomain().getCodeSource().getLocation().getFile();
+ client.execute("add jar " + jarFile);
+ jarFile = com.maxmind.geoip.LookupService.class.getProtectionDomain().getCodeSource().getLocation().getFile();
+ client.execute("add jar " + jarFile);
+ //download this or put in reasources
+ client.execute(" add file /tmp/GeoIP.dat");
+
+ client.execute("create temporary function geoip as 'com.jointhegrid.udf.geoip.GenericUDFGeoIP'");
+ client.execute("create table ips ( ip string) row format delimited fields terminated by '09' lines terminated by '10'");
+ client.execute("load data local inpath '" + p.toString() + "' into table ips");
+
+ client.execute("select geoip(ip, 'COUNTRY_NAME', './GeoIP.dat') FROM ips");
+ List<String> expected = Arrays.asList("United States","N/A");
+ assertEquals(expected, client.fetchAll());
+
+
+ client.execute("drop table ips");
+ }
+}
View
56 src/test/resources/hive-exec-log4j.properties
@@ -0,0 +1,56 @@
+# Define some default values that can be overridden by system properties
+hive.root.logger=INFO,FA
+#hive.root.logger=DEBUG,console
+hive.log.dir=/tmp/${user.name}
+hive.log.file=${hive.query.id}.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hive.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=DEBUG
+
+#
+# File Appender
+#
+
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.File=${hive.log.dir}/${hive.log.file}
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.FA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#custom logging levels
+#log4j.logger.xxx=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+
+
+#log4j.category.DataNucleus=ERROR,FA
+#log4j.category.Datastore=ERROR,FA
+#log4j.category.Datastore.Schema=ERROR,FA
+#log4j.category.JPOX.Datastore=ERROR,FA
+#log4j.category.JPOX.Plugin=ERROR,FA
+#log4j.category.JPOX.MetaData=ERROR,FA
+#log4j.category.JPOX.Query=ERROR,FA
+#log4j.category.JPOX.General=ERROR,FA
+#log4j.category.JPOX.Enhancer=ERROR,FA
+
View
62 src/test/resources/hive-log4j.properties
@@ -0,0 +1,62 @@
+# Define some default values that can be overridden by system properties
+hive.root.logger=WARN,DRFA
+#hive.root.logger=DEBUG,console
+hive.log.dir=/tmp/${user.name}
+hive.log.file=hive.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hive.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=WARN
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#custom logging levels
+#log4j.logger.xxx=DEBUG
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+
+
+log4j.category.DataNucleus=ERROR,DRFA
+log4j.category.Datastore=ERROR,DRFA
+log4j.category.Datastore.Schema=ERROR,DRFA
+log4j.category.JPOX.Datastore=ERROR,DRFA
+log4j.category.JPOX.Plugin=ERROR,DRFA
+log4j.category.JPOX.MetaData=ERROR,DRFA
+log4j.category.JPOX.Query=ERROR,DRFA
+log4j.category.JPOX.General=ERROR,DRFA
+log4j.category.JPOX.Enhancer=ERROR,DRFA
+
View
41 src/test/resources/hive-site.xml
@@ -0,0 +1,41 @@
+
+<configuration>
+
+<property>
+ <name>hive.mapred.reduce.tasks.speculative.execution</name>
+ <value>false</value>
+ <description>Whether speculative execution for reducers should be turned on. </description>
+</property>
+
+
+
+<property>
+ <name>javax.jdo.option.ConnectionURL</name>
+ <!--<value>jdbc:derby:;databaseName=/tmp/metastore_db;create=true</value>-->
+ <value>jdbc:derby:memory:metastore_db;create=true</value>
+ <description>JDBC connect string for a JDBC metastore</description>
+</property>
+
+<property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>/tmp/warehouse</value>
+ <description>location of default database for the warehouse</description>
+</property>
+
+<!--
+<property>
+ <name>hive.jar.path</name>
+ <value>/home/edward/.m2/repository/org/apache/hive/hive-exec/0.7.1-SNAPSHOT/hive-exec-0.7.1-SNAPSHOT.jar</value>
+ <description>location of default database for the warehouse</description>
+</property>
+-->
+
+<!--
+<property>
+ <name>hive.metastore.local</name>
+ <value>true</value>
+ <description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
+</property>
+-->
+
+</configuration>

0 comments on commit db9baaa

Please sign in to comment.