From 733d58cd687517678ebb6e78890327796fd0aef6 Mon Sep 17 00:00:00 2001 From: Aaron Gresch Date: Thu, 21 Jun 2018 14:56:19 -0500 Subject: [PATCH] STORM-3117 prevent deleting blobs while topologies still active --- .../generated/IllegalStateException.java | 380 ++++++++++++++++++ .../org/apache/storm/generated/Nimbus.java | 130 +++++- .../utils/WrappedIllegalStateException.java | 32 ++ storm-client/src/py/storm/Nimbus.py | 20 +- storm-client/src/py/storm/ttypes.py | 66 +++ storm-client/src/storm.thrift | 6 +- .../apache/storm/daemon/nimbus/Nimbus.java | 27 +- 7 files changed, 649 insertions(+), 12 deletions(-) create mode 100644 storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java create mode 100644 storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java diff --git a/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java b/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java new file mode 100644 index 00000000000..4b16837aff0 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/generated/IllegalStateException.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.11.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.storm.generated; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.11.0)") +public class IllegalStateException extends org.apache.storm.thrift.TException implements org.apache.storm.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("IllegalStateException"); + + private static final org.apache.storm.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("msg", org.apache.storm.thrift.protocol.TType.STRING, (short)1); + + private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new IllegalStateExceptionStandardSchemeFactory(); + private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new IllegalStateExceptionTupleSchemeFactory(); + + private java.lang.String msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum { + MSG((short)1, "msg"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // MSG + return MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.MSG, new org.apache.storm.thrift.meta_data.FieldMetaData("msg", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, + new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(IllegalStateException.class, metaDataMap); + } + + public IllegalStateException() { + } + + public IllegalStateException( + java.lang.String msg) + { + this(); + this.msg = msg; + } + + /** + * Performs a deep copy on other. + */ + public IllegalStateException(IllegalStateException other) { + if (other.is_set_msg()) { + this.msg = other.msg; + } + } + + public IllegalStateException deepCopy() { + return new IllegalStateException(this); + } + + @Override + public void clear() { + this.msg = null; + } + + public java.lang.String get_msg() { + return this.msg; + } + + public void set_msg(java.lang.String msg) { + this.msg = msg; + } + + public void unset_msg() { + this.msg = null; + } + + /** Returns true if field msg is set (has been assigned a value) and false otherwise */ + public boolean is_set_msg() { + return this.msg != null; + } + + public void set_msg_isSet(boolean value) { + if (!value) { + this.msg = null; + } + } + + public void setFieldValue(_Fields field, java.lang.Object value) { + switch (field) { + case MSG: + if (value == null) { + unset_msg(); + } else { + set_msg((java.lang.String)value); + } + break; + + } + } + + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case MSG: + return get_msg(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case MSG: + return is_set_msg(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof IllegalStateException) + return this.equals((IllegalStateException)that); + return false; + } + + public boolean equals(IllegalStateException that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_msg = true && this.is_set_msg(); + boolean that_present_msg = true && that.is_set_msg(); + if (this_present_msg || that_present_msg) { + if (!(this_present_msg && that_present_msg)) + return false; + if (!this.msg.equals(that.msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((is_set_msg()) ? 131071 : 524287); + if (is_set_msg()) + hashCode = hashCode * 8191 + msg.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(IllegalStateException other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(is_set_msg()).compareTo(other.is_set_msg()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_msg()) { + lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.msg, other.msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.storm.thrift.protocol.TProtocol iprot) throws org.apache.storm.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.storm.thrift.protocol.TProtocol oprot) throws org.apache.storm.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("IllegalStateException("); + boolean first = true; + + sb.append("msg:"); + if (this.msg == null) { + sb.append("null"); + } else { + sb.append(this.msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.storm.thrift.TException { + // check for required fields + if (!is_set_msg()) { + throw new org.apache.storm.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString()); + } + + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.storm.thrift.protocol.TCompactProtocol(new org.apache.storm.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.storm.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.storm.thrift.protocol.TCompactProtocol(new org.apache.storm.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.storm.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class IllegalStateExceptionStandardSchemeFactory implements org.apache.storm.thrift.scheme.SchemeFactory { + public IllegalStateExceptionStandardScheme getScheme() { + return new IllegalStateExceptionStandardScheme(); + } + } + + private static class IllegalStateExceptionStandardScheme extends org.apache.storm.thrift.scheme.StandardScheme { + + public void read(org.apache.storm.thrift.protocol.TProtocol iprot, IllegalStateException struct) throws org.apache.storm.thrift.TException { + org.apache.storm.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.storm.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // MSG + if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRING) { + struct.msg = iprot.readString(); + struct.set_msg_isSet(true); + } else { + org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + struct.validate(); + } + + public void write(org.apache.storm.thrift.protocol.TProtocol oprot, IllegalStateException struct) throws org.apache.storm.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.msg != null) { + oprot.writeFieldBegin(MSG_FIELD_DESC); + oprot.writeString(struct.msg); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class IllegalStateExceptionTupleSchemeFactory implements org.apache.storm.thrift.scheme.SchemeFactory { + public IllegalStateExceptionTupleScheme getScheme() { + return new IllegalStateExceptionTupleScheme(); + } + } + + private static class IllegalStateExceptionTupleScheme extends org.apache.storm.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.storm.thrift.protocol.TProtocol prot, IllegalStateException struct) throws org.apache.storm.thrift.TException { + org.apache.storm.thrift.protocol.TTupleProtocol oprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot; + oprot.writeString(struct.msg); + } + + @Override + public void read(org.apache.storm.thrift.protocol.TProtocol prot, IllegalStateException struct) throws org.apache.storm.thrift.TException { + org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot; + struct.msg = iprot.readString(); + struct.set_msg_isSet(true); + } + } + + private static S scheme(org.apache.storm.thrift.protocol.TProtocol proto) { + return (org.apache.storm.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } +} + diff --git a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java index 9fcc8ee89c6..de9a3b3224a 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java +++ b/storm-client/src/jvm/org/apache/storm/generated/Nimbus.java @@ -85,7 +85,7 @@ public interface Iface { public java.nio.ByteBuffer downloadBlobChunk(java.lang.String session) throws AuthorizationException, org.apache.storm.thrift.TException; - public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException; + public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException; public ListBlobsResult listBlobs(java.lang.String session) throws org.apache.storm.thrift.TException; @@ -890,7 +890,7 @@ public java.nio.ByteBuffer recv_downloadBlobChunk() throws AuthorizationExceptio throw new org.apache.storm.thrift.TApplicationException(org.apache.storm.thrift.TApplicationException.MISSING_RESULT, "downloadBlobChunk failed: unknown result"); } - public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException + public void deleteBlob(java.lang.String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException { send_deleteBlob(key); recv_deleteBlob(); @@ -903,7 +903,7 @@ public void send_deleteBlob(java.lang.String key) throws org.apache.storm.thrift sendBase("deleteBlob", args); } - public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException + public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException { deleteBlob_result result = new deleteBlob_result(); receiveBase(result, "deleteBlob"); @@ -913,6 +913,9 @@ public void recv_deleteBlob() throws AuthorizationException, KeyNotFoundExceptio if (result.knf != null) { throw result.knf; } + if (result.ise != null) { + throw result.ise; + } return; } @@ -2460,7 +2463,7 @@ public void write_args(org.apache.storm.thrift.protocol.TProtocol prot) throws o prot.writeMessageEnd(); } - public Void getResult() throws AuthorizationException, KeyNotFoundException, org.apache.storm.thrift.TException { + public Void getResult() throws AuthorizationException, KeyNotFoundException, IllegalStateException, org.apache.storm.thrift.TException { if (getState() != org.apache.storm.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new java.lang.IllegalStateException("Method call not finished!"); } @@ -4135,6 +4138,8 @@ public deleteBlob_result getResult(I iface, deleteBlob_args args) throws org.apa result.aze = aze; } catch (KeyNotFoundException knf) { result.knf = knf; + } catch (IllegalStateException ise) { + result.ise = ise; } return result; } @@ -6540,6 +6545,10 @@ public void onError(java.lang.Exception e) { result.knf = (KeyNotFoundException) e; result.set_knf_isSet(true); msg = result; + } else if (e instanceof IllegalStateException) { + result.ise = (IllegalStateException) e; + result.set_ise_isSet(true); + msg = result; } else if (e instanceof org.apache.storm.thrift.transport.TTransportException) { _LOGGER.error("TTransportException inside handler", e); fb.close(); @@ -29008,17 +29017,20 @@ public static class deleteBlob_result implements org.apache.storm.thrift.TBase byName = new java.util.HashMap(); @@ -29037,6 +29049,8 @@ public static _Fields findByThriftId(int fieldId) { return AZE; case 2: // KNF return KNF; + case 3: // ISE + return ISE; default: return null; } @@ -29084,6 +29098,8 @@ public java.lang.String getFieldName() { new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, AuthorizationException.class))); tmpMap.put(_Fields.KNF, new org.apache.storm.thrift.meta_data.FieldMetaData("knf", org.apache.storm.thrift.TFieldRequirementType.DEFAULT, new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, KeyNotFoundException.class))); + tmpMap.put(_Fields.ISE, new org.apache.storm.thrift.meta_data.FieldMetaData("ise", org.apache.storm.thrift.TFieldRequirementType.DEFAULT, + new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, IllegalStateException.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(deleteBlob_result.class, metaDataMap); } @@ -29093,11 +29109,13 @@ public deleteBlob_result() { public deleteBlob_result( AuthorizationException aze, - KeyNotFoundException knf) + KeyNotFoundException knf, + IllegalStateException ise) { this(); this.aze = aze; this.knf = knf; + this.ise = ise; } /** @@ -29110,6 +29128,9 @@ public deleteBlob_result(deleteBlob_result other) { if (other.is_set_knf()) { this.knf = new KeyNotFoundException(other.knf); } + if (other.is_set_ise()) { + this.ise = new IllegalStateException(other.ise); + } } public deleteBlob_result deepCopy() { @@ -29120,6 +29141,7 @@ public deleteBlob_result deepCopy() { public void clear() { this.aze = null; this.knf = null; + this.ise = null; } public AuthorizationException get_aze() { @@ -29168,6 +29190,29 @@ public void set_knf_isSet(boolean value) { } } + public IllegalStateException get_ise() { + return this.ise; + } + + public void set_ise(IllegalStateException ise) { + this.ise = ise; + } + + public void unset_ise() { + this.ise = null; + } + + /** Returns true if field ise is set (has been assigned a value) and false otherwise */ + public boolean is_set_ise() { + return this.ise != null; + } + + public void set_ise_isSet(boolean value) { + if (!value) { + this.ise = null; + } + } + public void setFieldValue(_Fields field, java.lang.Object value) { switch (field) { case AZE: @@ -29186,6 +29231,14 @@ public void setFieldValue(_Fields field, java.lang.Object value) { } break; + case ISE: + if (value == null) { + unset_ise(); + } else { + set_ise((IllegalStateException)value); + } + break; + } } @@ -29197,6 +29250,9 @@ public java.lang.Object getFieldValue(_Fields field) { case KNF: return get_knf(); + case ISE: + return get_ise(); + } throw new java.lang.IllegalStateException(); } @@ -29212,6 +29268,8 @@ public boolean isSet(_Fields field) { return is_set_aze(); case KNF: return is_set_knf(); + case ISE: + return is_set_ise(); } throw new java.lang.IllegalStateException(); } @@ -29249,6 +29307,15 @@ public boolean equals(deleteBlob_result that) { return false; } + boolean this_present_ise = true && this.is_set_ise(); + boolean that_present_ise = true && that.is_set_ise(); + if (this_present_ise || that_present_ise) { + if (!(this_present_ise && that_present_ise)) + return false; + if (!this.ise.equals(that.ise)) + return false; + } + return true; } @@ -29264,6 +29331,10 @@ public int hashCode() { if (is_set_knf()) hashCode = hashCode * 8191 + knf.hashCode(); + hashCode = hashCode * 8191 + ((is_set_ise()) ? 131071 : 524287); + if (is_set_ise()) + hashCode = hashCode * 8191 + ise.hashCode(); + return hashCode; } @@ -29295,6 +29366,16 @@ public int compareTo(deleteBlob_result other) { return lastComparison; } } + lastComparison = java.lang.Boolean.valueOf(is_set_ise()).compareTo(other.is_set_ise()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_ise()) { + lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.ise, other.ise); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -29330,6 +29411,14 @@ public java.lang.String toString() { sb.append(this.knf); } first = false; + if (!first) sb.append(", "); + sb.append("ise:"); + if (this.ise == null) { + sb.append("null"); + } else { + sb.append(this.ise); + } + first = false; sb.append(")"); return sb.toString(); } @@ -29391,6 +29480,15 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, deleteBlob_re org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // ISE + if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRUCT) { + struct.ise = new IllegalStateException(); + struct.ise.read(iprot); + struct.set_ise_isSet(true); + } else { + org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -29414,6 +29512,11 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, deleteBlob_r struct.knf.write(oprot); oprot.writeFieldEnd(); } + if (struct.ise != null) { + oprot.writeFieldBegin(ISE_FIELD_DESC); + struct.ise.write(oprot); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -29438,19 +29541,25 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, deleteBlob_re if (struct.is_set_knf()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.is_set_ise()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.is_set_aze()) { struct.aze.write(oprot); } if (struct.is_set_knf()) { struct.knf.write(oprot); } + if (struct.is_set_ise()) { + struct.ise.write(oprot); + } } @Override public void read(org.apache.storm.thrift.protocol.TProtocol prot, deleteBlob_result struct) throws org.apache.storm.thrift.TException { org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(2); + java.util.BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.aze = new AuthorizationException(); struct.aze.read(iprot); @@ -29461,6 +29570,11 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, deleteBlob_res struct.knf.read(iprot); struct.set_knf_isSet(true); } + if (incoming.get(2)) { + struct.ise = new IllegalStateException(); + struct.ise.read(iprot); + struct.set_ise_isSet(true); + } } } diff --git a/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java b/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java new file mode 100644 index 00000000000..3df9c0a9d57 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/utils/WrappedIllegalStateException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import org.apache.storm.generated.IllegalStateException; + +public class WrappedIllegalStateException extends IllegalStateException { + public WrappedIllegalStateException(String msg) { + super(msg); + } + + @Override + public String getMessage() { + return this.get_msg(); + } +} diff --git a/storm-client/src/py/storm/Nimbus.py b/storm-client/src/py/storm/Nimbus.py index 182dbe2b7f7..a4e64848e7c 100644 --- a/storm-client/src/py/storm/Nimbus.py +++ b/storm-client/src/py/storm/Nimbus.py @@ -1235,6 +1235,8 @@ def recv_deleteBlob(self): raise result.aze if result.knf is not None: raise result.knf + if result.ise is not None: + raise result.ise return def listBlobs(self, session): @@ -2879,6 +2881,9 @@ def process_deleteBlob(self, seqid, iprot, oprot): except KeyNotFoundException as knf: msg_type = TMessageType.REPLY result.knf = knf + except IllegalStateException as ise: + msg_type = TMessageType.REPLY + result.ise = ise except TApplicationException as ex: logging.exception('TApplication exception in handler') msg_type = TMessageType.EXCEPTION @@ -6928,12 +6933,14 @@ class deleteBlob_result(object): Attributes: - aze - knf + - ise """ - def __init__(self, aze=None, knf=None,): + def __init__(self, aze=None, knf=None, ise=None,): self.aze = aze self.knf = knf + self.ise = ise def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -6956,6 +6963,12 @@ def read(self, iprot): self.knf.read(iprot) else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRUCT: + self.ise = IllegalStateException() + self.ise.read(iprot) + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -6974,6 +6987,10 @@ def write(self, oprot): oprot.writeFieldBegin('knf', TType.STRUCT, 2) self.knf.write(oprot) oprot.writeFieldEnd() + if self.ise is not None: + oprot.writeFieldBegin('ise', TType.STRUCT, 3) + self.ise.write(oprot) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -6995,6 +7012,7 @@ def __ne__(self, other): None, # 0 (1, TType.STRUCT, 'aze', [AuthorizationException, None], None, ), # 1 (2, TType.STRUCT, 'knf', [KeyNotFoundException, None], None, ), # 2 + (3, TType.STRUCT, 'ise', [IllegalStateException, None], None, ), # 3 ) diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py index 48d28866266..0d7ea12ce82 100644 --- a/storm-client/src/py/storm/ttypes.py +++ b/storm-client/src/py/storm/ttypes.py @@ -1921,6 +1921,67 @@ def __ne__(self, other): return not (self == other) +class IllegalStateException(TException): + """ + Attributes: + - msg + """ + + + def __init__(self, msg=None,): + self.msg = msg + + def read(self, iprot): + if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: + iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec]) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.msg = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot._fast_encode is not None and self.thrift_spec is not None: + oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec])) + return + oprot.writeStructBegin('IllegalStateException') + if self.msg is not None: + oprot.writeFieldBegin('msg', TType.STRING, 1) + oprot.writeString(self.msg.encode('utf-8') if sys.version_info[0] == 2 else self.msg) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.msg is None: + raise TProtocolException(message='Required field msg is unset!') + return + + def __str__(self): + return repr(self) + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.items()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + + class KeyAlreadyExistsException(TException): """ Attributes: @@ -10201,6 +10262,11 @@ def __ne__(self, other): None, # 0 (1, TType.STRING, 'msg', 'UTF8', None, ), # 1 ) +all_structs.append(IllegalStateException) +IllegalStateException.thrift_spec = ( + None, # 0 + (1, TType.STRING, 'msg', 'UTF8', None, ), # 1 +) all_structs.append(KeyAlreadyExistsException) KeyAlreadyExistsException.thrift_spec = ( None, # 0 diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index fefc6db0d41..724c5b43b02 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -153,6 +153,10 @@ exception KeyNotFoundException { 1: required string msg; } +exception IllegalStateException { + 1: required string msg; +} + exception KeyAlreadyExistsException { 1: required string msg; } @@ -744,7 +748,7 @@ service Nimbus { void setBlobMeta(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); BeginDownloadResult beginBlobDownload(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); binary downloadBlobChunk(1: string session) throws (1: AuthorizationException aze); - void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); + void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf, 3: IllegalStateException ise); ListBlobsResult listBlobs(1: string session); //empty string "" means start at the beginning i32 getBlobReplication(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); i32 updateBlobReplication(1: string key, 2: i32 replication) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf); diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index e19d0ef474a..3c167450961 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -87,6 +87,7 @@ import org.apache.storm.generated.ExecutorStats; import org.apache.storm.generated.ExecutorSummary; import org.apache.storm.generated.GetInfoOptions; +import org.apache.storm.generated.IllegalStateException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.generated.KeyAlreadyExistsException; import org.apache.storm.generated.KeyNotFoundException; @@ -202,6 +203,7 @@ import org.apache.storm.utils.VersionInfo; import org.apache.storm.utils.WrappedAlreadyAliveException; import org.apache.storm.utils.WrappedAuthorizationException; +import org.apache.storm.utils.WrappedIllegalStateException; import org.apache.storm.utils.WrappedInvalidTopologyException; import org.apache.storm.utils.WrappedNotAliveException; import org.apache.storm.validation.ConfigValidation; @@ -820,10 +822,18 @@ private static Map basicSupervisorDetailsMap(IStormCl return ret; } + /** + * NOTE: this can return false when a topology has just been activated. The topology may still be + * in the STORMS_SUBTREE. + */ private static boolean isTopologyActive(IStormClusterState state, String topoName) { return state.getTopoId(topoName).isPresent(); } + private static boolean isTopologyActiveOrActivating(IStormClusterState state, String topoName) { + return isTopologyActive(state, topoName) || state.activeStorms().contains(topoName); + } + private static Map tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException { try { @@ -2672,7 +2682,12 @@ private ClusterSummary getClusterInfoImpl() throws Exception { summary.set_assigned_memoffheap(resources.getAssignedMemOffHeap()); summary.set_assigned_cpu(resources.getAssignedCpu()); } - summary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId))); + try { + summary.set_replication_count(getBlobReplicationCount(ConfigUtils.masterStormCodeKey(topoId))); + } catch (KeyNotFoundException e) { + // This could fail if a blob gets deleted by mistake. Don't crash nimbus. + LOG.error("Unable to find blob entry", e); + } topologySummaries.add(summary); } @@ -3555,8 +3570,16 @@ public ByteBuffer downloadBlobChunk(String session) throws AuthorizationExceptio } @Override - public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, TException { + public void deleteBlob(String key) throws AuthorizationException, KeyNotFoundException, IllegalStateException, TException { try { + String topoName = ConfigUtils.getIdFromBlobKey(key); + if (topoName != null) { + if (isTopologyActiveOrActivating(stormClusterState, topoName)) { + String message = "Attempting to delete blob " + key + " from under active topology " + topoName; + LOG.warn(message); + throw new WrappedIllegalStateException(message); + } + } blobStore.deleteBlob(key, getSubject()); LOG.info("Deleted blob for key {}", key); } catch (Exception e) {