-
Notifications
You must be signed in to change notification settings - Fork 914
/
AMQPFederationTarget.java
165 lines (139 loc) · 6.27 KB
/
AMQPFederationTarget.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
/**
* This is the receiving side of an AMQP broker federation that occurs over an
* inbound connection from a remote peer. The federation target only comes into
* existence once a remote peer connects and successfully authenticates against
* a control link validation address. Only one federation target is allowed per
* connection.
*/
public class AMQPFederationTarget extends AMQPFederation {
private final AMQPConnectionContext connection;
private final AMQPFederationConfiguration configuration;
public AMQPFederationTarget(String name, AMQPFederationConfiguration configuration, AMQPSessionContext session, ActiveMQServer server) {
super(name, server);
Objects.requireNonNull(session, "Provided session instance cannot be null");
this.session = session;
this.connection = session.getAMQPConnectionContext();
this.connection.addLinkRemoteCloseListener(getName(), this::handleLinkRemoteClose);
this.configuration = configuration;
}
@Override
public AMQPConnectionContext getConnectionContext() {
return connection;
}
@Override
public AMQPSessionContext getSessionContext() {
return session;
}
@Override
public int getReceiverCredits() {
return configuration.getReceiverCredits();
}
@Override
public int getReceiverCreditsLow() {
return configuration.getReceiverCreditsLow();
}
@Override
public int getLargeMessageThreshold() {
return configuration.getLargeMessageThreshold();
}
@Override
public int getLinkAttachTimeout() {
return configuration.getLinkAttachTimeout();
}
@Override
public boolean isCoreMessageTunnelingEnabled() {
return configuration.isCoreMessageTunnelingEnabled();
}
@Override
public boolean isIgnoreQueueConsumerFilters() {
return configuration.isIgnoreSubscriptionFilters();
}
@Override
public boolean isIgnoreQueueConsumerPriorities() {
return configuration.isIgnoreSubscriptionPriorities();
}
@Override
protected void handleFederationStarted() throws ActiveMQException {
// Tag the session with Federation metadata which will allow local federation policies sent by
// the remote to apply checks when seeing local demand to determine if a federation consumer
// should cause remote receivers to be created.
//
// This currently is a session global tag which means any consumer created from this session in
// response to remote attach of said receiver is going to get caught by the filtering but as of
// now we shouldn't be creating consumers other than federation consumers but if that were to
// change we'd either need single new session for this federation instance or a session per
// consumer at the extreme which then requires that the protocol handling code add the metadata
// during the receiver attach on the remote.
try {
session.getSessionSPI().addMetaData(FederationConstants.FEDERATION_NAME, getName());
} catch (ActiveMQAMQPException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException("Error while configuring interal session metadata");
}
super.handleFederationStarted();
}
private void handleLinkRemoteClose(Link link) {
// If the connection has already closed then we can ignore this event.
final Connection protonConnection = link.getSession().getConnection();
if (protonConnection.getLocalState() != EndpointState.ACTIVE) {
return;
}
// If the link is locally closed then we closed it intentionally and
// we can continue as normal otherwise we need to check on why it closed.
if (link.getLocalState() != EndpointState.ACTIVE) {
return;
}
// Did the federation links handle this so that we can ignore it?
// If not then we consider this a terminal outcome and close the connection.
if (!invokeLinkClosedInterceptors(link)) {
signalError(new ActiveMQAMQPInternalErrorException("Federation link closed unexpectedly: " + link.getName()));
}
}
@Override
protected void signalResourceCreateError(Exception cause) {
signalError(cause);
}
@Override
protected void signalError(Exception cause) {
final Symbol condition;
final String description = cause.getMessage();
if (cause instanceof ActiveMQAMQPException) {
condition = ((ActiveMQAMQPException) cause).getAmqpError();
} else {
condition = AmqpError.INTERNAL_ERROR;
}
connection.close(new ErrorCondition(condition, description));
}
}