Skip to content

Commit

Permalink
GH-35988: [C#] The C data interface implementation can leak on import (
Browse files Browse the repository at this point in the history
…#35996)

### What changes are included in this PR?

To ensure proper cleanup, immediately copies the contents of the C structure into the imported class for arrays and streams.
Relaxes the requirement when exporting that the target structure appear uninitialized.

### Are these changes tested?

Existing tests pass. We don't as yet seem to have a good way to test for memory leaks so no new tests have been added.

### Are there any user-facing changes?

No.

* Closes: #35988

Authored-by: Curt Hagenlocher <curt@hagenlocher.org>
Signed-off-by: Eric Erhardt <eric.erhardt@microsoft.com>
  • Loading branch information
CurtHagenlocher authored Jun 14, 2023
1 parent 4a53764 commit 1e2dfce
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 53 deletions.
4 changes: 0 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ public static unsafe void ExportArray(IArrowArray array, CArrowArray* cArray)
{
throw new ArgumentNullException(nameof(cArray));
}
if (cArray->release != null)
{
throw new ArgumentException("Cannot export array to a struct that is already initialized.", nameof(cArray));
}

ExportedAllocationOwner allocationOwner = new ExportedAllocationOwner();
try
Expand Down
31 changes: 22 additions & 9 deletions csharp/src/Apache.Arrow/C/CArrowArrayImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public static class CArrowArrayImporter
/// IArrowArray importedArray = CArrowArrayImporter.ImportArray(importedPtr);
/// </code>
/// </examples>
/// <param name="ptr">The pointer to the array being imported</param>
/// <param name="type">The type of the array being imported</param>
/// <returns>The imported C# array</returns>
public static unsafe IArrowArray ImportArray(CArrowArray* ptr, IArrowType type)
{
ImportedArrowArray importedArray = null;
Expand Down Expand Up @@ -74,6 +77,9 @@ public static unsafe IArrowArray ImportArray(CArrowArray* ptr, IArrowType type)
/// RecordBatch batch = CArrowArrayImporter.ImportRecordBatch(importedPtr, schema);
/// </code>
/// </examples>
/// <param name="ptr">The pointer to the record batch being imported</param>
/// <param name="schema">The schema type of the record batch being imported</param>
/// <returns>The imported C# record batch</returns>
public static unsafe RecordBatch ImportRecordBatch(CArrowArray* ptr, Schema schema)
{
ImportedArrowArray importedArray = null;
Expand All @@ -90,42 +96,49 @@ public static unsafe RecordBatch ImportRecordBatch(CArrowArray* ptr, Schema sche

private sealed unsafe class ImportedArrowArray : ImportedAllocationOwner
{
private readonly CArrowArray* _cArray;
private readonly CArrowArray _cArray;

public ImportedArrowArray(CArrowArray* cArray)
{
if (cArray == null)
{
throw new ArgumentNullException(nameof(cArray));
}
_cArray = cArray;
if (_cArray->release == null)
if (cArray->release == null)
{
throw new ArgumentException("Tried to import an array that has already been released.", nameof(cArray));
}
_cArray = *cArray;
cArray->release = null;
}

protected override void FinalRelease()
{
if (_cArray->release != null)
if (_cArray.release != null)
{
_cArray->release(_cArray);
fixed (CArrowArray* cArray = &_cArray)
{
cArray->release(cArray);
}
}
}

public IArrowArray GetAsArray(IArrowType type)
{
return ArrowArrayFactory.BuildArray(GetAsArrayData(_cArray, type));
fixed (CArrowArray* cArray = &_cArray)
{
return ArrowArrayFactory.BuildArray(GetAsArrayData(cArray, type));
}
}

public RecordBatch GetAsRecordBatch(Schema schema)
{
IArrowArray[] arrays = new IArrowArray[schema.FieldsList.Count];
for (int i = 0; i < _cArray->n_children; i++)
for (int i = 0; i < _cArray.n_children; i++)
{
arrays[i] = ArrowArrayFactory.BuildArray(GetAsArrayData(_cArray->children[i], schema.FieldsList[i].DataType));
arrays[i] = ArrowArrayFactory.BuildArray(GetAsArrayData(_cArray.children[i], schema.FieldsList[i].DataType));
}
return new RecordBatch(schema, arrays, checked((int)_cArray->length));
return new RecordBatch(schema, arrays, checked((int)_cArray.length));
}

private ArrayData GetAsArrayData(CArrowArray* cArray, IArrowType type)
Expand Down
4 changes: 0 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ public static unsafe void ExportArrayStream(IArrowArrayStream arrayStream, CArro
{
throw new ArgumentNullException(nameof(arrayStream));
}
if (cArrayStream->release != null)
{
throw new ArgumentException("Cannot export array to a struct that is already initialized.", nameof(cArrayStream));
}

cArrayStream->private_data = ExportedArrayStream.Export(arrayStream);
cArrayStream->get_schema = (delegate* unmanaged[Stdcall]<CArrowArrayStream*, CArrowSchema*, int>)s_getSchemaArrayStream.Pointer;
Expand Down
54 changes: 22 additions & 32 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ public static class CArrowArrayStreamImporter
/// IArrowArrayStream importedStream = CArrowArrayStreamImporter.ImportStream(importedPtr);
/// </code>
/// </examples>
/// <param name="ptr">The pointer to the stream being imported</param>
/// <returns>The imported C# array stream</returns>
public static unsafe IArrowArrayStream ImportArrayStream(CArrowArrayStream* ptr)
{
return new ImportedArrowArrayStream(ptr);
}

private sealed unsafe class ImportedArrowArrayStream : IArrowArrayStream
{
private readonly CArrowArrayStream* _cArrayStream;
private readonly CArrowArrayStream _cArrayStream;
private readonly Schema _schema;
private bool _disposed;

Expand All @@ -59,29 +61,21 @@ public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
{
throw new ArgumentNullException(nameof(cArrayStream));
}
_cArrayStream = cArrayStream;
if (_cArrayStream->release == null)
if (cArrayStream->release == null)
{
throw new ArgumentException("Tried to import an array stream that has already been released.", nameof(cArrayStream));
}

CArrowSchema* cSchema = CArrowSchema.Create();
try
CArrowSchema cSchema = new CArrowSchema();
int errno = cArrayStream->get_schema(cArrayStream, &cSchema);
if (errno != 0)
{
int errno = _cArrayStream->get_schema(_cArrayStream, cSchema);
if (errno != 0)
{
throw new Exception($"Unexpected error recieved from external stream. Errno: {errno}");
}
_schema = CArrowSchemaImporter.ImportSchema(cSchema);
}
finally
{
if (_schema == null)
{
CArrowSchema.Free(cSchema);
}
throw new Exception($"Unexpected error recieved from external stream. Errno: {errno}");
}
_schema = CArrowSchemaImporter.ImportSchema(&cSchema);

_cArrayStream = *cArrayStream;
cArrayStream->release = null;
}

~ImportedArrowArrayStream()
Expand All @@ -99,24 +93,17 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell
}

RecordBatch result = null;
CArrowArray* cArray = CArrowArray.Create();
try
CArrowArray cArray = new CArrowArray();
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
int errno = _cArrayStream->get_next(_cArrayStream, cArray);
int errno = cArrayStream->get_next(cArrayStream, &cArray);
if (errno != 0)
{
throw new Exception($"Unexpected error recieved from external stream. Errno: {errno}");
}
if (cArray->release != null)
if (cArray.release != null)
{
result = CArrowArrayImporter.ImportRecordBatch(cArray, _schema);
}
}
finally
{
if (result == null)
{
CArrowArray.Free(cArray);
result = CArrowArrayImporter.ImportRecordBatch(&cArray, _schema);
}
}

Expand All @@ -125,10 +112,13 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell

public void Dispose()
{
if (!_disposed && _cArrayStream->release != null)
if (!_disposed && _cArrayStream.release != null)
{
_disposed = true;
_cArrayStream->release(_cArrayStream);
fixed (CArrowArrayStream * cArrayStream = &_cArrayStream)
{
cArrayStream->release(cArrayStream);
}
}
GC.SuppressFinalize(this);
}
Expand Down
4 changes: 0 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowSchemaExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public static unsafe void ExportType(IArrowType datatype, CArrowSchema* schema)
{
throw new ArgumentNullException(nameof(schema));
}
if (schema->release != null)
{
throw new ArgumentException("Cannot export schema to a struct that is already initialized.");
}

schema->format = StringUtil.ToCStringUtf8(GetFormat(datatype));
schema->name = null;
Expand Down
1 change: 1 addition & 0 deletions csharp/test/Apache.Arrow.Tests/CDataInterfaceDataTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public unsafe void CallsReleaseForInvalid()
CArrowArrayImporter.ImportArray(cArray, GetTestArray().Data.DataType);
});
Assert.True(wasCalled);

CArrowArray.Free(cArray);

GC.KeepAlive(releaseCallback);
Expand Down
5 changes: 5 additions & 0 deletions csharp/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ public unsafe void ImportArray()
Assert.Null(importedArray.GetString(2));
Assert.Equal("foo", importedArray.GetString(3));
Assert.Equal("bar", importedArray.GetString(4));

CArrowArray.Free(cArray);
}

[SkippableFact]
Expand Down Expand Up @@ -488,6 +490,7 @@ public unsafe void ImportRecordBatch()

Schema schema = CArrowSchemaImporter.ImportSchema(cSchema);
RecordBatch recordBatch = CArrowArrayImporter.ImportRecordBatch(cArray, schema);
CArrowArray.Free(cArray);

Assert.Equal(5, recordBatch.Length);

Expand Down Expand Up @@ -563,6 +566,8 @@ public unsafe void ImportArrayStream()
}

IArrowArrayStream stream = CArrowArrayStreamImporter.ImportArrayStream(cArrayStream);
CArrowArrayStream.Free(cArrayStream);

var batch1 = stream.ReadNextRecordBatchAsync().Result;
Assert.Equal(5, batch1.Length);

Expand Down

0 comments on commit 1e2dfce

Please sign in to comment.