Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class TestKeyValueSet {
Expand Down
12 changes: 0 additions & 12 deletions tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,12 @@

import com.google.protobuf.ServiceException;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

public class TajoGetConf {
private static final org.apache.commons.cli.Options options;
Expand Down
12 changes: 12 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public boolean getBool(ConfigKey key) {
return getBool(key, null);
}

public void setInt(ConfigKey key, int val) {
setInt(key.keyname(), val);
}

public int getInt(ConfigKey key, Integer defaultVal) {
assertRegisteredEnum(key);

Expand All @@ -120,6 +124,10 @@ public int getInt(ConfigKey key) {
return getInt(key, null);
}

public void setLong(ConfigKey key, long val) {
setLong(key.keyname(), val);
}

public long getLong(ConfigKey key, Long defaultVal) {
assertRegisteredEnum(key);

Expand All @@ -141,6 +149,10 @@ public long getLong(ConfigKey key) {
return getLong(key, null);
}

public void setFloat(ConfigKey key, float val) {
setFloat(key.keyname(), val);
}

public float getFloat(ConfigKey key, Float defaultVal) {
assertRegisteredEnum(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public int bitsLength() {
public void fromByteBuffer(ByteBuffer byteBuffer) {
clear();
int i = 0;
while(byteBuffer.hasRemaining()) {
while(i < data.length && byteBuffer.hasRemaining()) {
data[i] = byteBuffer.get();
i++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.json.CommonGsonHelper;
import org.apache.tajo.json.GsonObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
Expand Down Expand Up @@ -777,6 +778,11 @@ public PhysicalExec createShuffleFileWritePlan(TaskAttemptContext ctx,
return new RangeShuffleFileWriteExec(ctx, sm, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs);

case NONE_SHUFFLE:
// if there is no given NULL CHAR property in the table property and the query is neither CTAS or INSERT,
// we set DEFAULT NULL CHAR to the table property.
if (!ctx.getQueryContext().containsKey(SessionVars.NULL_CHAR)) {
plan.getOptions().set(StorageConstants.CSVFILE_NULL, TajoConf.ConfVars.$CSVFILE_NULL.defaultVal);
}
return new StoreTableExec(ctx, plan, subOp);

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
@Expose protected StoreType storageType = StoreType.CSV;
@Expose protected KeyValueSet options;
@Expose protected KeyValueSet options = new KeyValueSet();

protected PersistentStoreNode(int pid, NodeType nodeType) {
super(pid, nodeType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.logical.CreateTableNode;
import org.apache.tajo.engine.planner.logical.InsertNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.storage.Appender;
import org.apache.tajo.storage.StorageManagerFactory;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
Expand All @@ -50,6 +53,14 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
protected final int [] keyIds;
protected final String [] keyNames;

protected Appender appender;

// for file punctuation
protected TableStats aggregatedStats; // for aggregating all stats of written files
protected long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63
protected int writtenFileNum = 0; // how many file are written so far?
protected Path lastFileName; // latest written file name

public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
this.plan = plan;
Expand All @@ -67,6 +78,12 @@ public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, Ph
meta = CatalogUtil.newTableMeta(plan.getStorageType());
}

PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta);

if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE) * StorageUnit.MB;
}

// Find column index to name subpartition directory path
keyNum = this.plan.getPartitionMethod().getExpressionSchema().size();

Expand Down Expand Up @@ -107,33 +124,45 @@ public void init() throws IOException {
if (!fs.exists(storeTablePath.getParent())) {
fs.mkdirs(storeTablePath.getParent());
}

aggregatedStats = new TableStats();
}

protected Path getDataFile(String partition) {
return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName());
}

protected Appender makeAppender(String partition) throws IOException {
Path dataFile = getDataFile(partition);
FileSystem fs = dataFile.getFileSystem(context.getConf());
protected Appender getNextPartitionAppender(String partition) throws IOException {
lastFileName = getDataFile(partition);
FileSystem fs = lastFileName.getFileSystem(context.getConf());

if (fs.exists(dataFile.getParent())) {
LOG.info("Path " + dataFile.getParent() + " already exists!");
if (fs.exists(lastFileName.getParent())) {
LOG.info("Path " + lastFileName.getParent() + " already exists!");
} else {
fs.mkdirs(dataFile.getParent());
LOG.info("Add subpartition path directory :" + dataFile.getParent());
fs.mkdirs(lastFileName.getParent());
LOG.info("Add subpartition path directory :" + lastFileName.getParent());
}

if (fs.exists(dataFile)) {
LOG.info("File " + dataFile + " already exists!");
FileStatus status = fs.getFileStatus(dataFile);
if (fs.exists(lastFileName)) {
LOG.info("File " + lastFileName + " already exists!");
FileStatus status = fs.getFileStatus(lastFileName);
LOG.info("File size: " + status.getLen());
}

Appender appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
openAppender(0);

return appender;
}

public void openAppender(int suffixId) throws IOException {
Path actualFilePath = lastFileName;
if (suffixId > 0) {
actualFilePath = new Path(lastFileName + "_" + suffixId);
}

appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, actualFilePath);

appender.enableStats();
appender.init();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.tajo.engine.planner.physical;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
Expand All @@ -39,8 +37,6 @@
* This class is a physical operator to store at column partitioned table.
*/
public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
private static Log LOG = LogFactory.getLog(HashBasedColPartitionStoreExec.class);

private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();

public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
Expand All @@ -56,7 +52,7 @@ private Appender getAppender(String partition) throws IOException {
Appender appender = appenderMap.get(partition);

if (appender == null) {
appender = makeAppender(partition);
appender = getNextPartitionAppender(partition);
appenderMap.put(partition, appender);
} else {
appender = appenderMap.get(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@

package org.apache.tajo.engine.planner.physical;

import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.storage.StorageConstants;

import java.util.Stack;

public class PhysicalPlanUtil {
Expand All @@ -36,4 +43,60 @@ public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<?
}
}
}

/**
* Set nullChar to TableMeta according to file format
*
* @param meta TableMeta
* @param nullChar A character for NULL representation
*/
private static void setNullCharForTextSerializer(TableMeta meta, String nullChar) {
switch (meta.getStoreType()) {
case CSV:
meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
break;
case RCFILE:
meta.putOption(StorageConstants.RCFILE_NULL, nullChar);
break;
case SEQUENCEFILE:
meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar);
break;
default: // nothing to do
}
}

/**
* Check if TableMeta contains NULL char property according to file format
*
* @param meta Table Meta
* @return True if TableMeta contains NULL char property according to file format
*/
public static boolean containsNullChar(TableMeta meta) {
switch (meta.getStoreType()) {
case CSV:
return meta.containsOption(StorageConstants.CSVFILE_NULL);
case RCFILE:
return meta.containsOption(StorageConstants.RCFILE_NULL);
case SEQUENCEFILE:
return meta.containsOption(StorageConstants.SEQUENCEFILE_NULL);
default: // nothing to do
return false;
}
}

/**
* Set session variable null char TableMeta if necessary
*
* @param context QueryContext
* @param plan StoreTableNode
* @param meta TableMeta
*/
public static void setNullCharIfNecessary(QueryContext context, PersistentStoreNode plan, TableMeta meta) {
if (plan.getType() != NodeType.INSERT) {
// table property in TableMeta is the first priority, and session is the second priority
if (!containsNullChar(meta) && context.containsKey(SessionVars.NULL_CHAR)) {
setNullCharForTextSerializer(meta, context.get(SessionVars.NULL_CHAR));
}
}
}
}
Loading