Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Sources/SparkConnect/Catalog.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public actor Catalog: Sendable {
catalog.tableExists = tableExists
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect()[0].getAsBool(0)
}

/// Check if the table or view with the specified name exists. This can either be a temporary
Expand All @@ -270,7 +270,7 @@ public actor Catalog: Sendable {
catalog.tableExists = tableExists
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect()[0].getAsBool(0)
}

/// Check if the function with the specified name exists. This can either be a temporary function
Expand All @@ -287,7 +287,7 @@ public actor Catalog: Sendable {
catalog.functionExists = functionExists
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect()[0].getAsBool(0)
}

/// Check if the function with the specified name exists in the specified database under the Hive
Expand All @@ -305,7 +305,7 @@ public actor Catalog: Sendable {
catalog.functionExists = functionExists
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect()[0].getAsBool(0)
}

/// Caches the specified table in-memory.
Expand Down Expand Up @@ -338,7 +338,7 @@ public actor Catalog: Sendable {
catalog.isCached = isCached
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect()[0].getAsBool(0)
}

/// Invalidates and refreshes all the cached data and metadata of the given table.
Expand Down Expand Up @@ -407,7 +407,7 @@ public actor Catalog: Sendable {
catalog.dropTempView = dropTempView
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect().first!.getAsBool(0)
}

/// Drops the global temporary view with the given view name in the catalog. If the view has been
Expand All @@ -423,6 +423,6 @@ public actor Catalog: Sendable {
catalog.dropGlobalTempView = dropGlobalTempView
return catalog
})
return "true" == (try await df.collect().first!.get(0) as! String)
return try await df.collect()[0].getAsBool(0)
}
}
30 changes: 25 additions & 5 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,34 @@ public actor DataFrame: Sendable {
for i in 0..<batch.length {
var values: [Sendable?] = []
for column in batch.columns {
let str = column.array as! AsString
if column.data.isNull(i) {
values.append(nil)
} else if column.data.type.info == ArrowType.ArrowBinary {
let binary = str.asString(i).utf8.map { String(format: "%02x", $0) }.joined(separator: " ")
values.append("[\(binary)]")
} else {
values.append(str.asString(i))
let array = column.array
switch column.data.type.info {
case .primitiveInfo(.boolean):
values.append(array.asAny(i) as? Bool)
case .primitiveInfo(.int8):
values.append(array.asAny(i) as? Int8)
case .primitiveInfo(.int16):
values.append(array.asAny(i) as? Int16)
case .primitiveInfo(.int32):
values.append(array.asAny(i) as? Int32)
case .primitiveInfo(.int64):
values.append(array.asAny(i) as! Int64)
case .primitiveInfo(.float):
values.append(array.asAny(i) as? Float)
case .primitiveInfo(.double):
values.append(array.asAny(i) as? Double)
case .primitiveInfo(.date32):
values.append(array.asAny(i) as! Date)
case ArrowType.ArrowBinary:
values.append((array as! AsString).asString(i).utf8)
case .complexInfo(.strct):
values.append((array as! AsString).asString(i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support nested types for now, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all complex types are still under development.

default:
values.append(array.asAny(i) as? String)
}
}
}
result.append(Row(valueArray: values))
Expand Down
16 changes: 6 additions & 10 deletions Sources/SparkConnect/Row.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public struct Row: Sendable, Equatable {
return values[i]
}

public func getAsBool(_ i: Int) throws -> Bool {
return try get(i) as! Bool
}

public static func == (lhs: Row, rhs: Row) -> Bool {
if lhs.values.count != rhs.values.count {
return false
Expand All @@ -59,16 +63,8 @@ public struct Row: Sendable, Equatable {
return true
} else if let a = x as? Bool, let b = y as? Bool {
return a == b
} else if let a = x as? Int, let b = y as? Int {
return a == b
} else if let a = x as? Int8, let b = y as? Int8 {
return a == b
} else if let a = x as? Int16, let b = y as? Int16 {
return a == b
} else if let a = x as? Int32, let b = y as? Int32 {
return a == b
} else if let a = x as? Int64, let b = y as? Int64 {
return a == b
} else if let a = x as? any FixedWidthInteger, let b = y as? any FixedWidthInteger {
return Int64(a) == Int64(b)
} else if let a = x as? Float, let b = y as? Float {
return a == b
} else if let a = x as? Double, let b = y as? Double {
Expand Down
Loading
Loading