Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 27, 2023
1 parent e038498 commit f0b7413
Show file tree
Hide file tree
Showing 16 changed files with 951 additions and 649 deletions.
9 changes: 7 additions & 2 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]
# For C# C Data Interface testing
pip install pythonnet

# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1

# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
Expand All @@ -31,8 +36,8 @@ time archery integration \
--run-flight \
--with-cpp=1 \
--with-csharp=1 \
--with-java=1 \
--with-js=1 \
--with-java=0 \
--with-js=0 \
--with-go=1 \
--gold-dirs=$gold_dir/0.14.1 \
--gold-dirs=$gold_dir/0.17.1 \
Expand Down
3 changes: 2 additions & 1 deletion csharp/src/Apache.Arrow/ArrowBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public void Dispose()

internal bool TryExport(ExportedAllocationOwner newOwner, out IntPtr ptr)
{
if (_memoryOwner == null && IsEmpty)
if (IsEmpty)
{
// _memoryOwner could be anything (for example null or a NullMemoryOwner), but it doesn't matter here
ptr = IntPtr.Zero;
return true;
}
Expand Down
15 changes: 12 additions & 3 deletions csharp/src/Apache.Arrow/C/CArrowArray.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;

namespace Apache.Arrow.C
Expand Down Expand Up @@ -68,16 +69,24 @@ public unsafe struct CArrowArray
/// </remarks>
public static void Free(CArrowArray* array)
{
if (array->release != default)
{
CallReleaseFunc(array);
Marshal.FreeHGlobal((IntPtr)array);
}

/// <summary>
/// Call the array's release func, if set.
/// </summary>
public static void CallReleaseFunc(CArrowArray* array) {
if (array->release != default) {
// Call release if not already called.
#if NET5_0_OR_GREATER
array->release(array);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayExporter.ReleaseArrowArray>(array->release)(array);
#endif
Debug.Assert(array->release == default,
"Calling the CArrowArray release func should have set it to NULL");
}
Marshal.FreeHGlobal((IntPtr)array);
}
}
}
52 changes: 35 additions & 17 deletions csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Apache.Arrow.Memory;
Expand Down Expand Up @@ -59,8 +60,6 @@ public static unsafe void ExportArray(IArrowArray array, CArrowArray* cArray)
try
{
ConvertArray(allocationOwner, array.Data, cArray);
cArray->release = ReleaseArrayPtr;
cArray->private_data = FromDisposable(allocationOwner);
allocationOwner = null;
}
finally
Expand Down Expand Up @@ -102,8 +101,6 @@ public static unsafe void ExportRecordBatch(RecordBatch batch, CArrowArray* cArr
try
{
ConvertRecordBatch(allocationOwner, batch, cArray);
cArray->release = ReleaseArrayPtr;
cArray->private_data = FromDisposable(allocationOwner);
allocationOwner = null;
}
finally
Expand All @@ -118,7 +115,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr
cArray->offset = array.Offset;
cArray->null_count = array.NullCount;
cArray->release = ReleaseArrayPtr;
cArray->private_data = null;
cArray->private_data = MakePrivateData(sharedOwner);

cArray->n_buffers = array.Buffers?.Length ?? 0;
cArray->buffers = null;
Expand All @@ -131,7 +128,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr
IntPtr ptr;
if (!buffer.TryExport(sharedOwner, out ptr))
{
throw new NotSupportedException($"An ArrowArray of type {array.DataType.TypeId} could not be exported");
throw new NotSupportedException($"An ArrowArray of type {array.DataType.TypeId} could not be exported: failed on buffer #{i}");
}
cArray->buffers[i] = (byte*)ptr;
}
Expand All @@ -144,15 +141,15 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr
cArray->children = (CArrowArray**)sharedOwner.Allocate(IntPtr.Size * array.Children.Length);
for (int i = 0; i < array.Children.Length; i++)
{
cArray->children[i] = CArrowArray.Create();
cArray->children[i] = MakeArray(sharedOwner);
ConvertArray(sharedOwner, array.Children[i], cArray->children[i]);
}
}

