Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Understanding Write Append #114

Closed
JohnOmernik opened this issue Mar 24, 2017 · 16 comments
Closed

Understanding Write Append #114

JohnOmernik opened this issue Mar 24, 2017 · 16 comments

Comments

@JohnOmernik
Copy link

So when we write with append = true is there any good docs on how this works? Write now, I am managing the data files.

Ideally, we'd append to a file in the directory and keep appending until it reaches a certain size, then I want it roll over to a new file. (Performance specific to block sizes in HDFS). Would it be just a good thing that once it reaches a certain size to take the current file and move it to a different file? and then the next write would just start a new file?

so let's say I set a filename to myoutput.parq...

fname = "myoutput.parq"
fastparquet.write(fname, df, append=True)
if sizeof(fname) > 512m: # this is peudo code
curtime = int(time.time())
mv fname curtime.parq

Would that work as intended? I like the append option, but I want to ensure I managing my outputs well.

Thanks!

@martindurant
Copy link
Member

While this should work, I would warn against repeatedly using append with small writes - accumulate the data in memory first and write infrequently. The reason is that each append will create a new row-group and write to the metadata, which will eventually become very large as a result.

Something like

out = []
while True:  # some good number here
    out.append(get_next_row)
    if len(out) > 100000000:   # or some number known to have memory footprint < 512M
        break
df = dataframe_from(out)
fastparquet.write("part.%i.parq" % time.time(), df)
or
fastparquet.write('output.parq', df, file_scheme='hive', append=True)

In the former case of write you could use fastparquet.writer.merge to create the metadata when you are done. The 'hive' case already splits the output into files.

@JohnOmernik
Copy link
Author

JohnOmernik commented Mar 24, 2017 via email

@martindurant
Copy link
Member

Ah, lambda architecture! It is common to have the batch and "speed"/streaming layer separate, perhaps only guaranteeing (eventual) accuracy on the batch side.
I would say that parquet is not really meant for incremental updates, it is batch-oriented. Why not directly pass the JSON lines or dictionaries (in a file, in-memory store, whatever) ? Your query can read from both sources.

@JohnOmernik
Copy link
Author

It is common to have them separate, but when it comes to moving ideas from R&D (batch) to models running on real time data, having those layers merged sure is nice. The way I am trying set things up is to have all data being streaming on kafka, and then use a tool like fast parquet to subscribe to those topics that are long term storage type topics. I could just write the json to files, but then I need another process to move to efficient long term storage. I am trying to simplify my administrative overhead, as well as those "aha" moments when an analyst says "Look at this stream of data, we should store this"

That said, my question about reading and merging was kinda a silly question wasn't it. Of COURSE fastparquet has that feature!

My process is such

Write append in smallish batches to a file: /app/data/2017-03-25/mycurrent.parq

When that file size reaches a max (after the current write) then mv that file to ./part/storage_%curepoch%.parq (ex: ./2017-03-25/storage_12345678.parq)

Then read that file into a new data frame all at once, and turn around and write to ./.tmp/storage_12345678.parq.

Then do one move move of ./.tmp/storage_12345678.parq to ./2017-03-25/storage_12345678.parq

The use of the .tmp is directories starting with . are ignored my Apache Drill (and others I think) thus as that large file is being written, we don't get errors about partial files. Then it's just a Filesystem move (instant) to get the file over the other.

Since that data frame is written as one file, it should eliminate the multiple small row groups.

In addition, since I am doing this all in docker containers, instead of the word "storage" I pass a unique key that the python script can read from ENV variables which will replace that word. Thus, if I use "HOSTNAME" the HOSTNAME ENV of my docker container will allow me to run multiple of these on a larger kafka stream subscribing to partitions effectively parallelizing the outputs. Weee.

I think this will work, I testing over the weekend here. I will report back.

@martindurant
Copy link
Member

Many small operations, but each should be fast, so perhaps this will work for you.
You should know, that the size parquet file with a single row-group will be smaller than of a file with many row-groups, because of the extra metadata and headers for each row-group, and less opportunity for efficient compression. If the row-groups are very small, the difference could be substantial.

@JohnOmernik
Copy link
Author

I know this is closed, however, I thought I'd mention that I used a method for tracking the files I've written, and when they get to a certain size, I did the merge. I am still toying with the idea of how to handle when I have multiple writers going at once... (large through put data sets). I have at the base, one output per writer, and trying to come up with a decent way to merge those... The writer I did is a generic tool for reading JSON records off a Kafka queue, handling partitioning etc. Providing some basic config, and writing to a location on a POSIX filesystem. (This doesn't really help HDFS folks yet... maybe next feature?) It can be found here: https://github.com/JohnOmernik/pyetl

@martindurant
Copy link
Member

Writing to HDFS ought to be simple by replacing s3fs in the example by hdfs3

@sethusabarish
Copy link

@martindurant hdfs3 objects are not callable and fastparquet expects it otherwise:

"open_with functions provided to the constructor must be callable with f(path, mode) and produce an open file context"

@martindurant
Copy link
Member

@sethusabarish , the example would look something like this

import fastparquet as fp
import hdfs3
hdfs = hdfs3.HDFileSystem(host='myhost', port=9000, ...)
pf = fp.ParquetFile('/user/myuser/file.parquet', open_with=hdfs.open)

@sethusabarish
Copy link

Hi @martindurant Thanks! Just happened to figure it out from the test code. It slipped somehow.

I think we can cite an example in the documentation.

@martindurant
Copy link
Member

Are you interested in providing a PR?

@sethusabarish
Copy link

Excuse my naivety. Whats a PR?

@martindurant
Copy link
Member

A pull request (one of the tabs at the head of this page): a change someone wants to make to the code in the repository. This is how the code evolves: commits are made to some branch on a copy of this repository on some else's account (a fork), and then "submit pull request" is selected to notify the admins of the proposed change.

@sethusabarish
Copy link

Sure will do.

@sethusabarish
Copy link

sethusabarish commented Sep 20, 2017

@martindurant Encountered an Exception when tried to do an append:

write('<HDFS_PATH>', <DATAFRAME>, open_with=fs.open, append=True)

Exception: Mode must be 'rb' (read), 'wb' (write, new file), or 'ab' (append).

Did i miss something? I prefer file scheme to be 'simple'. It has to do something with append? (Pyarrow looks to support only 'ab' and not 'rb+')

@martindurant
Copy link
Member

Ah, indeed: HDFS does not allow writing to a file anywhere except at the end, which means that you cannot append with a simple file scheme. (also, if you will be writing with hive mode, you need in general to also provide a mkdir function to write.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants