Skip to content

Commit

Permalink
IMPALA-12601: Add a fully partitioned TPC-DS database
Browse files Browse the repository at this point in the history
The current tpcds dataset only has store_sales table fully partitioned
and leaves the other facts table unpartitioned. This is intended for
faster data loading during tests. However, this is not an accurate
reflection of the larger scale TPC-DS dataset where all facts tables are
partitioned. Impala planner may change the details of the query plan if
a partition column exists.

This patch adds a new dataset tpcds_partitioned, loading a fully
partitioned TPC-DS db in parquet format named
tpcds_partitioned_parquet_snap. This dataset can not be loaded
independently and requires the base 'tpcds' db from the tpcds dataset to
be preloaded first. An example of how to load this dataset can be seen
at function load-tpcds-data in bin/create-load-data.sh.

This patch also changes PlannerTest#testProcessingCost from targeting
tpcds_parquet to tpcds_partitioned_parquet_snap. Other planner tests are
that currently target tpcds_parquet will be gradually changed to test
against tpcds_partitioned_parquet_snap in follow-up patches.

This addition adds a couple of seconds in the "Computing table stats"
step, but loading itself is negligible since it is parallelized with
TPC-H and functional-query. The total loading time for the three
datasets remains similar after this patch.

This patch also adds several improvements in the following files:

bin/load-data.py:
- Log elapsed time on serial steps.

testdata/bin/create-load-data.sh:
- Rename MSG to LOAD_MSG to avoid collision with the same variable name
  in ./testdata/bin/run-step.sh

testdata/bin/generate-schema-statements.py:
- Remove redundant FILE_FORMAT_MAP.
- Add build_partitioned_load to simplify expressing partitioned insert
  query in SQL template.

testdata/datasets/tpcds/tpcds_schema_template.sql:
- Reorder schema template to load all dimension tables before fact tables.

Testing:
- Pass core tests.

Change-Id: I3a2e66c405639554f325ae78c66628d464f6c453
Reviewed-on: http://gerrit.cloudera.org:8080/20756
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
  • Loading branch information
rizaon authored and Impala Public Jenkins committed Dec 16, 2023
1 parent 1141a6a commit 8661f92
Show file tree
Hide file tree
Showing 14 changed files with 2,857 additions and 1,987 deletions.
52 changes: 34 additions & 18 deletions bin/load-data.py
Expand Up @@ -34,7 +34,7 @@
import traceback

from optparse import OptionParser
from tests.beeswax.impala_beeswax import *
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
from multiprocessing.pool import ThreadPool

LOG = logging.getLogger('load-data.py')
Expand Down Expand Up @@ -159,11 +159,17 @@ def exec_hive_query_from_file_beeline(file_name):

return is_success

def exec_hbase_query_from_file(file_name):

def exec_hbase_query_from_file(file_name, step_name):
if not os.path.exists(file_name): return
LOG.info('Begin step "%s".' % step_name)
start_time = time.time()
hbase_cmd = "hbase shell %s" % file_name
LOG.info('Executing HBase Command: %s' % hbase_cmd)
exec_cmd(hbase_cmd, error_msg='Error executing hbase create commands')
total_time = time.time() - start_time
LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time))


# KERBEROS TODO: fails when kerberized and impalad principal isn't "impala"
def exec_impala_query_from_file(file_name):
Expand Down Expand Up @@ -263,7 +269,8 @@ def exec_hadoop_fs_cmd(args, exit_on_error=True):
exec_cmd(cmd, error_msg="Error executing Hadoop command, exiting",
exit_on_error=exit_on_error)

def exec_query_files_parallel(thread_pool, query_files, execution_type):

def exec_query_files_parallel(thread_pool, query_files, execution_type, step_name):
"""Executes the query files provided using the execution engine specified
in parallel using the given thread pool. Aborts immediately if any execution
encounters an error."""
Expand All @@ -274,16 +281,23 @@ def exec_query_files_parallel(thread_pool, query_files, execution_type):
elif execution_type == 'hive':
execution_function = exec_hive_query_from_file_beeline

LOG.info('Begin step "%s".' % step_name)
start_time = time.time()
for result in thread_pool.imap_unordered(execution_function, query_files):
if not result:
thread_pool.terminate()
sys.exit(1)
total_time = time.time() - start_time
LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time))


