Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving out of box experience for data source #295

Merged
merged 2 commits into from Jun 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 46 additions & 1 deletion docs/configurations.md
Expand Up @@ -7,6 +7,8 @@ toc: false
summary: "Here we list all possible configurations and what they mean"
---

### Configuration

* [HoodieWriteConfig](#HoodieWriteConfig) <br/>
<span style="color:grey">Top Level Config which is passed in when HoodieWriteClent is created.</span>
- [withPath](#withPath) (hoodie_base_path) <br/>
Expand Down Expand Up @@ -152,4 +154,47 @@ summary: "Here we list all possible configurations and what they mean"
`instant_time <= END_INSTANTTIME` are fetched out.</span>


{% include callout.html content="Hoodie is a young project. A lot of pluggable interfaces and configurations to support diverse workloads need to be created. Get involved [here](https://github.com/uber/hoodie)" type="info" %}
### Tuning

Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability.

- **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it.
- **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs. We recommend having shuffle parallelism `hoodie.[insert|upsert|bulkinsert].shuffle.parallelism` such that its atleast input_data_size/500MB
- **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures.
- **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance.
- **Sizing files** : Set `limitFileSize` above judiciously, to balance ingest/write latency vs number of files & consequently metadata overhead associated with it.
- **Timeseries/Log data** : Default configs are tuned for database/nosql changelogs where individual record sizes are large. Another very popular class of data is timeseries/event/log data that tends to be more volumnious with lot more records per partition. In such cases
- Consider tuning the bloom filter accuracy via `.bloomFilterFPP()/bloomFilterNumEntries()` to achieve your target index look up time
- Consider making a key that is prefixed with time of the event, which will enable range pruning & significantly speeding up index lookup.
- **GC Tuning** : Please be sure to follow garbage collection tuning tips from Spark tuning guide to avoid OutOfMemory errors
- [Must] Use G1/CMS Collector. Sample CMS Flags to add to spark.executor.extraJavaOptions : ``-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof`
- If it keeps OOMing still, reduce spark memory conservatively: `spark.memory.fraction=0.2, spark.memory.storageFraction=0.2` allowing it to spill rather than OOM. (reliably slow vs crashing intermittently)

Below is a full working production config

```
spark.driver.extraClassPath /etc/hive/conf
spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include gc tuning knobs

spark.driver.maxResultSize 2g
spark.driver.memory 4g
spark.executor.cores 1
spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id driver
spark.executor.instances 300
spark.executor.memory 6g
spark.rdd.compress true

spark.kryoserializer.buffer.max 512m
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.memoryFraction 0.2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is deprecated. remove.

spark.shuffle.service.enabled true
spark.sql.hive.convertMetastoreParquet false
spark.storage.memoryFraction 0.6
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same remove..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash you may want to double check configs to see if we are still setting old props..

Also anything you can add here for reliable spark configs would be appreciated.

spark.submit.deployMode cluster
spark.task.cpus 1
spark.task.maxFailures 4

spark.yarn.driver.memoryOverhead 1024
spark.yarn.executor.memoryOverhead 3072
spark.yarn.max.executor.failures 100
```
20 changes: 19 additions & 1 deletion docs/s3_filesystem.md
Expand Up @@ -17,7 +17,9 @@ There are two configurations required for Hoodie-S3 compatibility:

### AWS Credentials

Add the required configs in your core-site.xml from where Hoodie can fetch them. Replace the `fs.defaultFS` with your S3 bucket name and Hoodie should be able to read/write from the bucket.
Simplest way to use Hoodie with S3, is to configure your `SparkSession` or `SparkContext` with S3 credentials. Hoodie will automatically pick this up and talk to S3.

Alternatively, add the required configs in your core-site.xml from where Hoodie can fetch them. Replace the `fs.defaultFS` with your S3 bucket name and Hoodie should be able to read/write from the bucket.

```
<property>
Expand Down Expand Up @@ -51,6 +53,22 @@ Add the required configs in your core-site.xml from where Hoodie can fetch them.
</property>
```


Utilities such as hoodie-cli or deltastreamer tool, can pick up s3 creds via environmental variable prefixed with `HOODIE_ENV_`. For e.g below is a bash snippet to setup
such variables and then have cli be able to work on datasets stored in s3

```
export HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key=$accessKey
export HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key=$secretKey
export HOODIE_ENV_fs_DOT_s3_DOT_awsAccessKeyId=$accessKey
export HOODIE_ENV_fs_DOT_s3_DOT_awsSecretAccessKey=$secretKey
export HOODIE_ENV_fs_DOT_s3n_DOT_awsAccessKeyId=$accessKey
export HOODIE_ENV_fs_DOT_s3n_DOT_awsSecretAccessKey=$secretKey
export HOODIE_ENV_fs_DOT_s3n_DOT_impl=org.apache.hadoop.fs.s3a.S3AFileSystem
```



### AWS Libs

AWS hadoop libraries to add to our classpath
Expand Down
Expand Up @@ -41,7 +41,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TABLE_NAME = "hoodie.table.name";
private static final String BASE_PATH_PROP = "hoodie.base.path";
private static final String AVRO_SCHEMA = "hoodie.avro.schema";
private static final String DEFAULT_PARALLELISM = "200";
private static final String DEFAULT_PARALLELISM = "1500";
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
Expand Down
Expand Up @@ -74,6 +74,13 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
return reader.read(null, decoder);
}

public static boolean isMetadataField(String fieldName) {
return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
}

/**
* Adds the Hoodie metadata fields to the given schema
Expand All @@ -98,7 +105,9 @@ public static Schema addMetadataFields(Schema schema) {
parentFields.add(partitionPathField);
parentFields.add(fileNameField);
for (Schema.Field field : schema.getFields()) {
parentFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), null));
if (!isMetadataField(field.name())) {
parentFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), null));
}
}

Schema mergedSchema = Schema
Expand Down
6 changes: 6 additions & 0 deletions hoodie-hadoop-mr/pom.xml
Expand Up @@ -79,6 +79,11 @@
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>objectsize</artifactId>
<version>0.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down Expand Up @@ -114,6 +119,7 @@
<includes>
<include>com.uber.hoodie:hoodie-common</include>
<include>com.twitter:parquet-avro</include>
<include>com.twitter.common:objectsize</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import java.io.Serializable;
Expand Down Expand Up @@ -61,6 +60,9 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
private HashSet<String> nonHoodiePathCache;


private transient FileSystem fs;


public HoodieROTablePathFilter() {
hoodiePathCache = new HashMap<>();
nonHoodiePathCache = new HashSet<>();
Expand All @@ -79,7 +81,6 @@ private Path safeGetParentsParent(Path path) {
return null;
}


@Override
public boolean accept(Path path) {

Expand All @@ -88,9 +89,8 @@ public boolean accept(Path path) {
}
Path folder = null;
try {
FileSystem fs = path.getFileSystem(FSUtils.prepareHadoopConf(new Configuration()));
if (fs.isDirectory(path)) {
return true;
if (fs == null) {
fs = path.getFileSystem(new Configuration());
}

// Assumes path is a file
Expand Down
Expand Up @@ -63,22 +63,22 @@ public String getBasePath() {
}

private static void writeString(String str, DataOutput out) throws IOException {
byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(pathBytes.length);
out.write(pathBytes);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(bytes.length);
out.write(bytes);
}

private static String readString(DataInput in) throws IOException {
byte[] pathBytes = new byte[in.readInt()];
in.readFully(pathBytes);
return new String(pathBytes, StandardCharsets.UTF_8);
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}


@Override
public void write(DataOutput out) throws IOException {
super.write(out);

writeString(basePath, out);
writeString(maxCommitTime, out);
out.writeInt(deltaFilePaths.size());
for (String logFilePath : deltaFilePaths) {
Expand All @@ -89,7 +89,7 @@ public void write(DataOutput out) throws IOException {
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);

basePath = readString(in);
maxCommitTime = readString(in);
int totalLogFiles = in.readInt();
deltaFilePaths = new ArrayList<>(totalLogFiles);
Expand Down
2 changes: 1 addition & 1 deletion hoodie-spark/pom.xml
Expand Up @@ -142,7 +142,7 @@
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Expand Up @@ -43,6 +43,16 @@ object AvroConversionUtils {
}
}

def getNewRecordNamespace(elementDataType: DataType,
currentRecordNamespace: String,
elementName: String): String = {

elementDataType match {
case StructType(_) => s"$currentRecordNamespace.$elementName"
case _ => currentRecordNamespace
}
}

def createConverterToAvro(dataType: DataType,
structName: String,
recordNamespace: String): (Any) => Any = {
Expand All @@ -60,7 +70,10 @@ object AvroConversionUtils {
case DateType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Date].getTime
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(elementType, structName, recordNamespace)
val elementConverter = createConverterToAvro(
elementType,
structName,
getNewRecordNamespace(elementType, recordNamespace, structName))
(item: Any) => {
if (item == null) {
null
Expand All @@ -77,7 +90,10 @@ object AvroConversionUtils {
}
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(valueType, structName, recordNamespace)
val valueConverter = createConverterToAvro(
valueType,
structName,
getNewRecordNamespace(valueType, recordNamespace, structName))
(item: Any) => {
if (item == null) {
null
Expand All @@ -94,7 +110,10 @@ object AvroConversionUtils {
val schema: Schema = SchemaConverters.convertStructToAvro(
structType, builder, recordNamespace)
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(field.dataType, field.name, recordNamespace))
createConverterToAvro(
field.dataType,
field.name,
getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
(item: Any) => {
if (item == null) {
null
Expand Down
17 changes: 11 additions & 6 deletions hoodie-spark/src/main/scala/com/uber/hoodie/DefaultSource.scala
Expand Up @@ -134,23 +134,29 @@ class DefaultSource extends RelationProvider
df: DataFrame): BaseRelation = {

val parameters = parametersWithWriteDefaults(optParams).toMap
val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
val tblName = parameters.get(HoodieWriteConfig.TABLE_NAME)
if (path.isEmpty || tblName.isEmpty) {
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.")
}
val serializer = sparkContext.getConf.get("spark.serializer")
if (!serializer.equals("org.apache.spark.serializer.KryoSerializer")) {
throw new HoodieException(s"${serializer} serialization is not supported by hoodie. Please use kryo.")
}

val storageType = parameters(STORAGE_TYPE_OPT_KEY)
val operation = parameters(OPERATION_OPT_KEY)

// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
sqlContext.sparkContext.getConf.registerKryoClasses(
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sqlContext.sparkContext.getConf.registerAvroSchemas(schema)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}");

// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(
Expand All @@ -167,7 +173,7 @@ class DefaultSource extends RelationProvider


val basePath = new Path(parameters.get("path").get)
val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(basePath)

// Handle various save modes
Expand All @@ -190,12 +196,11 @@ class DefaultSource extends RelationProvider
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType);
properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived");
HoodieTableMetaClient.initializePathAsHoodieDataset(
sqlContext.sparkContext.hadoopConfiguration, path.get, properties);
HoodieTableMetaClient.initializePathAsHoodieDataset(sparkContext.hadoopConfiguration, path.get, properties);
}

// Create a HoodieWriteClient & issue the write.
val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sqlContext.sparkContext),
val client = DataSourceUtils.createHoodieClient(new JavaSparkContext(sparkContext),
schema.toString,
path.get,
tblName.get,
Expand Down