Write data to files split by topic and rolled over on size or a timeout, files can be compressed using lzo, snappy, gzip or parquet.
This allows the user to write data and have the api take care of splitting to data into files based on keys e.g. topic-datetime, and rollover the data on a timeout or size.
As of version 1.0.0
depends on java 1.8
Pre 1.0.0
depends on java 1.7
For the function sent to the write function, before version 0.5.0 the argument was a single ^DataOutputStream out. From 0.5.0 and forward a map is passed to the function with the keys: out, future-file-name, file, codec, file-key.
(require '[fileape.core :refer :all])
(import '[java.io File DataOutputStream])
(defn callback-f [{:keys [file]}]
(prn "File rolled " file))
(def ape2 (ape {:codec :gzip
:base-dir "testdir"
:check-freq 5000
:rollover-size 134217728
:rollover-timeout 60000
:roll-callbacks [callback-f]}))
(write ape2 "abc-123" (fn [{:keys [^DataOutputStream out]}]
(.writeInt out (int 1))))
;keys sent to the file write function above are
; out ^java.io.DataOutputStream
; future-file-name ^String the file name after rolling
; file ^java.io.File the file name that is being written
; codec ^clojure.lang.Keyword
; file-key the key used to write the data
; record-counter java.util.concurrent.atomic.AtomicLong (a helper function is provided see record-count)
(close ape2)
import java.io.DataOutputStream;
import java.util.HashMap;
import java.util.Map;
import clojure.lang.AFn;
import clojure.lang.Keyword;
import fileape.FileApeConnector;
import fileape.FileApeConnector.Writer;
public class Test {
public static final void main(String[] args){
final Map<String, Object> conf = new HashMap<String, Object>();
conf.put("base-dir", "/tmp/test");
conf.put("codec", "gzip");
conf.put("check-freq", 5000);
conf.put("rollover-size", 134217728);
conf.put("rollover-timeout", 60000);
conf.put("roll-callbacks", new AFn() {
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object arg) {
Map<Object, Object> fileData = (Map<Object, Object>)arg;
System.out.println("File rolled: " + fileData.get(Keyword.find("file")));
return null;
}
});
//create the connector
final Object connector = FileApeConnector.create(conf);
//using a string
FileApeConnector.write(connector, "test", "Hi\n");
//using bytes
FileApeConnector.write(connector, "test", "Hi\n".getBytes());
//using a callback function
FileApeConnector.write(connector, "test", new Writer(){
//to get the file data map use the write_data method
@Override
public void write(DataOutputStream out) throws Exception {
out.writeChars("Hi\n");
}
});
FileApeConnector.close(connector);
}
LZO is distributed under the GPL 3 license and therefore fileape needs to come in two versions fileape (apache2) and fileape-lzo (gpl).
The api is exactly the same with the difference to the license and the extra libraries required for lzo compression.
For more information on setting up lzo correctly see: https://code.google.com/p/hadoop-gpl-packing/
[fileape-lzo "0.4.4-SNAPSHOT"]
(require '[fileape.core :refer :all])
(import '[java.io File DataOutputStream])
(defn callback-f [{:keys [file]}]
(prn "File rolled " file))
(def ape2 (ape {:codec :lzo
:base-dir "testdir"
:check-freq 5000
:rollover-size 134217728
:rollover-timeout 60000
:roll-callbacks [callback-f]}))
(write ape2 "abc-123" (fn [^DataOutputStream o]
(.writeInt o (int 1))))
(close ape2)
Note on Schemas
Schemas can either be loaded automatically from an avro schema registry
by specifying :avro-schema-registry-url
or manually supplied and a mock
registry passed through using :schema-client
. With the latter
any valid schema client io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
can be used.
(require '[fileape.avro.writer :as awriter] :reload)
(require '[fileape.core :as ape] :reload)
(import '(org.apache.avro Schema$Parser))
(import '(org.apache.avro.generic IndexedRecord GenericData$Record))
(import '(io.confluent.kafka.schemaregistry.client CachedSchemaRegistryClient MockSchemaRegistryClient))
(defn ^IndexedRecord record [schema]
(GenericData$Record. schema))
(defn test-record [sc v]
(let [^IndexedRecord r (record sc)]
(.put r 0 (System/currentTimeMillis))
(.put r 1 (str v))
r))
(def SCHEMA (.parse (Schema$Parser.) "{\"type\":\"record\",\"name\":\"encryption_output\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"line\",\"type\":\"string\"}]}"))
(def schema-client (doto (MockSchemaRegistryClient.)
(.register "mytopic" SCHEMA)))
(def ape2 (ape/ape {:codec :avro :base-dir "/tmp/testdir" :avro-codec "snappy" :schema-client schema-client}))
;;for each key a valid avro schema must exist in the schema registry or supplied by the schema-client
(ape/write ape2 "mytopic" (fn [{:keys [avro]}]
(awriter/write! avro
(test-record SCHEMA 1))))
(ape/close ape2)
Note on Schemas
This parquet implementation supports the Hive way of representing Maps and Lists.
Note repeated attributes are represented, i.e if you want a repeated element it needs to be a List.
Lists
Hive lists are marked in the schema with originalType == java.util.List
And the schema is:
optional group addresses (LIST) {
repeated group bag {
optional group array_element {
optional binary country;
optional binary city;
}
}
}
I know this is weird but this is how Hive likes it in parquet.
Maps
Maps are groups and groups are Maps.
Extra support or change
To change what other original types can be written use the multimethod in fileape.parquet.write-support.write-extended-val
(require '[fileape.parquet.writer :as pwriter] :reload)
(require '[fileape.core :as ape] :reload)
(def schema (pwriter/parse-schema "message test{ required binary name; int32 id;}"))
(def ape2 (ape/ape {:codec :parquet :base-dir "/tmp/testdir" :parquet-codec "gzip" :message-type schema}))
;;for each key we must register a parquet schema and codec
;; the fileape.parquet.writer namespace has helper functions for compiling parquet definition schemas
(ape/update-env! ape2 "a" :parquet-codec :gzip :message-type schema)
;;; note that the callback function sets a :parquet key
;;; this contains the open parquet file context
(ape/write ape2 "a" (fn [{:keys [parquet]}]
(pwriter/write! parquet
{"name" "abc" "id" 1})))
(ape/close ape2)
File writing is done asynchronously and thus no exceptions will be thrown while calling ape/write
.
Exceptions are called into an error handler as (error-handler exception writer-function)
here writer-function
is the function that was sent to the ape/write
function.
(require '[fileape.core :as ape] :reload)
(defn prn-errors [exception f] (prn "Error " exception " f " f))
(def ape2 (ape/ape {:base-dir "/tmp/testdir" :error-handler prn-errors}))
(ape/write ape2 "test" (fn [_] (throw (RuntimeException. "Test"))))
;; "Error " #<RuntimeException java.lang.RuntimeException: Test> " f " #<user$eval9783$fn__9784 user$eval9783$fn__9784@4194c025>
Name | Description |
:codec | the code to use :gzip, :gzip-native, :snappy, :bzip2, :none |
:base-dir | the base directory or directories to use, if a list is specified a directory will be randomly selected on file create |
:rollover-size | size in bytes on which the file is rolled |
:rollover-timeout | if no bytes where written for this period of time in milliseconds to the file its rolled |
:check-freq | frequency in milliseconds in which the open files are checked |
:parallel-files | this is a performance property, for each topic n=parallel-files files will be created. |
:rollover-abs-timeout | A file is only open rollover-abs-timeout milliseconds, default is Long/MAX_VALUE |
:retries | Number of retries to do when creating resources associated with the file, default is 3 |
:retry-sleep | The number of milliseconds to sleep between each retry, default 500ms |
Name | Description |
:parquet-block-size | 136314880, the bigger this value the more memory is used |
:parquet-page-size | 1048576 |
:parquet-memory-min-chunk-size | default 1048576, disable with 0 |
:parquet-codec | :gzip sets parquet-compression |
:parquet-enable-dictionary | false |
Name | Description |
:avro-schema-registry-url | url to the avro schema registry |
:avro-codec | valid values are "deflate", "snappy", "bzip2" any invalid or empty value will default to "deflate" |
:schema-client | either specify :avro-schema-registry-url or :schema-client, schema-client is any valid SchemaRegistryClient instance and will be used instead of the registry url to get schemas if supplied |
Configuration can be overwritten for specific keys passed into write
, while the default top level configuration
applies for other keys.
The following properties can be configured, example used for k = :mytopic, the key passed into write
can be
either a string, symbol or keyword, all would map to :mytopic.
:mytopic.codec |
:mytopic.base-dir |
:mytopic.rollover-size |
:mytopic.rollover-timeout |
:mytopic.parquet-block-size |
:mytopic.parquet-page-size |
:mytopic.parquet-memory-min-chunk-size |
:mytopic.parquet-codec |
:mytopic.parquet-enable-dictionary |
When files are closed they are renamed, this operation in Java delegates to the platform and is platform even file system dependant. In my own personal experience I have seen situations where files are perfectly flushed and closed, then renamed, and using a find command in the background sees the file, passes it into another process, and this last process throws an error with FileNotFoundException, or a mv with std No such file or directory, even worse I've seen files that are markes as corrupt then when I run the same command again they are fine, this suggests that the file rename/move is still being performed but the file is available for reading. Note all this was seen while using java io File.renameTo.
The fileape.io package uses (Files/move (.toPath source-file) (.toPath dest-file) (into-array CopyOption [StandardCopyOption/ATOMIC_MOVE StandardCopyOption/COPY_ATTRIBUTES]))
to try and do an atomic move, BUT if the filesystem doesn't support ATOMIC_MOVE
and exception is thrown and the api falls back to the commons FileUtils to ensure at least
the file is properly moved, but can not solve the issue at hand i.e moves/renames are not atomic. Although using FileUtils it is possible that the whole file could be copied now,
thus allowing other application to again see a file partially.
Use of FileLocks: I don't think File Locks will solve the issue here as it is also just advisory and very much platform dependent.
If you're filesystem doesn't provide aotmic moves then either read files that are a few seconds old, or better yet, have a mechanism (like with parquet files) to check if the file is valid, if invalid retry the check to some kind of timeout.
Thrown by org.apache.parquet.hadoop.MemoryManager updateAllocation, its not clear to me why this should be an exception but in case you get it, to disable this check set :parquet-memory-min-chunk-size to 0
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.