Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ log4j.rootLogger=${gora.root.logger}
#special logging requirements for some commandline tools
log4j.logger.org.apache.gora.compiler.GoraCompiler=INFO,console
log4j.logger.org.apache.avro.specific.SpecificCompiler=INFO,console
log4j.logger.org.apache.gora.dynamodb.compiler.GoraDynamoDBCompiler=INFO,console
log4j.logger.org.apache.gora.dynamodb.compiler.GoraDynamoDBCompiler=DEBUG,console
log4j.logger.org.apache.gora.tutorial.log.LogManager=INFO,console
log4j.logger.org.apache.gora.tutorial.log.LogAnalytics=INFO,console

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ public void put(K key, T value) {
private Object getFieldValue(Schema fieldSchema, Type type, Object fieldValue ){
switch(type) {
case RECORD:
Persistent persistent = (Persistent) fieldValue;
Persistent newRecord = (Persistent) SpecificData.get().newRecord(persistent, persistent.getSchema());
PersistentBase persistent = (PersistentBase) fieldValue;
PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema());
for (Field member: fieldSchema.getFields()) {
if (member.pos() == 0 || !persistent.isDirty()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,31 @@ public class GoraRecordReader<K, T extends PersistentBase> extends RecordReader<

protected Query<K,T> query;
protected Result<K,T> result;

private GoraRecordCounter counter = new GoraRecordCounter();

public GoraRecordReader(Query<K,T> query, TaskAttemptContext context) {
this.query = query;

Configuration configuration = context.getConfiguration();
int recordsMax = configuration.getInt(BUFFER_LIMIT_READ_NAME, BUFFER_LIMIT_READ_VALUE);

// Check if result set will at least contain 2 rows
if (recordsMax <= 1) {
LOG.info("Limit " + recordsMax + " changed to " + BUFFER_LIMIT_READ_VALUE);
recordsMax = BUFFER_LIMIT_READ_VALUE;
}

counter.setRecordsMax(recordsMax);
LOG.info("gora.buffer.read.limit = " + recordsMax);

this.query.setLimit(recordsMax);
}

public void executeQuery() throws Exception {
this.result = query.execute();
}

@Override
public K getCurrentKey() throws IOException, InterruptedException {
return result.getKey();
Expand All @@ -80,47 +80,47 @@ public T getCurrentValue() throws IOException, InterruptedException {
@Override
public float getProgress() throws IOException, InterruptedException {
try{
return result.getProgress();
}
catch(Exception e){
return 0;
}
return result.getProgress();
}
catch(Exception e){
return 0;
}
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException { }
throws IOException, InterruptedException { }

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
try{
if (counter.isModulo()) {
boolean firstBatch = (this.result == null);
if (! firstBatch) {
this.query.setStartKey(this.result.getKey());
if (this.query.getLimit() == counter.getRecordsMax()) {
this.query.setLimit(counter.getRecordsMax() + 1);
}
}
if (this.result != null) {
this.result.close();
}
executeQuery();
if (! firstBatch) {
// skip first result
this.result.next();
}
}
counter.increment();
return this.result.next();
}
catch(Exception e){
LOG.error("Error reading Gora records: {}", e.getMessage());
throw new RuntimeException(e);
}
try{
if (counter.isModulo()) {
boolean firstBatch = (this.result == null);
if (! firstBatch) {
this.query.setStartKey(this.result.getKey());
if (this.query.getLimit() == counter.getRecordsMax()) {
this.query.setLimit(counter.getRecordsMax() + 1);
}
}
if (this.result != null) {
this.result.close();
}

executeQuery();

if (! firstBatch) {
// skip first result
this.result.next();
}
}

counter.increment();
return this.result.next();
}
catch(Exception e){
LOG.error("Error reading Gora records: {}", e.getMessage());
throw new RuntimeException(e);
}
}

//@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.util.AvroUtils;
import org.apache.hadoop.io.serializer.Deserializer;

Expand All @@ -33,19 +33,19 @@
* with {@link BinaryDecoder}.
*/
public class PersistentDeserializer
implements Deserializer<Persistent> {
implements Deserializer<PersistentBase> {

private BinaryDecoder decoder;
private Class<? extends Persistent> persistentClass;
private Class<? extends PersistentBase> persistentClass;
private boolean reuseObjects;
private SpecificDatumReader<Persistent> datumReader;
private SpecificDatumReader<PersistentBase> datumReader;

public PersistentDeserializer(Class<? extends Persistent> c, boolean reuseObjects) {
public PersistentDeserializer(Class<? extends PersistentBase> c, boolean reuseObjects) {
this.persistentClass = c;
this.reuseObjects = reuseObjects;
try {
Schema schema = AvroUtils.getSchema(persistentClass);
datumReader = new SpecificDatumReader<>(schema);
datumReader = new SpecificDatumReader<PersistentBase>(schema);

} catch (Exception ex) {
throw new RuntimeException(ex);
Expand All @@ -67,7 +67,7 @@ public void open(InputStream in) throws IOException {
public void close() throws IOException { }

@Override
public Persistent deserialize(Persistent persistent) throws IOException {
public PersistentBase deserialize(PersistentBase persistent) throws IOException {
return datumReader.read(reuseObjects ? persistent : null, decoder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@
*/
package org.apache.gora.mapreduce;

import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;

public class PersistentSerialization
implements Serialization<Persistent> {
public class PersistentSerialization implements Serialization<PersistentBase> {

@Override
public boolean accept(Class<?> c) {
return Persistent.class.isAssignableFrom(c);
return PersistentBase.class.isAssignableFrom(c);
}

@Override
public Deserializer<Persistent> getDeserializer(Class<Persistent> c) {
public Deserializer<PersistentBase> getDeserializer(Class<PersistentBase> c) {
return new PersistentDeserializer(c, true);
}

@Override
public Serializer<Persistent> getSerializer(Class<Persistent> c) {
public Serializer<PersistentBase> getSerializer(Class<PersistentBase> c) {
return new PersistentSerializer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.hadoop.io.serializer.Serializer;

/**
* Hadoop serializer using Avro's {@link SpecificDatumWriter}
* with {@link BinaryEncoder}.
*/
public class PersistentSerializer implements Serializer<Persistent> {
public class PersistentSerializer implements Serializer<PersistentBase> {

private SpecificDatumWriter<Persistent> datumWriter;
private SpecificDatumWriter<PersistentBase> datumWriter;
private BinaryEncoder encoder;

public PersistentSerializer() {
Expand All @@ -58,7 +58,7 @@ public void open(OutputStream out) throws IOException {
* Do the serialization of the {@link Persistent} object
*/
@Override
public void serialize(Persistent persistent) throws IOException {
public void serialize(PersistentBase persistent) throws IOException {
datumWriter.setSchema(persistent.getSchema());

datumWriter.write(persistent, encoder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.ConcurrentSkipListMap;

import org.apache.avro.Schema.Field;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
Expand Down Expand Up @@ -170,7 +169,7 @@ public T get(K key, String[] fields) {
/**
* Returns a clone with exactly the requested fields shallowly copied
*/
private static<T extends Persistent> T getPersistent(T obj, String[] fields) {
private static<T extends PersistentBase> T getPersistent(T obj, String[] fields) {
List<Field> otherFields = obj.getSchema().getFields();
String[] otherFieldStrings = new String[otherFields.size()];
for(int i = 0; i<otherFields.size(); i++ ){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import java.util.List;

import org.apache.avro.Schema.Field;
import org.apache.avro.specific.SpecificRecord;

import org.apache.gora.persistency.Dirtyable;

/**
* Objects that are persisted by Gora implements this interface.
*/
public interface Persistent extends SpecificRecord, Dirtyable {
public interface Persistent extends Dirtyable {

public static String DIRTY_BYTES_FIELD_NAME = "__g__dirty";

Expand Down
17 changes: 17 additions & 0 deletions gora-core/src/main/java/org/apache/gora/persistency/Tombstone.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.persistency;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.util.StringUtils;

/**
Expand Down Expand Up @@ -54,21 +55,28 @@ public abstract class WSDataStoreBase<K, T extends Persistent>
* Properties object
*/
protected Properties properties;

/**
* Determines if an schema will be automatically created.
*/
protected boolean autoCreateSchema;

/**
* Default constructor
*/
public WSDataStoreBase() {
}

@Override
/**
* Initializes the web services backed data store
*/
@Override
public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) {
setKeyClass(keyClass);
setPersistentClass(persistentClass);
autoCreateSchema = DataStoreFactory.getAutoCreateSchema(properties, this);
this.properties = properties;
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions gora-core/src/main/java/org/apache/gora/util/AvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;

/**
* An utility class for Avro related tasks
Expand Down Expand Up @@ -65,7 +65,7 @@ public static Object getEnumValue(Schema schema, int enumOrdinal) {
/**
* Returns the schema of the class
*/
public static Schema getSchema(Class<? extends Persistent> clazz)
public static Schema getSchema(Class<? extends PersistentBase> clazz)
throws SecurityException, NoSuchFieldException, IllegalArgumentException,
IllegalAccessException {

Expand All @@ -80,7 +80,7 @@ public static Schema getSchema(Class<? extends Persistent> clazz)
* the persistent object to get the fields names from
* @return the field names
*/
public static String[] getPersistentFieldNames(Persistent persistent) {
public static String[] getPersistentFieldNames(PersistentBase persistent) {
return getSchemaFieldNames(persistent.getSchema());
}

Expand All @@ -100,10 +100,10 @@ public static String[] getSchemaFieldNames(Schema schema) {
return fieldNames;
}

public static <T extends Persistent> T deepClonePersistent(T persistent) {
public static <T extends PersistentBase> T deepClonePersistent(T persistent) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
BinaryEncoder enc = EncoderFactory.get().binaryEncoder(bos, null);
SpecificDatumWriter<Persistent> writer = new SpecificDatumWriter<>(
SpecificDatumWriter<PersistentBase> writer = new SpecificDatumWriter<>(
persistent.getSchema());
try {
writer.write(persistent, enc);
Expand Down
Loading