Skip to content

Commit

Permalink
Merge pull request #56 from Esri/genericudf
Browse files Browse the repository at this point in the history
Generic UDF and performance improvements
  • Loading branch information
climbage committed Jul 1, 2014
2 parents 169edb0 + 5a27498 commit 8fcba2e
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 268 deletions.
4 changes: 2 additions & 2 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

<artifact:dependencies pathId="dependencies.class.path">
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="0.20.2"/>
<dependency groupId="org.apache.hive" artifactId="hive-exec" version="0.10.0"/>
<dependency groupId="org.apache.hive" artifactId="hive-serde" version="0.10.0"/>
<dependency groupId="org.apache.hive" artifactId="hive-exec" version="0.12.0"/>
<dependency groupId="org.apache.hive" artifactId="hive-serde" version="0.12.0"/>
<dependency groupId="com.esri.geometry" artifactId="esri-geometry-api" version="1.1.1"/>
</artifact:dependencies>

Expand Down
62 changes: 52 additions & 10 deletions hive/src/main/java/com/esri/hadoop/hive/GeometryUtils.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector;
import org.apache.hadoop.io.BytesWritable;


import com.esri.core.geometry.*;
import com.esri.core.geometry.ogc.*;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
public class GeometryUtils {

private static final int SIZE_WKID = 4;
Expand Down Expand Up @@ -47,10 +49,18 @@ public int getIndex(){
OGCType.ST_MULTIPOLYGON
};

public static final WritableBinaryObjectInspector geometryTransportObjectInspector = PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
public static final WritableBinaryObjectInspector geometryTransportObjectInspector =
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;


private static Cache<BytesWritable, OGCGeometry> geometryCache =
CacheBuilder.newBuilder()
.weakKeys() // don't hold on to BytesWritable keys if they aren't referenced anymore
.expireAfterAccess(10, TimeUnit.SECONDS)
.maximumSize(10000)
.build();

/**
*
* @param geomref1
* @param geomref2
* @return return true if both geometries are in the same spatial reference
Expand All @@ -67,11 +77,27 @@ public static BytesWritable geometryToEsriShapeBytesWritable(Geometry geometry,
return serialize(geometry, wkid, type);
}

public static BytesWritable geometryToEsriShapeBytesWritable(OGCGeometry geometry) {
return serialize(geometry);
public static BytesWritable geometryToEsriShapeBytesWritable(OGCGeometry geometry) {
return new CachedGeometryBytesWritable(geometry);
}

public static OGCGeometry geometryFromEsriShape(BytesWritable geomref) {

// this geomref might actually be a LazyGeometryBytesWritable which
// means we don't need to deserialize from bytes
if (geomref instanceof CachedGeometryBytesWritable) {
return ((CachedGeometryBytesWritable)geomref).getGeometry();
}

// check for a cache hit to previously created geometries
OGCGeometry cachedGeom = geometryCache.getIfPresent(geomref);
if (cachedGeom != null) {
return cachedGeom;
}

// not in cache or instance of LazyGeometryBytesWritable. now
// need to create the geometry from its bytes

ByteBuffer bbuf = ByteBuffer.allocate(4);
bbuf.order(ByteOrder.LITTLE_ENDIAN);

Expand All @@ -91,7 +117,9 @@ public static OGCGeometry geometryFromEsriShape(BytesWritable geomref) {
spatialReference = SpatialReference.create(wkid);
}
Geometry esriGeom = GeometryEngine.geometryFromEsriShape(shapeBytes, Geometry.Type.Unknown);
return OGCGeometry.createFromEsriGeometry(esriGeom, spatialReference);
OGCGeometry createdGeom = OGCGeometry.createFromEsriGeometry(esriGeom, spatialReference);
geometryCache.put(geomref, createdGeom); // add newly created geometry to cache
return createdGeom;
}
}
}
Expand Down Expand Up @@ -149,9 +177,9 @@ public static OGCType getInferredOGCType(Geometry geom){
return OGCType.ST_MULTIPOINT;
case Point:
return OGCType.ST_POINT;
default:
return OGCType.UNKNOWN;
}

return OGCType.UNKNOWN;
}

private static byte[] getShapeBytes(BytesWritable geomref){
Expand Down Expand Up @@ -248,8 +276,22 @@ private static BytesWritable serialize(Geometry geometry, int wkid, OGCType type
setWKID(hiveGeometryBytes, wkid);
setType(hiveGeometryBytes, type);

return new BytesWritable(shapeWithData);
BytesWritable ret = new BytesWritable(shapeWithData);

return ret;
}



public static class CachedGeometryBytesWritable extends BytesWritable {
OGCGeometry cachedGeom;

public CachedGeometryBytesWritable(OGCGeometry geom) {
cachedGeom = geom;
super.set(serialize(cachedGeom));
}

public OGCGeometry getGeometry() {
return cachedGeom;
}
}
}
112 changes: 112 additions & 0 deletions hive/src/main/java/com/esri/hadoop/hive/HiveGeometryOIHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.esri.hadoop.hive;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;

import com.esri.core.geometry.ogc.OGCGeometry;

public class HiveGeometryOIHelper {

static Logger LOG = Logger.getLogger(HiveGeometryOIHelper.class);

private PrimitiveObjectInspector oi;
private int argIndex;
private boolean isConstant;

OGCGeometry constantGeometry;

private HiveGeometryOIHelper(ObjectInspector oi, int argIndex) {
this.oi = (PrimitiveObjectInspector)oi;
this.argIndex = argIndex;

// constant geometries only need to be processed once and can
// be optimized in certain operations
isConstant = ObjectInspectorUtils.isConstantObjectInspector(oi);
}

public static HiveGeometryOIHelper create(ObjectInspector oi, int argIndex) throws UDFArgumentException {
if (oi.getCategory() != Category.PRIMITIVE) {
throw new UDFArgumentException("Only primitive types current supported");
}

return new HiveGeometryOIHelper(oi, argIndex);
}

/**
* Gets whether this geometry argument is constant.
*
* @return
*/
public boolean isConstant() {
return isConstant;
}

/**
* Returns the cached constant geometry object.
*
* @return cache geometry, or null if not constant
*/
public OGCGeometry getConstantGeometry() {
return constantGeometry;
}

/**
* Reads the corresponding geometry from the deferred object list
* or returns the cached geometry if argument is constant.
*
* @param args
* @return
*/
public OGCGeometry getGeometry(DeferredObject[] args) {
if (isConstant) {
if (constantGeometry == null) {
constantGeometry = getGeometry(args[argIndex]);
}

return constantGeometry;
} else {
// not constant, so we have to rebuild the geometry
// on every call
return getGeometry(args[argIndex]);
}
}

private OGCGeometry getGeometry(DeferredObject arg) {
Object writable;
try {
writable = oi.getPrimitiveWritableObject(arg.get());
} catch (HiveException e) {
LOG.error("Failed to get writable", e);
return null;
}

if (writable == null) {
return null;
}

switch (oi.getPrimitiveCategory()) {
case BINARY: return GeometryUtils.geometryFromEsriShape((BytesWritable)writable);
case STRING: return OGCGeometry.fromText(((Text)writable).toString());
default: return null;
}
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();

builder.append("HiveGeometryHelper(");
builder.append("constant=" + isConstant + ";");
builder.append(")");

return builder.toString();
}
}
57 changes: 19 additions & 38 deletions hive/src/main/java/com/esri/hadoop/hive/ST_Contains.java
Original file line number Diff line number Diff line change
@@ -1,48 +1,29 @@
package com.esri.hadoop.hive;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hive.ql.udf.UDFType;

