Skip to content

Commit

Permalink
apacheGH-34858: [Swift] Initial reader impl (apache#34842)
Browse files Browse the repository at this point in the history
- Initial check in for the swift arrow reader impl
- bug fixes found during reader testing
- class/method access modifier changes (mostly from internal to public)

* Closes: apache#34858

Authored-by: Alva Bandy <abandy@live.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
abandy authored and ArgusLi committed May 15, 2023
1 parent 60349b1 commit a324fdf
Show file tree
Hide file tree
Showing 26 changed files with 3,555 additions and 60 deletions.
24 changes: 24 additions & 0 deletions ci/docker/ubuntu-swift.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

FROM swift:5.7.3

# Install golang
RUN apt-get update -y -q && \
apt-get install -y -q --no-install-recommends \
golang-go && \
apt-get clean
8 changes: 7 additions & 1 deletion ci/scripts/swift_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@

set -ex

source_dir=${1}/swift/Arrow
data_gen_dir=${1}/swift/data-generator/swift-datagen
pushd ${data_gen_dir}
go get -d ./...
go run main.go
cp *.arrow ../../Arrow
popd

source_dir=${1}/swift/Arrow
pushd ${source_dir}
swift test
popd
1 change: 1 addition & 0 deletions dev/release/rat_exclude_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,4 @@ r/tools/nixlibs-allowlist.txt
.gitattributes
ruby/red-arrow/.yardopts
.github/pull_request_template.md
swift/data-generator/swift-datagen/go.sum
5 changes: 4 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,10 @@ services:
# docker-compose build ubuntu-swift
# docker-compose run --rm ubuntu-swift
# Parameters:
image: swift:5.7.3
image: ubuntu-swift
build:
context: .
dockerfile: ci/docker/ubuntu-swift.dockerfile
shm_size: *shm-size
volumes: *ubuntu-volumes
command: >
Expand Down
11 changes: 10 additions & 1 deletion swift/Arrow/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,26 @@ import PackageDescription

let package = Package(
name: "Arrow",
platforms: [
.macOS(.v10_14)
],
products: [
// Products define the executables and libraries a package produces, and make them visible to other packages.
.library(
name: "Arrow",
targets: ["Arrow"]),
],
dependencies: [
.package(url: "https://github.com/google/flatbuffers.git", from: "23.3.3")
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
// Targets can depend on other targets in this package, and on products in packages this package depends on.
.target(
name: "Arrow"),
name: "Arrow",
dependencies: [
.product(name: "FlatBuffers", package: "flatbuffers")
]),
.testTarget(
name: "ArrowTests",
dependencies: ["Arrow"]),
Expand Down
7 changes: 7 additions & 0 deletions swift/Arrow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ An implementation of Arrow targeting Swift.
- Data Types
- Fields
- Schema

## Test data generation

Test data files for the reader tests are generated by an executable built in go whose source is included in the data-generator directory.
```sh
$ go build -o swift-datagen
```
9 changes: 9 additions & 0 deletions swift/Arrow/Sources/Arrow/ArrowBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ public class ArrowBuffer {
self.rawPointer.deallocate()
}

static func createBuffer(_ data: [UInt8], length: UInt) -> ArrowBuffer {
let byteCount = UInt(data.count)
let capacity = alignTo64(byteCount)
let memory = MemoryAllocator(64)
let rawPointer = memory.allocateArray(Int(capacity))
rawPointer.copyMemory(from: data, byteCount: data.count)
return ArrowBuffer(length: length, capacity: capacity, rawPointer: rawPointer)
}

static func createBuffer(_ length: UInt, size: UInt, doAlign: Bool = true) -> ArrowBuffer {
let actualLen = max(length, ArrowBuffer.min_length)
let byteCount = size * actualLen
Expand Down
8 changes: 6 additions & 2 deletions swift/Arrow/Sources/Arrow/ArrowBufferBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@ public class FixedBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder {
}

if let val = newValue {
BitUtility.setBit(index + self.offset, buffer: self.nulls)
self.values.rawPointer.advanced(by: byteIndex).storeBytes(of: val, as: T.self)
} else {
self.nullCount += 1
BitUtility.setBit(index + self.offset, buffer: self.nulls)
BitUtility.clearBit(index + self.offset, buffer: self.nulls)
self.values.rawPointer.advanced(by: byteIndex).storeBytes(of: defaultVal, as: T.self)
}
}
Expand Down Expand Up @@ -150,6 +151,7 @@ public class BoolBufferBuilder: BaseBufferBuilder<Bool>, ArrowBufferBuilder {
}

if newValue != nil {
BitUtility.setBit(index + self.offset, buffer: self.nulls)
if newValue == true {
BitUtility.setBit(index + self.offset, buffer: self.values)
} else {
Expand All @@ -158,7 +160,7 @@ public class BoolBufferBuilder: BaseBufferBuilder<Bool>, ArrowBufferBuilder {

} else {
self.nullCount += 1
BitUtility.setBit(index + self.offset, buffer: self.nulls)
BitUtility.clearBit(index + self.offset, buffer: self.nulls)
BitUtility.clearBit(index + self.offset, buffer: self.values)
}
}
Expand Down Expand Up @@ -225,6 +227,8 @@ public class VariableBufferBuilder<T>: BaseBufferBuilder<T>, ArrowBufferBuilder

if isNull {
self.nullCount += 1
BitUtility.clearBit(index + self.offset, buffer: self.nulls)
} else {
BitUtility.setBit(index + self.offset, buffer: self.nulls)
}

Expand Down
14 changes: 7 additions & 7 deletions swift/Arrow/Sources/Arrow/ArrowData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import Foundation

public class ArrowData {
let type: ArrowType.Info
let buffers: [ArrowBuffer]
let nullCount: UInt
let length: UInt
let stride: Int
public let type: ArrowType.Info
public let buffers: [ArrowBuffer]
public let nullCount: UInt
public let length: UInt
public let stride: Int

init(_ type: ArrowType.Info, buffers: [ArrowBuffer], nullCount: UInt, stride: Int) throws {
switch(type) {
Expand All @@ -43,8 +43,8 @@ public class ArrowData {
self.stride = stride
}

func isNull(_ at: UInt) -> Bool {
public func isNull(_ at: UInt) -> Bool {
let nullBuffer = buffers[0];
return nullBuffer.length == 0 || BitUtility.isSet(at, buffer: nullBuffer)
return nullBuffer.length > 0 && !BitUtility.isSet(at, buffer: nullBuffer)
}
}
163 changes: 163 additions & 0 deletions swift/Arrow/Sources/Arrow/ArrowReader.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import FlatBuffers
import Foundation

public enum ArrowError: Error {
case runtimeError(String)
}

let FILEMARKER = "ARROW1"
let CONTINUATIONMARKER = -1

public class ArrowReader {
private struct DataLoadInfo {
let recordBatch: org_apache_arrow_flatbuf_RecordBatch
let field: org_apache_arrow_flatbuf_Field
let nodeIndex: Int32
let bufferIndex: Int32
let fileData: Data
let messageOffset: Int64
}

private func loadSchema(_ schema: org_apache_arrow_flatbuf_Schema) throws -> ArrowSchema {
let builder = ArrowSchema.Builder()
for index in 0 ..< schema.fieldsCount {
let field = schema.fields(at: index)!
let arrowField = ArrowField(field.name!, type: findArrowType(field), isNullable: field.nullable)
builder.addField(arrowField)
if field.typeType == .struct_ {
throw ValidationError.unknownType
}
}

return builder.finish()
}

private func loadPrimitiveData(_ loadInfo: DataLoadInfo) throws -> ChunkedArrayHolder {
let node = loadInfo.recordBatch.nodes(at: loadInfo.nodeIndex)!
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex)
let nullBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex)!
let arrowNullBuffer = makeBuffer(nullBuffer, fileData: loadInfo.fileData,
length: UInt(node.nullCount), messageOffset: loadInfo.messageOffset)
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex + 1)
let valueBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex + 1)!
let arrowValueBuffer = makeBuffer(valueBuffer, fileData: loadInfo.fileData,
length: UInt(node.length), messageOffset: loadInfo.messageOffset)
return try makeArrayHolder(loadInfo.field, buffers: [arrowNullBuffer, arrowValueBuffer])
}

private func loadVariableData(_ loadInfo: DataLoadInfo) throws -> ChunkedArrayHolder {
let node = loadInfo.recordBatch.nodes(at: loadInfo.nodeIndex)!
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex)
let nullBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex)!
let arrowNullBuffer = makeBuffer(nullBuffer, fileData: loadInfo.fileData,
length: UInt(node.nullCount), messageOffset: loadInfo.messageOffset)
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex + 1)
let offsetBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex + 1)!
let arrowOffsetBuffer = makeBuffer(offsetBuffer, fileData: loadInfo.fileData,
length: UInt(node.length), messageOffset: loadInfo.messageOffset)
try validateBufferIndex(loadInfo.recordBatch, index: loadInfo.bufferIndex + 2)
let valueBuffer = loadInfo.recordBatch.buffers(at: loadInfo.bufferIndex + 2)!
let arrowValueBuffer = makeBuffer(valueBuffer, fileData: loadInfo.fileData,
length: UInt(node.length), messageOffset: loadInfo.messageOffset)
return try makeArrayHolder(loadInfo.field, buffers: [arrowNullBuffer, arrowOffsetBuffer, arrowValueBuffer])
}

