Skip to content

Commit

Permalink
[CARBONDATA-2418] [Presto] [S3] Fixed Presto Can't Query CarbonData W…
Browse files Browse the repository at this point in the history
…hen CarbonStore is at S3

This closes #2287
  • Loading branch information
anubhav100 authored and jackylk committed Jun 18, 2018
1 parent e7fed36 commit dc4f87b
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 4 deletions.
28 changes: 28 additions & 0 deletions integration/presto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,34 @@ Please follow the below steps to query carbondata in presto
For example, if you have a schema named 'default' stored in hdfs://namenode:9000/test/carbondata/,
Then set carbondata-store=hdfs://namenode:9000/test/carbondata

#### Connecting to carbondata store on s3
* In case you want to query carbonstore on S3 using S3A api put following additional properties inside $PRESTO_HOME$/etc/catalog/carbondata.properties
```
Required properties
fs.s3a.access.key={value}
fs.s3a.secret.key={value}
Optional properties
fs.s3a.endpoint={value}
```
* In case you want to query carbonstore on s3 using S3 api put following additional properties inside $PRESTO_HOME$/etc/catalog/carbondata.properties
```
fs.s3.awsAccessKeyId={value}
fs.s3.awsSecretAccessKey={value}
```
* In case You want to query carbonstore on s3 using S3N api put following additional properties inside $PRESTO_HOME$/etc/catalog/carbondata.properties
```
fs.s3n.awsAccessKeyId={value}
fs.s3n.awsSecretAccessKey={value}
```
Replace the schema-store-path with the absolute path of the parent directory of the schema.
For example, if you have a schema named 'default' stored in a bucket s3a://s3-carbon/store,
Then set carbondata-store=s3a://s3-carbon/store

#### Unsafe Properties
enable.unsafe.in.query.processing property by default is true in CarbonData system, the carbon.unsafe.working.memory.in.mb
property defines the limit for Unsafe Memory usage in Mega Bytes, the default value is 512 MB.
If your tables are big you can increase the unsafe memory, or disable unsafe via setting enable.unsafe.in.query.processing=false.
Expand Down
49 changes: 48 additions & 1 deletion integration/presto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,54 @@
<artifactId>lz4-java</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.carbondata.presto;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.presto.impl.CarbonTableConfig;
import org.apache.carbondata.presto.impl.CarbonTableReader;
import com.facebook.presto.spi.*;
import com.facebook.presto.spi.connector.ConnectorMetadata;
Expand All @@ -37,6 +40,9 @@
import static org.apache.carbondata.presto.Types.checkType;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;

public class CarbondataMetadata implements ConnectorMetadata {
private final String connectorId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.carbondata.presto.impl;

import io.airlift.configuration.Config;

import javax.validation.constraints.NotNull;

import io.airlift.configuration.Config;

/**
* Configuration read from etc/catalog/carbondata.properties
*/
Expand All @@ -32,6 +32,14 @@ public class CarbonTableConfig {
private String storePath;
private String unsafeMemoryInMb;
private String enableUnsafeInQueryExecution;
private String s3A_acesssKey;
private String s3A_secretKey;
private String s3_acesssKey;
private String s3_secretKey;
private String s3N_acesssKey;
private String s3N_secretKey;
private String endPoint;


@NotNull public String getDbPath() {
return dbPath;
Expand Down Expand Up @@ -79,4 +87,73 @@ public CarbonTableConfig setEnableUnsafeInQueryExecution(String enableUnsafeInQu
this.enableUnsafeInQueryExecution = enableUnsafeInQueryExecution;
return this;
}

public String getS3A_AcesssKey() {
return s3A_acesssKey;
}

public String getS3A_SecretKey() {
return s3A_secretKey;
}

public String getS3_AcesssKey() {
return s3_acesssKey;
}

public String getS3_SecretKey() {
return s3_secretKey;
}

public String getS3N_AcesssKey() {
return s3N_acesssKey;
}

public String getS3N_SecretKey() {
return s3N_secretKey;
}

public String getS3EndPoint() {
return endPoint;
}


@Config("fs.s3a.access.key")
public CarbonTableConfig setS3A_AcesssKey(String s3A_acesssKey) {
this.s3A_acesssKey = s3A_acesssKey;
return this;
}

@Config("fs.s3a.secret.key")
public CarbonTableConfig setS3A_SecretKey(String s3A_secretKey) {
this.s3A_secretKey = s3A_secretKey;
return this;
}

@Config("fs.s3.awsAccessKeyId")
public CarbonTableConfig setS3_AcesssKey(String s3_acesssKey) {
this.s3_acesssKey = s3_acesssKey;
return this;
}

@Config("fs.s3.awsSecretAccessKey")
public CarbonTableConfig setS3_SecretKey(String s3_secretKey) {
this.s3_secretKey = s3_secretKey;
return this;
}
@Config("fs.s3n.awsAccessKeyId")
public CarbonTableConfig setS3N_AcesssKey(String s3N_acesssKey) {
this.s3N_acesssKey = s3N_acesssKey;
return this;
}

@Config("fs.s3.awsSecretAccessKey")
public CarbonTableConfig setS3N_SecretKey(String s3N_secretKey) {
this.s3N_secretKey = s3N_secretKey;
return this;
}
@Config("fs.s3a.endpoint")
public CarbonTableConfig setS3EndPoint(String endPoint) {
this.endPoint = endPoint;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -79,6 +80,9 @@
import org.apache.thrift.TBase;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;

/**
* CarbonTableReader will be a facade of these utils
Expand All @@ -98,7 +102,7 @@ public class CarbonTableReader {
return CarbonTablePath.isCarbonDataFile(path.getName());
}
};
private CarbonTableConfig config;
public CarbonTableConfig config;
/**
* The names of the tables under the schema (this.carbonFileList).
*/
Expand Down Expand Up @@ -132,6 +136,7 @@ public class CarbonTableReader {
this.config = requireNonNull(config, "CarbonTableConfig is null");
this.carbonCache = new AtomicReference(new HashMap());
tableList = new ConcurrentSet<>();
setS3Properties();
}

/**
Expand Down Expand Up @@ -506,5 +511,21 @@ private CarbonTableInputFormat<Object> createInputFormat( Configuration conf,
return format;
}

private void setS3Properties(){
FileFactory.getConfiguration()
.set(ACCESS_KEY, Objects.toString(config.getS3A_AcesssKey(),""));
FileFactory.getConfiguration()
.set(SECRET_KEY, Objects.toString(config.getS3A_SecretKey()));
FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
Objects.toString(config.getS3_AcesssKey(),""));
FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
Objects.toString(config.getS3_SecretKey()));
FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
Objects.toString(config.getS3N_AcesssKey(),""));
FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
Objects.toString(config.getS3N_SecretKey(),""));
FileFactory.getConfiguration().set(ENDPOINT,
Objects.toString(config.getS3EndPoint(),""));
}

}

0 comments on commit dc4f87b

Please sign in to comment.