Skip to content

Commit

Permalink
merged
Browse files Browse the repository at this point in the history
  • Loading branch information
epalace committed Feb 23, 2012
2 parents 9359b38 + 03d1ea8 commit d18d08f
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 290 deletions.
20 changes: 17 additions & 3 deletions core/src/main/java/com/datasalt/pangool/io/BaseComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datasalt.pangool.io;

import static com.datasalt.pangool.serialization.tuples.PangoolSerialization.NULL_LENGTH;

import java.io.IOException;
import java.io.Serializable;

Expand All @@ -24,6 +26,7 @@

import com.datasalt.pangool.cogroup.TupleMRConfig;
import com.datasalt.pangool.cogroup.TupleMRException;
import com.datasalt.pangool.serialization.tuples.PangoolSerialization;
import com.datasalt.pangool.serialization.tuples.SingleFieldDeserializer;

@SuppressWarnings("serial")
Expand Down Expand Up @@ -58,16 +61,27 @@ public Configuration getConf() {
return conf;
}

/**
* Objects can be null.
*/
@Override
public abstract int compare(T o1, T o2);
public abstract int compare(T object1, T object2);

@SuppressWarnings("unchecked")
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {

object1 = (T) fieldDeser1.deserialize(b1, s1);
object2 = (T) fieldDeser2.deserialize(b2, s2);
if (l1 == NULL_LENGTH) {
object1 = null;
} else {
object1 = (T) fieldDeser1.deserialize(b1, s1);
}
if (l2 == NULL_LENGTH) {
object2 = null;
} else {
object2 = (T) fieldDeser2.deserialize(b2, s2);
}

} catch(IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.conf.Configuration;

import com.datasalt.pangool.cogroup.TupleMRConfig;
import com.datasalt.pangool.cogroup.TupleMRConfigBuilder;
import com.datasalt.pangool.cogroup.sorting.Criteria;
import com.datasalt.pangool.cogroup.sorting.Criteria.SortElement;
Expand Down Expand Up @@ -51,18 +52,17 @@ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
throw new RuntimeException(e);
}
}