def impala_exec_query_files_parallel(thread_pool, query_files, step_name):
exec_query_files_parallel(thread_pool, query_files, 'impala', step_name)


def impala_exec_query_files_parallel(thread_pool, query_files):
exec_query_files_parallel(thread_pool, query_files, 'impala')
def hive_exec_query_files_parallel(thread_pool, query_files, step_name):
exec_query_files_parallel(thread_pool, query_files, 'hive', step_name)

def hive_exec_query_files_parallel(thread_pool, query_files):
exec_query_files_parallel(thread_pool, query_files, 'hive')

def main():
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
Expand Down Expand Up @@ -446,38 +460,40 @@ def log_file_list(header, file_list):
# so they're done at the end. Finally, the Hbase Tables that have been filled with data
# need to be flushed.

impala_exec_query_files_parallel(thread_pool, impala_create_files)
impala_exec_query_files_parallel(thread_pool, impala_create_files, "Impala Create")

# There should be at most one hbase creation script
assert(len(hbase_create_files) <= 1)
for hbase_create in hbase_create_files:
exec_hbase_query_from_file(hbase_create)
exec_hbase_query_from_file(hbase_create, "HBase Create")

# If this is loading text tables plus multiple other formats, the text tables
# need to be loaded first
assert(len(hive_load_text_files) <= 1)
hive_exec_query_files_parallel(thread_pool, hive_load_text_files)
hive_exec_query_files_parallel(thread_pool, hive_load_text_files, "Hive Load Text")
# IMPALA-9923: Run ORC serially separately from other non-text formats. This hacks
# around flakiness seen when loading this in parallel. This should be removed as
# soon as possible.
# around flakiness seen when loading this in parallel (see IMPALA-12630 comments for
# broken tests). This should be removed as soon as possible.
assert(len(hive_load_orc_files) <= 1)
hive_exec_query_files_parallel(thread_pool, hive_load_orc_files)
hive_exec_query_files_parallel(thread_pool, hive_load_orc_files, "Hive Load ORC")

# Load all non-text formats (goes parallel)
hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files)
hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files,
"Hive Load Non-Text")

assert(len(hbase_postload_files) <= 1)
for hbase_postload in hbase_postload_files:
exec_hbase_query_from_file(hbase_postload)
exec_hbase_query_from_file(hbase_postload, "HBase Post-Load")

# Invalidate so that Impala sees the loads done by Hive before loading Parquet/Kudu
# Note: This only invalidates tables for this workload.
assert(len(invalidate_files) <= 1)
if impala_load_files:
impala_exec_query_files_parallel(thread_pool, invalidate_files)
impala_exec_query_files_parallel(thread_pool, impala_load_files)
impala_exec_query_files_parallel(thread_pool, invalidate_files,
"Impala Invalidate 1")
impala_exec_query_files_parallel(thread_pool, impala_load_files, "Impala Load")
# Final invalidate for this workload
impala_exec_query_files_parallel(thread_pool, invalidate_files)
impala_exec_query_files_parallel(thread_pool, invalidate_files, "Impala Invalidate 2")
loading_time_map[workload] = time.time() - start_time

total_time = 0.0
Expand Down
3 changes: 2 additions & 1 deletion fe/src/test/java/org/apache/impala/planner/PlannerTest.java
Expand Up @@ -1410,7 +1410,8 @@ public void testProcessingCost() {
options.setProcessing_cost_min_threads(2);
options.setMax_fragment_instances_per_node(16);
runPlannerTestFile(
"tpcds-processing-cost", "tpcds_parquet", options, tpcdsParquetTestOptions());
"tpcds-processing-cost", "tpcds_partitioned_parquet_snap", options,
tpcdsParquetTestOptions());
}

