Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
adding async events
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Jul 13, 2015
1 parent a63c817 commit 09873ce
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 11 deletions.
Expand Up @@ -29,7 +29,7 @@
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
Expand All @@ -38,9 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
Expand Down Expand Up @@ -255,7 +252,8 @@ private void handleMessages(final List<QueueMessage> messages) {

@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
offer(new InitializeApplicationIndexEvent(applicationScope));
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}


Expand Down Expand Up @@ -379,8 +377,7 @@ public void handleInitializeApplicationIndex(final QueueMessage message) {
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));

final ApplicationScope applicationScope = event.getApplicationScope();
final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
index.initialize();
ack(message);
Expand Down
Expand Up @@ -22,6 +22,7 @@

import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.model.entity.Id;

import java.io.Serializable;
Expand All @@ -36,6 +37,9 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class AsyncEvent implements Serializable {

@JsonProperty
protected IndexLocationStrategy indexLocationStrategy;

@JsonProperty
protected EventType eventType;

Expand Down Expand Up @@ -75,9 +79,9 @@ public AsyncEvent(final EventType eventType,
this.creationTime = System.currentTimeMillis();
}

public AsyncEvent(EventType eventType, ApplicationScope applicationScope) {
public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) {
this.eventType = eventType;
this.applicationScope = applicationScope;
this.indexLocationStrategy = indexLocationStrategy;
this.creationTime = System.currentTimeMillis();
}

Expand Down Expand Up @@ -132,6 +136,13 @@ protected void setApplicationScope(ApplicationScope applicationScope) {
this.applicationScope = applicationScope;
}

@JsonSerialize
public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }

public void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
this.indexLocationStrategy = indexLocationStrategy;
}

@JsonSerialize()
public Edge getEdge() {
return edge;
Expand Down
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;

/**
* event to init app index
Expand All @@ -30,8 +31,8 @@ public class InitializeApplicationIndexEvent extends AsyncEvent {
public InitializeApplicationIndexEvent() {
super(EventType.APPLICATION_INDEX);
}
public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
super(EventType.APPLICATION_INDEX, applicationScope);
public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) {
super(EventType.APPLICATION_INDEX, indexLocationStrategy);

}
}
@@ -0,0 +1,103 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. The ASF licenses this file to You
* * under the Apache License, Version 2.0 (the "License"); you may not
* * use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License. For additional information regarding
* * copyright in this work, please see the NOTICE file in the top level
* * directory of this distribution.
*
*/
package org.apache.usergrid.corepersistence.index;

import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexAlias;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;

/**
* Classy class class.
*/
public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {

private ReplicatedIndexAlias alias;
private String rootName;
private String indexInitialName;
private ApplicationScope applicationScope;
private int numberShards;
private int numberReplicas;

public ReplicatedIndexLocationStrategy(){

}

public ReplicatedIndexLocationStrategy(IndexLocationStrategy indexLocationStrategy){
alias = new ReplicatedIndexAlias( indexLocationStrategy.getAlias() );
rootName = indexLocationStrategy.getIndexRootName();
indexInitialName = indexLocationStrategy.getIndexInitialName();
applicationScope = indexLocationStrategy.getApplicationScope();
numberShards = indexLocationStrategy.getNumberOfShards();
numberReplicas = indexLocationStrategy.getNumberOfReplicas();
}

@Override
public IndexAlias getAlias() {
return alias;
}

@Override
public String getIndexRootName() {
return rootName;
}

@Override
public String getIndexInitialName() {
return indexInitialName;
}

@Override
public ApplicationScope getApplicationScope() {
return applicationScope;
}

@Override
public int getNumberOfShards() {
return numberShards;
}

@Override
public int getNumberOfReplicas() {
return numberReplicas;
}

public static class ReplicatedIndexAlias implements IndexAlias{

private String readAlias;
private String writeAlias;

public ReplicatedIndexAlias(){

}
public ReplicatedIndexAlias(IndexAlias alias){
this.readAlias = alias.getReadAlias();
this.writeAlias = alias.getWriteAlias();
}
@Override
public String getReadAlias() {
return readAlias;
}

@Override
public String getWriteAlias() {
return writeAlias;
}
}
}

0 comments on commit 09873ce

Please sign in to comment.