@Override
public void setConf(Configuration conf){
super.setConf(conf);
if (conf != null){
List<SortElement> sortElements = grouperConf.getCommonCriteria().getElements();
int numGroupByFields = grouperConf.getGroupByFields().size();
List<SortElement> groupSortElements = new ArrayList<SortElement>();
groupSortElements.addAll(sortElements);
groupSortElements = groupSortElements.subList(0,numGroupByFields);
groupSortBy = new Criteria(groupSortElements);
TupleMRConfigBuilder.initializeComparators(conf, grouperConf);
}
}
public void setConf(Configuration conf) {
super.setConf(conf);
List<SortElement> sortElements = grouperConf.getCommonCriteria().getElements();
int numGroupByFields = grouperConf.getGroupByFields().size();
List<SortElement> groupSortElements = new ArrayList<SortElement>();
groupSortElements.addAll(sortElements);
groupSortElements = groupSortElements.subList(0,numGroupByFields);
groupSortBy = new Criteria(groupSortElements);
TupleMRConfigBuilder.initializeComparators(conf, grouperConf);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.io.WritableComparator.readLong;
import static org.apache.hadoop.io.WritableComparator.readVInt;
import static org.apache.hadoop.io.WritableComparator.readVLong;
import static com.datasalt.pangool.serialization.tuples.PangoolSerialization.NULL_LENGTH;

import java.io.IOException;

Expand All @@ -43,6 +44,7 @@
import com.datasalt.pangool.io.tuple.Schema;
import com.datasalt.pangool.io.tuple.Schema.Field;
import com.datasalt.pangool.io.tuple.Schema.InternalType;
import com.datasalt.pangool.serialization.tuples.PangoolSerialization;

@SuppressWarnings("rawtypes")
public class SortComparator implements RawComparator<ITuple>, Configurable {
Expand Down Expand Up @@ -216,26 +218,16 @@ protected int compare(byte[] b1, int s1,byte[] b2, int s2,Schema schema,Criteria
int[] lengths2 = getHeaderLengthAndFieldLength(b2, o.offset2, type);
int dataSize1 = lengths1[1];
int dataSize2 = lengths2[1];
int comparison;
if (dataSize1 < 0){
if (dataSize2 < 0){
o.offset1 += lengths1[0]; //header
o.offset2 += lengths2[0]; // header
comparison=0; //object1 and object2 nulls
} else {
comparison = -1;//object1 null and object2 not null
}
} else {
if (dataSize2 < 0){
comparison= 1;
} else {
int totalField1Size = lengths1[0] + lengths1[1]; // Header size + data size
int totalField2Size = lengths2[0] + lengths2[1]; // Header size + data size
comparison = comparator.compare(b1, o.offset1, totalField1Size, b2, o.offset2, totalField2Size);
o.offset1 += totalField1Size;
o.offset2 += totalField2Size;
}
}
boolean isNull1 = (dataSize1 == NULL_LENGTH);
boolean isNull2 = (dataSize2 == NULL_LENGTH);
int nullAdjustedDataSize1 = isNull1 ? 0 : dataSize1;
int nullAdjustedDataSize2 = isNull2 ? 0 : dataSize2;
int totalField1Size = lengths1[0] + nullAdjustedDataSize1; // Header size + data size
int totalField2Size = lengths2[0] + nullAdjustedDataSize2; // Header size + data size
int comparison = comparator.compare(b1, o.offset1, (isNull1) ? NULL_LENGTH : totalField1Size,
b2, o.offset2, (isNull2) ? NULL_LENGTH : totalField2Size);
o.offset1 += totalField1Size;
o.offset2 += totalField2Size;
if(comparison != 0) {
return (sort == Order.ASC) ? comparison : -comparison;
}
Expand Down Expand Up @@ -352,13 +344,15 @@ protected int compare(byte[] b1, int s1,byte[] b2, int s2,Schema schema,Criteria

/**
* Return the header length and the field length for a field
* of the given type in the given position at the buffer
* of the given type in the given position at the buffer.
* Length can be {@link PangoolSerialization#NULL_LENGTH} in
* the case of null objects.
*/
public static int[] getHeaderLengthAndFieldLength(byte[] b1, int offset1, Class<?> type) throws IOException {
if(type == Integer.class ) {
return new int[]{0, Integer.SIZE / 8};
} else if(type == String.class ) {
return new int[]{0, readVInt(b1, offset1)};
return new int[]{0, readVInt(b1, offset1) + WritableUtils.decodeVIntSize(b1[offset1])};
} else if(type == Long.class ) {
return new int[]{0, Long.SIZE / 8};
} else if(type == VIntWritable.class || type.isEnum()) {
Expand All @@ -373,6 +367,7 @@ public static int[] getHeaderLengthAndFieldLength(byte[] b1, int offset1, Class<
return new int[]{0, 1};
} else {
// The rest of types has a VInt with length as header
// In the case of null objects, length is negative.
return new int[]{WritableUtils.decodeVIntSize(b1[offset1]), readVInt(b1, offset1)};
}
}
Expand All @@ -381,7 +376,7 @@ public static int[] getHeaderLengthAndFieldLength(byte[] b1, int offset1, Class<
public Configuration getConf() {
return conf;
}

@Override
public void setConf(Configuration conf) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class PangoolSerialization implements Serialization<DatumWrapper<ITuple>>
private Configuration conf;
private com.datasalt.pangool.serialization.hadoop.HadoopSerialization ser;
private TupleMRConfig grouperConfig;

// Length that represents a null object
public static final int NULL_LENGTH = -1;

public PangoolSerialization() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.datasalt.pangool.io.tuple.ITuple;
import com.datasalt.pangool.io.tuple.Schema;
import com.datasalt.pangool.io.tuple.Schema.Field;
import com.datasalt.pangool.io.tuple.Schema.InternalType;
import com.datasalt.pangool.serialization.hadoop.HadoopSerialization;

public class PangoolSerializer implements Serializer<DatumWrapper<ITuple>> {
Expand Down Expand Up @@ -133,33 +134,37 @@ private void write(Schema destinationSchema, ITuple tuple,
output.writeDouble((Double) element);
} else if (fieldType == Float.class) {
output.writeFloat((Float) element);
} else if (fieldType == String.class) {
if (element == null) {
EMPTY_TEXT.write(output);
} else if (element instanceof Text) {
((Text) element).write(output);
} else if (element instanceof String) {
HELPER_TEXT.set((String) element);
} else if(fieldType == String.class) {
if (element instanceof Text){
((Text)element).write(output);
} else if (element instanceof String){
HELPER_TEXT.set((String)element);
HELPER_TEXT.write(output);
} else {
raiseClassCastException(null,fieldName,element,fieldType);
}
} else if (fieldType == Boolean.class) {
output.write((Boolean) element ? 1 : 0);
} else if (fieldType.isEnum()) {
writeEnum((Enum<?>) element, fieldType, fieldName, output);
} else {
writeCustomObject(element, output);
// Non of the other types. Then it is a custom object
writeCustomObject(element,output);
}
} catch (ClassCastException e) {
raiseClassCastException(fieldName, element, fieldType);
} // end for
}


} catch(ClassCastException e) {
raiseClassCastException(e, fieldName, element, fieldType);
} catch(NullPointerException e) {
raiseNullInstanceException(e, field, element);
}


} // end for
}

private void writeCustomObject(Object element, DataOutput output)
throws IOException {
if (element == null) {
WritableUtils.writeVInt(output, -1);

private void writeCustomObject(Object element, DataOutput output) throws IOException{
if(element == null) {
WritableUtils.writeVInt(output, PangoolSerialization.NULL_LENGTH);
} else {
tmpOutputBuffer.reset();
ser.ser(element, tmpOutputBuffer);
Expand All @@ -179,12 +184,25 @@ private void writeEnum(Enum<?> element, Class<?> expectedType,
}
WritableUtils.writeVInt(output, e.ordinal());
}

private void raiseClassCastException(String fieldName, Object element,
Class<?> expectedType) throws IOException {
throw new IOException("Field '" + fieldName + "' contains '" + element
+ "' which is " + element.getClass().getName()
+ ".The expected type is " + expectedType.getName());

private void raiseClassCastException(ClassCastException cause, String fieldName,Object element,Class<?> expectedType) throws IOException {
if (element.getClass() == expectedType) {
// We don't know what is happening
throw cause;
} else {
// We now types does not match.
throw new IOException("Field '" + fieldName + "' contains '" + element + "' which is "
+ element.getClass().getName() + ".The expected type is " + expectedType.getName());
}
}


private void raiseNullInstanceException(NullPointerException cause, Field field,Object element) throws IOException {
if (element == null && field.getInternalType() != InternalType.OBJECT) {
// Element can only be null for objects (InternalType.OBJECT
throw new IOException("Field '" + field.getName() + "' of type " + field.getType() +" cannot contain null value. Basic types does not support null values. Only fields of internalType == InternalType.OBJECT can");
} else {
// We don't know the cause.
throw cause;
}
}
}
28 changes: 25 additions & 3 deletions core/src/main/java/com/datasalt/pangool/utils/DCUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public class DCUtils {

private static Logger log = LoggerFactory.getLogger(DCUtils.class);

static final String TMP_FOLDER = "Pangool_instances_cache";

private static File ensureTmpFolder() {
File tmpFolder = new File(System.getProperty("java.io.tmpdir"), TMP_FOLDER);
if (!tmpFolder.exists()) {
tmpFolder.mkdir();
}
return tmpFolder;
}

/**
* Utility method for serializing an object and saving it in the Distributed Cache.
* <p>
Expand All @@ -65,7 +75,8 @@ public class DCUtils {
*/
public static void serializeToDC(Object obj, String serializeToLocalFile, Configuration conf) throws FileNotFoundException, IOException,
URISyntaxException {
File file = new File(System.getProperty("java.io.tmpdir"), serializeToLocalFile);
File file = new File(ensureTmpFolder(), serializeToLocalFile);
file.deleteOnExit();
ObjectOutput out = new ObjectOutputStream(new FileOutputStream(file));
out.writeObject(obj);
out.close();
Expand Down Expand Up @@ -112,8 +123,8 @@ public static <T> T loadSerializedObjectInDC(Configuration conf, Class<T> objCla
* working on testing environments. We know files are
*/
if (path == null) {
String tmpdir = System.getProperty("java.io.tmpdir");
log.debug("[profile] Not found in DC. Looking in " + System.getProperty("java.io.tmpdir") + " folder");
String tmpdir = ensureTmpFolder().toString();
log.debug("[profile] Not found in DC. Looking in " + tmpdir + " folder");
path = locateFileInFolder(tmpdir, fileName);
}

Expand Down Expand Up @@ -166,4 +177,15 @@ public static Path locateFileInDC(Configuration conf, String filePostFix) throws
return locatedFile;
}

/**
* The methods of this class creates some temporary files
* for serializing instances. This method removes them.
*/
public static void cleanupTemporaryInstanceCache() {
File cacheFolder = ensureTmpFolder();
File[] files = cacheFolder.listFiles();
for (File f: files) {
f.delete();
}
}
}
13 changes: 10 additions & 3 deletions core/src/test/java/com/datasalt/pangool/BaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,17 @@ protected static void fillTuple(boolean isRandom,ITuple tuple, int minIndex, int
} else if(fieldType == Float.class) {
tuple.set(i, isRandom ? random.nextFloat() : 0f);
} else if(fieldType == String.class) {
if(!isRandom || random.nextBoolean()) {
tuple.set(i, "");
if (isRandom) {
switch (random.nextInt(2)) {
case 0:
tuple.set(i, "");
break;
case 1:
tuple.set(i, random.nextLong() + "");
break;
}
} else {
tuple.set(i, random.nextLong() + "");
tuple.set(i, "");
}
} else if(fieldType.isEnum()) {
Method method = fieldType.getMethod("values", (Class[])null);
Expand Down

0 comments on commit d18d08f

Please sign in to comment.