private func loadRecordBatch(_ message: org_apache_arrow_flatbuf_Message, schema: org_apache_arrow_flatbuf_Schema,
data: Data, messageEndOffset: Int64) throws -> RecordBatch {
let recordBatch = message.header(type: org_apache_arrow_flatbuf_RecordBatch.self)
let nodesCount = recordBatch?.nodesCount ?? 0
var bufferIndex: Int32 = 0
var columns: [ChunkedArrayHolder] = []
let arrowSchema = try loadSchema(schema)
for nodeIndex in 0 ..< nodesCount {
let field = schema.fields(at: nodeIndex)!
let loadInfo = DataLoadInfo(recordBatch: recordBatch!, field: field,
nodeIndex: nodeIndex, bufferIndex: bufferIndex,
fileData: data, messageOffset: messageEndOffset)
if isFixedPrimitive(field.typeType) {
let holder = try loadPrimitiveData(loadInfo)
columns.append(holder)
bufferIndex += 2
} else {
let holder = try loadVariableData(loadInfo)
columns.append(holder)
bufferIndex += 3
}
}

return RecordBatch(arrowSchema, columns: columns)
}

public func fromStream(_ fileData: Data) throws -> [RecordBatch] {
let footerLength = fileData.withUnsafeBytes { rawBuffer in
rawBuffer.loadUnaligned(fromByteOffset: fileData.count - 4, as: Int32.self)
}

var recordBatchs: [RecordBatch] = []
let footerStartOffset = fileData.count - Int(footerLength + 4)
let footerData = fileData[footerStartOffset...]
let footerBuffer = ByteBuffer(data: footerData)
let footer = org_apache_arrow_flatbuf_Footer.getRootAsFooter(bb: footerBuffer)
for index in 0 ..< footer.recordBatchesCount {
let recordBatch = footer.recordBatches(at: index)!
var messageLength = fileData.withUnsafeBytes { rawBuffer in
rawBuffer.loadUnaligned(fromByteOffset: Int(recordBatch.offset), as: Int32.self)
}

var messageOffset: Int64 = 1
if messageLength == CONTINUATIONMARKER {
messageOffset += 1
messageLength = fileData.withUnsafeBytes { rawBuffer in
rawBuffer.loadUnaligned(
fromByteOffset: Int(recordBatch.offset + Int64(MemoryLayout<Int32>.size)),
as: Int32.self)
}
}

let messageStartOffset = recordBatch.offset + (Int64(MemoryLayout<Int32>.size) * messageOffset)
let messageEndOffset = messageStartOffset + Int64(messageLength)
let recordBatchData = fileData[messageStartOffset ... messageEndOffset]
let mbb = ByteBuffer(data: recordBatchData)
let message = org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: mbb)
switch message.headerType {
case .recordbatch:
let recordBatch = try loadRecordBatch(message, schema: footer.schema!,
data: fileData, messageEndOffset: messageEndOffset)
recordBatchs.append(recordBatch)
default:
print("Unhandled header type: \(message.headerType)")
}
}

return recordBatchs
}

public func fromFile(_ fileURL: URL) throws -> [RecordBatch] {
let fileData = try Data(contentsOf: fileURL)
if !validateFileData(fileData) {
throw ArrowError.runtimeError("Not a valid arrow file.")
}

let markerLength = FILEMARKER.utf8.count
let footerLengthEnd = Int(fileData.count - markerLength)
let data = fileData[..<(footerLengthEnd)]
return try fromStream(data)
}
}

0 comments on commit a324fdf

Please sign in to comment.