Skip to content
Permalink
Browse files
Merge pull request #27 from mikewalch/apache-updates
Apache updates
  • Loading branch information
keith-turner committed Jul 8, 2016
2 parents 2312d0c + a22a7ef commit 5f1ea06d517bc2d57d8d509b7d108cf6c32b6907
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 92 deletions.
@@ -3,6 +3,9 @@
BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
PC_HOME=$( cd "$( dirname "$BIN_DIR" )" && pwd )

# stop if any command fails
set -e

if [ "$#" -ne 1 ]; then
echo "Usage : $0 <TXT FILES DIR>"
exit
@@ -49,7 +52,7 @@ $PC_HOME/bin/copy-jars.sh $FLUO_HOME $PC_HOME
$FLUO_HOME/bin/fluo exec $APP phrasecount.cmd.Setup $APP_PROPS pcExport >> $APP_PROPS

$FLUO_HOME/bin/fluo init $APP -f
$FLUO_HOME/bin/fluo exec $APP io.fluo.recipes.accumulo.cmds.OptimizeTable
$FLUO_HOME/bin/fluo exec $APP org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
$FLUO_HOME/bin/fluo start $APP
$FLUO_HOME/bin/fluo info $APP

@@ -60,7 +63,7 @@ $FLUO_HOME/bin/fluo exec $APP phrasecount.cmd.Load $APP_PROPS $TXT_DIR
$FLUO_HOME/bin/fluo wait $APP

#print phrase counts
$FLUO_HOME/bin/fluo exec $APP phrasecount.cmd.Print $APP_PROPS pcExport
$FLUO_HOME/bin/fluo exec $APP phrasecount.cmd.Print $APP_PROPS pcExport | tail

$FLUO_HOME/bin/fluo stop $APP

23 pom.xml
@@ -2,7 +2,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.fluo</groupId>
<groupId>org.apache.fluo</groupId>
<artifactId>phrasecount</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
@@ -13,8 +13,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<accumulo.version>1.6.1</accumulo.version>
<fluo.version>1.0.0-beta-2</fluo.version>
<fluo-recipes.version>1.0.0-beta-1</fluo-recipes.version>
<fluo.version>1.0.0-beta-3-SNAPSHOT</fluo.version>
<fluo-recipes.version>1.0.0-beta-3-SNAPSHOT</fluo-recipes.version>
</properties>

<build>
@@ -34,7 +34,7 @@
<version>2.10</version>
<configuration>
<!--define the specific dependencies to copy into the Fluo application dir-->
<includeArtifactIds>fluo-recipes-core,fluo-recipes-accumulo,kryo,minlog,reflectasm,objenesis</includeArtifactIds>
<includeArtifactIds>fluo-recipes-core,fluo-recipes-accumulo,fluo-recipes-kryo,kryo,minlog,reflectasm,objenesis</includeArtifactIds>
</configuration>
</plugin>
</plugins>
@@ -53,34 +53,39 @@
<version>1.32</version>
</dependency>
<dependency>
<groupId>io.fluo</groupId>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-api</artifactId>
<version>${fluo.version}</version>
</dependency>
<dependency>
<groupId>io.fluo</groupId>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-core</artifactId>
<version>${fluo.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.fluo</groupId>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-recipes-core</artifactId>
<version>${fluo-recipes.version}</version>
</dependency>
<dependency>
<groupId>io.fluo</groupId>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-recipes-accumulo</artifactId>
<version>${fluo-recipes.version}</version>
</dependency>
<dependency>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-recipes-kryo</artifactId>
<version>${fluo-recipes.version}</version>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
</dependency>
<dependency>
<groupId>io.fluo</groupId>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-mini</artifactId>
<version>${fluo.version}</version>
<scope>test</scope>
@@ -1,12 +1,13 @@
package phrasecount;

import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.config.ObserverConfiguration;
import io.fluo.recipes.accumulo.export.AccumuloExporter;
import io.fluo.recipes.accumulo.export.TableInfo;
import io.fluo.recipes.export.ExportQueue;
import io.fluo.recipes.map.CollisionFreeMap;
import io.fluo.recipes.serialization.KryoSimplerSerializer;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverConfiguration;
import org.apache.fluo.recipes.accumulo.export.AccumuloExport;
import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
import org.apache.fluo.recipes.accumulo.export.TableInfo;
import org.apache.fluo.recipes.export.ExportQueue;
import org.apache.fluo.recipes.kryo.KryoSimplerSerializer;
import org.apache.fluo.recipes.map.CollisionFreeMap;
import phrasecount.pojos.Counts;
import phrasecount.pojos.PcKryoFactory;

@@ -59,9 +60,8 @@ public static void configure(FluoConfiguration fluoConfig, Options opts) {
opts.phraseCountMapBuckets));

