Skip to content

Commit

Permalink
Upgrade elasticsearch to 0.90.7
Browse files Browse the repository at this point in the history
Part of solution for sonian#19. New cluster state acknowledgement mechanism is not implemented yet. So, some requests will not be acknowledged when zookeeper state publishing is enabled.
  • Loading branch information
imotov committed Nov 25, 2013
1 parent c411faa commit c79ec14
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 105 deletions.
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
</parent>

<properties>
<elasticsearch.version>0.90.0</elasticsearch.version>
<elasticsearch.version>0.90.7</elasticsearch.version>
<zookeeper.version>3.3.6</zookeeper.version>
</properties>

<repositories>
<repository>
<id>sonatype</id>
<name>elasticsearch sonatype repo</name>
<url>https://oss.sonatype.org/content/repositories/releases</url>
</repository>
<repository>
<id>sonatype</id>
<name>elasticsearch sonatype repo</name>
<url>https://oss.sonatype.org/content/repositories/releases</url>
</repository>
</repositories>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import com.sonian.elasticsearch.action.zookeeper.TransportNodesZooKeeperStatusAction;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;

import java.io.IOException;

Expand All @@ -48,7 +48,7 @@ public RestZooKeeperStatusAction(Settings settings, Client client, RestControlle

@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
String[] nodesIds = RestActions.splitNodes(request.param("nodeId"));
String[] nodesIds = Strings.splitStringByCommaToArray((request.param("nodeId")));
NodesZooKeeperStatusRequest zooKeeperStatusRequest = new NodesZooKeeperStatusRequest(nodesIds);
zooKeeperStatusRequest.zooKeeperTimeout(request.paramAsTime("timeout", TimeValue.timeValueSeconds(10)));
transportNodesZooKeeperStatusAction.execute(zooKeeperStatusRequest, new ActionListener<NodesZooKeeperStatusResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void publish(ClusterState state) throws ElasticSearchException, Interrupt
* @throws ElasticSearchException
* @throws InterruptedException
*/
public ClusterState retrieve(final NewClusterStateListener newClusterStateListener) throws ElasticSearchException, InterruptedException {
public ClusterState retrieve(final PublishClusterStateAction.NewClusterStateListener newClusterStateListener) throws ElasticSearchException, InterruptedException {
publishingLock.lock();
try {
if (!lifecycle.started()) {
Expand Down Expand Up @@ -161,7 +162,7 @@ public void onNodeDataChanged(String id) {
throw new ZooKeeperIncompatibleStateVersionException("Expected: " + clusterStateVersion() + ", actual: " + clusterStateVersion);
}

ClusterState.Builder builder = ClusterState.newClusterStateBuilder()
ClusterState.Builder builder = ClusterState.builder()
.version(buf.readLong());
for (ClusterStatePart<?> part : this.parts) {
builder = part.set(builder, buf.readString());
Expand Down Expand Up @@ -200,16 +201,26 @@ private void cleanClusterStateNode() throws ElasticSearchException, InterruptedE
for (String part : parts) {
// Don't delete the part node itself. Other nodes might already have watchers set on this node
if (!"parts".equals(part)) {
zooKeeperClient.deleteNodeRecursively(environment.stateNodePath() + "/" + part);
zooKeeperClient.deleteNodeRecursively(environment.stateNodePath() + "/" + part);
}
}
}

private void updateClusterState(NewClusterStateListener newClusterStateListener) {
private void updateClusterState(PublishClusterStateAction.NewClusterStateListener newClusterStateListener) {
try {
ClusterState clusterState = retrieve(newClusterStateListener);
if (clusterState != null) {
newClusterStateListener.onNewClusterState(clusterState);
newClusterStateListener.onNewClusterState(clusterState, new PublishClusterStateAction.NewClusterStateListener.NewStateProcessed() {
@Override
public void onNewClusterStateProcessed() {
// ignore
}

@Override
public void onNewClusterStateFailed(Throwable t) {
// ignore
}
});
}
} catch (ZooKeeperClientSessionExpiredException ex) {
// Ignore session should be restarted
Expand All @@ -235,10 +246,6 @@ protected String clusterStateVersion() {
}


public interface NewClusterStateListener {
public void onNewClusterState(ClusterState clusterState);
}

// TODO: this logic should be moved to the actual classes that represent parts of Cluster State after zookeeper-
// based discovery is merged to master.
private void initClusterStatePersistence() {
Expand Down
Loading

0 comments on commit c79ec14

Please sign in to comment.