-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor internals to allow better memory efficiency #510
Conversation
refs::Vector{Vector{String}} | ||
buf::Vector{UInt8} | ||
tapes::Vector{Vector{UInt64}} | ||
columns::Vector{AbstractVector} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change here is moving from storing the raw tapes + additional info to just storing the final, Column
structures directly. That moves a bit of work from getproperty
on CSV.File
previously to CSV.File
construction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main advantage of this change is it removes a bunch of duplicated logic that existed between tables.jl and iteration.jl before, since row iterating on CSV.File
now just makes a lazy Row
wrapper that indexes the already-created CSV.Column
s.
@@ -96,6 +101,17 @@ function checkvalidsource(source) | |||
throw(ArgumentError("\"$source\" is not a valid file")) | |||
end | |||
|
|||
function allocate(rowsguess, ncols, typecodes) | |||
tapes = Vector{UInt64}[Mmap.mmap(Vector{UInt64}, usermissing(typecodes[i]) ? 0 : rowsguess) for i = 1:ncols] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes here include:
- if a user passes
Missing
as a column type, then we avoid allocating any tape/poslen vectors at all; any value found in the cell while parsing will still be parsed, but no value/position/length will be stored anywhere. TheCSV.Column{Missing}
just checks thatgetindex
is in bounds and then returnsmissing
. This can help a lot for large files that have entire columns of null values (though you have to passMissing
as the column type manually). It also provides a way we could do really efficient "baked in" column selection while parsing; we would just set the non-selected columns as having column typeMissing
and they essentially get skipped - The other change here is that if the user provides the column type for any other kind of column, we don't allocate the
poslen
vector. Theposlen
vector for a column tracks the position and length of each cell, in case we need to promote the column to a string at any point later on while parsing. For user-provided types, the user is essentially declaring the column type should be X, so we don't need to track this additional info (which costs memory). Previously, the columntape
stored the raw parsed values and poslen values in the same array; separating them intotape
andposlen
allows us to avoid theposlen
when it's not needed.
src/CSV.jl
Outdated
end | ||
return File(getname(source), names, finaltypes, rows, ncols, eq, categorical, finalrefs, buf, tapes) | ||
if threaded === true | ||
columns = AbstractVector[ApplyArray(vcat, (Column{_eltype(finaltypes[i]), finaltypes[i]}(tapes[j][i], rows[j], eq, categorical, finalrefs[i], buf, finaltypes[i] >: Int64 ? uint64(INT_SENTINELS[intsentinel.x]) : sentinelvalue(Base.nonmissingtype(finaltypes[i]))) for j = 1:Threads.nthreads())...) for i = 1:ncols] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the multithreaded case, each thread parses an equal chunk of the file. Previously, we were then allocating a final, huge tape and copying each thread's chunk into the final tape.
Here, we're using the quite-well-optimized LazyArray.Vcat
type to lazily vcat
the threads' tapes into a final "column". So essentially in the non-threaded case, you get back a Vector{Column}
, and in the multithreaded case, you get back Vector{LazyArrays.Vcat{Column}
. I've tried to do a pretty thorough benchmarking/stress-testing on LazyArray.Vcat
and I've been really impressed with the optimizations they've implemented for broadcasting, reductions, etc.
I also feel like we've already conditioned users to know that in the copycols=false
case, you're getting some kind of optimized "view" AbstractVector
, and to use copycols=true
if they need a final, full Vector
.
else | ||
columns = AbstractVector[Column{_eltype(finaltypes[i]), finaltypes[i]}(tapes[i], rows, eq, categorical, finalrefs[i], buf, finaltypes[i] >: Int64 ? uint64(INT_SENTINELS[intsentinel.x]) : sentinelvalue(Base.nonmissingtype(finaltypes[i]))) for i = 1:ncols] | ||
end | ||
lookup = Dict(k => v for (k, v) in zip(names, columns)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen a couple comments and we had at least one issue opened about the cost of getproperty
on CSV.Row
and CSV.Row2
, so throwing a lookup Dict into CSV.File
and CSV.Rows
seems like not a high price to pay for faster property access later.
end | ||
end | ||
@sync for j = 1:N | ||
@static if VERSION >= v"1.3-DEV" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By using LazyArrays
, it also made me realize we can clean up the ref-array syncing code in the multithreaded case. Instead of a two-step, complicated process, we now just do the single loop above and recode a pooled columns refs if needed.
oldtapes = tapes | ||
newtapelen = ceil(Int64, tapelen * 1.5) | ||
newtapes = Vector{UInt64}[Mmap.mmap(Vector{UInt64}, newtapelen) for i = 1:ncols] | ||
# if our initial row estimate was too few, we need to reallocate our tapes/poslens to read the rest of the file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a much smarter algorithm for re-allocating column tapes if we've underestimated the # of rows to parse. Before, we were just blindly doing tapelen * 1.5
, which could be disastrous on really large files. We now look at how many bytes are left in the file, how many rows we've parsed vs. how many rows we've parsed, and make a smarter guess at how many rows are left in the file. We then allocate the new tapes one at a time, freeing the previous column's old tape before allocating the next column's new tape.
src/CSV.jl
Outdated
intsent = uint64(INT_SENTINELS[intsentinel.x]) | ||
for i = 1:(row - 1) | ||
@inbounds z = tape[i] | ||
@inbounds tape[i] = ifelse(z === intsent, sentinelvalue(Float64), uint64(Float64(int64(z)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past, we didn't go back and recode already-parsed Int64 values to Float64, but stored a bit to say that it was originally a Int64
or not. We now don't keep the extra poslen
vectors around after parsing (since when we finish, we then know the perfect types for each column and can free that memory up).
Now, if we have been parsing an Int64
column that needs to be promoted to Float64
, we just go recode the past values, which makes less work in CSV.Column
indexing and one less thing to keep track of.
missingvalue(offlen) && return missing | ||
return getvalue(Base.nonmissingtype(T), f, ind, offlen, col) | ||
@inline function Base.getproperty(row::Row{Int}, name::Symbol) | ||
column = getcolumn(getfile(row), name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we get to remove a lot of duplicated logic from tables.jl and just index the CSV.Column
for our CSV.Row
.
return x | ||
end | ||
|
||
@inline Base.@propagate_inbounds function Parsers.parse(::Type{T}, r::Row2, i::Int) where {T} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two functions now allow an even more efficient way to parse values when iterating CSV.Rows
, like:
for row in CSV.Rows(file)
int = Parsers.parse(Int64, row, :column1)
end
which will avoid the intermediate String
allocation when doing row.column1
.
@@ -97,15 +99,15 @@ gettypecodes(x::Dict{TypeCode, TypeCode}) = x | |||
# bit patterns for missing value, int value, escaped string, position and len in tape parsing | |||
const MISSING_BIT = 0x8000000000000000 | |||
missingvalue(x::UInt64) = (x & MISSING_BIT) == MISSING_BIT | |||
sentinelvalue(::Type{Float64}) = Core.bitcast(UInt64, NaN) | MISSING_BIT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One wrinkle I didn't anticipate from separating our previous tape
into tape
+ poslen
is how to encode missing
values. Before we always had a poslen
value around where we could steal a bit, but now we don't keep poslen
around after parsing. It turns out that for all but Int64
, we can find an empty bit pattern to use as a sentinel for missing
(we use an obscure NaN
bit pattern for Float64
, and Core.bitcast(UInt64, typemin(Int64))
for all other types).
I wrestled with what to do for Int64
, since every bit pattern is a valid Int64
. I first considered typemin(Int64)
, but worried that it would turn out to actually be more frequent of a value encountered in the wild than a bunch of random Int64
s (I'm thinking about manually encoded sentinels where they use typemin(Int64)
themselves to mean something special). The approach here has 4 random Int64 values, positive and negative, and if we encounter one, we'll use the next one and recode the old sentinels. I think statistically we're in the clear here, but it is a bit unfortunate that this is so much uglier than other types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that an error is thrown if the 4 values are encountered in the data? I guess that's OK, but OTC it would be a real problem if a missing value was silently generated instead of the actual value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, actually we don't throw an error if we encounter all 4, but we probably should do a fatal error, or perhaps allow the user to manually pass a sentinel to use for Int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd throw an error for now and see whether that works. Providing a sentinel manually is really not user-friendly.
BTW, rather than random values, we can probably find good values by hand. For example, typemax(Int)-1
is a really good candidate. OTC, anything close to 0 should be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we choose 4 sentinels? My intuition is that INT_SENTINEL
could be a Ref
. We initialize it somehow (here we can discuss). If we hit a collision (in practice I think it will be extremely unlikely) then we just carefully generate a new value to store in this Ref
so that we are sure it was never encountered yet. In this way we will have no limie of 4 values, and moreover the code should be simpler (we do not need to keep track of the counter).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I understand (and that is why I think picking a random sentinel as you have described it is OK). I just to not see a reason why you allow only up to 4 sentinels. You could just push!
a new sentinel if needed (making sure to lock this vector in case multithreading is used).
Of course 4 should be enough in practice, but why allow the possibility of error, even minute, as it seems you can easily avoid it altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(making sure to lock this vector in case multithreading is used).
So..........it turns out, I somehow completely didn't consider the multithreaded case in all of this and it's all obviously wrong (the recoding, bumping sentinels, and not just for Int types, but other types are affected as well). This probably explains why we still see issues in #495 and #484. I think I have an idea of how to adjust all this for multithreaded, so I'll take a stab at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you - please let me know when you are done so we can come back to this excellent PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, just pushed some new commits. The changes are these:
- Instead of using a single
AtomicVector
that each thread accesses/updates, we have a per-thread types vector - So each thread will start "fresh" when parsing, detect values, and update/promote types as it's chunk of the file is parsed
- After all threads have finished, and similar to what we were already doing for pooled columns, we now "sync" the types detected from each thread and adjust if needed; e.g. if one thread parsed all Ints, it will have an Int column type, but another thread for the same column may have encountered a String value and promoted it. In our "sync" phase, we promote the types from each thread for each column, so that column's type would be promoted to String and we would recode the Int column values in that thread.
- The recoding/promoting of the "sync" phase isn't too bad/complex, and should be fairly rare, so I'm not too worried about any kind of serious performance hit.
- For the Int64 sentinel story, we now allocate an Int64 sentinel per column per thread, and do a similar "sync" afterwards, to ensure each column is encoded to a single sentinel value. It's a pain and a lot of code for how statistically unlikely it will be to ever hit that case, but whatever, we handle it and I just need to find a way to test it. Basically the logic is: do the per-thread parsing, afterwards, we set our
intsentinels
equal to the first thread'sintsentinels
, then check if any other column in any other thread ended up w/ a different int sentinel than the starting one (already pretty unlikely). If so, we then bump that column's int sentinel to the one that thread ended up (since we know it's valid for that thread). We then have a pass if a column's int sentinel isn't the starting one, to check if the new int sentinel is equal to any parsed value in any other thread. If so, we bump the int sentinel again and re-check all the threads until we find a sentinel that isn't present in the parsed value of any thread. We then do the actual recoding of each thread's sentinel values to the new, unique/good int sentinel for that column. Whew......
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that in order to test the Int64
sentinel story we have to generate the data that contains exactly the sentinels that you are expecting to use (I have not dug in the code yet, but I assume that you are generating them in a way that can be made reproducible e.g. by setting appropriate seeds for PRNG).
end | ||
end | ||
|
||
Base.size(a::AtomicVector) = size(a.A) | ||
Base.IndexStyle(::Type{A}) where {A <: AtomicVector} = Base.IndexLinear() | ||
|
||
Base.getindex(a::AtomicVector, i::Int) = a.A[i][] | ||
Base.setindex!(a::AtomicVector, x, i::Int) = setindex!(a.A[i], x) | ||
for typ in Base.Threads.atomictypes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This updates our AtomicVector
type to be more efficient. Instead of storing a Threads.Atomic{T}
for each element and having extra pointer indirections for each load/store, we generate the llvm intrinsic instructions ourselves for getindex
/setindex!
for each type.
Long term it'd be great to have a ThreadedDataStructures.jl package or something where this can live, but for now, this does what we want/need for multithreaded parsing, and it's pretty simple.
For reference, I basically copied the llvmcall
s from Base and used CudaNative.jl and tips from @vchuravy/@maleadt.
""", Cvoid, Tuple{Ptr{$typ}, $typ}, pointer(x.A, i), v) | ||
end | ||
|
||
function unset!(A::Vector, i::Int, row, x) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're being a bit tricky here, but it's in the name of memory efficiency. First we finalize
our mmapped array, which calls munmap
underneath, then we call jl_arrayunset
, which deletes the Vector{UInt64}
Julia structure. This will mostly be called when columns get promoted from some type to String
, in which case we don't need both tape
and poslen
, so we get rid of poslen
to avoid extra allocations.
A little more commentary on this PR for those interested: this is about my 4th iteration of internals refactoring over the past couple weeks. I tried out a bunch of different approaches to achieving goals of better memory footprint and overall performance/simplicity. Obviously, it's a big lift due to the amount of complexity we already employ to get various performance levels. Most of my initial approaches tried limited versions of the current PR, but tried to leave the existing encoding for
Also before I merge, I want to make sure to go through docs and update things with some of these optimizations documented; i.e. for extremely large files, it's best to provide types manually, or set column types to |
Codecov Report
@@ Coverage Diff @@
## master #510 +/- ##
========================================
- Coverage 83.34% 72.34% -11%
========================================
Files 7 7
Lines 1243 1179 -64
========================================
- Hits 1036 853 -183
- Misses 207 326 +119
Continue to review full report at Codecov.
|
* adding LazyArrays.jl requirements See #512 * Update Project.toml
Ok, update here: I pushed some more fixes that I think gets this into "correct" territory. I still want to run some more performance tests on large files and add some tests for some of the new code paths, but I'd appreciate if anyone has any comments here or more simply just tries this branch out on a few files and lets me know if they see anything ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really have serious remarks to make here. Feel free to point me at particular changes if you want.
src/CSV.jl
Outdated
@@ -116,7 +137,7 @@ end | |||
By supporting the Tables.jl interface, a `CSV.File` can also be a table input to any other table sink function. Like: | |||
|
|||
```julia | |||
# materialize a csv file as a DataFrame | |||
# materialize a csv file as a DataFrame, allowing DataFrames to take ownership of the CSV.File columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Take ownership" isn't very explicit without more explanation. Maybe just "without copying columns"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
# promote typecodes from each thread | ||
for col = 1:ncols | ||
for i = 1:N | ||
@inbounds typecodes[col] = promote_typecode(typecodes[col], perthreadtypecodes[i][col]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why apply @inbounds
only to this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied @inbounds
to the other line
Co-Authored-By: Milan Bouchet-Valat <nalimilan@club.fr>
Co-Authored-By: Milan Bouchet-Valat <nalimilan@club.fr>
Just tested latest master and CSV.jl is basically on par with data.table on my 16G RAM laptop on thsi 5GB CSV https://www.kaggle.com/xiaodaizj/mortgageriskfeaturetools |
Also fixes #508, #507, #501, #500, #488, #484, #432, and #424.
To aid in reviewing such a large PR, I'm going to add some more detailed comments around the specific code changes, and people can chime in with thoughts/concerns where appropriate.