Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
bug in arrowtable accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
tgruben committed Apr 7, 2023
1 parent 3e22826 commit fffcb0b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
9 changes: 7 additions & 2 deletions apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (e *executor) IvyReduce(reduceCode string, opCode string, opt *ExecOptions)
if v == nil {
return prev
}

if accumulator == nil {
switch val := v.(type) {
case *dataframe.DataFrame:
Expand Down Expand Up @@ -228,7 +229,6 @@ func (e *executor) executeApplyShard(ctx context.Context, qcx *Qcx, index string
if err != nil {
return nil, err
}
defer table.Release()
df, err := dataframe.NewDataFrameFromTable(e.pool, table)
if err != nil {
return nil, err
Expand All @@ -250,6 +250,7 @@ func (e *executor) executeApplyShard(ctx context.Context, qcx *Qcx, index string
if err != nil {
return nil, fmt.Errorf("ivy map error: %w", err)
}

return context.Global("_"), nil
}

Expand Down Expand Up @@ -407,7 +408,11 @@ func (sf *ShardFile) Process(cs *ChangesetRequest) error {
}
fname := sf.dest + sf.executor.TableExtension()
sf.executor.arrowmu.Lock()
delete(sf.executor.arrowCache, fname)
t, ok := sf.executor.arrowCache[fname]
if ok {
t.Release()
delete(sf.executor.arrowCache, fname)
}
sf.executor.arrowmu.Unlock()
return os.Rename(rtemp+sf.executor.TableExtension(), fname)
}
Expand Down
16 changes: 12 additions & 4 deletions arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,14 +435,23 @@ func (e *executor) dataFrameExists(fname string) bool {
return true
}

func (e *executor) dumpCache(msg string) {
e.arrowmu.Lock()
defer e.arrowmu.Unlock()
vprint.VV("dump:%v", msg)
for k, v := range e.arrowCache {
table := v
fname := k
vprint.VV(" cache name:%v numcols:%v numrows:%v", fname, table.NumCols(), table.NumRows())

}
}

func (e *executor) getDataTable(ctx context.Context, fname string) (arrow.Table, error) {
e.arrowmu.Lock()
table, ok := e.arrowCache[fname]
e.arrowmu.Unlock()
if ok {
vprint.VV("returning table from cache name:%v numcols:%v numrows:%v", fname, table.NumCols(), table.NumRows())

table.Retain()
return table, nil
}
// ignoring the passed in allocatorsince where caching
Expand All @@ -458,7 +467,6 @@ func (e *executor) getDataTable(ctx context.Context, fname string) (arrow.Table,
e.arrowmu.Lock()
e.arrowCache[fname] = table
e.arrowmu.Unlock()
vprint.VV("returning new table and cacheing at cache name:%v numcols:%v numrows:%v", fname, table.NumCols(), table.NumRows())
return table, nil
}

Expand Down
1 change: 0 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6587,7 +6587,6 @@ func (e *executor) mapperLocal(ctx context.Context, shards []uint64, mapFn mapFu
}

ch := make(chan mapResponse, len(shards))

expected := 0
shardLoop:
for _, shard := range shards {
Expand Down

0 comments on commit fffcb0b

Please sign in to comment.