Skip to content

Commit

Permalink
HIVE-3012 hive custom scripts do not work well if the data contains n…
Browse files Browse the repository at this point in the history
…ew lines (njain via kevinwilfong)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1336986 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Kevin Wilfong committed May 11, 2012
1 parent 3c1893a commit 88582ee
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 2 deletions.
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ public static enum ConfVars {
"org.apache.hadoop.hive.ql.exec.TextRecordReader"),
HIVESCRIPTRECORDWRITER("hive.script.recordwriter",
"org.apache.hadoop.hive.ql.exec.TextRecordWriter"),
HIVESCRIPTESCAPENEWLINES("hive.script.escape.newlines", false),

// HWI
HIVEHWILISTENHOST("hive.hwi.listen.host", "0.0.0.0"),
Expand Down
11 changes: 11 additions & 0 deletions conf/hive-default.xml.template
Original file line number Diff line number Diff line change
Expand Up @@ -1272,4 +1272,15 @@
</description>
</property>

<property>
<name>hive.script.escape.newlines</name>
<value>false</value>
<description>
This adds an option to escape the newlines when they are passed to the
user script. This is useful is the hive tables can contain data that
can contain newlines.
</description>
</property>

</configuration>

22 changes: 22 additions & 0 deletions data/scripts/newline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys

for line in sys.stdin:
print "1\\n2"
12 changes: 11 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;

/**
* TextRecordReader.
*
Expand All @@ -36,11 +39,13 @@ public class TextRecordReader implements RecordReader {
private LineReader lineReader;
private InputStream in;
private Text row;
private Configuration conf;

public void initialize(InputStream in, Configuration conf, Properties tbl)
throws IOException {
lineReader = new LineReader(in, conf);
this.in = in;
this.conf = conf;
}

public Writable createRow() throws IOException {
Expand All @@ -53,7 +58,12 @@ public int next(Writable row) throws IOException {
return -1;
}

return lineReader.readLine((Text) row);
int bytesConsumed = lineReader.readLine((Text) row);

if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPENEWLINES)) {
return HiveUtils.unescapeNewLine((Text) row);
}
return bytesConsumed;
}

public void close() throws IOException {
Expand Down
12 changes: 11 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.conf.HiveConf;

/**
* TextRecordWriter.
Expand All @@ -32,15 +34,23 @@
public class TextRecordWriter implements RecordWriter {

private OutputStream out;
private Configuration conf;

public void initialize(OutputStream out, Configuration conf)
throws IOException {
this.out = out;
this.conf = conf;
}

public void write(Writable row) throws IOException {
Text text = (Text) row;
out.write(text.getBytes(), 0, text.getLength());
Text escapeText = text;

if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESCRIPTESCAPENEWLINES)) {
escapeText = HiveUtils.escapeNewLine(text);
}

out.write(escapeText.getBytes(), 0, escapeText.getLength());
out.write(Utilities.newLineCode);
}

Expand Down
66 changes: 66 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;

/**
Expand Down Expand Up @@ -96,6 +97,71 @@ public static String escapeString(String str) {
return (escape.toString());
}

static final byte[] newLineEscapeBytes = "\\n".getBytes();;
static final byte[] newLineUnescapeBytes = "\n".getBytes();

public static Text escapeNewLine(Text text) {
int length = text.getLength();
byte[] textBytes = text.getBytes();

Text escape = new Text(text);
escape.clear();

for (int i = 0; i < length; ++i) {
int c = text.charAt(i);
switch (c) {
case '\n':
byte[] escaped = newLineEscapeBytes;
escape.append(escaped, 0, escaped.length);
break;
default:
escape.append(textBytes, i, 1);
break;
}
}
return escape;
}

public static int unescapeNewLine(Text text) {
Text escape = new Text(text);
text.clear();

int length = escape.getLength();
byte[] textBytes = escape.getBytes();

boolean hadSlash = false;
for (int i = 0; i < length; ++i) {
int c = escape.charAt(i);
switch (c) {
case '\\':
if (hadSlash) {
text.append(textBytes, i, 1);
}
hadSlash = true;
break;
case 'n':
if (hadSlash) {
byte[] newLine = newLineUnescapeBytes;
text.append(newLine, 0, newLine.length);
}
else {
text.append(textBytes, i, 1);
}
hadSlash = false;
break;
default:
if (hadSlash) {
text.append(textBytes, i-1, 1);
hadSlash = false;
}

text.append(textBytes, i, 1);
break;
}
}
return text.getLength();
}

public static String lightEscapeString(String str) {
int length = str.length();
StringBuilder escape = new StringBuilder(length + 16);
Expand Down
9 changes: 9 additions & 0 deletions ql/src/test/queries/clientpositive/newline.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
add file ../data/scripts/newline.py;
set hive.script.escape.newlines=true;

create table tmp_tmp(key string, value string) stored as rcfile;
insert overwrite table tmp_tmp
SELECT TRANSFORM(key, value) USING
'python newline.py' AS key, value FROM src limit 5;

select * from tmp_tmp;
39 changes: 39 additions & 0 deletions ql/src/test/results/clientpositive/newline.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
PREHOOK: query: create table tmp_tmp(key string, value string) stored as rcfile
PREHOOK: type: CREATETABLE
POSTHOOK: query: create table tmp_tmp(key string, value string) stored as rcfile
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: default@tmp_tmp
PREHOOK: query: insert overwrite table tmp_tmp
SELECT TRANSFORM(key, value) USING
'python newline.py' AS key, value FROM src limit 5
PREHOOK: type: QUERY
PREHOOK: Input: default@src
PREHOOK: Output: default@tmp_tmp
POSTHOOK: query: insert overwrite table tmp_tmp
SELECT TRANSFORM(key, value) USING
'python newline.py' AS key, value FROM src limit 5
POSTHOOK: type: QUERY
POSTHOOK: Input: default@src
POSTHOOK: Output: default@tmp_tmp
POSTHOOK: Lineage: tmp_tmp.key SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
POSTHOOK: Lineage: tmp_tmp.value SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
PREHOOK: query: select * from tmp_tmp
PREHOOK: type: QUERY
PREHOOK: Input: default@tmp_tmp
#### A masked pattern was here ####
POSTHOOK: query: select * from tmp_tmp
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tmp_tmp
#### A masked pattern was here ####
POSTHOOK: Lineage: tmp_tmp.key SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
POSTHOOK: Lineage: tmp_tmp.value SCRIPT [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ]
1
2 NULL
1
2 NULL
1
2 NULL
1
2 NULL
1
2 NULL

0 comments on commit 88582ee

Please sign in to comment.