/**
Expand Down
3 changes: 2 additions & 1 deletion testdata/bin/compute-table-stats.sh
Expand Up @@ -44,5 +44,6 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
fi
${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \
--table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet
${COMPUTE_STATS_SCRIPT} \
--db_names=tpch_nested_parquet,tpcds,tpcds_parquet,tpcds_partitioned_parquet_snap
${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu
15 changes: 10 additions & 5 deletions testdata/bin/create-load-data.sh
Expand Up @@ -234,12 +234,12 @@ function load-data {
TABLE_FORMATS=${3:-}
FORCE_LOAD=${4:-}

MSG="Loading workload '$WORKLOAD'"
LOAD_MSG="Loading workload '$WORKLOAD'"
ARGS=("--workloads $WORKLOAD")
MSG+=" using exploration strategy '$EXPLORATION_STRATEGY'"
LOAD_MSG+=" using exploration strategy '$EXPLORATION_STRATEGY'"
ARGS+=("-e $EXPLORATION_STRATEGY")
if [ $TABLE_FORMATS ]; then
MSG+=" in table formats '$TABLE_FORMATS'"
LOAD_MSG+=" in table formats '$TABLE_FORMATS'"
ARGS+=("--table_formats $TABLE_FORMATS")
fi
if [ $LOAD_DATA_ARGS ]; then
Expand Down Expand Up @@ -282,7 +282,7 @@ function load-data {
fi

LOG_FILE=${IMPALA_DATA_LOADING_LOGS_DIR}/${LOG_BASENAME}
echo "$MSG. Logging to ${LOG_FILE}"
echo "$LOAD_MSG. Logging to ${LOG_FILE}"
# Use unbuffered logging by executing with -u
if ! impala-python -u ${IMPALA_HOME}/bin/load-data.py ${ARGS[@]} &> ${LOG_FILE}; then
echo Error loading data. The end of the log file is:
Expand All @@ -291,6 +291,11 @@ function load-data {
fi
}

function load-tpcds-data {
load-data "tpcds" "core"
load-data "tpcds_partitioned" "core"
}

function cache-test-tables {
echo CACHING tpch.nation AND functional.alltypestiny
# uncaching the tables first makes this operation idempotent.
Expand Down Expand Up @@ -570,7 +575,7 @@ if [ $SKIP_METADATA_LOAD -eq 0 ]; then
run-step-backgroundable "Loading functional-query data" load-functional-query.log \
load-data "functional-query" "exhaustive"
run-step-backgroundable "Loading TPC-H data" load-tpch.log load-data "tpch" "core"
run-step-backgroundable "Loading TPC-DS data" load-tpcds.log load-data "tpcds" "core"
run-step-backgroundable "Loading TPC-DS data" load-tpcds.log load-tpcds-data
run-step-wait-all
# Load tpch nested data.
# TODO: Hacky and introduces more complexity into the system, but it is expedient.
Expand Down
126 changes: 86 additions & 40 deletions testdata/bin/generate-schema-statements.py
Expand Up @@ -96,23 +96,19 @@
#
from __future__ import absolute_import, division, print_function
from builtins import object
import collections
import csv
import glob
import json
import math
import os
import random
import re
import shutil
import subprocess
import sys
import tempfile
from itertools import product
from optparse import OptionParser
from tests.common.environ import HIVE_MAJOR_VERSION
from tests.util.test_file_parser import *
from tests.common.test_dimensions import *
from tests.util.test_file_parser import parse_table_constraints, parse_test_file
from tests.common.test_dimensions import (
FILE_FORMAT_TO_STORED_AS_MAP, TableFormatInfo, get_dataset_from_workload,
load_table_info_dimension)

parser = OptionParser()
parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy",
Expand Down Expand Up @@ -195,20 +191,6 @@
'none': '',
}

FILE_FORMAT_MAP = {
'text': 'TEXTFILE',
'seq': 'SEQUENCEFILE',
'rc': 'RCFILE',
'orc': 'ORC',
'parquet': 'PARQUET',
'hudiparquet': 'HUDIPARQUET',
'avro': 'AVRO',
'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
'kudu': "KUDU",
'iceberg': "ICEBERG",
'json': "JSONFILE",
}

HIVE_TO_AVRO_TYPE_MAP = {
'STRING': 'string',
'INT': 'int',
Expand Down Expand Up @@ -240,6 +222,14 @@

KNOWN_EXPLORATION_STRATEGIES = ['core', 'pairwise', 'exhaustive']

PARTITIONED_INSERT_RE = re.compile(
# Capture multi insert specification.
# Each group represent partition column, min value, max value,
# and num partition per insert.
r'-- partitioned_insert: ([a-z_]+),(\d+),(\d+),(\d+)')

HINT_SHUFFLE = "/* +shuffle, clustered */"

