Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic UDF and performance improvements #56

Merged
merged 8 commits into from
Jul 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

<artifact:dependencies pathId="dependencies.class.path">
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="0.20.2"/>
<dependency groupId="org.apache.hive" artifactId="hive-exec" version="0.10.0"/>
<dependency groupId="org.apache.hive" artifactId="hive-serde" version="0.10.0"/>
<dependency groupId="org.apache.hive" artifactId="hive-exec" version="0.12.0"/>
<dependency groupId="org.apache.hive" artifactId="hive-serde" version="0.12.0"/>
<dependency groupId="com.esri.geometry" artifactId="esri-geometry-api" version="1.1.1"/>
</artifact:dependencies>

Expand Down
62 changes: 52 additions & 10 deletions hive/src/main/java/com/esri/hadoop/hive/GeometryUtils.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector;
import org.apache.hadoop.io.BytesWritable;


import com.esri.core.geometry.*;
import com.esri.core.geometry.ogc.*;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
public class GeometryUtils {

private static final int SIZE_WKID = 4;
Expand Down Expand Up @@ -47,10 +49,18 @@ public int getIndex(){
OGCType.ST_MULTIPOLYGON
};

public static final WritableBinaryObjectInspector geometryTransportObjectInspector = PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
public static final WritableBinaryObjectInspector geometryTransportObjectInspector =
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;


private static Cache<BytesWritable, OGCGeometry> geometryCache =
CacheBuilder.newBuilder()
.weakKeys() // don't hold on to BytesWritable keys if they aren't referenced anymore
.expireAfterAccess(10, TimeUnit.SECONDS)
.maximumSize(10000)
.build();

/**
*
* @param geomref1
* @param geomref2
* @return return true if both geometries are in the same spatial reference
Expand All @@ -67,11 +77,27 @@ public static BytesWritable geometryToEsriShapeBytesWritable(Geometry geometry,
return serialize(geometry, wkid, type);
}

public static BytesWritable geometryToEsriShapeBytesWritable(OGCGeometry geometry) {
return serialize(geometry);
public static BytesWritable geometryToEsriShapeBytesWritable(OGCGeometry geometry) {
return new CachedGeometryBytesWritable(geometry);
}

public static OGCGeometry geometryFromEsriShape(BytesWritable geomref) {

// this geomref might actually be a LazyGeometryBytesWritable which
// means we don't need to deserialize from bytes
if (geomref instanceof CachedGeometryBytesWritable) {
return ((CachedGeometryBytesWritable)geomref).getGeometry();
}

// check for a cache hit to previously created geometries
OGCGeometry cachedGeom = geometryCache.getIfPresent(geomref);
if (cachedGeom != null) {
return cachedGeom;
}

// not in cache or instance of LazyGeometryBytesWritable. now
// need to create the geometry from its bytes

ByteBuffer bbuf = ByteBuffer.allocate(4);
bbuf.order(ByteOrder.LITTLE_ENDIAN);

Expand All @@ -91,7 +117,9 @@ public static OGCGeometry geometryFromEsriShape(BytesWritable geomref) {
spatialReference = SpatialReference.create(wkid);
}
Geometry esriGeom = GeometryEngine.geometryFromEsriShape(shapeBytes, Geometry.Type.Unknown);
return OGCGeometry.createFromEsriGeometry(esriGeom, spatialReference);
OGCGeometry createdGeom = OGCGeometry.createFromEsriGeometry(esriGeom, spatialReference);
geometryCache.put(geomref, createdGeom); // add newly created geometry to cache
return createdGeom;
}
}
}
Expand Down Expand Up @@ -149,9 +177,9 @@ public static OGCType getInferredOGCType(Geometry geom){
return OGCType.ST_MULTIPOINT;
case Point:
return OGCType.ST_POINT;
default:
return OGCType.UNKNOWN;
}

return OGCType.UNKNOWN;
}

private static byte[] getShapeBytes(BytesWritable geomref){
Expand Down Expand Up @@ -248,8 +276,22 @@ private static BytesWritable serialize(Geometry geometry, int wkid, OGCType type
setWKID(hiveGeometryBytes, wkid);
setType(hiveGeometryBytes, type);

return new BytesWritable(shapeWithData);
BytesWritable ret = new BytesWritable(shapeWithData);

return ret;
}



public static class CachedGeometryBytesWritable extends BytesWritable {
OGCGeometry cachedGeom;

public CachedGeometryBytesWritable(OGCGeometry geom) {
cachedGeom = geom;
super.set(serialize(cachedGeom));
}

public OGCGeometry getGeometry() {
return cachedGeom;
}
}
}
112 changes: 112 additions & 0 deletions hive/src/main/java/com/esri/hadoop/hive/HiveGeometryOIHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.esri.hadoop.hive;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;

import com.esri.core.geometry.ogc.OGCGeometry;

public class HiveGeometryOIHelper {

static Logger LOG = Logger.getLogger(HiveGeometryOIHelper.class);

private PrimitiveObjectInspector oi;
private int argIndex;
private boolean isConstant;

OGCGeometry constantGeometry;

private HiveGeometryOIHelper(ObjectInspector oi, int argIndex) {
this.oi = (PrimitiveObjectInspector)oi;
this.argIndex = argIndex;

// constant geometries only need to be processed once and can
// be optimized in certain operations
isConstant = ObjectInspectorUtils.isConstantObjectInspector(oi);
}

public static HiveGeometryOIHelper create(ObjectInspector oi, int argIndex) throws UDFArgumentException {
if (oi.getCategory() != Category.PRIMITIVE) {
throw new UDFArgumentException("Only primitive types current supported");
}

return new HiveGeometryOIHelper(oi, argIndex);
}

/**
* Gets whether this geometry argument is constant.
*
* @return
*/
public boolean isConstant() {
return isConstant;
}

/**
* Returns the cached constant geometry object.
*
* @return cache geometry, or null if not constant
*/
public OGCGeometry getConstantGeometry() {
return constantGeometry;
}

/**
* Reads the corresponding geometry from the deferred object list
* or returns the cached geometry if argument is constant.
*
* @param args
* @return
*/
public OGCGeometry getGeometry(DeferredObject[] args) {
if (isConstant) {
if (constantGeometry == null) {
constantGeometry = getGeometry(args[argIndex]);
}

return constantGeometry;
} else {
// not constant, so we have to rebuild the geometry
// on every call
return getGeometry(args[argIndex]);
}
}

private OGCGeometry getGeometry(DeferredObject arg) {
Object writable;
try {
writable = oi.getPrimitiveWritableObject(arg.get());
} catch (HiveException e) {
LOG.error("Failed to get writable", e);
return null;
}

if (writable == null) {
return null;
}

switch (oi.getPrimitiveCategory()) {
case BINARY: return GeometryUtils.geometryFromEsriShape((BytesWritable)writable);
case STRING: return OGCGeometry.fromText(((Text)writable).toString());
default: return null;
}
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();

builder.append("HiveGeometryHelper(");
builder.append("constant=" + isConstant + ";");
builder.append(")");

return builder.toString();
}
}
57 changes: 19 additions & 38 deletions hive/src/main/java/com/esri/hadoop/hive/ST_Contains.java
Original file line number Diff line number Diff line change
@@ -1,48 +1,29 @@
package com.esri.hadoop.hive;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hive.ql.udf.UDFType;