// setup an export queue to to send phrase count updates to an Accumulo table
ExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID, PhraseExporter.class,
String.class, Counts.class, opts.exportQueueBuckets));
AccumuloExporter.setExportTableInfo(fluoConfig.getAppConfiguration(), EXPORT_QUEUE_ID,
ExportQueue.configure(fluoConfig, new ExportQueue.Options(EXPORT_QUEUE_ID, AccumuloExporter.class.getName(), String.class.getName(), AccumuloExport.class.getName(), opts.exportQueueBuckets));
AccumuloExporter.setExportTableInfo(fluoConfig, EXPORT_QUEUE_ID,
new TableInfo(opts.instance, opts.zookeepers, opts.user, opts.password, opts.exportTable));
}
}
@@ -1,8 +1,8 @@
package phrasecount;

import io.fluo.api.data.Column;
import io.fluo.api.types.StringEncoder;
import io.fluo.api.types.TypeLayer;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.recipes.types.StringEncoder;
import org.apache.fluo.recipes.types.TypeLayer;

public class Constants {

@@ -1,8 +1,8 @@
package phrasecount;

import io.fluo.api.client.Loader;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.types.TypedTransactionBase;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.recipes.types.TypedTransactionBase;
import phrasecount.pojos.Document;

import static phrasecount.Constants.DOC_CONTENT_COL;
@@ -4,12 +4,12 @@
import java.util.Map;
import java.util.Map.Entry;

import io.fluo.api.client.TransactionBase;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.observer.AbstractObserver;
import io.fluo.api.types.TypedTransactionBase;
import io.fluo.recipes.map.CollisionFreeMap;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.recipes.map.CollisionFreeMap;
import org.apache.fluo.recipes.types.TypedTransactionBase;
import phrasecount.pojos.Counts;
import phrasecount.pojos.Document;

@@ -1,19 +1,28 @@
package phrasecount;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import io.fluo.recipes.accumulo.export.AccumuloExporter;
import org.apache.accumulo.core.data.Mutation;
import org.apache.fluo.recipes.accumulo.export.AccumuloExport;
import phrasecount.pojos.Counts;
import phrasecount.query.PhraseCountTable;

/**
* Glue code to convert {@link Counts} objects from the export queue to Mutations to write to Accumulo.
*/
public class PhraseExporter extends AccumuloExporter<String, Counts> {
public class PhraseExport implements AccumuloExport<String> {

private Counts pc;

public PhraseExport(){}

public PhraseExport(Counts pc){
this.pc = pc;
}

@Override
protected List<Mutation> convert(String phrase, long seq, Counts pc) {
public Collection<Mutation> toMutations(String phrase, long seq) {
return Collections.singletonList(PhraseCountTable.createMutation(phrase, seq, pc));
}
}
@@ -1,18 +1,18 @@
package phrasecount;

import java.util.Iterator;
import java.util.Optional;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.observer.Observer.Context;
import io.fluo.recipes.export.Export;
import io.fluo.recipes.export.ExportQueue;
import io.fluo.recipes.map.CollisionFreeMap;
import io.fluo.recipes.map.Combiner;
import io.fluo.recipes.map.Update;
import io.fluo.recipes.map.UpdateObserver;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.observer.Observer.Context;
import org.apache.fluo.recipes.export.Export;
import org.apache.fluo.recipes.export.ExportQueue;
import org.apache.fluo.recipes.map.CollisionFreeMap;
import org.apache.fluo.recipes.map.Combiner;
import org.apache.fluo.recipes.map.Update;
import org.apache.fluo.recipes.map.UpdateObserver;
import phrasecount.pojos.Counts;

import static phrasecount.Constants.EXPORT_QUEUE_ID;
@@ -46,7 +46,7 @@ public Optional<Counts> combine(String key, Iterator<Counts> updates) {
*/
public static class PcmUpdateObserver extends UpdateObserver<String, Counts> {

private ExportQueue<String, Counts> pcEq;
private ExportQueue<String, PhraseExport> pcEq;

@Override
public void init(String mapId, Context observerContext) throws Exception {
@@ -55,11 +55,11 @@ public void init(String mapId, Context observerContext) throws Exception {

@Override
public void updatingValues(TransactionBase tx, Iterator<Update<String, Counts>> updates) {
Iterator<Export<String, Counts>> exports = Iterators.transform(updates,
new Function<Update<String, Counts>, Export<String, Counts>>() {
Iterator<Export<String, PhraseExport>> exports = Iterators.transform(updates,
new Function<Update<String, Counts>, Export<String, PhraseExport>>() {
@Override
public Export<String, Counts> apply(Update<String, Counts> update) {
return new Export<>(update.getKey(), update.getNewValue().get());
public Export<String, PhraseExport> apply(Update<String, Counts> update) {
return new Export<>(update.getKey(), new PhraseExport(update.getNewValue().get()));
}
});

@@ -4,10 +4,10 @@

import com.google.common.base.Charsets;
import com.google.common.io.Files;
import io.fluo.api.client.FluoClient;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.client.LoaderExecutor;
import io.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.config.FluoConfiguration;
import phrasecount.DocumentLoader;
import phrasecount.pojos.Document;

@@ -8,16 +8,15 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import io.fluo.api.client.FluoAdmin.InitOpts;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.mini.MiniFluo;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.fluo.api.client.FluoAdmin.InitOpts;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
import phrasecount.Application;

public class Mini {
@@ -90,10 +89,7 @@ public static void main(String[] args) throws Exception {

MiniFluo miniFluo = FluoFactory.newMiniFluo(fluoConfig);

PropertiesConfiguration propsConfig = new PropertiesConfiguration();
propsConfig.copy(miniFluo.getClientConfiguration());

propsConfig.save(params.args.get(1));
miniFluo.getClientConfiguration().save(new File(params.args.get(1)));

System.out.println();
System.out.println("Wrote : " + params.args.get(1));
@@ -3,16 +3,16 @@
import java.io.File;
import java.util.Map.Entry;

import io.fluo.api.client.FluoClient;
import io.fluo.api.client.FluoFactory;
import io.fluo.api.client.Snapshot;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.config.ScannerConfiguration;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.data.Span;
import io.fluo.api.iterator.ColumnIterator;
import io.fluo.api.iterator.RowIterator;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ScannerConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.api.iterator.ColumnIterator;
import org.apache.fluo.api.iterator.RowIterator;
import phrasecount.Constants;
import phrasecount.pojos.PhraseAndCounts;
import phrasecount.query.PhraseCountTable;
@@ -2,12 +2,11 @@

import java.io.File;

import io.fluo.api.config.FluoConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.fluo.api.config.FluoConfiguration;
import phrasecount.Application;
import phrasecount.Application.Options;

@@ -34,10 +33,6 @@ public static void main(String[] args) throws Exception {

FluoConfiguration observerConfig = new FluoConfiguration();
Application.configure(observerConfig, opts);

PropertiesConfiguration propsConfig = new PropertiesConfiguration();
propsConfig.setDelimiterParsingDisabled(true);
propsConfig.copy(observerConfig);
propsConfig.save(System.out);
observerConfig.save(System.out);
}
}
@@ -4,10 +4,10 @@
import java.util.SortedSet;
import java.util.TreeSet;

import io.fluo.api.config.FluoConfiguration;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.hadoop.io.Text;

/**
@@ -2,12 +2,14 @@

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoFactory;
import phrasecount.PhraseExport;

public class PcKryoFactory implements KryoFactory {
@Override
public Kryo create() {
Kryo kryo = new Kryo();
kryo.register(Counts.class, 9);
kryo.register(PhraseExport.class, 10);
return kryo;
}
}

0 comments on commit 5f1ea06

Please sign in to comment.