Skip to content
Permalink
Browse files

IMPALA-1658: Add compatibility flag for Hive-Parquet-Timestamps

No changes to writing were made. No changes to reading Impala written
files were made.

Hive writes TIMESTAMP values to parquet files differently than Impala
does. Hive converts the value from local time to UTC before writing;
Impala does not. This change adds a startup flag that will convert UTC
to local when reading files written by Hive.

The Hive-file detection actually checks for "parquet-mr" (which is the
library Hive uses) in the file metadata. A slight possibility exists
that TIMESTAMP values written by something other than Hive but also
using parquet-mr may become incorrect. The possibility should be very
small because TIMESTAMP values are stored and encoded in a non-standard
way other applications are unlikely to be aware of.

Flags from be/src/exec/hdfs-parquet-scanner.cc:
  -convert_legacy_hive_parquet_utc_timestamps (When true, TIMESTAMPs
    read from files written by Parquet-MR (used by Hive) will be
    converted from UTC to local time. Writes are unaffected.) type: bool
    default: false

Change-Id: I79a499fe24049b7025ee2dd76c9c3e07010d346a
Reviewed-on: http://gerrit.cloudera.org:8080/35
Reviewed-by: Casey Ching <casey@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information...
casey Internal Jenkins
casey authored and Internal Jenkins committed Jan 22, 2015
1 parent e710591 commit 256ef1c1481a24c0d76e3e48ee1d3410f6dc760a
@@ -14,9 +14,11 @@

#include "exec/hdfs-parquet-scanner.h"

#include <limits> // for std::numeric_limits

#include <boost/algorithm/string.hpp>
#include <gflags/gflags.h>
#include <gutil/strings/substitute.h>
#include <limits> // for std::numeric_limits

#include "common/object-pool.h"
#include "exec/hdfs-scan-node.h"
@@ -46,6 +48,11 @@ using namespace boost::algorithm;
using namespace impala;
using namespace strings;

// Provide a workaround for IMPALA-1658.
DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
"be converted from UTC to local time. Writes are unaffected.");

// Max data page header size in bytes. This is an estimate and only needs to be an upper
// bound. It is theoretically possible to have a page header of any size due to string
// value statistics, but in practice we'll have trouble reading string values this large.
@@ -276,7 +283,12 @@ class HdfsParquetScanner::ColumnReader : public HdfsParquetScanner::BaseColumnRe
} else {
fixed_len_size_ = -1;
}
needs_conversion_ = desc_->type().type == TYPE_CHAR;
needs_conversion_ = desc_->type().type == TYPE_CHAR ||
// TODO: Add logic to detect file versions that have unconverted TIMESTAMP
// values. Currently all versions have converted values.
(FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
desc_->type().type == TYPE_TIMESTAMP &&
parent->file_version_.application == "parquet-mr");
}

protected:
@@ -332,7 +344,7 @@ class HdfsParquetScanner::ColumnReader : public HdfsParquetScanner::BaseColumnRe
}