cArray->dictionary = null;
if (array.Dictionary != null)
{
cArray->dictionary = CArrowArray.Create();
cArray->dictionary = MakeArray(sharedOwner);
ConvertArray(sharedOwner, array.Dictionary, cArray->dictionary);
}
}
Expand All @@ -163,20 +160,24 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
cArray->offset = 0;
cArray->null_count = 0;
cArray->release = ReleaseArrayPtr;
cArray->private_data = null;
cArray->private_data = MakePrivateData(sharedOwner);

cArray->n_buffers = 1;
cArray->buffers = (byte**)sharedOwner.Allocate(IntPtr.Size);

cArray->n_children = batch.ColumnCount;
cArray->children = null;
// XXX sharing the same ExportedAllocationOwner for all columns
// and child arrays makes memory tracking inflexible.
// If the consumer keeps only a single record batch column,
// the entire record batch memory is nevertheless kept alive.
if (cArray->n_children > 0)
{
cArray->children = (CArrowArray**)sharedOwner.Allocate(IntPtr.Size * batch.ColumnCount);
int i = 0;
foreach (IArrowArray child in batch.Arrays)
{
cArray->children[i] = CArrowArray.Create();
cArray->children[i] = MakeArray(sharedOwner);
ConvertArray(sharedOwner, child.Data, cArray->children[i]);
i++;
}
Expand All @@ -190,26 +191,43 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
#endif
private unsafe static void ReleaseArray(CArrowArray* cArray)
{
Dispose(&cArray->private_data);
for (long i = 0; i < cArray->n_children; i++)
{
CArrowArray.CallReleaseFunc(cArray->children[i]);
}
if (cArray->dictionary != null) {
CArrowArray.CallReleaseFunc(cArray->dictionary);
}
DisposePrivateData(&cArray->private_data);
cArray->release = default;
}

private unsafe static void* FromDisposable(IDisposable disposable)
private unsafe static CArrowArray* MakeArray(ExportedAllocationOwner sharedOwner)
{
var array = (CArrowArray*)sharedOwner.Allocate(sizeof(CArrowArray));
*array = default;
return array;
}

private unsafe static void* MakePrivateData(ExportedAllocationOwner sharedOwner)
{
GCHandle gch = GCHandle.Alloc(disposable);
GCHandle gch = GCHandle.Alloc(sharedOwner);
sharedOwner.IncRef();
return (void*)GCHandle.ToIntPtr(gch);
}

private unsafe static void Dispose(void** ptr)
private unsafe static void DisposePrivateData(void** ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)(*ptr));
GCHandle gch = GCHandle.FromIntPtr((IntPtr) (*ptr));
if (!gch.IsAllocated)
{
return;
}
((IDisposable)gch.Target).Dispose();
// We can't call IDisposable.Dispose() here as we create multiple
// GCHandles to the same object. Instead, refcounting ensures
// timely memory deallocation when all GCHandles are freed.
((ExportedAllocationOwner) gch.Target).DecRef();
gch.Free();
*ptr = null;
}
}
}
9 changes: 6 additions & 3 deletions csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static class CArrowSchemaImporter
/// Typically, you will allocate an uninitialized CArrowSchema pointer,
/// pass that to external function, and then use this method to import
/// the result.
///
///
/// <code>
/// CArrowSchema* importedPtr = CArrowSchema.Create();
/// foreign_export_function(importedPtr);
Expand All @@ -62,7 +62,7 @@ public static unsafe ArrowType ImportType(CArrowSchema* ptr)
/// Typically, you will allocate an uninitialized CArrowSchema pointer,
/// pass that to external function, and then use this method to import
/// the result.
///
///
/// <code>
/// CArrowSchema* importedPtr = CArrowSchema.Create();
/// foreign_export_function(importedPtr);
Expand All @@ -87,7 +87,7 @@ public static unsafe Field ImportField(CArrowSchema* ptr)
/// Typically, you will allocate an uninitialized CArrowSchema pointer,
/// pass that to external function, and then use this method to import
/// the result.
///
///
/// <code>
/// CArrowSchema* importedPtr = CArrowSchema.Create();
/// foreign_export_function(importedPtr);
Expand Down Expand Up @@ -241,6 +241,9 @@ public ArrowType GetAsType()
};