import com.esri.core.geometry.ogc.OGCGeometry;
import com.esri.core.geometry.OperatorContains;
import com.esri.core.geometry.OperatorSimpleRelation;

@UDFType(deterministic = true)
@Description(
name = "ST_Contains",
value = "_FUNC_(geometry1, geometry2) - return true if geometry1 contains geometry2",
extended = "Example:\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(2, 3) from src LIMIT 1; -- return true\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(8, 8) from src LIMIT 1; -- return false"
)

name = "ST_Contains",
value = "_FUNC_(geometry1, geometry2) - return true if geometry1 contains geometry2",
extended = "Example:\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(2, 3) from src LIMIT 1; -- return true\n" +
"SELECT _FUNC_(st_polygon(1,1, 1,4, 4,4, 4,1), st_point(8, 8) from src LIMIT 1; -- return false"
)
public class ST_Contains extends ST_GeometryRelational {

static final Log LOG = LogFactory.getLog(ST_Contains.class.getName());
final BooleanWritable resultBoolean = new BooleanWritable();

public BooleanWritable evaluate(BytesWritable geometryref1, BytesWritable geometryref2)
{
if (geometryref1 == null || geometryref2 == null ||
geometryref1.getLength() == 0 || geometryref2.getLength() == 0) {
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

if (!GeometryUtils.compareSpatialReferences(geometryref1, geometryref2)) {
LogUtils.Log_SRIDMismatch(LOG, geometryref1, geometryref2);
return null;
}

OGCGeometry ogcGeom1 = GeometryUtils.geometryFromEsriShape(geometryref1);
OGCGeometry ogcGeom2 = GeometryUtils.geometryFromEsriShape(geometryref2);
if (ogcGeom1 == null || ogcGeom2 == null){
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

resultBoolean.set(ogcGeom1.contains(ogcGeom2));
return resultBoolean;
@Override
protected OperatorSimpleRelation getRelationOperator() {
return OperatorContains.local();
}

@Override
public String getDisplayString(String[] args) {
return String.format("returns true if %s contains %s", args[0], args[1]);
}

}

40 changes: 9 additions & 31 deletions hive/src/main/java/com/esri/hadoop/hive/ST_Crosses.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.esri.hadoop.hive;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;

import com.esri.core.geometry.ogc.OGCGeometry;
import com.esri.core.geometry.OperatorCrosses;
import com.esri.core.geometry.OperatorSimpleRelation;

@Description(
name = "ST_Crosses",
Expand All @@ -16,34 +13,15 @@
"SELECT _FUNC_(st_linestring(2,0, 2,3), st_polygon(1,1, 1,4, 4,4, 4,1)) from src LIMIT 1; -- return true\n" +
"SELECT _FUNC_(st_linestring(0,2, 0,1), ST_linestring(2,0, 1,0)) from src LIMIT 1; -- return false"
)

public class ST_Crosses extends ST_GeometryRelational {

static final Log LOG = LogFactory.getLog(ST_Crosses.class.getName());
final BooleanWritable resultBoolean = new BooleanWritable();

public BooleanWritable evaluate(BytesWritable geometryref1, BytesWritable geometryref2)
{
if (geometryref1 == null || geometryref2 == null ||
geometryref1.getLength() == 0 || geometryref2.getLength() == 0) {
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

if (!GeometryUtils.compareSpatialReferences(geometryref1, geometryref2)) {
LogUtils.Log_SRIDMismatch(LOG, geometryref1, geometryref2);
return null;
}

OGCGeometry ogcGeom1 = GeometryUtils.geometryFromEsriShape(geometryref1);
OGCGeometry ogcGeom2 = GeometryUtils.geometryFromEsriShape(geometryref2);
if (ogcGeom1 == null || ogcGeom2 == null){
LogUtils.Log_ArgumentsNull(LOG);
return null;
}

resultBoolean.set(ogcGeom1.crosses(ogcGeom2));
return resultBoolean;
@Override
protected OperatorSimpleRelation getRelationOperator() {
return OperatorCrosses.local();
}

@Override
public String getDisplayString(String[] args) {
return String.format("returns true if %s crosses %s", args[0], args[1]);
}
}
Loading