From af0d4abc408c7e7541f016b5980450a72036a2ad Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Mon, 1 Oct 2018 10:47:06 +0530 Subject: [PATCH 1/5] HIVE-20542: Insert NULL value for columns of NOTIFICATION_LOG for which values are not available When no database is associated with an event we insert 'null' as database name in the metastore. With this commit, we insert NULL as database name. When no tablename is associated with an event we insert an empty string as table name in the metastore. With this commit, we insert NULL as table name. Even if a catalog name is associated with an event, addNotificationLog() doesn't insert catalog in the metastore. With this commit we take care of that as well. Ashutosh Bapat. --- .../listener/DbNotificationListener.java | 69 ++++++++++++++++--- 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 4313e127502b..f492fc6759ed 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -24,6 +24,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -871,19 +872,65 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE long nextNLId = getNextNLId(stmt, sqlGenerator, "org.apache.hadoop.hive.metastore.model.MNotificationLog"); - String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," + - quoteString(" ") + ",?, ?)"; + String insertVal; + String columns; + List params = new ArrayList(); + + // Construct the values string, parameters and column string step by step simultaneously so + // that the positions of columns and of their corresponding values do not go out of sync. + + // Notification log id + columns = "\"NL_ID\""; + insertVal = "" + nextNLId; + + // Event id + columns = columns + ", \"EVENT_ID\""; + insertVal = insertVal + "," + nextEventId; + + // Event time + columns = columns + ", \"EVENT_TIME\""; + insertVal = insertVal + "," + now(); + + // Event type + columns = columns + ", \"EVENT_TYPE\""; + insertVal = insertVal + ", ?"; + params.add(event.getEventType()); + + // Message + columns = columns + ", \"MESSAGE\""; + insertVal = insertVal + ", ?"; + params.add(event.getMessage()); + + // Message format + columns = columns + ", \"MESSAGE_FORMAT\""; + insertVal = insertVal + ", ?"; + params.add(event.getMessageFormat()); + + // Database name, optional + if (event.getDbName() != null) { + columns = columns + ", \"DB_NAME\""; + insertVal = insertVal + ", ?"; + params.add(event.getDbName()); + } - s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " + - " \"EVENT_TYPE\", \"DB_NAME\", " + - " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal; - List params = Arrays.asList( - event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat()); - pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + // Table name, optional + if (event.getTableName() != null) { + columns = columns + ", \"TBL_NAME\""; + insertVal = insertVal + ", ?"; + params.add(event.getTableName()); + } - LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", - quoteString(event.getEventType()), quoteString(event.getDbName()), - quoteString(event.getMessage()), quoteString(event.getMessageFormat())); + // Catalog name, optional + if (event.getCatName() != null) { + columns = columns + ", \"CAT_NAME\""; + insertVal = insertVal + ", ?"; + params.add(event.getCatName()); + } + + s = "insert into \"NOTIFICATION_LOG\" (" + columns + ") VALUES (" + insertVal + ")"; + pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); + LOG.debug("Going to execute insert <" + s + "> with parameters (" + + String.join(", ", params) + ")"); pst.execute(); // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. From 0a0d18ad93ec1547bd1d88d793dbcfda8fe99c46 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Mon, 1 Oct 2018 11:41:15 +0530 Subject: [PATCH 2/5] HIVE-20542: Modify query used to count the number of events to be replicated incrementally The query used to count the events for a given incremental replication does not 1. Count event with NULL database, table or catalog names. 2. Does not consider toEventId and Limit for the given incremental replication. Ashutosh Bapat. --- .../hive/ql/exec/repl/ReplDumpTask.java | 3 +- .../hive/ql/metadata/events/EventUtils.java | 10 +- .../api/NotificationEventsCountRequest.java | 206 +++++++++++++++++- .../gen/thrift/gen-php/metastore/Types.php | 46 ++++ .../thrift/gen-py/hive_metastore/ttypes.py | 28 ++- .../gen/thrift/gen-rb/hive_metastore_types.rb | 6 +- .../src/main/thrift/hive_metastore.thrift | 4 +- .../hadoop/hive/metastore/ObjectStore.java | 44 +++- 8 files changed, 331 insertions(+), 16 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 487208054ae6..fc05bf45bb09 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -170,7 +170,8 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive ? work.dbNameOrPattern : "?"; replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), - evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName)); + evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo, + work.maxEventLimit())); replLogger.startLog(); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java index 66abd5152ebb..d7f539f43c68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java @@ -36,7 +36,8 @@ public class EventUtils { public interface NotificationFetcher { int getBatchSize() throws IOException; long getCurrentNotificationEventId() throws IOException; - long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException; + long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId, + int limit) throws IOException; List getNextNotificationEvents( long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; } @@ -78,10 +79,15 @@ public long getCurrentNotificationEventId() throws IOException { } @Override - public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException { + public long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId, + int limit) throws IOException { try { NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); + if (toEventId != null) + rqst.setToEventId(toEventId); + if (limit > 0) + rqst.setLimit(limit); return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount(); } catch (TException e) { throw new IOException(e); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java index a4a5218f91b2..95af1a455b32 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java @@ -41,6 +41,8 @@ private static final org.apache.thrift.protocol.TField FROM_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("fromEventId", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField DB_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbName", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField CAT_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("catName", org.apache.thrift.protocol.TType.STRING, (short)3); + private static final org.apache.thrift.protocol.TField TO_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("toEventId", org.apache.thrift.protocol.TType.I64, (short)4); + private static final org.apache.thrift.protocol.TField LIMIT_FIELD_DESC = new org.apache.thrift.protocol.TField("limit", org.apache.thrift.protocol.TType.I64, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -51,12 +53,16 @@ private long fromEventId; // required private String dbName; // required private String catName; // optional + private long toEventId; // optional + private long limit; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { FROM_EVENT_ID((short)1, "fromEventId"), DB_NAME((short)2, "dbName"), - CAT_NAME((short)3, "catName"); + CAT_NAME((short)3, "catName"), + TO_EVENT_ID((short)4, "toEventId"), + LIMIT((short)5, "limit"); private static final Map byName = new HashMap(); @@ -77,6 +83,10 @@ public static _Fields findByThriftId(int fieldId) { return DB_NAME; case 3: // CAT_NAME return CAT_NAME; + case 4: // TO_EVENT_ID + return TO_EVENT_ID; + case 5: // LIMIT + return LIMIT; default: return null; } @@ -118,8 +128,10 @@ public String getFieldName() { // isset id assignments private static final int __FROMEVENTID_ISSET_ID = 0; + private static final int __TOEVENTID_ISSET_ID = 1; + private static final int __LIMIT_ISSET_ID = 2; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.CAT_NAME}; + private static final _Fields optionals[] = {_Fields.CAT_NAME,_Fields.TO_EVENT_ID,_Fields.LIMIT}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -129,6 +141,10 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.CAT_NAME, new org.apache.thrift.meta_data.FieldMetaData("catName", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TO_EVENT_ID, new org.apache.thrift.meta_data.FieldMetaData("toEventId", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.LIMIT, new org.apache.thrift.meta_data.FieldMetaData("limit", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEventsCountRequest.class, metaDataMap); } @@ -158,6 +174,8 @@ public NotificationEventsCountRequest(NotificationEventsCountRequest other) { if (other.isSetCatName()) { this.catName = other.catName; } + this.toEventId = other.toEventId; + this.limit = other.limit; } public NotificationEventsCountRequest deepCopy() { @@ -170,6 +188,10 @@ public void clear() { this.fromEventId = 0; this.dbName = null; this.catName = null; + setToEventIdIsSet(false); + this.toEventId = 0; + setLimitIsSet(false); + this.limit = 0; } public long getFromEventId() { @@ -240,6 +262,50 @@ public void setCatNameIsSet(boolean value) { } } + public long getToEventId() { + return this.toEventId; + } + + public void setToEventId(long toEventId) { + this.toEventId = toEventId; + setToEventIdIsSet(true); + } + + public void unsetToEventId() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TOEVENTID_ISSET_ID); + } + + /** Returns true if field toEventId is set (has been assigned a value) and false otherwise */ + public boolean isSetToEventId() { + return EncodingUtils.testBit(__isset_bitfield, __TOEVENTID_ISSET_ID); + } + + public void setToEventIdIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TOEVENTID_ISSET_ID, value); + } + + public long getLimit() { + return this.limit; + } + + public void setLimit(long limit) { + this.limit = limit; + setLimitIsSet(true); + } + + public void unsetLimit() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __LIMIT_ISSET_ID); + } + + /** Returns true if field limit is set (has been assigned a value) and false otherwise */ + public boolean isSetLimit() { + return EncodingUtils.testBit(__isset_bitfield, __LIMIT_ISSET_ID); + } + + public void setLimitIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __LIMIT_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case FROM_EVENT_ID: @@ -266,6 +332,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TO_EVENT_ID: + if (value == null) { + unsetToEventId(); + } else { + setToEventId((Long)value); + } + break; + + case LIMIT: + if (value == null) { + unsetLimit(); + } else { + setLimit((Long)value); + } + break; + } } @@ -280,6 +362,12 @@ public Object getFieldValue(_Fields field) { case CAT_NAME: return getCatName(); + case TO_EVENT_ID: + return getToEventId(); + + case LIMIT: + return getLimit(); + } throw new IllegalStateException(); } @@ -297,6 +385,10 @@ public boolean isSet(_Fields field) { return isSetDbName(); case CAT_NAME: return isSetCatName(); + case TO_EVENT_ID: + return isSetToEventId(); + case LIMIT: + return isSetLimit(); } throw new IllegalStateException(); } @@ -341,6 +433,24 @@ public boolean equals(NotificationEventsCountRequest that) { return false; } + boolean this_present_toEventId = true && this.isSetToEventId(); + boolean that_present_toEventId = true && that.isSetToEventId(); + if (this_present_toEventId || that_present_toEventId) { + if (!(this_present_toEventId && that_present_toEventId)) + return false; + if (this.toEventId != that.toEventId) + return false; + } + + boolean this_present_limit = true && this.isSetLimit(); + boolean that_present_limit = true && that.isSetLimit(); + if (this_present_limit || that_present_limit) { + if (!(this_present_limit && that_present_limit)) + return false; + if (this.limit != that.limit) + return false; + } + return true; } @@ -363,6 +473,16 @@ public int hashCode() { if (present_catName) list.add(catName); + boolean present_toEventId = true && (isSetToEventId()); + list.add(present_toEventId); + if (present_toEventId) + list.add(toEventId); + + boolean present_limit = true && (isSetLimit()); + list.add(present_limit); + if (present_limit) + list.add(limit); + return list.hashCode(); } @@ -404,6 +524,26 @@ public int compareTo(NotificationEventsCountRequest other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetToEventId()).compareTo(other.isSetToEventId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetToEventId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.toEventId, other.toEventId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetLimit()).compareTo(other.isSetLimit()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetLimit()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.limit, other.limit); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -445,6 +585,18 @@ public String toString() { } first = false; } + if (isSetToEventId()) { + if (!first) sb.append(", "); + sb.append("toEventId:"); + sb.append(this.toEventId); + first = false; + } + if (isSetLimit()) { + if (!first) sb.append(", "); + sb.append("limit:"); + sb.append(this.limit); + first = false; + } sb.append(")"); return sb.toString(); } @@ -522,6 +674,22 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, NotificationEventsC org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // TO_EVENT_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.toEventId = iprot.readI64(); + struct.setToEventIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // LIMIT + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.limit = iprot.readI64(); + struct.setLimitIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -550,6 +718,16 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, NotificationEvents oprot.writeFieldEnd(); } } + if (struct.isSetToEventId()) { + oprot.writeFieldBegin(TO_EVENT_ID_FIELD_DESC); + oprot.writeI64(struct.toEventId); + oprot.writeFieldEnd(); + } + if (struct.isSetLimit()) { + oprot.writeFieldBegin(LIMIT_FIELD_DESC); + oprot.writeI64(struct.limit); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -573,10 +751,22 @@ public void write(org.apache.thrift.protocol.TProtocol prot, NotificationEventsC if (struct.isSetCatName()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetToEventId()) { + optionals.set(1); + } + if (struct.isSetLimit()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetCatName()) { oprot.writeString(struct.catName); } + if (struct.isSetToEventId()) { + oprot.writeI64(struct.toEventId); + } + if (struct.isSetLimit()) { + oprot.writeI64(struct.limit); + } } @Override @@ -586,11 +776,19 @@ public void read(org.apache.thrift.protocol.TProtocol prot, NotificationEventsCo struct.setFromEventIdIsSet(true); struct.dbName = iprot.readString(); struct.setDbNameIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.catName = iprot.readString(); struct.setCatNameIsSet(true); } + if (incoming.get(1)) { + struct.toEventId = iprot.readI64(); + struct.setToEventIdIsSet(true); + } + if (incoming.get(2)) { + struct.limit = iprot.readI64(); + struct.setLimitIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 775c9d97a9b3..5fd5d782ea7a 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -22619,6 +22619,14 @@ class NotificationEventsCountRequest { * @var string */ public $catName = null; + /** + * @var int + */ + public $toEventId = null; + /** + * @var int + */ + public $limit = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -22635,6 +22643,14 @@ public function __construct($vals=null) { 'var' => 'catName', 'type' => TType::STRING, ), + 4 => array( + 'var' => 'toEventId', + 'type' => TType::I64, + ), + 5 => array( + 'var' => 'limit', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -22647,6 +22663,12 @@ public function __construct($vals=null) { if (isset($vals['catName'])) { $this->catName = $vals['catName']; } + if (isset($vals['toEventId'])) { + $this->toEventId = $vals['toEventId']; + } + if (isset($vals['limit'])) { + $this->limit = $vals['limit']; + } } } @@ -22690,6 +22712,20 @@ public function read($input) $xfer += $input->skip($ftype); } break; + case 4: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->toEventId); + } else { + $xfer += $input->skip($ftype); + } + break; + case 5: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->limit); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -22718,6 +22754,16 @@ public function write($output) { $xfer += $output->writeString($this->catName); $xfer += $output->writeFieldEnd(); } + if ($this->toEventId !== null) { + $xfer += $output->writeFieldBegin('toEventId', TType::I64, 4); + $xfer += $output->writeI64($this->toEventId); + $xfer += $output->writeFieldEnd(); + } + if ($this->limit !== null) { + $xfer += $output->writeFieldBegin('limit', TType::I64, 5); + $xfer += $output->writeI64($this->limit); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 9d3885cf2e99..03c2a4ee95a3 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -15814,6 +15814,8 @@ class NotificationEventsCountRequest: - fromEventId - dbName - catName + - toEventId + - limit """ thrift_spec = ( @@ -15821,12 +15823,16 @@ class NotificationEventsCountRequest: (1, TType.I64, 'fromEventId', None, None, ), # 1 (2, TType.STRING, 'dbName', None, None, ), # 2 (3, TType.STRING, 'catName', None, None, ), # 3 + (4, TType.I64, 'toEventId', None, None, ), # 4 + (5, TType.I64, 'limit', None, None, ), # 5 ) - def __init__(self, fromEventId=None, dbName=None, catName=None,): + def __init__(self, fromEventId=None, dbName=None, catName=None, toEventId=None, limit=None,): self.fromEventId = fromEventId self.dbName = dbName self.catName = catName + self.toEventId = toEventId + self.limit = limit def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -15852,6 +15858,16 @@ def read(self, iprot): self.catName = iprot.readString() else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.I64: + self.toEventId = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.limit = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15874,6 +15890,14 @@ def write(self, oprot): oprot.writeFieldBegin('catName', TType.STRING, 3) oprot.writeString(self.catName) oprot.writeFieldEnd() + if self.toEventId is not None: + oprot.writeFieldBegin('toEventId', TType.I64, 4) + oprot.writeI64(self.toEventId) + oprot.writeFieldEnd() + if self.limit is not None: + oprot.writeFieldBegin('limit', TType.I64, 5) + oprot.writeI64(self.limit) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -15890,6 +15914,8 @@ def __hash__(self): value = (value * 31) ^ hash(self.fromEventId) value = (value * 31) ^ hash(self.dbName) value = (value * 31) ^ hash(self.catName) + value = (value * 31) ^ hash(self.toEventId) + value = (value * 31) ^ hash(self.limit) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 26b89c0fef33..2eea181d85e9 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3518,11 +3518,15 @@ class NotificationEventsCountRequest FROMEVENTID = 1 DBNAME = 2 CATNAME = 3 + TOEVENTID = 4 + LIMIT = 5 FIELDS = { FROMEVENTID => {:type => ::Thrift::Types::I64, :name => 'fromEventId'}, DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, - CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true} + CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true}, + TOEVENTID => {:type => ::Thrift::Types::I64, :name => 'toEventId', :optional => true}, + LIMIT => {:type => ::Thrift::Types::I64, :name => 'limit', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index a2a6740452e4..4b7b61520a2d 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1147,7 +1147,9 @@ struct CurrentNotificationEventId { struct NotificationEventsCountRequest { 1: required i64 fromEventId, 2: required string dbName, - 3: optional string catName + 3: optional string catName, + 4: optional i64 toEventId, + 5: optional i64 limit } struct NotificationEventsCountResponse { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b98b4b4005fd..9bcb77ff2a59 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -10254,14 +10254,46 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv long fromEventId = rqst.getFromEventId(); String inputDbName = rqst.getDbName(); String catName = rqst.isSetCatName() ? rqst.getCatName() : getDefaultCatalog(conf); - String queryStr = "select count(eventId) from " + MNotificationLog.class.getName() - + " where eventId > fromEventId && dbName == inputDbName && catalogName == catName"; + long toEventId; + String paramSpecs; + List paramVals = new ArrayList(); + + // Build the query to count events, part by part + String queryStr = "select count(eventId) from " + MNotificationLog.class.getName(); + // count fromEventId onwards events + queryStr = queryStr + " where eventId > fromEventId"; + paramSpecs = "java.lang.Long fromEventId"; + paramVals.add(new Long(fromEventId)); + + // dbName and catalogName could be NULL in case of transaction related events. These events + // are required to be processed for transaction consistency. + queryStr = queryStr + " && (dbName == inputDbName || dbName == null)"; + paramSpecs = paramSpecs + ", java.lang.String inputDbName"; + paramVals.add(inputDbName); + + queryStr = queryStr + " && (catalogName == catName || catalogName == null)"; + paramSpecs = paramSpecs +", java.lang.String catName"; + paramVals.add(catName); + + // count events upto toEventId if specified + if (rqst.isSetToEventId()) { + toEventId = rqst.getToEventId(); + queryStr = queryStr + " && eventId <= toEventId"; + paramSpecs = paramSpecs + ", java.lang.Long toEventId"; + paramVals.add(new Long(toEventId)); + } + query = pm.newQuery(queryStr); - query.declareParameters("java.lang.Long fromEventId, java.lang.String inputDbName," + - " java.lang.String catName"); - result = (Long) query.execute(fromEventId, inputDbName, catName); + query.declareParameters(paramSpecs); + result = (Long) query.executeWithArray(paramVals.toArray()); commited = commitTransaction(); - return new NotificationEventsCountResponse(result.longValue()); + + // Cap the event count by limit if specified. + long eventCount = result.longValue(); + if (rqst.isSetLimit() && eventCount > rqst.getLimit()) + eventCount = rqst.getLimit(); + + return new NotificationEventsCountResponse(eventCount); } finally { rollbackAndCleanup(commited, query); } From 52caa7452dbf5f560ef793074f6d9fceadd6482e Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Thu, 4 Oct 2018 09:38:44 +0530 Subject: [PATCH 3/5] HIVE-20644: Add tests for testing getNotificationEventsCount(). Ashutosh Bapat. --- .../listener/TestDbNotificationListener.java | 58 +++++++++++++++++++ .../TestReplicationScenariosAcidTables.java | 16 ++++- .../hive/ql/parse/WarehouseInstance.java | 15 +++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index a00ea173ae97..fc858587f982 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -54,6 +54,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; @@ -290,6 +292,19 @@ public void tearDown() { MockMetaStoreEventListener.clearEvents(); } + // Test if the number of events between the given event ids and with the given database name are + // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0. + private void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit, + long expectedCount) throws Exception { + NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); + + if (toEventId != null) + rqst.setToEventId(toEventId); + if (limit != null) + rqst.setLimit(limit); + + assertEquals(expectedCount, msClient.getNotificationEventsCount(rqst).getEventsCount()); + } @Test public void createDatabase() throws Exception { @@ -331,6 +346,10 @@ public void createDatabase() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); + + // There's only one event corresponding to CREATE DATABASE + testEventCounts(dbName, firstEventId, null, null, 1); + testEventCounts(dbName2, firstEventId, null, null, 0); } @Test @@ -348,6 +367,7 @@ public void dropDatabase() throws Exception { // Two events: one for create db and other for drop db assertEquals(2, rsp.getEventsSize()); + testEventCounts(dbName, firstEventId, null, null, 2); // Read event from notification NotificationEvent event = rsp.getEvents().get(1); @@ -378,6 +398,7 @@ public void dropDatabase() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(dbName2, firstEventId, null, null,1); } @Test @@ -433,6 +454,7 @@ public void createTable() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 1); } @Test @@ -491,6 +513,7 @@ public void alterTable() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(2, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 2); } @Test @@ -557,6 +580,7 @@ public void dropTable() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @Test @@ -626,6 +650,7 @@ public void addPartition() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(2, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 2); } @Test @@ -694,6 +719,7 @@ public void alterPartition() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @Test @@ -768,6 +794,7 @@ public void dropPartition() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(4, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 4); } @Test @@ -863,6 +890,7 @@ public void exchangePartition() throws Exception { MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + testEventCounts(dbName, firstEventId, null, null, 5); } @Test @@ -921,6 +949,7 @@ public void createFunction() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(1, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 1); } @Test @@ -975,6 +1004,7 @@ public void dropFunction() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @Test @@ -1030,6 +1060,7 @@ public void insertTable() throws Exception { // Verify the eventID was passed to the non-transactional listener MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + testEventCounts(defaultDbName, firstEventId, null, null, 2); } @Test @@ -1096,6 +1127,7 @@ public void insertPartition() throws Exception { MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2); MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1); + testEventCounts(defaultDbName, firstEventId, null, null, 3); } @@ -1190,10 +1222,12 @@ public void sqlInsertTable() throws Exception { event = rsp.getEvents().get(5); assertEquals(firstEventId + 6, event.getEventId()); assertEquals(EventType.DROP_TABLE.toString(), event.getEventType()); + testEventCounts(defaultDbName, firstEventId, null, null, 6); } @Test public void sqlCTAS() throws Exception { + String defaultDbName = "default"; String sourceTblName = "sqlctasins1"; String targetTblName = "sqlctasins2"; // Event 1 @@ -1219,10 +1253,12 @@ public void sqlCTAS() throws Exception { event = rsp.getEvents().get(4); assertEquals(firstEventId + 5, event.getEventId()); assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType()); + testEventCounts(defaultDbName, firstEventId, null, null, 6); } @Test public void sqlTempTable() throws Exception { + String defaultDbName = "default"; String tempTblName = "sqltemptbl"; driver.run("create temporary table " + tempTblName + " (c int)"); driver.run("insert into table " + tempTblName + " values (1)"); @@ -1230,6 +1266,7 @@ public void sqlTempTable() throws Exception { // Get notifications from metastore NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(0, rsp.getEventsSize()); + testEventCounts(defaultDbName, firstEventId, null, null, 0); } @Test @@ -1253,6 +1290,7 @@ public void sqlDb() throws Exception { @Test public void sqlInsertPartition() throws Exception { + String defaultDbName = "default"; String tblName = "sqlinsptn"; // Event 1 driver.run("create table " + tblName + " (c int) partitioned by (ds string)"); @@ -1264,6 +1302,13 @@ public void sqlInsertPartition() throws Exception { driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')"); // Event 9, 10 driver.run("alter table " + tblName + " add partition (ds = 'yesterday')"); + + testEventCounts(defaultDbName, firstEventId, null, null, 10); + // Test a limit higher than available events + testEventCounts(defaultDbName, firstEventId, null, 100, 10); + // Test toEventId lower than current eventId + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 5, null, 5); + // Event 10, 11, 12 driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)"); // Event 12, 13, 14 @@ -1330,6 +1375,9 @@ public void sqlInsertPartition() throws Exception { assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + // Test fromEventId different from the very first + testEventCounts(defaultDbName, event.getEventId(), null, null, 3); + event = rsp.getEvents().get(21); assertEquals(firstEventId + 22, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); @@ -1345,6 +1393,16 @@ public void sqlInsertPartition() throws Exception { assertEquals(firstEventId + 24, event.getEventId()); assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType()); assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); + testEventCounts(defaultDbName, firstEventId, null, null, 24); + + // Test a limit within the available events + testEventCounts(defaultDbName, firstEventId, null, 10, 10); + // Test toEventId greater than current eventId + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 24); + // Test toEventId greater than current eventId with some limit within available events + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 10, 10); + // Test toEventId greater than current eventId with some limit beyond available events + testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 24); } private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index e043e5446fd1..23664c67af41 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -470,24 +470,38 @@ public void run() { @Test public void testOpenTxnEvent() throws Throwable { String tableName = testName.getMethodName(); + // Bootstrap load causes 4 events: 2 for open and commit transaction and 2 alter database events WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + // Load will start and commit a transaction, thus producing 2 events apart from the events + // corresponding to other load activities. replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); - // create table will start and coomit the transaction + // create table will start and commit the transaction, thus producing 3 events primary.run("use " + primaryDbName) .run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')") .run("SHOW TABLES LIKE '" + tableName + "'") .verifyResult(tableName) + // insert will start and commit transaction along with adding and altering a + // partition, thus producing total 5 events .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") + // select will start and commit a transaction, producing 2 events .run("select key from " + tableName) .verifyResult("1"); WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + + // Test number of events. We expect total 17 events as follows + // 4 for dump + 2 for load + 3 for create table + 5 for insert + 2 for select + 1 for open + // transaction of incremental dump. + long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId); + primary.testEventCounts(primaryDbName, lastReplId, null, null, 17); + + // Test load replica.load(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(incrementalDump.lastReplicationId); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 1e3478d71822..33b3f985d8f6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; @@ -416,6 +417,20 @@ ReplicationV1CompatRule getReplivationV1CompatRule(List testsToSkip) { return new ReplicationV1CompatRule(client, hiveConf, testsToSkip); } + // Test if the number of events between the given event ids and with the given database name are + // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0. + public void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit, + long expectedCount) throws Exception { + NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); + + if (toEventId != null) + rqst.setToEventId(toEventId); + if (limit != null) + rqst.setLimit(limit); + + assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount()); + } + @Override public void close() throws IOException { if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) { From a2969724faa235e691bbdbaa648a6188936ba477 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Fri, 19 Oct 2018 15:03:06 +0530 Subject: [PATCH 4/5] HIVE-20542: Address Sankar's comments. Also corrected an instance of call to addNotificationLog() to which database name was being passed without converting it into a lower case. --- .../listener/DbNotificationListener.java | 18 ++++++++---- .../TestReplicationScenariosAcidTables.java | 12 +------- .../hive/ql/exec/repl/ReplDumpTask.java | 6 ++++ .../hive/ql/metadata/events/EventUtils.java | 12 ++++++-- .../hadoop/hive/metastore/ObjectStore.java | 28 ++++++++++++++----- .../hadoop/hive/metastore/txn/TxnHandler.java | 2 +- 6 files changed, 50 insertions(+), 28 deletions(-) diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index f492fc6759ed..e41230a18c56 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -907,24 +907,30 @@ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerE params.add(event.getMessageFormat()); // Database name, optional - if (event.getDbName() != null) { + String dbName = event.getDbName(); + if (dbName != null) { + assert dbName.equals(dbName.toLowerCase()); columns = columns + ", \"DB_NAME\""; insertVal = insertVal + ", ?"; - params.add(event.getDbName()); + params.add(dbName); } // Table name, optional - if (event.getTableName() != null) { + String tableName = event.getTableName(); + if (tableName != null) { + assert tableName.equals(tableName.toLowerCase()); columns = columns + ", \"TBL_NAME\""; insertVal = insertVal + ", ?"; - params.add(event.getTableName()); + params.add(tableName); } // Catalog name, optional - if (event.getCatName() != null) { + String catName = event.getCatName(); + if (catName != null) { + assert catName.equals(catName.toLowerCase()); columns = columns + ", \"CAT_NAME\""; insertVal = insertVal + ", ?"; - params.add(event.getCatName()); + params.add(catName); } s = "insert into \"NOTIFICATION_LOG\" (" + columns + ") VALUES (" + insertVal + ")"; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 23664c67af41..23ca85020e83 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -470,36 +470,26 @@ public void run() { @Test public void testOpenTxnEvent() throws Throwable { String tableName = testName.getMethodName(); - // Bootstrap load causes 4 events: 2 for open and commit transaction and 2 alter database events WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); - // Load will start and commit a transaction, thus producing 2 events apart from the events - // corresponding to other load activities. replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); - // create table will start and commit the transaction, thus producing 3 events primary.run("use " + primaryDbName) .run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')") .run("SHOW TABLES LIKE '" + tableName + "'") .verifyResult(tableName) - // insert will start and commit transaction along with adding and altering a - // partition, thus producing total 5 events .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") - // select will start and commit a transaction, producing 2 events .run("select key from " + tableName) .verifyResult("1"); WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); - // Test number of events. We expect total 17 events as follows - // 4 for dump + 2 for load + 3 for create table + 5 for insert + 2 for select + 1 for open - // transaction of incremental dump. long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId); - primary.testEventCounts(primaryDbName, lastReplId, null, null, 17); + primary.testEventCounts(primaryDbName, lastReplId, null, null, 20); // Test load replica.load(replicatedDbName, incrementalDump.dumpLocation) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index fc05bf45bb09..07d093f9d047 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -166,6 +166,12 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive evFetcher, work.eventFrom, work.maxEventLimit(), evFilter); lastReplId = work.eventTo; + + // Right now the only pattern allowed to be specified is *, which matches all the database + // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter + // on database name when it's *. In future, if we support more elaborate patterns, we will + // have to pass DatabaseAndTableFilter created above to getDbNotificationEventsCount() to get + // correct event count. String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java index d7f539f43c68..f9252712b3f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java @@ -82,12 +82,18 @@ public long getCurrentNotificationEventId() throws IOException { public long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId, int limit) throws IOException { try { + // Number of events is always bounded by limit, which when non-positive, will result + // in no events being counted.. + if (limit <= 0) { + return 0; + } + NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); - if (toEventId != null) + if (toEventId != null) { rqst.setToEventId(toEventId); - if (limit > 0) - rqst.setLimit(limit); + } + rqst.setLimit(limit); return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount(); } catch (TException e) { throw new IOException(e); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 9bcb77ff2a59..7de46c94ffc3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -10258,6 +10258,10 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv String paramSpecs; List paramVals = new ArrayList(); + // We store a catalog name in lower case in metastore and also use the same way everywhere in + // hive. + assert catName.equals(catName.toLowerCase()); + // Build the query to count events, part by part String queryStr = "select count(eventId) from " + MNotificationLog.class.getName(); // count fromEventId onwards events @@ -10265,12 +10269,21 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv paramSpecs = "java.lang.Long fromEventId"; paramVals.add(new Long(fromEventId)); - // dbName and catalogName could be NULL in case of transaction related events. These events - // are required to be processed for transaction consistency. - queryStr = queryStr + " && (dbName == inputDbName || dbName == null)"; - paramSpecs = paramSpecs + ", java.lang.String inputDbName"; - paramVals.add(inputDbName); - + // Input database name can be a database name or a *. In the first case we add a filter + // condition on dbName column, but not in the second case, since a * means all the + // databases. In case we support more elaborate database name patterns in future, we will + // have to apply a method similar to getNextNotification() method of MetaStoreClient. + if (!inputDbName.equals("*")) { + // dbName could be NULL in case of transaction related events, which also need to be + // counted. + queryStr = queryStr + " && (dbName == inputDbName || dbName == null)"; + paramSpecs = paramSpecs + ", java.lang.String inputDbName"; + // We store a database name in lower case in metastore. + paramVals.add(inputDbName.toLowerCase()); + } + + // catName could be NULL in case of transaction related events, which also need to be + // counted. queryStr = queryStr + " && (catalogName == catName || catalogName == null)"; paramSpecs = paramSpecs +", java.lang.String catName"; paramVals.add(catName); @@ -10290,8 +10303,9 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv // Cap the event count by limit if specified. long eventCount = result.longValue(); - if (rqst.isSetLimit() && eventCount > rqst.getLimit()) + if (rqst.isSetLimit() && eventCount > rqst.getLimit()) { eventCount = rqst.getLimit(); + } return new NotificationEventsCountResponse(eventCount); } finally { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 351fafd933e0..0bb739fc2229 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1652,7 +1652,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds if (transactionalListeners != null) { MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, EventMessage.EventType.ALLOC_WRITE_ID, - new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(), null), + new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null), dbConn, sqlGenerator); } From fab29d9daf2c9485125aa3d28c2595a6c4fc0b7c Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Mon, 22 Oct 2018 12:32:00 +0530 Subject: [PATCH 5/5] HIVE-20542: Address Hive-QA's comments --- .../hcatalog/listener/TestDbNotificationListener.java | 11 ++++++----- .../hadoop/hive/ql/parse/WarehouseInstance.java | 6 ++++-- .../org/apache/hadoop/hive/metastore/ObjectStore.java | 4 ++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index fc858587f982..725221439fa3 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; @@ -298,10 +297,12 @@ private void testEventCounts(String dbName, long fromEventId, Long toEventId, In long expectedCount) throws Exception { NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); - if (toEventId != null) + if (toEventId != null) { rqst.setToEventId(toEventId); - if (limit != null) + } + if (limit != null) { rqst.setLimit(limit); + } assertEquals(expectedCount, msClient.getNotificationEventsCount(rqst).getEventsCount()); } @@ -348,7 +349,7 @@ public void createDatabase() throws Exception { assertEquals(1, rsp.getEventsSize()); // There's only one event corresponding to CREATE DATABASE - testEventCounts(dbName, firstEventId, null, null, 1); + testEventCounts(dbName, firstEventId, null, null, 1); testEventCounts(dbName2, firstEventId, null, null, 0); } @@ -398,7 +399,7 @@ public void dropDatabase() throws Exception { } rsp = msClient.getNextNotification(firstEventId, 0, null); assertEquals(3, rsp.getEventsSize()); - testEventCounts(dbName2, firstEventId, null, null,1); + testEventCounts(dbName2, firstEventId, null, null, 1); } @Test diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 33b3f985d8f6..aa26cc949ad3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -423,10 +423,12 @@ public void testEventCounts(String dbName, long fromEventId, Long toEventId, Int long expectedCount) throws Exception { NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName); - if (toEventId != null) + if (toEventId != null) { rqst.setToEventId(toEventId); - if (limit != null) + } + if (limit != null) { rqst.setLimit(limit); + } assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount()); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 7de46c94ffc3..f001dcb4980c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -10267,7 +10267,7 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv // count fromEventId onwards events queryStr = queryStr + " where eventId > fromEventId"; paramSpecs = "java.lang.Long fromEventId"; - paramVals.add(new Long(fromEventId)); + paramVals.add(Long.valueOf(fromEventId)); // Input database name can be a database name or a *. In the first case we add a filter // condition on dbName column, but not in the second case, since a * means all the @@ -10293,7 +10293,7 @@ public NotificationEventsCountResponse getNotificationEventsCount(NotificationEv toEventId = rqst.getToEventId(); queryStr = queryStr + " && eventId <= toEventId"; paramSpecs = paramSpecs + ", java.lang.Long toEventId"; - paramVals.add(new Long(toEventId)); + paramVals.add(Long.valueOf(toEventId)); } query = pm.newQuery(queryStr);