// Converts and writes src into dst based on desc_->type()
void ConvertSlot(const T* src, void* dst, MemPool* pool) {
void ConvertSlot(const T* src, T* dst, MemPool* pool) {
DCHECK(false);
}

@@ -357,7 +369,7 @@ void HdfsParquetScanner::ColumnReader<StringValue>::CopySlot(

template<>
void HdfsParquetScanner::ColumnReader<StringValue>::ConvertSlot(
const StringValue* src, void* dst, MemPool* pool) {
const StringValue* src, StringValue* dst, MemPool* pool) {
DCHECK(desc_->type().type == TYPE_CHAR);
int len = desc_->type().len;
StringValue sv;
@@ -371,9 +383,17 @@ void HdfsParquetScanner::ColumnReader<StringValue>::ConvertSlot(
memcpy(sv.ptr, src->ptr, unpadded_len);
StringValue::PadWithSpaces(sv.ptr, len, unpadded_len);

if (desc_->type().IsVarLen()) *reinterpret_cast<StringValue*>(dst) = sv;
if (desc_->type().IsVarLen()) *dst = sv;
}

template<>
void HdfsParquetScanner::ColumnReader<TimestampValue>::ConvertSlot(
const TimestampValue* src, TimestampValue* dst, MemPool* pool) {
// Conversion should only happen when this flag is enabled.
DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
*dst = *src;
if (dst->HasDateAndTime()) dst->UtcToLocal();
}

class HdfsParquetScanner::BoolColumnReader : public HdfsParquetScanner::BaseColumnReader {
public:
@@ -37,6 +37,27 @@ int TimestampValue::Format(const DateTimeFormatContext& dt_ctx, int len, char* b
return TimestampParser::Format(dt_ctx, date_, time_, len, buff);
}

void TimestampValue::UtcToLocal() {

This comment has been minimized.

Copy link
@Tagar

Tagar Apr 7, 2016

This conversion code is very slow... It makes reading such tables 30x slower.
https://issues.cloudera.org/browse/IMPALA-3316

DCHECK(HasDateAndTime());
try {
tm temp_tm = to_tm(ptime(date_, time_)); // will throw if date/time is invalid
time_t utc = timegm(&temp_tm);
if (UNLIKELY(NULL == localtime_r(&utc, &temp_tm))) {
*this = ptime(not_a_date_time);
return;
}
// Unlikely but a time zone conversion may push the value over the min/max
// boundary resulting in an exception.
ptime local = ptime_from_tm(temp_tm);
// Neither time_t nor struct tm allow fractional seconds so they have to be handled
// separately.
local += nanoseconds(time_.fractional_seconds());
*this = local;
} catch (std::exception& from_boost) {
*this = ptime(not_a_date_time);
}
}

ostream& operator<<(ostream& os, const TimestampValue& timestamp_value) {
return os << timestamp_value.DebugString();
}
@@ -126,6 +126,7 @@ class TimestampValue {
bool HasDate() const { return !date_.is_special(); }
bool HasTime() const { return !time_.is_special(); }
bool HasDateOrTime() const { return HasDate() || HasTime(); }
bool HasDateAndTime() const { return HasDate() && HasTime(); }

std::string DebugString() const {
std::stringstream ss;
@@ -166,6 +167,10 @@ class TimestampValue {
return temp;
}

// Converts from UTC to local time in-place. The caller must ensure the TimestampValue
// this function is called upon has both a valid date and time.
void UtcToLocal();

void set_date(const boost::gregorian::date d) { date_ = d; }
void set_time(const boost::posix_time::time_duration t) { time_ = t; }
const boost::gregorian::date& date() const { return date_; }
@@ -18,3 +18,7 @@ Populated with:
hive> set parquet.block.size=500;
hive> INSERT INTO TABLE tbl
SELECT l_comment FROM tpch.lineitem LIMIT 1000;

alltypesagg_hive_13_1.parquet:
Generated with parquet-mr version 1.5.0-cdh5.4.0-SNAPSHOT
hive> create table alltypesagg_hive_13_1 stored as parquet as select * from alltypesagg;
Binary file not shown.
@@ -1277,6 +1277,31 @@ bad_parquet
field STRING
====
---- DATASET
-- IMPALA-1658: Timestamps written by Hive are local-to-UTC adjusted.
functional
---- BASE_TABLE_NAME
alltypesagg_hive_13_1
---- COLUMNS
id int
bool_col boolean
tinyint_col tinyint
smallint_col smallint
int_col int
bigint_col bigint
float_col float
double_col double
date_string_col string
string_col string
timestamp_col timestamp
year int
month int
day int
---- LOAD
`hadoop fs -mkdir /test-warehouse/alltypesagg_hive_13_1_parquet && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/data/alltypesagg_hive_13_1.parquet \
/test-warehouse/alltypesagg_hive_13_1_parquet/
====
---- DATASET
-- Parquet file with invalid metadata size in the file footer.
functional
---- BASE_TABLE_NAME
@@ -39,6 +39,7 @@ table_name:bad_parquet, constraint:restrict_to, table_format:parquet/none/none
table_name:bad_metadata_len, constraint:restrict_to, table_format:parquet/none/none
table_name:bad_dict_page_offset, constraint:restrict_to, table_format:parquet/none/none
table_name:bad_compressed_size, constraint:restrict_to, table_format:parquet/none/none
table_name:alltypesagg_hive_13_1, constraint:restrict_to, table_format:parquet/none/none

# TODO: Support Avro. Data loading currently fails for Avro because complex types
# cannot be converted to the corresponding Avro types yet.
@@ -75,7 +76,7 @@ table_name:complex_view, constraint:restrict_to, table_format:seq/snap/block
table_name:view_view, constraint:restrict_to, table_format:text/none/none
table_name:view_view, constraint:restrict_to, table_format:seq/snap/block

# liketbl and tblwithraggedcolumns all have
# liketbl and tblwithraggedcolumns all have
# NULLs in primary key columns. hbase does not support
# writing NULLs to primary key columns.
table_name:liketbl, constraint:exclude, table_format:hbase/none/none
@@ -93,8 +94,8 @@ table_name:overflow, constraint:exclude, table_format:hbase/none/none
# seem to like this.
table_name:widerow, constraint:exclude, table_format:hbase/none/none

# nullformat_custom is used in null-insert tests, which user insert overwrite,
# which is not supported in hbase. The schema is also specified in HIVE_CREATE
# nullformat_custom is used in null-insert tests, which user insert overwrite,
# which is not supported in hbase. The schema is also specified in HIVE_CREATE
# with no corresponding LOAD statement.
table_name:nullformat_custom, constraint:exclude, table_format:hbase/none/none
table_name:unsupported_types, constraint:exclude, table_format:hbase/none/none
@@ -0,0 +1,106 @@
# Copyright (c) 2015 Cloudera, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Tests for IMPALA-1658

import pytest
import time

from tests.common.custom_cluster_test_suite import CustomClusterTestSuite

class TestHiveParquetTimestampConversion(CustomClusterTestSuite):
'''Hive writes timestamps in parquet files by first converting values from local time
to UTC. The conversion was not expected (other file formats don't convert) and a
startup flag was later added to adjust for this (IMPALA-1658). This file tests that
the conversion and flag behave as expected.
'''

@classmethod
def add_test_dimensions(cls):
super(CustomClusterTestSuite, cls).add_test_dimensions()
cls.TestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet' and
v.get_value('table_format').compression_codec == 'none')

def check_sanity(self, expect_converted_result):
data = self.execute_query_expect_success(self.client, """
SELECT COUNT(timestamp_col), COUNT(DISTINCT timestamp_col),
MIN(timestamp_col), MAX(timestamp_col)
FROM functional_parquet.alltypesagg_hive_13_1""")\
.get_data()
assert len(data) > 0
rows = data.split("\n")
assert len(rows) == 1
values = rows[0].split("\t")
assert len(values) == 4
assert values[0] == "11000"
assert values[1] == "10000"
if expect_converted_result:
# Doing easy time zone conversion in python seems to require a 3rd party lib,
# so the only check will be that the value changed in some way.
assert values[2] != "2010-01-01 00:00:00"
assert values[3] != "2010-01-10 18:02:05.100000000"
else:
assert values[2] == "2010-01-01 00:00:00"
assert values[3] == "2010-01-10 18:02:05.100000000"

@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true")
def test_conversion(self, vector):
tz_name = time.tzname[time.localtime().tm_isdst]
self.check_sanity(tz_name not in ("UTC", "GMT"))
# The value read from the Hive table should be the same as reading a UTC converted
# value from the Impala table.
tz_name = time.tzname[time.localtime().tm_isdst]
data = self.execute_query_expect_success(self.client, """
SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
FROM functional_parquet.alltypesagg_hive_13_1 h
JOIN functional_parquet.alltypesagg
i ON i.id = h.id AND i.day = h.day -- serves as a unique key
WHERE
(h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL)
OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL)
OR h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s')
""" % tz_name)\
.get_data()
assert len(data) == 0

@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=false")
def test_no_conversion(self, vector):
self.check_sanity(False)
# Without conversion all the values will be different.
tz_name = time.tzname[time.localtime().tm_isdst]
data = self.execute_query_expect_success(self.client, """
SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
FROM functional_parquet.alltypesagg_hive_13_1 h
JOIN functional_parquet.alltypesagg
i ON i.id = h.id AND i.day = h.day -- serves as a unique key
WHERE h.timestamp_col != FROM_UTC_TIMESTAMP(i.timestamp_col, '%s')
""" % tz_name)\
.get_data()
expected_row_count = 0 if tz_name in ("UTC", "GMT") else 10000
assert len(data.split('\n')) == expected_row_count
# A value should either stay null or stay not null.
data = self.execute_query_expect_success(self.client, """
SELECT h.id, h.day, h.timestamp_col, i.timestamp_col
FROM functional_parquet.alltypesagg_hive_13_1 h
JOIN functional_parquet.alltypesagg
i ON i.id = h.id AND i.day = h.day -- serves as a unique key
WHERE
(h.timestamp_col IS NULL AND i.timestamp_col IS NOT NULL)
OR (h.timestamp_col IS NOT NULL AND i.timestamp_col IS NULL)
""")\
.get_data()
assert len(data) == 0

0 comments on commit 256ef1c

Please sign in to comment.
You can’t perform that action at this time.