def build_create_statement(table_template, table_name, db_name, db_suffix,
file_format, compression, hdfs_location,
force_reload):
Expand All @@ -257,11 +247,12 @@ def build_create_statement(table_template, table_name, db_name, db_suffix,
# Remove location part from the format string
table_template = table_template.replace("LOCATION '{hdfs_location}'", "")

create_stmt += table_template.format(db_name=db_name,
db_suffix=db_suffix,
table_name=table_name,
file_format=FILE_FORMAT_MAP[file_format],
hdfs_location=hdfs_location)
create_stmt += table_template.format(
db_name=db_name,
db_suffix=db_suffix,
table_name=table_name,
file_format=FILE_FORMAT_TO_STORED_AS_MAP[file_format],
hdfs_location=hdfs_location)
return create_stmt


Expand Down Expand Up @@ -473,17 +464,59 @@ def build_impala_parquet_codec_statement(codec):
parquet_codec = IMPALA_PARQUET_COMPRESSION_MAP[codec]
return IMPALA_COMPRESSION_CODEC % parquet_codec


def build_partitioned_load(insert, re_match, can_hint, params):
part_col = re_match.group(1)
min_val = int(re_match.group(2))
max_val = int(re_match.group(3))
batch = int(re_match.group(4))
insert = PARTITIONED_INSERT_RE.sub("", insert)
statements = []
params["hint"] = HINT_SHUFFLE if can_hint else ''
first_part = min_val
while first_part < max_val:
if first_part + batch >= max_val:
# This is the last batch
if first_part == min_val:
# There is only 1 batch in this insert. No predicate needed.
params["part_predicate"] = ''
else:
# Insert the remaining partitions + NULL partition
params["part_predicate"] = "WHERE {0} <= {1} OR {2} IS NULL".format(
first_part, part_col, part_col)
elif first_part == min_val:
# This is the first batch.
params["part_predicate"] = "WHERE {0} < {1}".format(
part_col, first_part + batch)
else:
# This is the middle batch.
params["part_predicate"] = "WHERE {0} <= {1} AND {2} < {3}".format(
first_part, part_col, part_col, first_part + batch)
statements.append(insert.format(**params))
first_part += batch
return "\n".join(statements)


def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format,
hdfs_path, for_impala=False):
insert_hint = ""
if for_impala and (file_format == 'parquet' or is_iceberg_table(file_format)):
insert_hint = "/* +shuffle, clustered */"
insert_statement = insert.format(db_name=db_name,
db_suffix=db_suffix,
table_name=table_name,
hdfs_location=hdfs_path,
impala_home=os.getenv("IMPALA_HOME"),
hint=insert_hint)
can_hint = for_impala and (file_format == 'parquet' or is_iceberg_table(file_format))
params = {
"db_name": db_name,
"db_suffix": db_suffix,
"table_name": table_name,
"hdfs_location": hdfs_path,
"impala_home": os.getenv("IMPALA_HOME"),
"hint": "",
"part_predicate": ""
}

m = PARTITIONED_INSERT_RE.search(insert)
if m:
insert_statement = build_partitioned_load(insert, m, can_hint, params)
else:
if can_hint:
params["hint"] = HINT_SHUFFLE
insert_statement = insert.format(**params)

# Kudu tables are managed and don't support OVERWRITE, so we replace OVERWRITE
# with INTO to make this a regular INSERT.
Expand Down Expand Up @@ -539,6 +572,7 @@ def build_insert(insert, db_name, db_suffix, file_format,
for_impala) + "\n"
return output


def build_load_statement(load_template, db_name, db_suffix, table_name):
# hbase does not need the hdfs path.
if table_name.startswith('hbase'):
Expand All @@ -547,12 +581,24 @@ def build_load_statement(load_template, db_name, db_suffix, table_name):
db_suffix=db_suffix)
else:
base_load_dir = os.getenv("REMOTE_LOAD", os.getenv("IMPALA_HOME"))
load_template = load_template.format(table_name=table_name,
db_name=db_name,
db_suffix=db_suffix,
impala_home = base_load_dir)
params = {
"db_name": db_name,
"db_suffix": db_suffix,
"table_name": table_name,
"impala_home": base_load_dir,
"hint": "",
"part_predicate": ""
}

m = PARTITIONED_INSERT_RE.search(load_template)
if m:
load_template = build_partitioned_load(load_template, m, False, params)
else:
load_template = load_template.format(**params)

return load_template


def build_hbase_create_stmt(db_name, table_name, column_families, region_splits):
hbase_table_name = "{db_name}_hbase.{table_name}".format(db_name=db_name,
table_name=table_name)
Expand Down

0 comments on commit 8661f92

Please sign in to comment.