Navigation Menu

Skip to content

Commit

Permalink
Removed proto in favor of reflect
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardcapriolo committed Sep 25, 2012
1 parent e961ae3 commit c535b6d
Showing 1 changed file with 50 additions and 43 deletions.
93 changes: 50 additions & 43 deletions src/main/java/com/m6d/hive/protobuf/ProtobufDeserializer.java
Expand Up @@ -48,9 +48,8 @@
import org.apache.hadoop.io.Writable;

import com.google.protobuf.GeneratedMessage;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import com.google.protobuf.Message;


//import prototest.Ex;

Expand All @@ -77,18 +76,20 @@ public class ProtobufDeserializer implements Deserializer{
List<ObjectInspector> keyOIs = new ArrayList<ObjectInspector>();
List<ObjectInspector> valueOIs = new ArrayList<ObjectInspector>();
Class[] parameters = new Class[]{ new byte[0].getClass() };
//Class[] parameters = new Class[] { InputStream.class };

Map<ClassMethod,Method> cached= new HashMap<ClassMethod,Method>();
Map<ClassMethod,Method> cachedHas= new HashMap<ClassMethod,Method>();
Map<ClassMethod,Method> cachedHas= new HashMap<ClassMethod,Method>();
Map<ClassMethod,FieldDescriptor> protoCache= new HashMap<ClassMethod,FieldDescriptor>();
Map<ClassMethod,FieldDescriptor> protoHasCache= new HashMap<ClassMethod,FieldDescriptor>();


Method parseFrom = null;
Method vparseFrom = null;

List<Object> row = new ArrayList<Object>();
List<Object> keyRow = new ArrayList<Object>();
List<Object> valueRow = new ArrayList<Object>();

public ProtobufDeserializer() {
}

Expand Down Expand Up @@ -136,13 +137,11 @@ public Object deserialize(Writable field) throws SerDeException {
if (parseFrom != null) {
byte [] b = new byte [key.getLength()];
System.arraycopy(key.getBytes(), 0, b, 0, key.getLength());
//ByteArrayInputStream b = new ByteArrayInputStream(key.getBytes(),0,key.getLength());
parsedResult = parseFrom.invoke(null, b);
}
if (vparseFrom != null) {
byte [] c = new byte [ value.getLength()];
System.arraycopy(value.getBytes(), 0, c, 0, value.getLength());
//ByteArrayInputStream c = new ByteArrayInputStream(value.getBytes(),0,value.getLength());
vparsedResult = vparseFrom.invoke(null, c);
}
} catch (IllegalAccessException ex) {
Expand All @@ -153,9 +152,6 @@ public Object deserialize(Writable field) throws SerDeException {
throw new SerDeException(ex.getMessage(), ex);
}

//key struct<name:string,id:int,email:string,hobby:struct<name:string,serializedsize:int>> from deserializer,
//value struct<name:string,id:int,email:string,hobby:struct<name:string,serializedsize:int>,serializedsize:int> from deserializer

row.clear();
keyRow.clear();
if (parseFrom !=null){
Expand Down Expand Up @@ -184,19 +180,32 @@ public Object deserialize(Writable field) throws SerDeException {

public void matchProtoToRow(Object proto, List<Object> row,
List<ObjectInspector> ois, List<String> columnNames) throws Exception{
Message m =(Message) proto;
for (int i = 0;i<columnNames.size();i++){
switch (ois.get(i).getCategory()){
case PRIMITIVE:

if (this.reflectHas(proto,columnNames.get(i))){
row.add(reflectGet(proto,columnNames.get(i)));
} else {
row.add(null);
row.add(null);
}
/*
* This is the protobuf alternative to reflection
* (Did not find it to be significantly faster)
if (this.protoHas(m,columnNames.get(i))){
//row.add(reflectGet(proto,columnNames.get(i)));
//row.add(this.protoGet(proto, columnNames.get(i)));
row.add(this.protoCacheGet(m, columnNames.get(i)));
} else {
row.add(null);
}*/
break;
case LIST:
//there is no hasList in proto a null list is an empty list
//if (this.reflectHas(proto,columnNames.get(i))){
Object listObject = reflectGet(proto,columnNames.get(i));
/* there is no hasList in proto a null list is an empty list.
* no need to call hasX
*/
Object listObject = reflectGet(m,columnNames.get(i));
ListObjectInspector li = (ListObjectInspector) ois.get(i);
ObjectInspector subOi =li.getListElementObjectInspector();
if (subOi.getCategory()==Category.PRIMITIVE){
Expand Down Expand Up @@ -227,7 +236,9 @@ public void matchProtoToRow(Object proto, List<Object> row,
//here
break;
case STRUCT:
//row.add(null);
/* Alternative protobuf implementation
if (this.protoHas(m,columnNames.get(i))){
Object subObject =protoCacheGet(m,columnNames.get(i)); */
if (this.reflectHas(proto,columnNames.get(i))){
Object subObject =reflectGet(proto,columnNames.get(i));
List<Object> subList = new ArrayList<Object>();
Expand All @@ -240,14 +251,12 @@ public void matchProtoToRow(Object proto, List<Object> row,
subOis.add(s.getFieldObjectInspector());
}
matchProtoToRow(subObject,subList,subOis,subCols);

row.add(subList);
} else {
row.add( null);
}
break;
}

}
}

Expand Down Expand Up @@ -442,37 +451,36 @@ public Object protoGet(Object o , String prop) throws Exception{
return m.getField( m.getDescriptorForType().findFieldByName(prop) );
}

Map<ClassMethod,FieldDescriptor> protoCache= new HashMap<ClassMethod,FieldDescriptor>();

public Object protoCacheGet(Object o, String prop) throws Exception{
public Object protoCacheGet(Message m, String prop) throws Exception{
prop = prop.toLowerCase();
if (prop.equals("serializedsize")){
return reflectGet(o,prop);
}
if (prop.endsWith("count")){
return reflectGet(o,prop);
return reflectGet(m,prop);
}
GeneratedMessage m = (GeneratedMessage) o;

StringBuilder sb = new StringBuilder();
sb.append("get");
sb.append(prop);
ClassMethod cm = new ClassMethod(o.getClass(),sb.toString());
ClassMethod cm = new ClassMethod(m.getClass(),prop);
FieldDescriptor f =this.protoCache.get(cm);
if (f == null){
//System.out.println("prop" + prop);

f = m.getDescriptorForType().findFieldByName(prop);
if (f==null){
} else {
}
this.protoCache.put(cm, f);
}

return m.getField(f);
}

public boolean protoHas(Message m , String prop) throws Exception {
prop=prop.toLowerCase();
if (prop.contains("count")){
return reflectHas(m,prop);
}
ClassMethod meth = new ClassMethod(m.getClass(),prop);
FieldDescriptor f =protoHasCache.get(meth);
if (f == null){
f = m.getDescriptorForType().findFieldByName(prop);
protoHasCache.put(meth, f);
}
return m.hasField(f);
}

public boolean reflectHas(Object o, String prop) {
public boolean reflectHas(Object o, String prop) throws Exception{
Method m = null;
Object result = null;
StringBuilder sb = new StringBuilder();
Expand All @@ -495,11 +503,7 @@ public boolean reflectHas(Object o, String prop) {
return true;
} else {
this.cachedHas.put(cm,m);
try {
result = m.invoke(o, new Object[0]);
} catch (Exception ex){
throw new RuntimeException(ex);
}
result = m.invoke(o, new Object[0]);
return (Boolean.TRUE.equals(result));
}
}
Expand Down Expand Up @@ -549,4 +553,7 @@ public boolean isaList(Class<?> c) {
return false;
}
}



}

0 comments on commit c535b6d

Please sign in to comment.