-
Notifications
You must be signed in to change notification settings - Fork 566
Make ExchangeService more or less thread safe #428
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
| import java.security.GeneralSecurityException; | ||
| import java.security.SecureRandom; | ||
| import java.text.DateFormat; | ||
| import java.text.SimpleDateFormat; | ||
| import java.util.Date; | ||
|
|
@@ -40,6 +41,7 @@ | |
| import java.util.Map; | ||
| import java.util.Random; | ||
| import java.util.TimeZone; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
||
| import javax.xml.stream.XMLStreamException; | ||
| import javax.xml.stream.XMLStreamWriter; | ||
|
|
@@ -92,7 +94,7 @@ public abstract class ExchangeServiceBase implements Closeable { | |
| /** | ||
| * The binary secret. | ||
| */ | ||
| private static byte[] binarySecret; | ||
| private static byte[] binarySecret = generateSessionKey(); | ||
|
|
||
| /** | ||
| * The timeout. | ||
|
|
@@ -132,24 +134,26 @@ public abstract class ExchangeServiceBase implements Closeable { | |
| /** | ||
| * The requested server version. | ||
| */ | ||
| private ExchangeVersion requestedServerVersion = ExchangeVersion.Exchange2010_SP2; | ||
| private final ExchangeVersion requestedServerVersion; | ||
|
|
||
| /** | ||
| * The server info. | ||
| */ | ||
| private ExchangeServerInfo serverInfo; | ||
|
|
||
| private Map<String, String> httpHeaders = new HashMap<String, String>(); | ||
|
|
||
| private Map<String, String> httpResponseHeaders = new HashMap<String, String>(); | ||
| private Map<String, String> httpHeaders = new ConcurrentHashMap<String, String>(); | ||
|
|
||
| private WebProxy webProxy; | ||
|
|
||
| protected CloseableHttpClient httpClient; | ||
| private volatile CloseableHttpClient httpClient; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need volatile here? Doesn't the use of synchronized between the two if checks does the trick of creating a barrier that makes the compiler assume that non local variables may be changed by another thread?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, volatile is absolutely required here in order to ensure coherency between multiple threads. Please see https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java for a full explanation. |
||
|
|
||
| private final Object httpClientLock = new Object(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why you didnt use ReentrantLocks?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because synchronized keyword locking is reentrant. If we wanted to avoid use of the synchronized keyword and implement the locking manually then ReentrantLock, as you said, would be the way to go... But I don't see the need for that complexity (the keyword approach is more readable). |
||
|
|
||
| private final CookieStore cookieStore = new BasicCookieStore(); | ||
|
|
||
| protected HttpClientContext httpContext; | ||
| private volatile CloseableHttpClient httpPoolingClient; | ||
|
|
||
| protected CloseableHttpClient httpPoolingClient; | ||
| private final Object httpPoolingClientLock = new Object(); | ||
|
|
||
| private int maximumPoolingConnections = 10; | ||
|
|
||
|
|
@@ -161,7 +165,7 @@ public abstract class ExchangeServiceBase implements Closeable { | |
| /** | ||
| * Default UserAgent. | ||
| */ | ||
| private static String defaultUserAgent = "ExchangeServicesClient/" + EwsUtilities.getBuildVersion(); | ||
| private static final String defaultUserAgent = "ExchangeServicesClient/" + EwsUtilities.getBuildVersion(); | ||
|
|
||
| /** | ||
| * Initializes a new instance. | ||
|
|
@@ -170,13 +174,11 @@ public abstract class ExchangeServiceBase implements Closeable { | |
| * every other constructor. | ||
| */ | ||
| protected ExchangeServiceBase() { | ||
| setUseDefaultCredentials(true); | ||
| initializeHttpClient(); | ||
| initializeHttpContext(); | ||
| this(ExchangeVersion.Exchange2010_SP2); | ||
| } | ||
|
|
||
| protected ExchangeServiceBase(ExchangeVersion requestedServerVersion) { | ||
| this(); | ||
| setUseDefaultCredentials(true); | ||
| this.requestedServerVersion = requestedServerVersion; | ||
| } | ||
|
|
||
|
|
@@ -194,30 +196,6 @@ protected ExchangeServiceBase(ExchangeServiceBase service, ExchangeVersion reque | |
| this.httpHeaders = service.getHttpHeaders(); | ||
| } | ||
|
|
||
| private void initializeHttpClient() { | ||
| Registry<ConnectionSocketFactory> registry = createConnectionSocketFactoryRegistry(); | ||
| HttpClientConnectionManager httpConnectionManager = new BasicHttpClientConnectionManager(registry); | ||
| AuthenticationStrategy authStrategy = new CookieProcessingTargetAuthenticationStrategy(); | ||
|
|
||
| httpClient = HttpClients.custom() | ||
| .setConnectionManager(httpConnectionManager) | ||
| .setTargetAuthenticationStrategy(authStrategy) | ||
| .build(); | ||
| } | ||
|
|
||
| private void initializeHttpPoolingClient() { | ||
| Registry<ConnectionSocketFactory> registry = createConnectionSocketFactoryRegistry(); | ||
| PoolingHttpClientConnectionManager httpConnectionManager = new PoolingHttpClientConnectionManager(registry); | ||
| httpConnectionManager.setMaxTotal(maximumPoolingConnections); | ||
| httpConnectionManager.setDefaultMaxPerRoute(maximumPoolingConnections); | ||
| AuthenticationStrategy authStrategy = new CookieProcessingTargetAuthenticationStrategy(); | ||
|
|
||
| httpPoolingClient = HttpClients.custom() | ||
| .setConnectionManager(httpConnectionManager) | ||
| .setTargetAuthenticationStrategy(authStrategy) | ||
| .build(); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the maximum number of connections for the pooling connection manager which is used for | ||
| * subscriptions. | ||
|
|
@@ -230,6 +208,9 @@ private void initializeHttpPoolingClient() { | |
| public void setMaximumPoolingConnections(int maximumPoolingConnections) { | ||
| if (maximumPoolingConnections < 1) | ||
| throw new IllegalArgumentException("maximumPoolingConnections must be 1 or greater"); | ||
| if (httpPoolingClient != null) { | ||
| throw new IllegalStateException("Cannot change the maximumPoolingConnections setting after a request has been made"); | ||
| } | ||
| this.maximumPoolingConnections = maximumPoolingConnections; | ||
| } | ||
|
|
||
|
|
@@ -252,29 +233,37 @@ protected Registry<ConnectionSocketFactory> createConnectionSocketFactoryRegistr | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * (Re)initializes the HttpContext object. This removes any existing state (mainly cookies). Use an own | ||
| * cookie store, instead of the httpClient's global store, so cookies get reset on reinitialization | ||
| */ | ||
| private void initializeHttpContext() { | ||
| CookieStore cookieStore = new BasicCookieStore(); | ||
| httpContext = HttpClientContext.create(); | ||
| protected HttpClientContext createHttpClientContext() { | ||
| HttpClientContext httpContext = HttpClientContext.create(); | ||
| httpContext.setCookieStore(cookieStore); | ||
| return httpContext; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| try { | ||
| httpClient.close(); | ||
| } catch (IOException e) { | ||
| LOG.debug(e); | ||
| if(httpClient!=null) { | ||
| synchronized(httpClientLock){ | ||
| if(httpClient!=null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it will be better to move this code in something like IOUtils.closeQuietly to prevent copy-paste (and cleanup other places)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I can do that. But wouldn't it be better to do that in another pull request? I'm happy to do it here, but to me, it seems like this pull request should solve one issue (thread safety) and not expand to other issues (like this kind of refactoring/cleanup). Let me know. |
||
| try { | ||
| httpClient.close(); | ||
| } catch (IOException e) { | ||
| LOG.debug(e); | ||
| } | ||
| httpClient = null; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (httpPoolingClient != null) { | ||
| try { | ||
| httpPoolingClient.close(); | ||
| } catch (IOException e) { | ||
| LOG.debug(e); | ||
| synchronized(httpPoolingClientLock){ | ||
| if (httpPoolingClient != null) { | ||
| try { | ||
| httpPoolingClient.close(); | ||
| } catch (IOException e) { | ||
| LOG.debug(e); | ||
| } | ||
| httpPoolingClient = null; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -322,7 +311,8 @@ protected HttpWebRequest prepareHttpWebRequestForUrl(URI url, boolean acceptGzip | |
| throw new ServiceLocalException(strErr); | ||
| } | ||
|
|
||
| HttpClientWebRequest request = new HttpClientWebRequest(httpClient, httpContext); | ||
| HttpClientWebRequest request = new HttpClientWebRequest(getHttpClient(), createHttpClientContext()); | ||
| request.setProxy(getWebProxy()); | ||
| prepareHttpWebRequestForUrl(url, acceptGzipEncoding, allowAutoRedirect, request); | ||
|
|
||
| return request; | ||
|
|
@@ -352,11 +342,8 @@ protected HttpWebRequest prepareHttpPoolingWebRequestForUrl(URI url, boolean acc | |
| throw new ServiceLocalException(strErr); | ||
| } | ||
|
|
||
| if (httpPoolingClient == null) { | ||
| initializeHttpPoolingClient(); | ||
| } | ||
|
|
||
| HttpClientWebRequest request = new HttpClientWebRequest(httpPoolingClient, httpContext); | ||
| HttpClientWebRequest request = new HttpClientWebRequest(getHttpPoolingClient(), createHttpClientContext()); | ||
| request.setProxy(getWebProxy()); | ||
| prepareHttpWebRequestForUrl(url, acceptGzipEncoding, allowAutoRedirect, request); | ||
|
|
||
| return request; | ||
|
|
@@ -383,8 +370,6 @@ private void prepareHttpWebRequestForUrl(URI url, boolean acceptGzipEncoding, bo | |
| prepareCredentials(request); | ||
|
|
||
| request.prepareConnection(); | ||
|
|
||
| httpResponseHeaders.clear(); | ||
| } | ||
|
|
||
| protected void prepareCredentials(HttpWebRequest request) throws ServiceLocalException, URISyntaxException { | ||
|
|
@@ -644,8 +629,8 @@ public void setCredentials(ExchangeCredentials credentials) { | |
| this.credentials = credentials; | ||
| this.useDefaultCredentials = false; | ||
|
|
||
| // Reset the httpContext, to remove any existing authentication cookies from subsequent request | ||
| initializeHttpContext(); | ||
| // Reset the cookies, to remove any existing authentication cookies from subsequent request | ||
| cookieStore.clear(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -673,8 +658,8 @@ public void setUseDefaultCredentials(boolean value) { | |
| this.credentials = null; | ||
| } | ||
|
|
||
| // Reset the httpContext, to remove any existing authentication cookies from subsequent request | ||
| initializeHttpContext(); | ||
| // Reset the cookies, to remove any existing authentication cookies from subsequent request | ||
| cookieStore.clear(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -820,6 +805,48 @@ public void setWebProxy(WebProxy value) { | |
| public Map<String, String> getHttpHeaders() { | ||
| return this.httpHeaders; | ||
| } | ||
|
|
||
| protected final CloseableHttpClient getHttpClient(){ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to configure/use custom httpClient or poolingHttpClient (with some additional configuration)? Maybe it will be better to move some part of code in factory methods?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I can do that. But wouldn't it be better to do that in another pull request? I'm happy to do it here, but to me, it seems like this pull request should solve one issue (thread safety) and not expand to other issues (like this one, which has already been reported separately as #423). Let me know. |
||
| CloseableHttpClient ret = httpClient; | ||
| if(ret == null){ | ||
| synchronized(httpClientLock){ | ||
| ret = httpClient; | ||
| if(ret == null){ | ||
| Registry<ConnectionSocketFactory> registry = createConnectionSocketFactoryRegistry(); | ||
| HttpClientConnectionManager httpConnectionManager = new BasicHttpClientConnectionManager(registry); | ||
| AuthenticationStrategy authStrategy = new CookieProcessingTargetAuthenticationStrategy(); | ||
|
|
||
| httpClient = ret = HttpClients.custom() | ||
| .setConnectionManager(httpConnectionManager) | ||
| .setTargetAuthenticationStrategy(authStrategy) | ||
| .build(); | ||
| } | ||
| } | ||
| } | ||
| return ret; | ||
| } | ||
|
|
||
| protected final CloseableHttpClient getHttpPoolingClient(){ | ||
| CloseableHttpClient ret = httpClient; | ||
| if(ret == null){ | ||
| synchronized(httpPoolingClientLock){ | ||
| ret = httpPoolingClient; | ||
| if(ret == null){ | ||
| Registry<ConnectionSocketFactory> registry = createConnectionSocketFactoryRegistry(); | ||
| PoolingHttpClientConnectionManager poolingHttpConnectionManager = new PoolingHttpClientConnectionManager(registry); | ||
| poolingHttpConnectionManager.setMaxTotal(maximumPoolingConnections); | ||
| poolingHttpConnectionManager.setDefaultMaxPerRoute(maximumPoolingConnections); | ||
| AuthenticationStrategy authStrategy = new CookieProcessingTargetAuthenticationStrategy(); | ||
|
|
||
| httpPoolingClient = ret = HttpClients.custom() | ||
| .setConnectionManager(poolingHttpConnectionManager) | ||
| .setTargetAuthenticationStrategy(authStrategy) | ||
| .build(); | ||
| } | ||
| } | ||
| } | ||
| return ret; | ||
| } | ||
|
|
||
| // Events | ||
|
|
||
|
|
@@ -859,44 +886,21 @@ public void setOnSerializeCustomSoapHeaders(List<ICustomXmlSerialization> onSeri | |
| public void processHttpResponseHeaders(TraceFlags traceType, HttpWebRequest request) | ||
| throws XMLStreamException, IOException, EWSHttpException { | ||
| this.traceHttpResponseHeaders(traceType, request); | ||
| this.saveHttpResponseHeaders(request.getResponseHeaders()); | ||
| } | ||
|
|
||
| /** | ||
| * Save the HTTP response headers. | ||
| * | ||
| * @param headers The response headers | ||
| */ | ||
| private void saveHttpResponseHeaders(Map<String, String> headers) { | ||
| this.httpResponseHeaders.clear(); | ||
|
|
||
| for (String key : headers.keySet()) { | ||
| this.httpResponseHeaders.put(key, headers.get(key)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets a collection of HTTP headers from the last response. | ||
| * @return HTTP response headers | ||
| */ | ||
| public Map<String, String> getHttpResponseHeaders() { | ||
| return this.httpResponseHeaders; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the session key. | ||
| * @return session key | ||
| */ | ||
| public static byte[] getSessionKey() { | ||
| return ExchangeServiceBase.binarySecret; | ||
| } | ||
|
|
||
| private static byte[] generateSessionKey(){ | ||
| // this has to be computed only once. | ||
| synchronized (ExchangeServiceBase.class) { | ||
| if (ExchangeServiceBase.binarySecret == null) { | ||
| Random randomNumberGenerator = new Random(); | ||
| ExchangeServiceBase.binarySecret = new byte[256 / 8]; | ||
| randomNumberGenerator.nextBytes(binarySecret); | ||
| } | ||
|
|
||
| return ExchangeServiceBase.binarySecret; | ||
| } | ||
| Random randomNumberGenerator = new SecureRandom(); | ||
| final byte[] bytes = new byte[256 / 8]; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while refactoring please avoid magic numbers.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you noted, I only refactored this magic number containing code; I don't know the significance of the magic numbers (256 and 8). Do you know or do you know how I can find out so I make them into appropriately named constants? I see these lines were added in https://github.com/OfficeDev/ews-java-api/blame/45e3fe6301e5824a1186e9e279200df5d57d73fe/src/main/java/microsoft/exchange/webservices/data/ExchangeServiceBase.java but the commit comment ("Initial Commit") isn't helpful, and I don't know where else to look.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an existing issue and is unrelated, we should keep it as is. If you desire to fix it, then do it as a separate PR. Let's keep PRs focused on one issue.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, we'll handle the changing of "256" and "8" into constants in another issue/PR. Sounds good! |
||
| randomNumberGenerator.nextBytes(bytes); | ||
| return bytes; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need concurrent hash map for instance variables on ExchangeService? Are we designing with the assumption that:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we do need to use ConcurrentHashMap here. I believe that ExchangeService should represent an Exchange server, and as such, it can be used by multiple threads concurrently.
The API currently allows the user to change the httpHeaders (IMHO, this variable would be better named "httpRequestHeaders" for clarity) at any time. In other words, the httpHeaders could have entries added or removed while another thread is also adding or removing entries while another thread is actually doing requests/responses. For that reason, httpHeaders must be thread safe. In my opinion, the API right now is definitely not ideal - that's why I recommended the API be changed to use an immutable ExchangeService with a builder approach, that way, the request headers would be provided using the builder and thereafter be immutable, eliminating this complexity - but we should handle that in a separate pull request.
If a user wants to connect to multiple Exchange servers, then they'll need one instance of ExchangeService for each server.
A user could create multiple ExchangeService instances for the same Exchange server, but that's not necessary, is less efficient, and more complex to write.
So, directly replying to each point:
A single instance of ExchangeService can be used by a single server: Yes.
Multiple threads can run in parallel each using its own instance of ExchangeService: Yes
Static methods are thread safe: Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a typo that was critical here:
"A single instance of ExchangeService can be used by a single server." I meant a single instance is used by a single thread. i.e. ExchangeService is not thread safe, but it is OK to have 3 threads using 3 different instances of ExchangeService connecting to same or difference servers in parallel.
It seems that you want to have a single instance of ExchangeService used by multiple threads in parallel. What is the motivation for that. That adds complexity and will have weird cases if the state can depend on the request (e.g. if headers may need to change based on request for example).
If we go with the model that I'm suggesting, then that we don't need this protection against concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way ExchangeService is implemented today it isn't thread safe even when you create new instances of it for each thread - see #371.
If it is thread safe (as this PR makes it), ExchangeService becomes: