Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,7 @@ public actor SparkConnectClient {
defineFlow.dataflowGraphID = dataflowGraphID
defineFlow.flowName = flowName
defineFlow.targetDatasetName = targetDatasetName
defineFlow.plan = relation
defineFlow.relation = relation

var pipelineCommand = Spark_Connect_PipelineCommand()
pipelineCommand.commandType = .defineFlow(defineFlow)
Expand Down
291 changes: 288 additions & 3 deletions Sources/SparkConnect/base.pb.swift

Large diffs are not rendered by default.

261 changes: 259 additions & 2 deletions Sources/SparkConnect/expressions.pb.swift

Large diffs are not rendered by default.

84 changes: 83 additions & 1 deletion Sources/SparkConnect/ml.pb.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ struct Spark_Connect_MlCommand: Sendable {
set {command = .createSummary(newValue)}
}

var getModelSize: Spark_Connect_MlCommand.GetModelSize {
get {
if case .getModelSize(let v)? = command {return v}
return Spark_Connect_MlCommand.GetModelSize()
}
set {command = .getModelSize(newValue)}
}

var unknownFields = SwiftProtobuf.UnknownStorage()

enum OneOf_Command: Equatable, Sendable {
Expand All @@ -128,6 +136,7 @@ struct Spark_Connect_MlCommand: Sendable {
case cleanCache(Spark_Connect_MlCommand.CleanCache)
case getCacheInfo(Spark_Connect_MlCommand.GetCacheInfo)
case createSummary(Spark_Connect_MlCommand.CreateSummary)
case getModelSize(Spark_Connect_MlCommand.GetModelSize)

}

Expand Down Expand Up @@ -399,6 +408,28 @@ struct Spark_Connect_MlCommand: Sendable {
fileprivate var _dataset: Spark_Connect_Relation? = nil
}

/// This is for query the model estimated in-memory size
struct GetModelSize: Sendable {
// SwiftProtobuf.Message conformance is added in an extension below. See the
// `Message` and `Message+*Additions` files in the SwiftProtobuf library for
// methods supported on all messages.

var modelRef: Spark_Connect_ObjectRef {
get {return _modelRef ?? Spark_Connect_ObjectRef()}
set {_modelRef = newValue}
}
/// Returns true if `modelRef` has been explicitly set.
var hasModelRef: Bool {return self._modelRef != nil}
/// Clears the value of `modelRef`. Subsequent reads from it will return its default value.
mutating func clearModelRef() {self._modelRef = nil}

var unknownFields = SwiftProtobuf.UnknownStorage()

init() {}

fileprivate var _modelRef: Spark_Connect_ObjectRef? = nil
}

init() {}
}

Expand Down Expand Up @@ -532,7 +563,7 @@ fileprivate let _protobuf_package = "spark.connect"

extension Spark_Connect_MlCommand: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = _protobuf_package + ".MlCommand"
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{1}fit\0\u{1}fetch\0\u{1}delete\0\u{1}write\0\u{1}read\0\u{1}evaluate\0\u{3}clean_cache\0\u{3}get_cache_info\0\u{3}create_summary\0")
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{1}fit\0\u{1}fetch\0\u{1}delete\0\u{1}write\0\u{1}read\0\u{1}evaluate\0\u{3}clean_cache\0\u{3}get_cache_info\0\u{3}create_summary\0\u{3}get_model_size\0")

mutating func decodeMessage<D: SwiftProtobuf.Decoder>(decoder: inout D) throws {
while let fieldNumber = try decoder.nextFieldNumber() {
Expand Down Expand Up @@ -657,6 +688,19 @@ extension Spark_Connect_MlCommand: SwiftProtobuf.Message, SwiftProtobuf._Message
self.command = .createSummary(v)
}
}()
case 10: try {
var v: Spark_Connect_MlCommand.GetModelSize?
var hadOneofValue = false
if let current = self.command {
hadOneofValue = true
if case .getModelSize(let m) = current {v = m}
}
try decoder.decodeSingularMessageField(value: &v)
if let v = v {
if hadOneofValue {try decoder.handleConflictingOneOf()}
self.command = .getModelSize(v)
}
}()
default: break
}
}
Expand Down Expand Up @@ -704,6 +748,10 @@ extension Spark_Connect_MlCommand: SwiftProtobuf.Message, SwiftProtobuf._Message
guard case .createSummary(let v)? = self.command else { preconditionFailure() }
try visitor.visitSingularMessageField(value: v, fieldNumber: 9)
}()
case .getModelSize?: try {
guard case .getModelSize(let v)? = self.command else { preconditionFailure() }
try visitor.visitSingularMessageField(value: v, fieldNumber: 10)
}()
case nil: break
}
try unknownFields.traverse(visitor: &visitor)
Expand Down Expand Up @@ -1046,6 +1094,40 @@ extension Spark_Connect_MlCommand.CreateSummary: SwiftProtobuf.Message, SwiftPro
}
}

extension Spark_Connect_MlCommand.GetModelSize: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = Spark_Connect_MlCommand.protoMessageName + ".GetModelSize"
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}model_ref\0")

mutating func decodeMessage<D: SwiftProtobuf.Decoder>(decoder: inout D) throws {
while let fieldNumber = try decoder.nextFieldNumber() {
// The use of inline closures is to circumvent an issue where the compiler
// allocates stack space for every case branch when no optimizations are
// enabled. https://github.com/apple/swift-protobuf/issues/1034
switch fieldNumber {
case 1: try { try decoder.decodeSingularMessageField(value: &self._modelRef) }()
default: break
}
}
}

func traverse<V: SwiftProtobuf.Visitor>(visitor: inout V) throws {
// The use of inline closures is to circumvent an issue where the compiler
// allocates stack space for every if/case branch local when no optimizations
// are enabled. https://github.com/apple/swift-protobuf/issues/1034 and
// https://github.com/apple/swift-protobuf/issues/1182
try { if let v = self._modelRef {
try visitor.visitSingularMessageField(value: v, fieldNumber: 1)
} }()
try unknownFields.traverse(visitor: &visitor)
}

static func ==(lhs: Spark_Connect_MlCommand.GetModelSize, rhs: Spark_Connect_MlCommand.GetModelSize) -> Bool {
if lhs._modelRef != rhs._modelRef {return false}
if lhs.unknownFields != rhs.unknownFields {return false}
return true
}
}

extension Spark_Connect_MlCommandResult: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = _protobuf_package + ".MlCommandResult"
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{1}param\0\u{1}summary\0\u{3}operator_info\0")
Expand Down
91 changes: 62 additions & 29 deletions Sources/SparkConnect/pipelines.pb.swift
Original file line number Diff line number Diff line change
Expand Up @@ -360,37 +360,26 @@ struct Spark_Connect_PipelineCommand: Sendable {
mutating func clearTargetDatasetName() {self._targetDatasetName = nil}

/// An unresolved relation that defines the dataset's flow.
var plan: Spark_Connect_Relation {
get {return _plan ?? Spark_Connect_Relation()}
set {_plan = newValue}
var relation: Spark_Connect_Relation {
get {return _relation ?? Spark_Connect_Relation()}
set {_relation = newValue}
}
/// Returns true if `plan` has been explicitly set.
var hasPlan: Bool {return self._plan != nil}
/// Clears the value of `plan`. Subsequent reads from it will return its default value.
mutating func clearPlan() {self._plan = nil}
/// Returns true if `relation` has been explicitly set.
var hasRelation: Bool {return self._relation != nil}
/// Clears the value of `relation`. Subsequent reads from it will return its default value.
mutating func clearRelation() {self._relation = nil}

/// SQL configurations set when running this flow.
var sqlConf: Dictionary<String,String> = [:]

/// If true, this flow will only be run once per full refresh.
var once: Bool {
get {return _once ?? false}
set {_once = newValue}
}
/// Returns true if `once` has been explicitly set.
var hasOnce: Bool {return self._once != nil}
/// Clears the value of `once`. Subsequent reads from it will return its default value.
mutating func clearOnce() {self._once = nil}

var unknownFields = SwiftProtobuf.UnknownStorage()

init() {}

fileprivate var _dataflowGraphID: String? = nil
fileprivate var _flowName: String? = nil
fileprivate var _targetDatasetName: String? = nil
fileprivate var _plan: Spark_Connect_Relation? = nil
fileprivate var _once: Bool? = nil
fileprivate var _relation: Spark_Connect_Relation? = nil
}

/// Resolves all datasets and flows and start a pipeline update. Should be called after all
Expand All @@ -410,11 +399,40 @@ struct Spark_Connect_PipelineCommand: Sendable {
/// Clears the value of `dataflowGraphID`. Subsequent reads from it will return its default value.
mutating func clearDataflowGraphID() {self._dataflowGraphID = nil}

/// List of dataset to reset and recompute.
var fullRefreshSelection: [String] = []

/// Perform a full graph reset and recompute.
var fullRefreshAll: Bool {
get {return _fullRefreshAll ?? false}
set {_fullRefreshAll = newValue}
}
/// Returns true if `fullRefreshAll` has been explicitly set.
var hasFullRefreshAll: Bool {return self._fullRefreshAll != nil}
/// Clears the value of `fullRefreshAll`. Subsequent reads from it will return its default value.
mutating func clearFullRefreshAll() {self._fullRefreshAll = nil}

/// List of dataset to update.
var refreshSelection: [String] = []

/// If true, the run will not actually execute any flows, but will only validate the graph and
/// check for any errors. This is useful for testing and validation purposes.
var dry: Bool {
get {return _dry ?? false}
set {_dry = newValue}
}
/// Returns true if `dry` has been explicitly set.
var hasDry: Bool {return self._dry != nil}
/// Clears the value of `dry`. Subsequent reads from it will return its default value.
mutating func clearDry() {self._dry = nil}

var unknownFields = SwiftProtobuf.UnknownStorage()

init() {}

fileprivate var _dataflowGraphID: String? = nil
fileprivate var _fullRefreshAll: Bool? = nil
fileprivate var _dry: Bool? = nil
}

/// Parses the SQL file and registers all datasets and flows.
Expand Down Expand Up @@ -894,7 +912,7 @@ extension Spark_Connect_PipelineCommand.DefineDataset: SwiftProtobuf.Message, Sw

extension Spark_Connect_PipelineCommand.DefineFlow: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = Spark_Connect_PipelineCommand.protoMessageName + ".DefineFlow"
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}dataflow_graph_id\0\u{3}flow_name\0\u{3}target_dataset_name\0\u{1}plan\0\u{3}sql_conf\0\u{1}once\0")
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}dataflow_graph_id\0\u{3}flow_name\0\u{3}target_dataset_name\0\u{1}relation\0\u{3}sql_conf\0")

mutating func decodeMessage<D: SwiftProtobuf.Decoder>(decoder: inout D) throws {
while let fieldNumber = try decoder.nextFieldNumber() {
Expand All @@ -905,9 +923,8 @@ extension Spark_Connect_PipelineCommand.DefineFlow: SwiftProtobuf.Message, Swift
case 1: try { try decoder.decodeSingularStringField(value: &self._dataflowGraphID) }()
case 2: try { try decoder.decodeSingularStringField(value: &self._flowName) }()
case 3: try { try decoder.decodeSingularStringField(value: &self._targetDatasetName) }()
case 4: try { try decoder.decodeSingularMessageField(value: &self._plan) }()
case 4: try { try decoder.decodeSingularMessageField(value: &self._relation) }()
case 5: try { try decoder.decodeMapField(fieldType: SwiftProtobuf._ProtobufMap<SwiftProtobuf.ProtobufString,SwiftProtobuf.ProtobufString>.self, value: &self.sqlConf) }()
case 6: try { try decoder.decodeSingularBoolField(value: &self._once) }()
default: break
}
}
Expand All @@ -927,33 +944,29 @@ extension Spark_Connect_PipelineCommand.DefineFlow: SwiftProtobuf.Message, Swift
try { if let v = self._targetDatasetName {
try visitor.visitSingularStringField(value: v, fieldNumber: 3)
} }()
try { if let v = self._plan {
try { if let v = self._relation {
try visitor.visitSingularMessageField(value: v, fieldNumber: 4)
} }()
if !self.sqlConf.isEmpty {
try visitor.visitMapField(fieldType: SwiftProtobuf._ProtobufMap<SwiftProtobuf.ProtobufString,SwiftProtobuf.ProtobufString>.self, value: self.sqlConf, fieldNumber: 5)
}
try { if let v = self._once {
try visitor.visitSingularBoolField(value: v, fieldNumber: 6)
} }()
try unknownFields.traverse(visitor: &visitor)
}

static func ==(lhs: Spark_Connect_PipelineCommand.DefineFlow, rhs: Spark_Connect_PipelineCommand.DefineFlow) -> Bool {
if lhs._dataflowGraphID != rhs._dataflowGraphID {return false}
if lhs._flowName != rhs._flowName {return false}
if lhs._targetDatasetName != rhs._targetDatasetName {return false}
if lhs._plan != rhs._plan {return false}
if lhs._relation != rhs._relation {return false}
if lhs.sqlConf != rhs.sqlConf {return false}
if lhs._once != rhs._once {return false}
if lhs.unknownFields != rhs.unknownFields {return false}
return true
}
}

