Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Mar 26, 2015
1 parent a7e08db commit b98cdce
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 40 deletions.
Expand Up @@ -22,7 +22,7 @@
/** /**
* Classy class class. * Classy class class.
*/ */
public interface EsIndexCache { public interface IndexCache {
String[] getIndexes(IndexAlias alias, AliasedEntityIndex.AliasType aliasType); String[] getIndexes(IndexAlias alias, AliasedEntityIndex.AliasType aliasType);


void invalidate(IndexAlias alias); void invalidate(IndexAlias alias);
Expand Down
Expand Up @@ -51,7 +51,7 @@ protected void configure() {
bind(EntityIndexFactory.class).to(EsEntityIndexFactoryImpl.class); bind(EntityIndexFactory.class).to(EsEntityIndexFactoryImpl.class);
bind(AliasedEntityIndex.class).to(EsEntityIndexImpl.class); bind(AliasedEntityIndex.class).to(EsEntityIndexImpl.class);
bind(EntityIndex.class).to(EsEntityIndexImpl.class); bind(EntityIndex.class).to(EsEntityIndexImpl.class);
bind(EsIndexCache.class).to(EsIndexCacheImpl.class); bind(IndexCache.class).to(EsIndexCacheImpl.class);


bind(IndexIdentifier.class).to(IndexIdentifierImpl.class); bind(IndexIdentifier.class).to(IndexIdentifierImpl.class);


Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
Expand Down Expand Up @@ -73,7 +72,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
private final MapManager mapManager; private final MapManager mapManager;
private final AliasedEntityIndex entityIndex; private final AliasedEntityIndex entityIndex;
private final IndexBufferProducer indexBatchBufferProducer; private final IndexBufferProducer indexBatchBufferProducer;
private final EsIndexCache indexCache; private final IndexCache indexCache;
private final IndexFig indexFig; private final IndexFig indexFig;
private final EsProvider esProvider; private final EsProvider esProvider;
private final IndexAlias alias; private final IndexAlias alias;
Expand All @@ -85,7 +84,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
@Inject @Inject
public EsApplicationEntityIndexImpl(@Assisted ApplicationScope appScope, final AliasedEntityIndex entityIndex, final IndexFig config, public EsApplicationEntityIndexImpl(@Assisted ApplicationScope appScope, final AliasedEntityIndex entityIndex, final IndexFig config,
final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider,
final EsIndexCache indexCache, final MetricsFactory metricsFactory, final IndexCache indexCache, final MetricsFactory metricsFactory,
final MapManagerFactory mapManagerFactory, final IndexFig indexFig, final IndexIdentifier indexIdentifier){ final MapManagerFactory mapManagerFactory, final IndexFig indexFig, final IndexIdentifier indexIdentifier){
this.entityIndex = entityIndex; this.entityIndex = entityIndex;
this.indexBatchBufferProducer = indexBatchBufferProducer; this.indexBatchBufferProducer = indexBatchBufferProducer;
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{


private final IndexFig config; private final IndexFig config;
private final EsProvider provider; private final EsProvider provider;
private final EsIndexCache indexCache; private final IndexCache indexCache;
private final IndexBufferProducer indexBatchBufferProducer; private final IndexBufferProducer indexBatchBufferProducer;
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final MapManagerFactory mapManagerFactory; private final MapManagerFactory mapManagerFactory;
Expand All @@ -57,7 +57,7 @@ public ApplicationEntityIndex load( ApplicationScope scope ) {
} ); } );


@Inject @Inject
public EsEntityIndexFactoryImpl( final IndexFig config, final EsProvider provider, final EsIndexCache indexCache, public EsEntityIndexFactoryImpl( final IndexFig config, final EsProvider provider, final IndexCache indexCache,
final IndexBufferProducer indexBatchBufferProducer, final IndexBufferProducer indexBatchBufferProducer,
final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory,
final IndexFig indexFig, final AliasedEntityIndex entityIndex, final IndexIdentifier indexIdentifier ){ final IndexFig indexFig, final AliasedEntityIndex entityIndex, final IndexIdentifier indexIdentifier ){
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.usergrid.persistence.core.future.BetterFuture; import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.migration.data.VersionedData; import org.apache.usergrid.persistence.core.migration.data.VersionedData;
Expand All @@ -40,13 +39,7 @@
import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;


Expand All @@ -60,8 +53,6 @@
import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;


import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.rest.action.admin.indices.alias.delete.AliasesMissingException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -92,7 +83,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {




private final EsProvider esProvider; private final EsProvider esProvider;
private final IndexFig config;


//number of times to wait for the index to refresh properly. //number of times to wait for the index to refresh properly.
private static final int MAX_WAITS = 10; private static final int MAX_WAITS = 10;
Expand All @@ -107,7 +97,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery(); private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery();
private final IndexIdentifier indexIdentifier; private final IndexIdentifier indexIdentifier;


private EsIndexCache aliasCache; private IndexCache aliasCache;
private Timer mappingTimer; private Timer mappingTimer;
private Timer refreshTimer; private Timer refreshTimer;
private Meter refreshIndexMeter; private Meter refreshIndexMeter;
Expand All @@ -117,16 +107,15 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {




@Inject @Inject
public EsEntityIndexImpl( final IndexFig config, public EsEntityIndexImpl(
final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider,
final EsIndexCache indexCache, final MetricsFactory metricsFactory, final IndexCache indexCache, final MetricsFactory metricsFactory,
final IndexFig indexFig, final IndexIdentifier indexIdentifier ) { final IndexFig indexFig, final IndexIdentifier indexIdentifier ) {
this.indexBatchBufferProducer = indexBatchBufferProducer; this.indexBatchBufferProducer = indexBatchBufferProducer;
this.indexFig = indexFig; this.indexFig = indexFig;
this.indexIdentifier = indexIdentifier; this.indexIdentifier = indexIdentifier;


this.esProvider = provider; this.esProvider = provider;
this.config = config;
this.alias = indexIdentifier.getAlias(); this.alias = indexIdentifier.getAlias();
this.aliasCache = indexCache; this.aliasCache = indexCache;
this.addTimer = metricsFactory this.addTimer = metricsFactory
Expand Down
Expand Up @@ -50,7 +50,7 @@
* Cache for Es index operations * Cache for Es index operations
*/ */
@Singleton @Singleton
public class EsIndexCacheImpl implements EsIndexCache { public class EsIndexCacheImpl implements IndexCache {


private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class ); private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
private final ListeningScheduledExecutorService refreshExecutors; private final ListeningScheduledExecutorService refreshExecutors;
Expand Down
Expand Up @@ -23,22 +23,16 @@
import org.apache.usergrid.persistence.core.migration.data.VersionedData; import org.apache.usergrid.persistence.core.migration.data.VersionedData;
import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.AliasedEntityIndex; import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.IndexAlias;
import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexIdentifier; import org.apache.usergrid.persistence.index.IndexIdentifier;
import org.apache.usergrid.persistence.index.EsIndexCache; import org.apache.usergrid.persistence.index.IndexCache;
import org.apache.usergrid.persistence.index.impl.EsProvider; import org.apache.usergrid.persistence.index.impl.EsProvider;
import org.apache.usergrid.persistence.index.impl.IndexingUtils;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rx.Observable; import rx.Observable;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/** /**
* Classy class class. * Classy class class.
*/ */
Expand All @@ -48,12 +42,12 @@ public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope>
private final EsProvider provider; private final EsProvider provider;
private final IndexFig indexFig; private final IndexFig indexFig;
private final IndexIdentifier indexIdentifier; private final IndexIdentifier indexIdentifier;
private final EsIndexCache indexCache; private final IndexCache indexCache;
private final VersionedData dataVersion; private final VersionedData dataVersion;
private static final Logger log = LoggerFactory.getLogger(EsIndexDataMigrationImpl.class); private static final Logger log = LoggerFactory.getLogger(EsIndexDataMigrationImpl.class);


@Inject @Inject
public EsIndexDataMigrationImpl(AliasedEntityIndex entityIndex, EsProvider provider, IndexFig indexFig, IndexIdentifier indexIdentifier, EsIndexCache indexCache){ public EsIndexDataMigrationImpl(AliasedEntityIndex entityIndex, EsProvider provider, IndexFig indexFig, IndexIdentifier indexIdentifier, IndexCache indexCache){
this.entityIndex = entityIndex; this.entityIndex = entityIndex;
this.provider = provider; this.provider = provider;
this.indexFig = indexFig; this.indexFig = indexFig;
Expand All @@ -65,7 +59,9 @@ public EsIndexDataMigrationImpl(AliasedEntityIndex entityIndex, EsProvider provi
@Override @Override
public int migrate(int currentVersion, MigrationDataProvider<ApplicationScope> migrationDataProvider, ProgressObserver observer) { public int migrate(int currentVersion, MigrationDataProvider<ApplicationScope> migrationDataProvider, ProgressObserver observer) {
final AdminClient adminClient = provider.getClient().admin(); final AdminClient adminClient = provider.getClient().admin();
final int latestVersion = dataVersion.getImplementationVersion();


observer.start();
migrationDataProvider.getData().flatMap(applicationScope -> { migrationDataProvider.getData().flatMap(applicationScope -> {
LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig, applicationScope); LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig, applicationScope);
String[] indexes = indexCache.getIndexes(legacyIndexIdentifier.getAlias(), AliasedEntityIndex.AliasType.Read); String[] indexes = indexCache.getIndexes(legacyIndexIdentifier.getAlias(), AliasedEntityIndex.AliasType.Read);
Expand All @@ -76,11 +72,16 @@ public int migrate(int currentVersion, MigrationDataProvider<ApplicationScope> m
aliasesRequestBuilder = adminClient.indices().prepareAliases(); aliasesRequestBuilder = adminClient.indices().prepareAliases();
// add read alias // add read alias
aliasesRequestBuilder.addAlias(index, indexIdentifier.getAlias().getReadAlias()); aliasesRequestBuilder.addAlias(index, indexIdentifier.getAlias().getReadAlias());
observer.update(latestVersion,"EsIndexDataMigrationImpl: fixed index: " + index );
})
.doOnError(error -> {
log.error("failed to migrate index", error);
observer.failed(latestVersion,"EsIndexDataMigrationImpl: failed to migrate",error);
}) })
.doOnError(error -> log.error("failed to migrate index", error)) .doOnCompleted(() -> observer.complete())
.toBlocking().lastOrDefault(null); .toBlocking().lastOrDefault(null);


return dataVersion.getImplementationVersion(); return latestVersion;
} }


@Override @Override
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.usergrid.persistence.index.guice; package org.apache.usergrid.persistence.index.guice;




import com.amazonaws.services.opsworks.model.App;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
Expand All @@ -29,11 +28,9 @@
import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.IndexBufferProducer; import org.apache.usergrid.persistence.index.IndexBufferProducer;
import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexIdentifier;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl; import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.apache.usergrid.persistence.index.EsIndexCache; import org.apache.usergrid.persistence.index.IndexCache;
import org.apache.usergrid.persistence.index.impl.EsProvider; import org.apache.usergrid.persistence.index.impl.EsProvider;
import org.apache.usergrid.persistence.index.migration.EsIndexDataMigrationImpl;
import org.apache.usergrid.persistence.index.migration.LegacyIndexIdentifier; import org.apache.usergrid.persistence.index.migration.LegacyIndexIdentifier;
import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.safehaus.guicyfig.GuicyFigModule; import org.safehaus.guicyfig.GuicyFigModule;
Expand Down Expand Up @@ -63,17 +60,18 @@ public void configureMigrationProvider(){
}); });
install( new GuicyFigModule(IndexTestFig.class) ); install( new GuicyFigModule(IndexTestFig.class) );
} }

public static class TestAllApplicationsObservable implements MigrationDataProvider<ApplicationScope>{ public static class TestAllApplicationsObservable implements MigrationDataProvider<ApplicationScope>{


final ApplicationScope appScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application")); final ApplicationScope appScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application"));


@Inject @Inject
public TestAllApplicationsObservable(final IndexFig config, public TestAllApplicationsObservable(
final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider, final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider,
final EsIndexCache indexCache, final MetricsFactory metricsFactory, final IndexCache indexCache, final MetricsFactory metricsFactory,
final IndexFig indexFig){ final IndexFig indexFig){
LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig,appScope); LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig,appScope);
EntityIndex entityIndex = new EsEntityIndexImpl(config,indexBatchBufferProducer,provider,indexCache,metricsFactory,indexFig,legacyIndexIdentifier); EntityIndex entityIndex = new EsEntityIndexImpl(indexBatchBufferProducer,provider,indexCache,metricsFactory,indexFig,legacyIndexIdentifier);
entityIndex.addIndex(null, 1, 0, indexFig.getWriteConsistencyLevel()); entityIndex.addIndex(null, 1, 0, indexFig.getWriteConsistencyLevel());
} }


Expand Down
@@ -0,0 +1,112 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. The ASF licenses this file to You
* * under the Apache License, Version 2.0 (the "License"); you may not
* * use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License. For additional information regarding
* * copyright in this work, please see the NOTICE file in the top level
* * directory of this distribution.
*
*/
package org.apache.usergrid.persistence.index.impl;

import com.google.inject.Inject;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
import org.apache.usergrid.persistence.core.migration.data.ProgressObserver;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.guice.TestIndexModule;
import org.apache.usergrid.persistence.index.migration.EsIndexDataMigrationImpl;
import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
import org.apache.usergrid.persistence.index.migration.LegacyIndexIdentifier;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.UUID;

import static junit.framework.Assert.fail;
import static junit.framework.TestCase.assertEquals;

/**
* Classy class class.
*/
@RunWith(EsRunner.class)
@UseModules({ TestIndexModule.class })
public class IndexMigrationTest extends BaseIT{


@Inject
public IndexFig fig;
@Inject
public IndexIdentifier indexIdentifier;

@Inject
public IndexCache indexCache;

@Inject
public EsProvider provider;

@Inject
public MigrationDataProvider<ApplicationScope> applicationScopeMigrationDataProvider;

@Inject
public AliasedEntityIndex ei;
@Inject
public IndexBufferProducer indexBatchBufferProducer;
@Inject
public MetricsFactory metricsFactory;

@Test
public void TestMigrate(){
EsIndexDataMigrationImpl indexDataMigration = new EsIndexDataMigrationImpl(ei,provider, fig, indexIdentifier,indexCache);
ProgressObserver po = new ProgressObserver() {
@Override
public void start() {

}

@Override
public void complete() {

}

@Override
public void failed(int migrationVersion, String reason) {
fail(reason);
}

@Override
public void failed(int migrationVersion, String reason, Throwable throwable) {
fail(reason);
}

@Override
public void update(int migrationVersion, String message) {

}
};
final ApplicationScope appScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application"));

LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(fig,appScope);

TestIndexModule.TestAllApplicationsObservable obs = new TestIndexModule.TestAllApplicationsObservable(indexBatchBufferProducer,provider,indexCache,metricsFactory,fig);
int version = indexDataMigration.migrate(0, applicationScopeMigrationDataProvider, po );
assertEquals(version, IndexDataVersions.SINGLE_INDEX.getVersion());
}
}



0 comments on commit b98cdce

Please sign in to comment.