From db9baaac9d8b3caf1537d768d6955460a1e5621d Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Tue, 22 May 2012 15:09:07 -0400 Subject: [PATCH] Yea man we need UDF power for GEO IP --- README.md | 11 +- pom.xml | 117 +++++++++++ .../udf/geoip/GenericUDFGeoIP.java | 197 ++++++++++++++++++ .../udf/geoip/GenericUDFGeoIPTest.java | 54 +++++ src/test/resources/hive-exec-log4j.properties | 56 +++++ src/test/resources/hive-log4j.properties | 62 ++++++ src/test/resources/hive-site.xml | 41 ++++ 7 files changed, 537 insertions(+), 1 deletion(-) create mode 100644 pom.xml create mode 100644 src/main/java/com/jointhegrid/udf/geoip/GenericUDFGeoIP.java create mode 100644 src/test/java/com/jointhegrid/udf/geoip/GenericUDFGeoIPTest.java create mode 100644 src/test/resources/hive-exec-log4j.properties create mode 100644 src/test/resources/hive-log4j.properties create mode 100644 src/test/resources/hive-site.xml diff --git a/README.md b/README.md index c485c50..b2a7f8c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,13 @@ hive-geoip ========== -GeoIP Functions for hive \ No newline at end of file +GeoIP Functions for hive + + add file GeoIP.dat; + add jar geo-ip-java.jar; + add jar hive-udf-geo-ip-jtg.jar; + create temporary function geoip as 'com.jointhegrid.hive.udf.GenericUDFGeoIP'; + select geoip(first, 'COUNTRY_NAME', './GeoIP.dat' ) from a; + +You need a geoip database extracted(separately licenced) found here: +http://geolite.maxmind.com/download/geoip/database/GeoLiteCountry/GeoIP.dat.gz diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..5303efc --- /dev/null +++ b/pom.xml @@ -0,0 +1,117 @@ + + + 4.0.0 + com.m6d + hive-geoip + hive-geoip + 1.0.0-SNAPSHOT + GeoIP in hive + jar + + + + + + org.kohsuke + geoip + 1.2.5 + + + + com.jointhegrid + hive_test + 4.0.0-SNAPSHOT + + + org.apache.hadoop + hadoop-core + 0.20.2 + + + org.apache.hadoop + hadoop-test + 0.20.2 + + + + junit + junit + 4.7 + test + + + + + + + + + + + apache-main + http://www.apache.org/dist/hadoop/common/hadoop-0.20.2 + hadoop-0.20.2.tar.gz + ${project.build.directory}/hadoop + + + org.codehaus.mojo + wagon-maven-plugin + 1.0-beta-3 + + + download-hadoop + pre-integration-test + + download-single + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + tar + + -xf + ${project.build.directory}/hadoop/hadoop-0.20.2.tar.gz + -C + ${project.build.directory} + + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.5.1 + + [artifactId] + true + true + 1.5 + + org.eclipse.jdt.core.javabuilder + org.maven.ide.eclipse.maven2Builder + + + org.eclipse.jdt.core.javanature + org.maven.ide.eclipse.maven2Nature + + + + + maven-compiler-plugin + + 1.6 + 1.6 + + + + + + diff --git a/src/main/java/com/jointhegrid/udf/geoip/GenericUDFGeoIP.java b/src/main/java/com/jointhegrid/udf/geoip/GenericUDFGeoIP.java new file mode 100644 index 0000000..6343651 --- /dev/null +++ b/src/main/java/com/jointhegrid/udf/geoip/GenericUDFGeoIP.java @@ -0,0 +1,197 @@ +package com.jointhegrid.udf.geoip; + +import com.maxmind.geoip.LookupService; +import com.maxmind.geoip.Location; +import com.maxmind.geoip.Country; +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; +import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.Set; +import java.util.Arrays; + +/** + * GenericUDFGeoIP is a Hive User Defined Function that allows you to lookup + * database information on a given ip. + * argument 0 should be an IP string + * argument 1 should be one of the following values: + * COUNTRY_NAME, COUNTRY_CODE ,AREA_CODE + * CITY,DMA_CODE,LATITUDE,LONGITUDE,METRO_CODE,POSTAL_CODE, REGION, ORG, ID + * argument 2 should be the filename for you geo-ip database + * + *
+ * Usage:
+ * add file GeoIP.dat;
+ * add jar geo-ip-java.jar;
+ * add jar hive-udf-geo-ip-jtg.jar;
+ * create temporary function geoip as 'com.jointhegrid.hive.udf.GenericUDFGeoIP';
+ * select geoip(first, 'COUNTRY_NAME',  './GeoIP.dat' ) from a;
+ * 
+ * @author ecapriolo + */ + +@Description( + name = "geoip", + value = "_FUNC_(ip,property,database) - loads database into GEO-IP lookup "+ + "service, then looks up 'property' of ip. " +) + +public class GenericUDFGeoIP extends GenericUDF { + + private String ipString = null; + private Long ipLong = null; + private String property; + private String database; + private LookupService ls; + + private static final String COUNTRY_NAME = "COUNTRY_NAME"; + private static final String COUNTRY_CODE = "COUNTRY_CODE"; + private static final String AREA_CODE = "AREA_CODE"; + private static final String CITY = "CITY"; + private static final String DMA_CODE = "DMA_CODE"; + private static final String LATITUDE = "LATITUDE"; + private static final String LONGITUDE = "LONGITUDE"; + private static final String METRO_CODE = "METRO_CODE"; + private static final String POSTAL_CODE = "POSTAL_CODE"; + private static final String REGION = "REGION"; + private static final String ORG = "ORG"; + private static final String ID = "ID"; + + private static final Set COUNTRY_PROPERTIES = + new CopyOnWriteArraySet(Arrays.asList( + new String[] {COUNTRY_NAME, COUNTRY_CODE})); + + private static final Set LOCATION_PROPERTIES = + new CopyOnWriteArraySet(Arrays.asList( + new String[] {AREA_CODE, CITY, DMA_CODE, LATITUDE, LONGITUDE, METRO_CODE, POSTAL_CODE, REGION})); + + PrimitiveObjectInspector [] argumentOIs; + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) + throws UDFArgumentException { + + argumentOIs = new PrimitiveObjectInspector [arguments.length]; + + if ( arguments.length != 3) { + throw new UDFArgumentLengthException( + "The function GenericUDFGeoIP( 'input', 'resultfield', 'datafile' ) " + + " accepts 3 arguments."); + } + + if (!(arguments[0] instanceof StringObjectInspector) && !(arguments[0] instanceof LongObjectInspector)) { + throw new UDFArgumentTypeException(0, + "The first 3 parameters of GenericUDFGeoIP('input', 'resultfield', 'datafile')" + + " should be string."); + } + argumentOIs[0] = (PrimitiveObjectInspector) arguments[0]; + + for (int i = 1; i < arguments.length; i++) { + if (!(arguments[i] instanceof StringObjectInspector )) { + throw new UDFArgumentTypeException(i, + "The first 3 parameters of GenericUDFGeoIP('input', 'resultfield', 'datafile')" + + " should be string."); + } + argumentOIs[i] = (StringObjectInspector) arguments[i]; + } + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( + PrimitiveCategory.STRING); + } + + @Override + public Object evaluate(DeferredObject[] arguments) throws HiveException { + if (argumentOIs[0] instanceof LongObjectInspector) { + this.ipLong = ((LongObjectInspector)argumentOIs[0]).get(arguments[0].get()); + } else { + this.ipString = ((StringObjectInspector)argumentOIs[0]).getPrimitiveJavaObject(arguments[0].get()); + } + this.property = ((StringObjectInspector)argumentOIs[1]).getPrimitiveJavaObject(arguments[1].get()); + + if (this.property != null) { + this.property = this.property.toUpperCase(); + } + + if (ls ==null){ + if (argumentOIs.length == 3){ + this.database = ((StringObjectInspector)argumentOIs[1]).getPrimitiveJavaObject(arguments[2].get()); + File f = new File(database); + if (!f.exists()){ + throw new HiveException(database+" does not exist"); + } + try { + ls = new LookupService ( f , LookupService.GEOIP_MEMORY_CACHE ); + } catch (IOException ex){ + throw new HiveException (ex); + } + } + /** // how to do this??? + if (argumentOIs.length == 2) { + URL u = getClass().getClassLoader().getResource("GeoIP.dat"); + try { + System.out.println("f exists ?"+ new File(u.getFile()).exists() ); + ls = new LookupService ( u.getFile() ); + } catch (IOException ex){ throw new HiveException (ex); } + } + * */ + } // ls null ? + + if (COUNTRY_PROPERTIES.contains(this.property)) { + Country country = ipString != null ? ls.getCountry(ipString) : ls.getCountry(ipLong); + if (country == null) { + return null; + } else if (this.property.equals(COUNTRY_NAME)) { + return country.getName(); + } else if (this.property.equals(COUNTRY_CODE)) { + return country.getCode(); + } + assert(false); + } else if (LOCATION_PROPERTIES.contains(this.property)) { + Location loc = ipString != null ? ls.getLocation(ipString) : ls.getLocation(ipLong); + if (loc == null) { + return null; + } + //country + if (this.property.equals(AREA_CODE)) { + return loc.area_code + ""; + } else if (this.property.equals(CITY)) { + return loc.city == null ? null : loc.city + ""; + } else if (this.property.equals(DMA_CODE)) { + return loc.dma_code + ""; + } else if (this.property.equals(LATITUDE)) { + return loc.latitude + ""; + } else if (this.property.equals(LONGITUDE)) { + return loc.longitude + ""; + } else if (this.property.equals(METRO_CODE)) { + return loc.metro_code + ""; + } else if (this.property.equals(POSTAL_CODE)) { + return loc.postalCode == null ? null : loc.postalCode + ""; + } else if (this.property.equals(REGION)) { + return loc.region == null ? null : loc.region + ""; + } + assert(false); + } else if (this.property.equals(ORG)) { + return ipString != null ? ls.getOrg(ipString) : ls.getOrg(ipLong); + } else if (this.property.equals(ID)) { + return ipString != null ? ls.getID(ipString) : ls.getID(ipLong); + } + + return null; + } + + @Override + public String getDisplayString(String[] children) { + assert(children.length == 3); + return "GenericUDFGeoIP ( "+children[0]+", "+children[1]+", "+children[2]+" )"; + } +} diff --git a/src/test/java/com/jointhegrid/udf/geoip/GenericUDFGeoIPTest.java b/src/test/java/com/jointhegrid/udf/geoip/GenericUDFGeoIPTest.java new file mode 100644 index 0000000..40abcd5 --- /dev/null +++ b/src/test/java/com/jointhegrid/udf/geoip/GenericUDFGeoIPTest.java @@ -0,0 +1,54 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.jointhegrid.udf.geoip; + +import java.io.OutputStreamWriter; +import java.io.BufferedWriter; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import java.util.List; +import java.util.Arrays; +import java.io.IOException; +import com.jointhegrid.hive_test.HiveTestService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; + +public class GenericUDFGeoIPTest extends HiveTestService { + + public GenericUDFGeoIPTest() throws IOException { + super(); + } + + public void testCollect() throws Exception { + Path p = new Path(this.ROOT_DIR, "rankfile"); + + FSDataOutputStream o = this.getFileSystem().create(p); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(o)); + bw.write("209.191.139.200\n"); + bw.write("twelve\n"); + bw.close(); + + String jarFile; + jarFile = GenericUDFGeoIP.class.getProtectionDomain().getCodeSource().getLocation().getFile(); + client.execute("add jar " + jarFile); + jarFile = com.maxmind.geoip.LookupService.class.getProtectionDomain().getCodeSource().getLocation().getFile(); + client.execute("add jar " + jarFile); + //download this or put in reasources + client.execute(" add file /tmp/GeoIP.dat"); + + client.execute("create temporary function geoip as 'com.jointhegrid.udf.geoip.GenericUDFGeoIP'"); + client.execute("create table ips ( ip string) row format delimited fields terminated by '09' lines terminated by '10'"); + client.execute("load data local inpath '" + p.toString() + "' into table ips"); + + client.execute("select geoip(ip, 'COUNTRY_NAME', './GeoIP.dat') FROM ips"); + List expected = Arrays.asList("United States","N/A"); + assertEquals(expected, client.fetchAll()); + + + client.execute("drop table ips"); + } +} diff --git a/src/test/resources/hive-exec-log4j.properties b/src/test/resources/hive-exec-log4j.properties new file mode 100644 index 0000000..9ad9546 --- /dev/null +++ b/src/test/resources/hive-exec-log4j.properties @@ -0,0 +1,56 @@ +# Define some default values that can be overridden by system properties +hive.root.logger=INFO,FA +#hive.root.logger=DEBUG,console +hive.log.dir=/tmp/${user.name} +hive.log.file=${hive.query.id}.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hive.root.logger}, EventCounter + +# Logging Threshold +log4j.threshhold=DEBUG + +# +# File Appender +# + +log4j.appender.FA=org.apache.log4j.FileAppender +log4j.appender.FA.File=${hive.log.dir}/${hive.log.file} +log4j.appender.FA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +log4j.appender.FA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +#custom logging levels +#log4j.logger.xxx=DEBUG + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter + + +#log4j.category.DataNucleus=ERROR,FA +#log4j.category.Datastore=ERROR,FA +#log4j.category.Datastore.Schema=ERROR,FA +#log4j.category.JPOX.Datastore=ERROR,FA +#log4j.category.JPOX.Plugin=ERROR,FA +#log4j.category.JPOX.MetaData=ERROR,FA +#log4j.category.JPOX.Query=ERROR,FA +#log4j.category.JPOX.General=ERROR,FA +#log4j.category.JPOX.Enhancer=ERROR,FA + diff --git a/src/test/resources/hive-log4j.properties b/src/test/resources/hive-log4j.properties new file mode 100644 index 0000000..a225d36 --- /dev/null +++ b/src/test/resources/hive-log4j.properties @@ -0,0 +1,62 @@ +# Define some default values that can be overridden by system properties +hive.root.logger=WARN,DRFA +#hive.root.logger=DEBUG,console +hive.log.dir=/tmp/${user.name} +hive.log.file=hive.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hive.root.logger}, EventCounter + +# Logging Threshold +log4j.threshhold=WARN + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hive.log.dir}/${hive.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +#custom logging levels +#log4j.logger.xxx=DEBUG + +# +# Event Counter Appender +# Sends counts of logging messages at different severity levels to Hadoop Metrics. +# +log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter + + +log4j.category.DataNucleus=ERROR,DRFA +log4j.category.Datastore=ERROR,DRFA +log4j.category.Datastore.Schema=ERROR,DRFA +log4j.category.JPOX.Datastore=ERROR,DRFA +log4j.category.JPOX.Plugin=ERROR,DRFA +log4j.category.JPOX.MetaData=ERROR,DRFA +log4j.category.JPOX.Query=ERROR,DRFA +log4j.category.JPOX.General=ERROR,DRFA +log4j.category.JPOX.Enhancer=ERROR,DRFA + diff --git a/src/test/resources/hive-site.xml b/src/test/resources/hive-site.xml new file mode 100644 index 0000000..23b82eb --- /dev/null +++ b/src/test/resources/hive-site.xml @@ -0,0 +1,41 @@ + + + + + hive.mapred.reduce.tasks.speculative.execution + false + Whether speculative execution for reducers should be turned on. + + + + + + javax.jdo.option.ConnectionURL + + jdbc:derby:memory:metastore_db;create=true + JDBC connect string for a JDBC metastore + + + + hive.metastore.warehouse.dir + /tmp/warehouse + location of default database for the warehouse + + + + + + +