Permalink
Browse files

Add list parsing to UTF-8 EventFormats.

  • Loading branch information...
1 parent 711005e commit 1dd5df959b9cb8cdc69da542de672939edaa00e6 @kimballa kimballa committed May 18, 2011
View
@@ -9,7 +9,7 @@ most important new features:
-- dependencies: LIST<T>, MAP<KT,VT>
-- multi-threading, distribution, scalability
- -- documentation debt: lists and list functions.
+ -- documentation debt: lists and list functions. (list.delim param for eventfmt)
Types:
@@ -24,9 +24,6 @@ Types:
foldl(fn<x, t> f, x initial, list<t> lst) -> folds f over elements of lst, using
initial as the first accumulator value.
- -- All list constructions need to use Collections.immutableList() to protect them.
- -- Need to read delimited lists from UTF-8 formats.
-
-- BINARY type should be able to specify encoding when converting to/from STRING.
Aggregation:
View
@@ -235,6 +235,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>5.14.10</version>
@@ -58,7 +58,7 @@ public Object eval(EventWrapper event, Object... args) {
out.addAll(in);
}
- return out;
+ return Collections.unmodifiableList(out);
}
@Override
@@ -54,7 +54,7 @@ public Object eval(EventWrapper event, Object... args) {
for (Object arg : args) {
out.add(arg);
}
- return out;
+ return Collections.unmodifiableList(out);
}
@Override
@@ -17,25 +17,18 @@
package com.odiago.flumebase.io;
-import java.io.UnsupportedEncodingException;
-
-import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.avro.util.Utf8;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.flume.core.Event;
-import com.odiago.flumebase.lang.PreciseType;
-import com.odiago.flumebase.lang.Timestamp;
import com.odiago.flumebase.lang.Type;
import com.odiago.flumebase.util.Ref;
@@ -59,6 +52,10 @@
public static final String NULL_STR_PARAM = "null.sequence";
public static final String DEFAULT_NULL_STR = "\\N";
+ /** key in the stream properties that specifies a split token for LIST<T> items. */
+ public static final String LIST_SEPARATOR_PARAM = "list.delim";
+ public static final String DEFAULT_LIST_SEPARATOR = "|";
+
/** CharBuffers wrapping the text of each column. */
private ArrayList<CharBuffer> mColTexts;
@@ -73,8 +70,12 @@
/** An escape sequence that specifies that the current field is a null string. */
private String mNullStr;
+ /** Delimiter for list items. */
+ private String mListSep;
+
protected CachingTextEventParser() {
mNullStr = DEFAULT_NULL_STR;
+ mListSep = DEFAULT_LIST_SEPARATOR;
init();
}
@@ -85,6 +86,11 @@ protected CachingTextEventParser(Map<String, String> params) {
mNullStr = DEFAULT_NULL_STR;
}
+ mListSep = params.get(LIST_SEPARATOR_PARAM);
+ if (null == mListSep) {
+ mListSep = DEFAULT_LIST_SEPARATOR;
+ }
+
init();
}
@@ -114,7 +120,6 @@ public void reset(Event e) {
*/
protected Object parseAndCache(CharBuffer chars, int colIdx, Type expectedType)
throws ColumnParseException {
- Type.TypeName primitiveTypeName = expectedType.getPrimitiveTypeName();
String debugInputString = null;
if (LOG.isDebugEnabled()) {
@@ -123,61 +128,7 @@ protected Object parseAndCache(CharBuffer chars, int colIdx, Type expectedType)
debugInputString = chars.toString();
}
- // TODO(aaron): Test how this handles a field that is an empty string.
- Object out = null;
- switch (primitiveTypeName) {
- case BINARY:
- try {
- out = ByteBuffer.wrap(chars.toString().getBytes("UTF-8"));
- } catch (UnsupportedEncodingException uee) {
- // Shouldn't ever be able to get here.
- // (http://download.oracle.com/javase/6/docs/api/java/nio/charset/Charset.html)
- LOG.error("Your JVM doesn't support UTF-8. This is really, really bad.");
- throw new ColumnParseException(uee);
- }
- break;
- case BOOLEAN:
- out = CharBufferUtils.parseBool(chars);
- break;
- case INT:
- out = CharBufferUtils.parseInt(chars);
- break;
- case BIGINT:
- out = CharBufferUtils.parseLong(chars);
- break;
- case FLOAT:
- out = CharBufferUtils.parseFloat(chars);
- break;
- case DOUBLE:
- out = CharBufferUtils.parseDouble(chars);
- break;
- case STRING:
- String asStr = chars.toString();
- if (expectedType.isNullable() && asStr.equals(mNullStr)) {
- out = null;
- } else {
- out = new Utf8(asStr);
- }
- break;
- case TIMESTAMP:
- out = CharBufferUtils.parseLong(chars);
- if (null != out) {
- out = new Timestamp((Long) out);
- }
- break;
- case TIMESPAN:
- // TODO: This should return a TimeSpan object, which is actually two
- // fields. We need to work on this... it should not just be a 'long'
- // representation.
- out = CharBufferUtils.parseLong(chars);
- break;
- case PRECISE:
- PreciseType preciseType = PreciseType.toPreciseType(expectedType);
- out = preciseType.parseStringInput(chars.toString());
- break;
- default:
- throw new ColumnParseException("Cannot parse recursive types");
- }
+ Object out = CharBufferUtils.parseType(chars, expectedType, mNullStr, mListSep);
while(mColumnValues.size() < colIdx) {
// Add nulls to the list to increase the memoized size up to this column.
@@ -192,10 +143,10 @@ protected Object parseAndCache(CharBuffer chars, int colIdx, Type expectedType)
mColumnValues.add(out);
mColumnNulls.add(Boolean.valueOf(out == null));
- //if (LOG.isDebugEnabled()) {
- // LOG.debug("Parsed string [" + debugInputString + "] with expected type ["
- // + expectedType + "] for column idx=" + colIdx + "; result is [" + out + "]");
- //}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parsed string [" + debugInputString + "] with expected type ["
+ + expectedType + "] for column idx=" + colIdx + "; result is [" + out + "]");
+ }
return out;
}
@@ -17,11 +17,27 @@
package com.odiago.flumebase.io;
+import java.io.UnsupportedEncodingException;
+
+import java.nio.ByteBuffer;
import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.avro.util.Utf8;
+
+import org.apache.commons.lang.text.StrTokenizer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.odiago.flumebase.lang.ListType;
+import com.odiago.flumebase.lang.PreciseType;
+import com.odiago.flumebase.lang.Timestamp;
+import com.odiago.flumebase.lang.Type;
+
/**
* Utility methods for parsing string-based values without
* requiring that they be incorporated into a String object.
@@ -151,4 +167,93 @@ public static String parseString(CharBuffer chars) throws ColumnParseException {
return chars.toString();
}
+ /**
+ * Parses a CharSequence into a list of values, all of some other type.
+ */
+ public static List<Object> parseList(CharBuffer chars, Type listItemType,
+ String nullStr, String listDelim) throws ColumnParseException {
+ StrTokenizer tokenizer = new StrTokenizer(chars.toString(), listDelim.charAt(0));
+ List<Object> out = new ArrayList<Object>();
+
+ while (tokenizer.hasNext()) {
+ String part = (String) tokenizer.next();
+ out.add(parseType(CharBuffer.wrap(part), listItemType, nullStr, listDelim));
+ }
+
+ return Collections.unmodifiableList(out);
+ }
+
+ /**
+ * Parses a CharSequence into a value of a given expected type.
+ * @param chars the unparsed characters representing the value
+ * @param expectedType the expected type of the final value
+ * @param nullStr a token indicating a null String instance.
+ */
+ public static Object parseType(CharBuffer chars, Type expectedType,
+ String nullStr, String listDelim) throws ColumnParseException {
+ Type.TypeName primitiveTypeName = expectedType.getPrimitiveTypeName();
+
+ // TODO(aaron): Test how this handles a field that is an empty string.
+ Object out = null;
+ switch (primitiveTypeName) {
+ case BINARY:
+ try {
+ out = ByteBuffer.wrap(chars.toString().getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException uee) {
+ // Shouldn't ever be able to get here.
+ // (http://download.oracle.com/javase/6/docs/api/java/nio/charset/Charset.html)
+ LOG.error("Your JVM doesn't support UTF-8. This is really, really bad.");
+ throw new ColumnParseException(uee);
+ }
+ break;
+ case BOOLEAN:
+ out = CharBufferUtils.parseBool(chars);
+ break;
+ case INT:
+ out = CharBufferUtils.parseInt(chars);
+ break;
+ case BIGINT:
+ out = CharBufferUtils.parseLong(chars);
+ break;
+ case FLOAT:
+ out = CharBufferUtils.parseFloat(chars);
+ break;
+ case DOUBLE:
+ out = CharBufferUtils.parseDouble(chars);
+ break;
+ case STRING:
+ String asStr = chars.toString();
+ if (expectedType.isNullable() && asStr.equals(nullStr)) {
+ out = null;
+ } else {
+ out = new Utf8(asStr);
+ }
+ break;
+ case TIMESTAMP:
+ out = CharBufferUtils.parseLong(chars);
+ if (null != out) {
+ out = new Timestamp((Long) out);
+ }
+ break;
+ case TIMESPAN:
+ // TODO: This should return a TimeSpan object, which is actually two
+ // fields. We need to work on this... it should not just be a 'long'
+ // representation.
+ out = CharBufferUtils.parseLong(chars);
+ break;
+ case PRECISE:
+ PreciseType preciseType = PreciseType.toPreciseType(expectedType);
+ out = preciseType.parseStringInput(chars.toString());
+ break;
+ case LIST:
+ out = parseList(chars, ListType.toListType(expectedType).getElementType(),
+ nullStr, listDelim);
+ break;
+ default:
+ throw new ColumnParseException("Cannot parse recursive types");
+ }
+
+ return out;
+ }
+
}
@@ -123,6 +123,21 @@ public Object coerceValue(Type valType, Object val) {
return out;
}
}
+
+ /**
+ * Given a ListType or a wrapper around the ListType, return the
+ * inner ListType object, or null if this cannot be performed.
+ */
+ public static ListType toListType(Type t) {
+ if (t instanceof ListType) {
+ return (ListType) t;
+ } else if (t instanceof NullableType) {
+ NullableType nul = t.asNullable();
+ return toListType(nul.getInnerType());
+ } else {
+ return null; // Didn't match.
+ }
+ }
}
@@ -98,10 +98,7 @@ public boolean isPrimitive() {
* @return the TypeName of the non-null type being wrapped.
*/
public TypeName getPrimitiveTypeName() {
- if (!mType.isPrimitive()) {
- return null;
- }
- return mType.getTypeName();
+ return mType.getPrimitiveTypeName();
}
@Override
@@ -203,11 +203,9 @@ public TypeName getTypeName() {
/**
* If this is a primitive type, return the TypeName it represents.
+ * Otherwise, return the TypeName representing the type class/constructor.
*/
public TypeName getPrimitiveTypeName() {
- if (!isPrimitive()) {
- return null;
- }
return mTypeName;
}
@@ -252,7 +250,7 @@ public boolean isNumeric() {
}
/** @return true if this is a concrete type that can actually be instantiated
- * with values (i.e., not a typeclass). */
+ * with values or selected (i.e., not a typeclass). */
public boolean isConcrete() {
return isScalar();
}
@@ -582,10 +582,10 @@ private void resolveIdentifier(SymbolTable fieldsSymTab, String fieldName,
}
Type fieldType = fieldSym.getType();
- if (!fieldType.isPrimitive()) {
- // This name refers to a stream or other complex object. We can't
+ if (!fieldType.isConcrete()) {
+ // This name refers to a stream or other ephemeral type. We can't
// select that.
- throw new TypeCheckException("Cannot select non-primitive entity \""
+ throw new TypeCheckException("Cannot select non-concrete entity \""
+ fieldName + "\"");
}
Oops, something went wrong.

0 comments on commit 1dd5df9

Please sign in to comment.