import com.esri.core.geometry.ogc.OGCGeometry;
import com.esri.core.geometry.OperatorContains;
import com.esri.core.geometry.OperatorSimpleRelation;

@UDFType(deterministic = true)
@Description(
name = "ST_Contains",
value = "_FUNC_(geometry1, geometry2) - return true if geometry1 contains geometry2",
extended = "Example:\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(2, 3) from src LIMIT 1; -- return true\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(8, 8) from src LIMIT 1; -- return false"
)

name = "ST_Contains",
value = "_FUNC_(geometry1, geometry2) - return true if geometry1 contains geometry2",
extended = "Example:\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(2, 3) from src LIMIT 1; -- return true\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(8, 8) from src LIMIT 1; -- return false"
)
public class ST_Contains extends ST_GeometryRelational {

static final Log LOG = LogFactory.getLog(ST_Contains.class.getName());
final BooleanWritable resultBoolean = new BooleanWritable();

public BooleanWritable evaluate(BytesWritable geometryref1, BytesWritable geometryref2)
{
if (geometryref1 == null || geometryref2 == null ||
geometryref1.getLength() == 0 || geometryref2.getLength() == 0) {
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

if (!GeometryUtils.compareSpatialReferences(geometryref1, geometryref2)) {
LogUtils.Log_SRIDMismatch(LOG, geometryref1, geometryref2);
return null;
}

OGCGeometry ogcGeom1 = GeometryUtils.geometryFromEsriShape(geometryref1);
OGCGeometry ogcGeom2 = GeometryUtils.geometryFromEsriShape(geometryref2);
if (ogcGeom1 == null || ogcGeom2 == null){
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

resultBoolean.set(ogcGeom1.contains(ogcGeom2));
return resultBoolean;
@Override
protected OperatorSimpleRelation getRelationOperator() {
return OperatorContains.local();
}

@Override
public String getDisplayString(String[] args) {
return String.format("returns true if %s contains %s", args[0], args[1]);
}

}

40 changes: 9 additions & 31 deletions hive/src/main/java/com/esri/hadoop/hive/ST_Crosses.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.esri.hadoop.hive;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;

import com.esri.core.geometry.ogc.OGCGeometry;
import com.esri.core.geometry.OperatorCrosses;
import com.esri.core.geometry.OperatorSimpleRelation;

@Description(
name = "ST_Crosses",
Expand All @@ -16,34 +13,15 @@
"SELECT _FUNC_(st_linestring(2,0, 2,3), st_polygon(1,1, 1,4, 4,4, 4,1)) from src LIMIT 1; -- return true\n" +
"SELECT _FUNC_(st_linestring(0,2, 0,1), ST_linestring(2,0, 1,0)) from src LIMIT 1; -- return false"
)

public class ST_Crosses extends ST_GeometryRelational {

static final Log LOG = LogFactory.getLog(ST_Crosses.class.getName());
final BooleanWritable resultBoolean = new BooleanWritable();

public BooleanWritable evaluate(BytesWritable geometryref1, BytesWritable geometryref2)
{
if (geometryref1 == null || geometryref2 == null ||
geometryref1.getLength() == 0 || geometryref2.getLength() == 0) {
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

if (!GeometryUtils.compareSpatialReferences(geometryref1, geometryref2)) {
LogUtils.Log_SRIDMismatch(LOG, geometryref1, geometryref2);
return null;
}

OGCGeometry ogcGeom1 = GeometryUtils.geometryFromEsriShape(geometryref1);
OGCGeometry ogcGeom2 = GeometryUtils.geometryFromEsriShape(geometryref2);
if (ogcGeom1 == null || ogcGeom2 == null){
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

resultBoolean.set(ogcGeom1.crosses(ogcGeom2));
return resultBoolean;
@Override
protected OperatorSimpleRelation getRelationOperator() {
return OperatorCrosses.local();
}

@Override
public String getDisplayString(String[] args) {
return String.format("returns true if %s crosses %s", args[0], args[1]);
}
}
Loading

0 comments on commit 8fcba2e

Please sign in to comment.