Skip to content

Commit 0ff4f45

Browse files
author
Vihang Karajgaonkar
committed
IMPALA-8847: Ignore add partition events with empty partition list
Certain Hive queries like "alter table <table> add if not exists partition (<part_spec>)" generate a add_partition event even if the partition did not really exists. Such events have a empty partition list in the event message which trips on the Precondition check in the AddPartitionEvent. This causes event processor to go into error state. The only way to recover is to issue invalidate metadata in such a case. The patch adds logic to ignore such events. Testing: 1. Added a test case which reproduces the issue. The test case works after the patch is applied. Change-Id: I877ce6233934e7090cd18e497f748bc6479838cb Reviewed-on: http://gerrit.cloudera.org:8080/14049 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Vihang Karajgaonkar <vihang@cloudera.com>
1 parent d5d3ace commit 0ff4f45

File tree

3 files changed

+162
-10
lines changed

3 files changed

+162
-10
lines changed

fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,7 +1331,7 @@ protected static String getStringProperty(
13311331
}
13321332

13331333
public static class AddPartitionEvent extends TableInvalidatingEvent {
1334-
private final Partition lastAddedPartition_;
1334+
private Partition lastAddedPartition_;
13351335
private final List<Partition> addedPartitions_;
13361336

13371337
/**
@@ -1350,12 +1350,16 @@ private AddPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
13501350
.getAddPartitionMessage(event.getMessage());
13511351
addedPartitions_ =
13521352
Lists.newArrayList(addPartitionMessage_.getPartitionObjs());
1353-
Preconditions.checkState(addedPartitions_.size() > 0);
1354-
// when multiple partitions are added in HMS they are all added as one transaction
1355-
// Hence all the partitions which are present in the message must have the same
1356-
// serviceId and version if it is set. hence it is fine to just look at the
1357-
// last added partition in the list and use it for the self-event ids
1358-
lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
1353+
// it is possible that the added partitions is empty in certain cases. See
1354+
// IMPALA-8847 for example
1355+
if (!addedPartitions_.isEmpty()) {
1356+
// when multiple partitions are added in HMS they are all added as one
1357+
// transaction Hence all the partitions which are present in the message must
1358+
// have the same serviceId and version if it is set. hence it is fine to just
1359+
// look at the last added partition in the list and use it for the self-event
1360+
// ids
1361+
lastAddedPartition_ = addedPartitions_.get(addedPartitions_.size() - 1);
1362+
}
13591363
msTbl_ = addPartitionMessage_.getTableObj();
13601364
} catch (Exception ex) {
13611365
throw new MetastoreNotificationException(ex);
@@ -1364,12 +1368,15 @@ private AddPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
13641368

13651369
@Override
13661370
public void process() throws MetastoreNotificationException, CatalogException {
1371+
// bail out early if there are not partitions to process
1372+
if (addedPartitions_.isEmpty()) {
1373+
infoLog("Partition list is empty. Ignoring this event.");
1374+
return;
1375+
}
13671376
if (isSelfEvent()) {
13681377
infoLog("Not processing the event as it is a self-event");
13691378
return;
13701379
}
1371-
// Notification is created for newly created partitions only. We need not worry
1372-
// about "IF NOT EXISTS".
13731380
try {
13741381
// Reload the whole table if it's a transactional table.
13751382
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
@@ -1541,7 +1548,6 @@ private DropPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
15411548
msTbl_ = Preconditions.checkNotNull(dropPartitionMessage.getTableObj());
15421549
droppedPartitions_ = dropPartitionMessage.getPartitions();
15431550
Preconditions.checkNotNull(droppedPartitions_);
1544-
Preconditions.checkState(droppedPartitions_.size() > 0);
15451551
} catch (Exception ex) {
15461552
throw new MetastoreNotificationException(
15471553
debugString("Could not parse event message. "
@@ -1553,6 +1559,12 @@ private DropPartitionEvent(CatalogServiceCatalog catalog, Metrics metrics,
15531559

15541560
@Override
15551561
public void process() throws MetastoreNotificationException {
1562+
// we have seen cases where a add_partition event is generated with empty
1563+
// partition list (see IMPALA-8547 for details. Make sure that droppedPartitions
1564+
// list is not empty
1565+
if (droppedPartitions_.isEmpty()) {
1566+
infoLog("Partition list is empty. Ignoring this event.");
1567+
}
15561568
// We do not need self event as dropPartition() call is a no-op if the directory
15571569
// doesn't exist.
15581570
try {

tests/custom_cluster/test_event_processing.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
2727
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
2828
from tests.util.hive_utils import HiveDbWrapper
29+
from tests.util.event_processor_utils import EventProcessorUtils
2930

3031

3132
@SkipIfS3.hive
@@ -129,6 +130,54 @@ def run_test_insert_events(self, is_transactional=False):
129130
" year=2019 and id=101" % (db_name, TBL_INSERT_PART))
130131
assert data.split('\t') == ['101', 'z', '28', '3', '2019']
131132

133+
@CustomClusterTestSuite.with_args(
134+
catalogd_args="--hms_event_polling_interval_s=1"
135+
)
136+
@SkipIfHive2.acid
137+
def test_empty_partition_events_transactional(self, unique_database):
138+
self._run_test_empty_partition_events(unique_database, True)
139+
140+
@CustomClusterTestSuite.with_args(
141+
catalogd_args="--hms_event_polling_interval_s=1"
142+
)
143+
def test_empty_partition_events(self, unique_database):
144+
self._run_test_empty_partition_events(unique_database, False)
145+
146+
def _run_test_empty_partition_events(self, unique_database, is_transactional):
147+
TBLPROPERTIES = ""
148+
if is_transactional:
149+
TBLPROPERTIES = "TBLPROPERTIES ('transactional'='true'," \
150+
"'transactional_properties'='insert_only')"
151+
test_tbl = unique_database + ".test_events"
152+
self.run_stmt_in_hive("create table {0} (key string, value string) \
153+
partitioned by (year int) {1} stored as parquet".format(test_tbl, TBLPROPERTIES))
154+
EventProcessorUtils.wait_for_event_processing(self.hive_client)
155+
self.client.execute("describe {0}".format(test_tbl))
156+
157+
self.run_stmt_in_hive(
158+
"alter table {0} add partition (year=2019)".format(test_tbl))
159+
EventProcessorUtils.wait_for_event_processing(self.hive_client)
160+
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
161+
162+
self.run_stmt_in_hive(
163+
"alter table {0} add if not exists partition (year=2019)".format(test_tbl))
164+
EventProcessorUtils.wait_for_event_processing(self.hive_client)
165+
assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year')
166+
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
167+
168+
self.run_stmt_in_hive(
169+
"alter table {0} drop partition (year=2019)".format(test_tbl))
170+
EventProcessorUtils.wait_for_event_processing(self.hive_client)
171+
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
172+
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
173+
174+
self.run_stmt_in_hive(
175+
"alter table {0} drop if exists partition (year=2019)".format(test_tbl))
176+
EventProcessorUtils.wait_for_event_processing(self.hive_client)
177+
assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year')
178+
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
179+
180+
132181
def wait_for_insert_event_processing(self, previous_event_id):
133182
"""Waits until the event processor has finished processing insert events. Since two
134183
events are created for every insert done through hive, we wait until the event id is
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# or more contributor license agreements. See the NOTICE file
2+
# distributed with this work for additional information
3+
# regarding copyright ownership. The ASF licenses this file
4+
# to you under the Apache License, Version 2.0 (the
5+
# "License"); you may not use this file except in compliance
6+
# with the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing,
11+
# software distributed under the License is distributed on an
12+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
# KIND, either express or implied. See the License for the
14+
# specific language governing permissions and limitations
15+
# under the License.
16+
#
17+
# Impala tests for Hive Metastore, covering the expected propagation
18+
# of metadata from Hive to Impala or Impala to Hive. Each test
19+
# modifies the metadata via Hive and checks that the modification
20+
# succeeded by querying Impala, or vice versa.
21+
22+
import requests
23+
import time
24+
import json
25+
from tests.common.environ import build_flavor_timeout
26+
27+
28+
class EventProcessorUtils(object):
29+
30+
DEFAULT_CATALOG_URL = "http://localhost:25020"
31+
32+
@staticmethod
33+
def wait_for_event_processing(hive_client, timeout=10):
34+
"""Waits till the event processor has synced to the latest event id from metastore
35+
or the timeout value in seconds whichever is earlier"""
36+
success = False
37+
assert timeout > 0
38+
assert hive_client is not None
39+
current_event_id = EventProcessorUtils.get_current_notification_id(hive_client)
40+
end_time = time.time() + timeout
41+
while time.time() < end_time:
42+
new_event_id = EventProcessorUtils.get_last_synced_event_id()
43+
if new_event_id >= current_event_id:
44+
success = True
45+
break
46+
time.sleep(0.1)
47+
if not success:
48+
raise Exception(
49+
"Event processor did not sync till last known event id{0} \
50+
within {1} seconds".format(current_event_id, timeout))
51+
# Wait for catalog update to be propagated.
52+
time.sleep(build_flavor_timeout(6, slow_build_timeout=10))
53+
return success
54+
55+
@staticmethod
56+
def get_event_processor_metrics():
57+
"""Scrapes the catalog's /events webpage and return a dictionary with the event
58+
processor metrics."""
59+
response = requests.get("%s/events?json" % EventProcessorUtils.DEFAULT_CATALOG_URL)
60+
assert response.status_code == requests.codes.ok
61+
varz_json = json.loads(response.text)
62+
metrics = varz_json["event_processor_metrics"].strip().splitlines()
63+
64+
# Helper to strip a pair of elements
65+
def strip_pair(p):
66+
return (p[0].strip(), p[1].strip())
67+
68+
pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
69+
return dict(pairs)
70+
71+
@staticmethod
72+
def get_last_synced_event_id():
73+
"""Returns the last_synced_event_id."""
74+
metrics = EventProcessorUtils.get_event_processor_metrics()
75+
assert 'last-synced-event-id' in metrics.keys()
76+
return int(metrics['last-synced-event-id'])
77+
78+
@staticmethod
79+
def get_event_processor_status():
80+
"""
81+
Returns the current status of the EventsProcessor
82+
"""
83+
metrics = EventProcessorUtils.get_event_processor_metrics()
84+
assert 'status' in metrics.keys()
85+
return metrics['status']
86+
87+
@staticmethod
88+
def get_current_notification_id(hive_client):
89+
"""Returns the current notification from metastore"""
90+
assert hive_client is not None
91+
return hive_client.get_current_notificationEventId()

0 commit comments

Comments
 (0)