string timezone = format.Substring(format.IndexOf(':') + 1);
if (timezone.Length == 0) {
timezone = null;
}
return new TimestampType(timeUnit, timezone);
}

Expand Down
20 changes: 20 additions & 0 deletions csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
// limitations under the License.

using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;

namespace Apache.Arrow.Memory
{
internal sealed class ExportedAllocationOwner : INativeAllocationOwner, IDisposable
{
private readonly List<IntPtr> _pointers = new List<IntPtr>();
private int _allocationSize;
private long _referenceCount;
private bool _disposed;

~ExportedAllocationOwner()
{
Expand All @@ -47,8 +51,23 @@ public void Release(IntPtr ptr, int offset, int length)
throw new InvalidOperationException();
}

public void IncRef()
{
Interlocked.Increment(ref _referenceCount);
}

public void DecRef()
{
if (Interlocked.Decrement(ref _referenceCount) == 0) {
Dispose();
}
}

public void Dispose()
{
if (_disposed) {
return;
}
for (int i = 0; i < _pointers.Count; i++)
{
if (_pointers[i] != IntPtr.Zero)
Expand All @@ -59,6 +78,7 @@ public void Dispose()
}
GC.RemoveMemoryPressure(_allocationSize);
GC.SuppressFinalize(this);
_disposed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TargetFrameworks>net7.0</TargetFrameworks>
</PropertyGroup>

Expand All @@ -13,4 +14,4 @@
<ProjectReference Include="..\Apache.Arrow.Tests\Apache.Arrow.Tests.csproj" />
</ItemGroup>

</Project>
</Project>
79 changes: 79 additions & 0 deletions csharp/test/Apache.Arrow.IntegrationTest/CDataInterface.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using Apache.Arrow.C;
using Apache.Arrow.Arrays;
using Apache.Arrow.Types;

namespace Apache.Arrow.IntegrationTest
{
/// <summary>
/// Bridge for C Data Interface integration testing.
/// These methods are called from the Python integration testing
/// harness provided by Archery.
/// </summary>
public static class CDataInterface
{
// Archery uses the `pythonnet` library (*) to invoke .Net DLLs.
// `pythonnet` is only able to marshal simple types such as int and
// str, which is why we provide trivial wrappers around other APIs.
//
// (*) https://pythonnet.github.io/

public static void Initialize()
{
// Allow debugging using Debug.WriteLine()
Trace.Listeners.Add(new ConsoleTraceListener());
}

public static unsafe Schema ImportSchema(long ptr)
{
return CArrowSchemaImporter.ImportSchema((CArrowSchema*) ptr);
}

public static unsafe void ExportSchema(Schema schema, long ptr)
{
CArrowSchemaExporter.ExportSchema(schema, (CArrowSchema*) ptr);
}

public static unsafe RecordBatch ImportRecordBatch(long ptr, Schema schema)
{
return CArrowArrayImporter.ImportRecordBatch((CArrowArray*) ptr, schema);
}

public static unsafe void ExportRecordBatch(RecordBatch batch, long ptr)
{
CArrowArrayExporter.ExportRecordBatch(batch, (CArrowArray*) ptr);
}

public static JsonFile ParseJsonFile(String jsonPath)
{
return JsonFile.Parse(new FileInfo(jsonPath));
}

public static long GetAllocatedBytes()
{
GC.Collect();
// XXX this doesn't seem to give stable and reliable measurements
var gcInfo = GC.GetGCMemoryInfo();
return gcInfo.PromotedBytes;
}
}
}
Loading

0 comments on commit f0b7413

Please sign in to comment.