diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8cef181 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +*.iml +*.ipr +*.iws +target/ +/var +pom.xml.versionsBackup +test-output/ +/atlassian-ide-plugin.xml +.idea +.DS_Store +.classpath +.settings +.project +temp-testng-customsuite.xml +test-output +.externalToolBuilders +*~ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..284b36a --- /dev/null +++ b/pom.xml @@ -0,0 +1,95 @@ + + 4.0.0 + + org.acz.hive + hive-serde + 1.0-SNAPSHOT + jar + + hive-serde + https://github.com/electrum/hive-serde + + + UTF-8 + + + + + cloudera-releases + Cloudera Releases Repository + https://repository.cloudera.com/content/repositories/releases/ + + + + + + joda-time + joda-time + 1.6.2 + + + + org.codehaus.jackson + jackson-core-asl + 1.8.5 + + + + org.codehaus.jackson + jackson-mapper-asl + 1.8.5 + + + + org.apache.hadoop + hadoop-core + 0.20.2-cdh3u1 + provided + + + + org.apache.hadoop.hive + hive-serde + 0.7.1-cdh3u1 + provided + + + + org.testng + testng + 6.2 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.6 + 1.6 + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + + package + + single + + + + + + + diff --git a/src/main/java/org/acz/hive/serde/CaseInsensitiveMap.java b/src/main/java/org/acz/hive/serde/CaseInsensitiveMap.java new file mode 100644 index 0000000..62d0787 --- /dev/null +++ b/src/main/java/org/acz/hive/serde/CaseInsensitiveMap.java @@ -0,0 +1,91 @@ +package org.acz.hive.serde; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class CaseInsensitiveMap + implements Map +{ + Map map = new HashMap(); + + @Override + public int size() + { + return map.size(); + } + + @Override + public boolean isEmpty() + { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) + { + return map.containsKey(convertKey(key)); + } + + @Override + public boolean containsValue(Object value) + { + return map.containsValue(value); + } + + @Override + public V get(Object key) + { + return map.get(convertKey(key)); + } + + @Override + public V remove(Object key) + { + return map.remove(convertKey(key)); + } + + @Override + public void clear() + { + map.clear(); + } + + @Override + public Set keySet() + { + throw new UnsupportedOperationException("keySet not supported"); + } + + @Override + public Collection values() + { + return map.values(); + } + + @Override + public Set> entrySet() + { + throw new UnsupportedOperationException("entrySet not supported"); + } + + @Override + public void putAll(Map m) + { + for (Entry entry : m.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public V put(String key, V value) + { + return map.put(convertKey(key), value); + } + + private static String convertKey(Object key) + { + return ((String) key).toLowerCase(); + } +} diff --git a/src/main/java/org/acz/hive/serde/JsonEventSerde.java b/src/main/java/org/acz/hive/serde/JsonEventSerde.java new file mode 100644 index 0000000..04f4042 --- /dev/null +++ b/src/main/java/org/acz/hive/serde/JsonEventSerde.java @@ -0,0 +1,217 @@ +package org.acz.hive.serde; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.Writable; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.module.SimpleModule; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getStructTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString; + +public class JsonEventSerde + implements SerDe +{ + public static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime().withZone(DateTimeZone.UTC); + public static final DateTimeFormatter HIVE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd' 'HH:mm:ss").withZone(DateTimeZone.UTC); + + private final ObjectMapper objectMapper; + private List columnNames; + private List columnTypes; + private ObjectInspector rowObjectInspector; + private Map columnNameMap; + private Integer timestampColumn; + + public JsonEventSerde() + { + SimpleModule module = new SimpleModule("module", Version.unknownVersion()); + module.addAbstractTypeMapping(Map.class, CaseInsensitiveMap.class); + objectMapper = new ObjectMapper(); + objectMapper.registerModule(module); + } + + @Override + public void initialize(Configuration configuration, Properties table) + throws SerDeException + { + String columnNamesProperty = table.getProperty(Constants.LIST_COLUMNS); + if ((columnNamesProperty == null) || columnNamesProperty.isEmpty()) { + throw new SerDeException("table has no columns"); + } + String columnTypesProperty = table.getProperty(Constants.LIST_COLUMN_TYPES); + if ((columnTypesProperty == null) || columnTypesProperty.isEmpty()) { + throw new SerDeException("table has no column types"); + } + + columnNames = asList(columnNamesProperty.toLowerCase().split(",")); + columnTypes = getTypeInfosFromTypeString(columnTypesProperty); + if (columnNames.size() != columnTypes.size()) { + throw new SerDeException(format("columns size (%s) does not match column types size (%s)", columnNames.size(), columnTypes.size())); + } + + rowObjectInspector = getStandardJavaObjectInspectorFromTypeInfo(getStructTypeInfo(columnNames, columnTypes)); + + columnNameMap = mapColumns(columnNames); + timestampColumn = columnNameMap.get("ts"); + } + + private static Map mapColumns(List columnNames) + { + Map map = new HashMap(); + for (int i = 0; i < columnNames.size(); i++) { + map.put(columnNames.get(i), i); + } + return map; + } + + @Override + public Class getSerializedClass() + { + throw new UnsupportedOperationException("serialization not supported"); + } + + @Override + public Writable serialize(Object o, ObjectInspector objectInspector) + throws SerDeException + { + throw new UnsupportedOperationException("serialization not supported"); + } + + @Override + public Object deserialize(Writable writable) + throws SerDeException + { + if (!(writable instanceof BinaryComparable)) { + throw new SerDeException("expected BinaryComparable: " + writable.getClass().getName()); + } + BinaryComparable binary = (BinaryComparable) writable; + + try { + JsonParser jsonParser = objectMapper.getJsonFactory().createJsonParser(binary.getBytes(), 0, binary.getLength()); + return buildStruct(objectMapper.readTree(jsonParser)); + } + catch (IOException e) { + throw new SerDeException("error parsing JSON", e); + } + } + + @Override + public ObjectInspector getObjectInspector() + throws SerDeException + { + return rowObjectInspector; + } + + private Object[] buildStruct(JsonNode tree) + throws IOException, SerDeException + { + Object[] struct = new Object[columnNames.size()]; + + if (!tree.has("data")) { + throw new IOException("data field is missing"); + } + JsonNode dataNode = tree.get("data"); + if (!dataNode.isObject()) { + throw new IOException("data field is not an object"); + } + + Iterator> fields = dataNode.getFields(); + while (fields.hasNext()) { + Map.Entry entry = fields.next(); + String key = entry.getKey().toLowerCase(); + JsonNode node = entry.getValue(); + + Integer columnIndex = columnNameMap.get(key); + if (columnIndex != null) { + struct[columnIndex] = getNodeValue(node, key, columnTypes.get(columnIndex)); + } + } + + if (timestampColumn != null) { + struct[timestampColumn] = HIVE_FORMATTER.print(parseTimestamp(tree)); + } + + return struct; + } + + private Object getNodeValue(JsonNode node, String columnName, TypeInfo typeInfo) + throws IOException, SerDeException + { + switch (typeInfo.getCategory()) { + case LIST: + return objectMapper.readValue(node, List.class); + case MAP: + return objectMapper.readValue(node, Map.class); + case PRIMITIVE: + PrimitiveTypeInfo ptypeInfo = (PrimitiveTypeInfo) typeInfo; + switch (ptypeInfo.getPrimitiveCategory()) { + case VOID: + throw new SerDeException("cannot deserialize to VOID type for column " + columnName); + case UNKNOWN: + throw new SerDeException("cannot deserialize to UNKNOWN type for column " + columnName); + case BOOLEAN: + return node.getBooleanValue(); + case BYTE: + return (byte) node.getIntValue(); + case SHORT: + return (short) node.getIntValue(); + case INT: + return node.getIntValue(); + case LONG: + return node.getLongValue(); + case FLOAT: + return (float) node.getDoubleValue(); + case DOUBLE: + return node.getDoubleValue(); + case STRING: + return node.getTextValue(); + default: + throw new SerDeException("unhandled primitive type: " + ptypeInfo.getPrimitiveCategory()); + } + default: + throw new SerDeException(format("unexpected type category (%s) for column: %s", typeInfo.getCategory(), columnName)); + } + } + + private static long parseTimestamp(JsonNode tree) + throws IOException + { + if (!tree.has("timestamp")) { + throw new IOException("timestamp field is missing"); + } + JsonNode node = tree.get("timestamp"); + if (!node.isTextual()) { + throw new IOException("timestamp field is not text"); + } + String timestamp = node.getTextValue(); + try { + return ISO_FORMATTER.parseMillis(timestamp); + } + catch (Exception e) { + throw new IOException("invalid timestamp: " + timestamp); + } + } +} diff --git a/src/test/java/org/acz/hive/serde/TestCaseInsensitiveMap.java b/src/test/java/org/acz/hive/serde/TestCaseInsensitiveMap.java new file mode 100644 index 0000000..84eae21 --- /dev/null +++ b/src/test/java/org/acz/hive/serde/TestCaseInsensitiveMap.java @@ -0,0 +1,70 @@ +package org.acz.hive.serde; + +import org.codehaus.jackson.Version; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.module.SimpleModule; +import org.testng.annotations.Test; + +import java.util.Map; + +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class TestCaseInsensitiveMap +{ + @Test + public void testMap() + { + Map map = new CaseInsensitiveMap(); + map.put("HELLO", "world"); + + assertEquals(map.get("hello"), "world"); + assertEquals(map.get("HELLO"), "world"); + assertEquals(map.get("Hello"), "world"); + + assertTrue(map.containsKey("hello")); + assertTrue(map.containsKey("HELLO")); + assertTrue(map.containsKey("Hello")); + + assertNull(map.get("foo")); + assertFalse(map.containsKey("foo")); + } + + @Test + public void testObjectMapper() + throws Exception + { + SimpleModule module = new SimpleModule("module", Version.unknownVersion()); + module.addAbstractTypeMapping(Map.class, CaseInsensitiveMap.class); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(module); + + Map map = objectMapper.readValue("{\"HELLO\":\"world\"}", Map.class); + + assertTrue(map instanceof CaseInsensitiveMap); + + assertInstanceOf(map, CaseInsensitiveMap.class); + + assertEquals(map.get("hello"), "world"); + assertEquals(map.get("HELLO"), "world"); + assertEquals(map.get("Hello"), "world"); + + assertTrue(map.containsKey("hello")); + assertTrue(map.containsKey("HELLO")); + assertTrue(map.containsKey("Hello")); + + assertNull(map.get("foo")); + assertFalse(map.containsKey("foo")); + } + + private static void assertInstanceOf(Object actual, Class expectedType) + { + if (!expectedType.isInstance(actual)) { + fail(format("expected %s to be an instance of %s", actual.getClass().getName(), expectedType.getName())); + } + } +}