Skip to content

Commit

Permalink
Fixing deps & serialization for RTView
Browse files Browse the repository at this point in the history
 - hoodie-hadoop-mr now needs objectsize bundled
 - Also updated docs with additional tuning tips
  • Loading branch information
vinothchandar authored and vinothchandar committed Jun 11, 2018
1 parent 85dd265 commit 8f1d362
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
5 changes: 4 additions & 1 deletion docs/configurations.md
Expand Up @@ -159,13 +159,16 @@ summary: "Here we list all possible configurations and what they mean"
Writing data via Hoodie happens as a Spark job and thus general rules of spark debugging applies here too. Below is a list of things to keep in mind, if you are looking to improving performance or reliability.

- **Right operations** : Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. Difference between them is that bulk insert uses a disk based write path to scale to load large inputs without need to cache it.
- **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs
- **Input Parallelism** : By default, Hoodie tends to over-partition input (i.e `withParallelism(1500)`), to ensure each Spark partition stays within the 2GB limit for inputs upto 500GB. Bump this up accordingly if you have larger inputs. We recommend having shuffle parallelism `hoodie.[insert|upsert|bulkinsert].shuffle.parallelism` such that its atleast input_data_size/500MB
- **Off-heap memory** : Hoodie writes parquet files and that needs good amount of off-heap memory proportional to schema width. Consider setting something like `spark.yarn.executor.memoryOverhead` or `spark.yarn.driver.memoryOverhead`, if you are running into such failures.
- **Spark Memory** : Typically, hoodie needs to be able to read a single file into memory to perform merges or compactions and thus the executor memory should be sufficient to accomodate this. In addition, Hoodie caches the input to be able to intelligently place data and thus leaving some `spark.storage.memoryFraction` will generally help boost performance.
- **Sizing files** : Set `limitFileSize` above judiciously, to balance ingest/write latency vs number of files & consequently metadata overhead associated with it.
- **Timeseries/Log data** : Default configs are tuned for database/nosql changelogs where individual record sizes are large. Another very popular class of data is timeseries/event/log data that tends to be more volumnious with lot more records per partition. In such cases
- Consider tuning the bloom filter accuracy via `.bloomFilterFPP()/bloomFilterNumEntries()` to achieve your target index look up time
- Consider making a key that is prefixed with time of the event, which will enable range pruning & significantly speeding up index lookup.
- **GC Tuning** : Please be sure to follow garbage collection tuning tips from Spark tuning guide to avoid OutOfMemory errors
- [Must] Use G1/CMS Collector. Sample CMS Flags to add to spark.executor.extraJavaOptions : ``-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof`
- If it keeps OOMing still, reduce spark memory conservatively: `spark.memory.fraction=0.2, spark.memory.storageFraction=0.2` allowing it to spill rather than OOM. (reliably slow vs crashing intermittently)

Below is a full working production config

Expand Down
6 changes: 6 additions & 0 deletions hoodie-hadoop-mr/pom.xml
Expand Up @@ -79,6 +79,11 @@
<groupId>com.twitter</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>objectsize</artifactId>
<version>0.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down Expand Up @@ -114,6 +119,7 @@
<includes>
<include>com.uber.hoodie:hoodie-common</include>
<include>com.twitter:parquet-avro</include>
<include>com.twitter.common:objectsize</include>
</includes>
</artifactSet>
</configuration>
Expand Down
Expand Up @@ -63,22 +63,22 @@ public String getBasePath() {
}

private static void writeString(String str, DataOutput out) throws IOException {
byte[] pathBytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(pathBytes.length);
out.write(pathBytes);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
out.writeInt(bytes.length);
out.write(bytes);
}

private static String readString(DataInput in) throws IOException {
byte[] pathBytes = new byte[in.readInt()];
in.readFully(pathBytes);
return new String(pathBytes, StandardCharsets.UTF_8);
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}


@Override
public void write(DataOutput out) throws IOException {
super.write(out);

writeString(basePath, out);
writeString(maxCommitTime, out);
out.writeInt(deltaFilePaths.size());
for (String logFilePath : deltaFilePaths) {
Expand All @@ -89,7 +89,7 @@ public void write(DataOutput out) throws IOException {
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);

basePath = readString(in);
maxCommitTime = readString(in);
int totalLogFiles = in.readInt();
deltaFilePaths = new ArrayList<>(totalLogFiles);
Expand Down

0 comments on commit 8f1d362

Please sign in to comment.