extension Spark_Connect_PipelineCommand.StartRun: SwiftProtobuf.Message, SwiftProtobuf._MessageImplementationBase, SwiftProtobuf._ProtoNameProviding {
static let protoMessageName: String = Spark_Connect_PipelineCommand.protoMessageName + ".StartRun"
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}dataflow_graph_id\0")
static let _protobuf_nameMap = SwiftProtobuf._NameMap(bytecode: "\0\u{3}dataflow_graph_id\0\u{3}full_refresh_selection\0\u{3}full_refresh_all\0\u{3}refresh_selection\0\u{1}dry\0")

mutating func decodeMessage<D: SwiftProtobuf.Decoder>(decoder: inout D) throws {
while let fieldNumber = try decoder.nextFieldNumber() {
Expand All @@ -962,6 +975,10 @@ extension Spark_Connect_PipelineCommand.StartRun: SwiftProtobuf.Message, SwiftPr
// enabled. https://github.com/apple/swift-protobuf/issues/1034
switch fieldNumber {
case 1: try { try decoder.decodeSingularStringField(value: &self._dataflowGraphID) }()
case 2: try { try decoder.decodeRepeatedStringField(value: &self.fullRefreshSelection) }()
case 3: try { try decoder.decodeSingularBoolField(value: &self._fullRefreshAll) }()
case 4: try { try decoder.decodeRepeatedStringField(value: &self.refreshSelection) }()
case 5: try { try decoder.decodeSingularBoolField(value: &self._dry) }()
default: break
}
}
Expand All @@ -975,11 +992,27 @@ extension Spark_Connect_PipelineCommand.StartRun: SwiftProtobuf.Message, SwiftPr
try { if let v = self._dataflowGraphID {
try visitor.visitSingularStringField(value: v, fieldNumber: 1)
} }()
if !self.fullRefreshSelection.isEmpty {
try visitor.visitRepeatedStringField(value: self.fullRefreshSelection, fieldNumber: 2)
}
try { if let v = self._fullRefreshAll {
try visitor.visitSingularBoolField(value: v, fieldNumber: 3)
} }()
if !self.refreshSelection.isEmpty {
try visitor.visitRepeatedStringField(value: self.refreshSelection, fieldNumber: 4)
}
try { if let v = self._dry {
try visitor.visitSingularBoolField(value: v, fieldNumber: 5)
} }()
try unknownFields.traverse(visitor: &visitor)
}

static func ==(lhs: Spark_Connect_PipelineCommand.StartRun, rhs: Spark_Connect_PipelineCommand.StartRun) -> Bool {
if lhs._dataflowGraphID != rhs._dataflowGraphID {return false}
if lhs.fullRefreshSelection != rhs.fullRefreshSelection {return false}
if lhs._fullRefreshAll != rhs._fullRefreshAll {return false}
if lhs.refreshSelection != rhs.refreshSelection {return false}
if lhs._dry != rhs._dry {return false}
if lhs.unknownFields != rhs.unknownFields {return false}
return true
}
Expand Down
Loading
Loading