From 45939da419fa5ec3b1f5c30dfd04225c73c648f0 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Wed, 2 Oct 2024 15:47:30 +0200 Subject: [PATCH 01/11] sketch local checkpoint --- .../scala/org/apache/spark/sql/Dataset.scala | 15 ++++++- .../apache/spark/sql/CheckpointSuite.scala | 15 +++++-- python/pyspark/sql/classic/dataframe.py | 14 ++++++- python/pyspark/sql/connect/dataframe.py | 11 ++++- .../pyspark/sql/connect/proto/commands_pb2.py | 14 +++---- .../sql/connect/proto/commands_pb2.pyi | 29 ++++++++++++- python/pyspark/sql/dataframe.py | 8 +++- .../sql/tests/connect/test_connect_basic.py | 1 + python/pyspark/sql/tests/test_dataframe.py | 10 ++++- .../org/apache/spark/sql/api/Dataset.scala | 42 ++++++++++++++++--- .../protobuf/spark/connect/commands.proto | 3 ++ .../connect/planner/SparkConnectPlanner.scala | 16 +++++-- .../scala/org/apache/spark/sql/Dataset.scala | 11 ++++- .../org/apache/spark/sql/DatasetSuite.scala | 20 +++++++++ 14 files changed, 179 insertions(+), 30 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 6bae04ef80231..1d95ec1c067af 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1115,13 +1115,20 @@ class Dataset[T] private[sql] ( } /** @inheritdoc */ - protected def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = { + protected def checkpoint( + eager: Boolean, + reliableCheckpoint: Boolean, + storageLevel: Option[StorageLevel]): Dataset[T] = { sparkSession.newDataset(agnosticEncoder) { builder => val command = sparkSession.newCommand { builder => - builder.getCheckpointCommandBuilder + val checkpointBuilder = builder.getCheckpointCommandBuilder .setLocal(!reliableCheckpoint) .setEager(eager) .setRelation(this.plan.getRoot) + storageLevel.foreach { storageLevel => + checkpointBuilder.setStorageLevel( + StorageLevelProtoConverter.toConnectProtoType(storageLevel)) + } } val responseIter = sparkSession.execute(command) try { @@ -1304,6 +1311,10 @@ class Dataset[T] private[sql] ( /** @inheritdoc */ override def localCheckpoint(eager: Boolean): Dataset[T] = super.localCheckpoint(eager) + /** @inheritdoc */ + override def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = + super.localCheckpoint(eager, storageLevel) + /** @inheritdoc */ override def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = super.joinWith(other, condition) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala index e57b051890f56..6d4c3ac7e3474 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.apache.spark.SparkException import org.apache.spark.connect.proto import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper} +import org.apache.spark.storage.StorageLevel class CheckpointSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelper { @@ -50,12 +51,20 @@ class CheckpointSuite extends ConnectFunSuite with RemoteSparkSession with SQLHe checkFragments(captureStdOut(block), fragmentsToCheck) } - test("checkpoint") { + test("localCheckpoint") { val df = spark.range(100).localCheckpoint() testCapturedStdOut(df.explain(), "ExistingRDD") } - test("checkpoint gc") { + test("localCheckpoint with StorageLevel") { + // We don't have a way to reach into the server and assert the storage level server side, but + // this test should cover for unexpected errors in the API. + val df = + spark.range(100).localCheckpoint(eager = true, storageLevel = Some(StorageLevel.DISK_ONLY)) + df.collect() + } + + test("localCheckpoint gc") { val df = spark.range(100).localCheckpoint(eager = true) val encoder = df.agnosticEncoder val dfId = df.plan.getRoot.getCachedRemoteRelation.getRelationId @@ -77,7 +86,7 @@ class CheckpointSuite extends ConnectFunSuite with RemoteSparkSession with SQLHe // This test is flaky because cannot guarantee GC // You can locally run this to verify the behavior. - ignore("checkpoint gc derived DataFrame") { + ignore("localCheckpoint gc derived DataFrame") { var df1 = spark.range(100).localCheckpoint(eager = true) var derived = df1.repartition(10) val encoder = df1.agnosticEncoder diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index 9f9dedbd38207..aac05c632c1c9 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -360,8 +360,18 @@ def checkpoint(self, eager: bool = True) -> ParentDataFrame: jdf = self._jdf.checkpoint(eager) return DataFrame(jdf, self.sparkSession) - def localCheckpoint(self, eager: bool = True) -> ParentDataFrame: - jdf = self._jdf.localCheckpoint(eager) + def localCheckpoint( + self, eager: bool = True, storageLevel: Optional[StorageLevel] = None + ) -> ParentDataFrame: + gateway = self._sc._gateway + if storageLevel is None: + javaStorageLevelOpt = gateway.jvm.scala.Option.empty() + else: + javaStorageLevelOpt = gateway.jvm.scala.Option( + self._sc._getJavaStorageLevel(storageLevel) + ) + + jdf = self._jdf.localCheckpoint(eager, javaStorageLevelOpt) return DataFrame(jdf, self.sparkSession) def withWatermark(self, eventTime: str, delayThreshold: str) -> ParentDataFrame: diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 136fe60532df4..ee39c2f65307c 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -71,7 +71,10 @@ from pyspark.util import PythonEvalType from pyspark.storagelevel import StorageLevel import pyspark.sql.connect.plan as plan -from pyspark.sql.connect.conversion import ArrowTableToRowsConversion +from pyspark.sql.connect.conversion import ( + ArrowTableToRowsConversion, + storage_level_to_proto, +) from pyspark.sql.connect.group import GroupedData from pyspark.sql.connect.merge import MergeIntoWriter from pyspark.sql.connect.readwriter import DataFrameWriter, DataFrameWriterV2 @@ -2173,8 +2176,12 @@ def checkpoint(self, eager: bool = True) -> ParentDataFrame: assert isinstance(checkpointed._plan, plan.CachedRemoteRelation) return checkpointed - def localCheckpoint(self, eager: bool = True) -> ParentDataFrame: + def localCheckpoint( + self, eager: bool = True, storageLevel: Optional[StorageLevel] = None + ) -> ParentDataFrame: cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager) + if storageLevel is not None: + cmd.storage_level.CopyFrom(storage_level_to_proto(storage_level)) _, properties, self._execution_info = self._session.client.execute_command( cmd.command(self._session.client) ) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index 43390ffa36d33..562e9d817f5fe 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -35,7 +35,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\r\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12m\n\x17register_table_function\x18\n \x01(\x0b\x32\x33.spark.connect.CommonInlineUserDefinedTableFunctionH\x00R\x15registerTableFunction\x12\x81\x01\n$streaming_query_listener_bus_command\x18\x0b \x01(\x0b\x32/.spark.connect.StreamingQueryListenerBusCommandH\x00R streamingQueryListenerBusCommand\x12\x64\n\x14register_data_source\x18\x0c \x01(\x0b\x32\x30.spark.connect.CommonInlineUserDefinedDataSourceH\x00R\x12registerDataSource\x12t\n\x1f\x63reate_resource_profile_command\x18\r \x01(\x0b\x32+.spark.connect.CreateResourceProfileCommandH\x00R\x1c\x63reateResourceProfileCommand\x12Q\n\x12\x63heckpoint_command\x18\x0e \x01(\x0b\x32 .spark.connect.CheckpointCommandH\x00R\x11\x63heckpointCommand\x12\x84\x01\n%remove_cached_remote_relation_command\x18\x0f \x01(\x0b\x32\x30.spark.connect.RemoveCachedRemoteRelationCommandH\x00R!removeCachedRemoteRelationCommand\x12_\n\x18merge_into_table_command\x18\x10 \x01(\x0b\x32$.spark.connect.MergeIntoTableCommandH\x00R\x15mergeIntoTableCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xaa\x04\n\nSqlCommand\x12\x14\n\x03sql\x18\x01 \x01(\tB\x02\x18\x01R\x03sql\x12;\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryB\x02\x18\x01R\x04\x61rgs\x12@\n\x08pos_args\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralB\x02\x18\x01R\x07posArgs\x12Z\n\x0fnamed_arguments\x18\x04 \x03(\x0b\x32-.spark.connect.SqlCommand.NamedArgumentsEntryB\x02\x18\x01R\x0enamedArguments\x12\x42\n\rpos_arguments\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionB\x02\x18\x01R\x0cposArguments\x12-\n\x05input\x18\x06 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01\x1a\\\n\x13NamedArgumentsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12/\n\x05value\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\xca\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x12-\n\x12\x63lustering_columns\x18\n \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xdc\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x12-\n\x12\x63lustering_columns\x18\t \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xd8\x06\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12N\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\rforeachWriter\x12L\n\rforeach_batch\x18\x0e \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\x0c\x66oreachBatch\x12\x36\n\x17\x63lustering_column_names\x18\x0f \x03(\tR\x15\x63lusteringColumnNames\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"\xb3\x01\n\x18StreamingForeachFunction\x12\x43\n\x0fpython_function\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0epythonFunction\x12\x46\n\x0escala_function\x18\x02 \x01(\x0b\x32\x1d.spark.connect.ScalarScalaUDFH\x00R\rscalaFunctionB\n\n\x08\x66unction"\xd4\x01\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12<\n\x18query_started_event_json\x18\x03 \x01(\tH\x00R\x15queryStartedEventJson\x88\x01\x01\x42\x1b\n\x19_query_started_event_json"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xbd\x06\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12n\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0b\x61\x64\x64Listener\x12t\n\x0fremove_listener\x18\x06 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0eremoveListener\x12\'\n\x0elist_listeners\x18\x07 \x01(\x08H\x00R\rlistListeners\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_ms\x1a\xcd\x01\n\x1dStreamingQueryListenerCommand\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x12U\n\x17python_listener_payload\x18\x02 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x15pythonListenerPayload\x88\x01\x01\x12\x0e\n\x02id\x18\x03 \x01(\tR\x02idB\x1a\n\x18_python_listener_payloadB\t\n\x07\x63ommand"\xb4\x08\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12#\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x08H\x00R\x0b\x61\x64\x64Listener\x12)\n\x0fremove_listener\x18\x06 \x01(\x08H\x00R\x0eremoveListener\x12{\n\x0elist_listeners\x18\x07 \x01(\x0b\x32R.spark.connect.StreamingQueryManagerCommandResult.ListStreamingQueryListenerResultH\x00R\rlistListeners\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminated\x1aK\n\x1eStreamingQueryListenerInstance\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x1a\x45\n ListStreamingQueryListenerResult\x12!\n\x0clistener_ids\x18\x01 \x03(\tR\x0blistenerIdsB\r\n\x0bresult_type"\xad\x01\n StreamingQueryListenerBusCommand\x12;\n\x19\x61\x64\x64_listener_bus_listener\x18\x01 \x01(\x08H\x00R\x16\x61\x64\x64ListenerBusListener\x12\x41\n\x1cremove_listener_bus_listener\x18\x02 \x01(\x08H\x00R\x19removeListenerBusListenerB\t\n\x07\x63ommand"\x83\x01\n\x1bStreamingQueryListenerEvent\x12\x1d\n\nevent_json\x18\x01 \x01(\tR\teventJson\x12\x45\n\nevent_type\x18\x02 \x01(\x0e\x32&.spark.connect.StreamingQueryEventTypeR\teventType"\xcc\x01\n"StreamingQueryListenerEventsResult\x12\x42\n\x06\x65vents\x18\x01 \x03(\x0b\x32*.spark.connect.StreamingQueryListenerEventR\x06\x65vents\x12\x42\n\x1blistener_bus_listener_added\x18\x02 \x01(\x08H\x00R\x18listenerBusListenerAdded\x88\x01\x01\x42\x1e\n\x1c_listener_bus_listener_added"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01"X\n\x1c\x43reateResourceProfileCommand\x12\x38\n\x07profile\x18\x01 \x01(\x0b\x32\x1e.spark.connect.ResourceProfileR\x07profile"C\n"CreateResourceProfileCommandResult\x12\x1d\n\nprofile_id\x18\x01 \x01(\x05R\tprofileId"d\n!RemoveCachedRemoteRelationCommand\x12?\n\x08relation\x18\x01 \x01(\x0b\x32#.spark.connect.CachedRemoteRelationR\x08relation"t\n\x11\x43heckpointCommand\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x14\n\x05local\x18\x02 \x01(\x08R\x05local\x12\x14\n\x05\x65\x61ger\x18\x03 \x01(\x08R\x05\x65\x61ger"\xe8\x03\n\x15MergeIntoTableCommand\x12*\n\x11target_table_name\x18\x01 \x01(\tR\x0ftargetTableName\x12\x43\n\x11source_table_plan\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x0fsourceTablePlan\x12\x42\n\x0fmerge_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x0emergeCondition\x12>\n\rmatch_actions\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0cmatchActions\x12I\n\x13not_matched_actions\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x11notMatchedActions\x12[\n\x1dnot_matched_by_source_actions\x18\x06 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x19notMatchedBySourceActions\x12\x32\n\x15with_schema_evolution\x18\x07 \x01(\x08R\x13withSchemaEvolution*\x85\x01\n\x17StreamingQueryEventType\x12\x1e\n\x1aQUERY_PROGRESS_UNSPECIFIED\x10\x00\x12\x18\n\x14QUERY_PROGRESS_EVENT\x10\x01\x12\x1a\n\x16QUERY_TERMINATED_EVENT\x10\x02\x12\x14\n\x10QUERY_IDLE_EVENT\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\r\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12m\n\x17register_table_function\x18\n \x01(\x0b\x32\x33.spark.connect.CommonInlineUserDefinedTableFunctionH\x00R\x15registerTableFunction\x12\x81\x01\n$streaming_query_listener_bus_command\x18\x0b \x01(\x0b\x32/.spark.connect.StreamingQueryListenerBusCommandH\x00R streamingQueryListenerBusCommand\x12\x64\n\x14register_data_source\x18\x0c \x01(\x0b\x32\x30.spark.connect.CommonInlineUserDefinedDataSourceH\x00R\x12registerDataSource\x12t\n\x1f\x63reate_resource_profile_command\x18\r \x01(\x0b\x32+.spark.connect.CreateResourceProfileCommandH\x00R\x1c\x63reateResourceProfileCommand\x12Q\n\x12\x63heckpoint_command\x18\x0e \x01(\x0b\x32 .spark.connect.CheckpointCommandH\x00R\x11\x63heckpointCommand\x12\x84\x01\n%remove_cached_remote_relation_command\x18\x0f \x01(\x0b\x32\x30.spark.connect.RemoveCachedRemoteRelationCommandH\x00R!removeCachedRemoteRelationCommand\x12_\n\x18merge_into_table_command\x18\x10 \x01(\x0b\x32$.spark.connect.MergeIntoTableCommandH\x00R\x15mergeIntoTableCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xaa\x04\n\nSqlCommand\x12\x14\n\x03sql\x18\x01 \x01(\tB\x02\x18\x01R\x03sql\x12;\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryB\x02\x18\x01R\x04\x61rgs\x12@\n\x08pos_args\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralB\x02\x18\x01R\x07posArgs\x12Z\n\x0fnamed_arguments\x18\x04 \x03(\x0b\x32-.spark.connect.SqlCommand.NamedArgumentsEntryB\x02\x18\x01R\x0enamedArguments\x12\x42\n\rpos_arguments\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionB\x02\x18\x01R\x0cposArguments\x12-\n\x05input\x18\x06 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01\x1a\\\n\x13NamedArgumentsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12/\n\x05value\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\xca\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x12-\n\x12\x63lustering_columns\x18\n \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xdc\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x12-\n\x12\x63lustering_columns\x18\t \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xd8\x06\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12N\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\rforeachWriter\x12L\n\rforeach_batch\x18\x0e \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\x0c\x66oreachBatch\x12\x36\n\x17\x63lustering_column_names\x18\x0f \x03(\tR\x15\x63lusteringColumnNames\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"\xb3\x01\n\x18StreamingForeachFunction\x12\x43\n\x0fpython_function\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0epythonFunction\x12\x46\n\x0escala_function\x18\x02 \x01(\x0b\x32\x1d.spark.connect.ScalarScalaUDFH\x00R\rscalaFunctionB\n\n\x08\x66unction"\xd4\x01\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12<\n\x18query_started_event_json\x18\x03 \x01(\tH\x00R\x15queryStartedEventJson\x88\x01\x01\x42\x1b\n\x19_query_started_event_json"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xbd\x06\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12n\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0b\x61\x64\x64Listener\x12t\n\x0fremove_listener\x18\x06 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0eremoveListener\x12\'\n\x0elist_listeners\x18\x07 \x01(\x08H\x00R\rlistListeners\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_ms\x1a\xcd\x01\n\x1dStreamingQueryListenerCommand\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x12U\n\x17python_listener_payload\x18\x02 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x15pythonListenerPayload\x88\x01\x01\x12\x0e\n\x02id\x18\x03 \x01(\tR\x02idB\x1a\n\x18_python_listener_payloadB\t\n\x07\x63ommand"\xb4\x08\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12#\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x08H\x00R\x0b\x61\x64\x64Listener\x12)\n\x0fremove_listener\x18\x06 \x01(\x08H\x00R\x0eremoveListener\x12{\n\x0elist_listeners\x18\x07 \x01(\x0b\x32R.spark.connect.StreamingQueryManagerCommandResult.ListStreamingQueryListenerResultH\x00R\rlistListeners\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminated\x1aK\n\x1eStreamingQueryListenerInstance\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x1a\x45\n ListStreamingQueryListenerResult\x12!\n\x0clistener_ids\x18\x01 \x03(\tR\x0blistenerIdsB\r\n\x0bresult_type"\xad\x01\n StreamingQueryListenerBusCommand\x12;\n\x19\x61\x64\x64_listener_bus_listener\x18\x01 \x01(\x08H\x00R\x16\x61\x64\x64ListenerBusListener\x12\x41\n\x1cremove_listener_bus_listener\x18\x02 \x01(\x08H\x00R\x19removeListenerBusListenerB\t\n\x07\x63ommand"\x83\x01\n\x1bStreamingQueryListenerEvent\x12\x1d\n\nevent_json\x18\x01 \x01(\tR\teventJson\x12\x45\n\nevent_type\x18\x02 \x01(\x0e\x32&.spark.connect.StreamingQueryEventTypeR\teventType"\xcc\x01\n"StreamingQueryListenerEventsResult\x12\x42\n\x06\x65vents\x18\x01 \x03(\x0b\x32*.spark.connect.StreamingQueryListenerEventR\x06\x65vents\x12\x42\n\x1blistener_bus_listener_added\x18\x02 \x01(\x08H\x00R\x18listenerBusListenerAdded\x88\x01\x01\x42\x1e\n\x1c_listener_bus_listener_added"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01"X\n\x1c\x43reateResourceProfileCommand\x12\x38\n\x07profile\x18\x01 \x01(\x0b\x32\x1e.spark.connect.ResourceProfileR\x07profile"C\n"CreateResourceProfileCommandResult\x12\x1d\n\nprofile_id\x18\x01 \x01(\x05R\tprofileId"d\n!RemoveCachedRemoteRelationCommand\x12?\n\x08relation\x18\x01 \x01(\x0b\x32#.spark.connect.CachedRemoteRelationR\x08relation"\xcd\x01\n\x11\x43heckpointCommand\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x14\n\x05local\x18\x02 \x01(\x08R\x05local\x12\x14\n\x05\x65\x61ger\x18\x03 \x01(\x08R\x05\x65\x61ger\x12\x45\n\rstorage_level\x18\x04 \x01(\x0b\x32\x1b.spark.connect.StorageLevelH\x00R\x0cstorageLevel\x88\x01\x01\x42\x10\n\x0e_storage_level"\xe8\x03\n\x15MergeIntoTableCommand\x12*\n\x11target_table_name\x18\x01 \x01(\tR\x0ftargetTableName\x12\x43\n\x11source_table_plan\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x0fsourceTablePlan\x12\x42\n\x0fmerge_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x0emergeCondition\x12>\n\rmatch_actions\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0cmatchActions\x12I\n\x13not_matched_actions\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x11notMatchedActions\x12[\n\x1dnot_matched_by_source_actions\x18\x06 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x19notMatchedBySourceActions\x12\x32\n\x15with_schema_evolution\x18\x07 \x01(\x08R\x13withSchemaEvolution*\x85\x01\n\x17StreamingQueryEventType\x12\x1e\n\x1aQUERY_PROGRESS_UNSPECIFIED\x10\x00\x12\x18\n\x14QUERY_PROGRESS_EVENT\x10\x01\x12\x1a\n\x16QUERY_TERMINATED_EVENT\x10\x02\x12\x14\n\x10QUERY_IDLE_EVENT\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -71,8 +71,8 @@ _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_options = b"8\001" _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._options = None _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_options = b"8\001" - _STREAMINGQUERYEVENTTYPE._serialized_start = 11162 - _STREAMINGQUERYEVENTTYPE._serialized_end = 11295 + _STREAMINGQUERYEVENTTYPE._serialized_start = 11252 + _STREAMINGQUERYEVENTTYPE._serialized_end = 11385 _COMMAND._serialized_start = 167 _COMMAND._serialized_end = 1847 _SQLCOMMAND._serialized_start = 1850 @@ -167,8 +167,8 @@ _CREATERESOURCEPROFILECOMMANDRESULT._serialized_end = 10448 _REMOVECACHEDREMOTERELATIONCOMMAND._serialized_start = 10450 _REMOVECACHEDREMOTERELATIONCOMMAND._serialized_end = 10550 - _CHECKPOINTCOMMAND._serialized_start = 10552 - _CHECKPOINTCOMMAND._serialized_end = 10668 - _MERGEINTOTABLECOMMAND._serialized_start = 10671 - _MERGEINTOTABLECOMMAND._serialized_end = 11159 + _CHECKPOINTCOMMAND._serialized_start = 10553 + _CHECKPOINTCOMMAND._serialized_end = 10758 + _MERGEINTOTABLECOMMAND._serialized_start = 10761 + _MERGEINTOTABLECOMMAND._serialized_end = 11249 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi index 2dedcdfc8e3e4..6192a29607cbf 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.pyi +++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi @@ -2188,6 +2188,7 @@ class CheckpointCommand(google.protobuf.message.Message): RELATION_FIELD_NUMBER: builtins.int LOCAL_FIELD_NUMBER: builtins.int EAGER_FIELD_NUMBER: builtins.int + STORAGE_LEVEL_FIELD_NUMBER: builtins.int @property def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: """(Required) The logical plan to checkpoint.""" @@ -2197,22 +2198,46 @@ class CheckpointCommand(google.protobuf.message.Message): """ eager: builtins.bool """(Required) Whether to checkpoint this dataframe immediately.""" + @property + def storage_level(self) -> pyspark.sql.connect.proto.common_pb2.StorageLevel: + """(Optional) For local checkpoint, the storage level to use.""" def __init__( self, *, relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ..., local: builtins.bool = ..., eager: builtins.bool = ..., + storage_level: pyspark.sql.connect.proto.common_pb2.StorageLevel | None = ..., ) -> None: ... def HasField( - self, field_name: typing_extensions.Literal["relation", b"relation"] + self, + field_name: typing_extensions.Literal[ + "_storage_level", + b"_storage_level", + "relation", + b"relation", + "storage_level", + b"storage_level", + ], ) -> builtins.bool: ... def ClearField( self, field_name: typing_extensions.Literal[ - "eager", b"eager", "local", b"local", "relation", b"relation" + "_storage_level", + b"_storage_level", + "eager", + b"eager", + "local", + b"local", + "relation", + b"relation", + "storage_level", + b"storage_level", ], ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_storage_level", b"_storage_level"] + ) -> typing_extensions.Literal["storage_level"] | None: ... global___CheckpointCommand = CheckpointCommand diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5906108163b46..7cc2f18b329c6 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1015,7 +1015,9 @@ def checkpoint(self, eager: bool = True) -> "DataFrame": """ ... - def localCheckpoint(self, eager: bool = True) -> "DataFrame": + def localCheckpoint( + self, eager: bool = True, storageLevel: Optional[StorageLevel] = None + ) -> "DataFrame": """Returns a locally checkpointed version of this :class:`DataFrame`. Checkpointing can be used to truncate the logical plan of this :class:`DataFrame`, which is especially useful in iterative algorithms where the plan may grow exponentially. Local checkpoints @@ -1032,6 +1034,10 @@ def localCheckpoint(self, eager: bool = True) -> "DataFrame": eager : bool, optional, default True Whether to checkpoint this :class:`DataFrame` immediately. + storageLevel : :class:`StorageLevel`, optional, default None + The StorageLevel with which the checkpoint will be stored. + If not specified, default for RDD local checkpoints. + Returns ------- :class:`DataFrame` diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index f0637056ab8f9..419f77afb8983 100755 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -24,6 +24,7 @@ from contextlib import redirect_stdout import datetime +from pyspark import StorageLevel from pyspark.util import is_remote_only from pyspark.errors import PySparkTypeError, PySparkValueError from pyspark.sql import SparkSession as PySparkSession, Row diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 8ec0839ec1fe4..1a1cbb4c702a5 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -951,11 +951,19 @@ def test_union_classmethod_usage(self): def test_isinstance_dataframe(self): self.assertIsInstance(self.spark.range(1), DataFrame) - def test_checkpoint_dataframe(self): + def test_local_checkpoint_dataframe(self): with io.StringIO() as buf, redirect_stdout(buf): self.spark.range(1).localCheckpoint().explain() self.assertIn("ExistingRDD", buf.getvalue()) + def test_local_checkpoint_dataframe_with_storage_level(self): + # We don't have a way to reach into the server and assert the storage level server side, but + # this test should cover for unexpected errors in the API. + df = self.connect.range(10).localCheckpoint( + eager = True, storageLevel = StorageLevel.DISK_ONLY + ) + df.collect() + def test_transpose(self): df = self.spark.createDataFrame([{"a": "x", "b": "y", "c": "z"}]) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index 06a6148a7c188..3b279e09617d8 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -312,7 +312,8 @@ abstract class Dataset[T] extends Serializable { * @group basic * @since 2.1.0 */ - def checkpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = true) + def checkpoint(): Dataset[T] = + checkpoint(eager = true, reliableCheckpoint = true, storageLevel = None) /** * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the @@ -332,7 +333,7 @@ abstract class Dataset[T] extends Serializable { * @since 2.1.0 */ def checkpoint(eager: Boolean): Dataset[T] = - checkpoint(eager = eager, reliableCheckpoint = true) + checkpoint(eager = eager, reliableCheckpoint = true, storageLevel = None) /** * Eagerly locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used @@ -343,7 +344,8 @@ abstract class Dataset[T] extends Serializable { * @group basic * @since 2.3.0 */ - def localCheckpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = false) + def localCheckpoint(): Dataset[T] = + checkpoint(eager = true, reliableCheckpoint = false, storageLevel = None) /** * Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to @@ -363,7 +365,29 @@ abstract class Dataset[T] extends Serializable { * @since 2.3.0 */ def localCheckpoint(eager: Boolean): Dataset[T] = - checkpoint(eager = eager, reliableCheckpoint = false) + checkpoint(eager = eager, reliableCheckpoint = false, storageLevel = None) + + /** + * Locally checkpoints a Dataset and return the new Dataset. Checkpointing can be used to + * truncate the logical plan of this Dataset, which is especially useful in iterative algorithms + * where the plan may grow exponentially. Local checkpoints are written to executor storage and + * despite potentially faster they are unreliable and may compromise job completion. + * + * @param eager + * Whether to checkpoint this dataframe immediately + * @param storageLevel + * Option. If defined, StorageLevel with which to checkpoint the data. + * @note + * When checkpoint is used with eager = false, the final data that is checkpointed after the + * first action may be different from the data that was used during the job due to + * non-determinism of the underlying operation and retries. If checkpoint is used to achieve + * saving a deterministic snapshot of the data, eager = true should be used. Otherwise, it is + * only deterministic after the first execution, after the checkpoint was finalized. + * @group basic + * @since 2.3.0 + */ + def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = + checkpoint(eager = eager, reliableCheckpoint = false, storageLevel = storageLevel) /** * Returns a checkpointed version of this Dataset. @@ -373,8 +397,14 @@ abstract class Dataset[T] extends Serializable { * @param reliableCheckpoint * Whether to create a reliable checkpoint saved to files inside the checkpoint directory. If * false creates a local checkpoint using the caching subsystem - */ - protected def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] + * @param storageLevel + * Option. If defined, StorageLevel with which to checkpoint the data. + * Only with reliableCheckpoint = false. + */ + protected def checkpoint( + eager: Boolean, + reliableCheckpoint: Boolean, + storageLevel: Option[StorageLevel]): Dataset[T] /** * Defines an event time watermark for this [[Dataset]]. A watermark tracks a point in time diff --git a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto index 71189a3c43a19..a01d4369a7aed 100644 --- a/sql/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/sql/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -507,6 +507,9 @@ message CheckpointCommand { // (Required) Whether to checkpoint this dataframe immediately. bool eager = 3; + + // (Optional) For local checkpoint, the storage level to use. + optional StorageLevel storage_level = 4; } message MergeIntoTableCommand { diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 231e54ff77d29..092bcb0cdef95 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3354,9 +3354,19 @@ class SparkConnectPlanner( responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = { val target = Dataset .ofRows(session, transformRelation(checkpointCommand.getRelation)) - val checkpointed = target.checkpoint( - eager = checkpointCommand.getEager, - reliableCheckpoint = !checkpointCommand.getLocal) + val checkpointed = if (checkpointCommand.getLocal) { + val storageLevelOpt = if (checkpointCommand.hasStorageLevel) { + Some(StorageLevelProtoConverter.toStorageLevel(checkpointCommand.getStorageLevel)) + } else { + None + } + target.localCheckpoint( + eager = checkpointCommand.getEager, + storageLevel = storageLevelOpt + ) + } else { + target.checkpoint(eager = checkpointCommand.getEager) + } val dfId = UUID.randomUUID().toString logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 18fc5787a1583..21c7b8206cca5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -540,13 +540,18 @@ class Dataset[T] private[sql]( def isStreaming: Boolean = logicalPlan.isStreaming /** @inheritdoc */ - protected[sql] def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = { + protected[sql] def checkpoint( + eager: Boolean, + reliableCheckpoint: Boolean, + storageLevel: Option[StorageLevel]): Dataset[T] = { val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint" withAction(actionName, queryExecution) { physicalPlan => val internalRdd = physicalPlan.execute().map(_.copy()) if (reliableCheckpoint) { + assert(storageLevel.isEmpty, "StorageLevel should not be defined for reliableCheckpoint") internalRdd.checkpoint() } else { + storageLevel.foreach(storageLevel => internalRdd.persist(storageLevel)) internalRdd.localCheckpoint() } @@ -1810,6 +1815,10 @@ class Dataset[T] private[sql]( /** @inheritdoc */ override def localCheckpoint(eager: Boolean): Dataset[T] = super.localCheckpoint(eager) + /** @inheritdoc */ + override def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = + super.localCheckpoint(eager, storageLevel) + /** @inheritdoc */ override def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = super.joinWith(other, condition) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 089ce79201dd8..0b88640d9aaa8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1849,6 +1849,26 @@ class DatasetSuite extends QueryTest } } + test("Dataset().localCheckpoint() lazy with StorageLevel") { + val df = spark.range(10).repartition($"id" % 2) + val checkpointedDf = df.localCheckpoint(eager = false, Some(StorageLevel.DISK_ONLY)) + val checkpointedPlan = checkpointedDf.queryExecution.analyzed + val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd + assert(rdd.getStorageLevel == StorageLevel.DISK_ONLY) + assert(!rdd.isCheckpointed) + checkpointedDf.collect() + assert(rdd.isCheckpointed) + } + + test("Dataset().localCheckpoint() eager with StorageLevel") { + val df = spark.range(10).repartition($"id" % 2) + val checkpointedDf = df.localCheckpoint(eager = true, Some(StorageLevel.DISK_ONLY)) + val checkpointedPlan = checkpointedDf.queryExecution.analyzed + val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd + assert(rdd.isCheckpointed) + assert(rdd.getStorageLevel == StorageLevel.DISK_ONLY) + } + test("identity map for primitive arrays") { val arrayByte = Array(1.toByte, 2.toByte, 3.toByte) val arrayInt = Array(1, 2, 3) From 1b9ea26c3f86fad9434b24aae8aa61387d76b5a6 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Wed, 2 Oct 2024 16:16:19 +0200 Subject: [PATCH 02/11] revert test_connect_basic - already tested in test_dataframe --- python/pyspark/sql/tests/connect/test_connect_basic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 419f77afb8983..f0637056ab8f9 100755 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -24,7 +24,6 @@ from contextlib import redirect_stdout import datetime -from pyspark import StorageLevel from pyspark.util import is_remote_only from pyspark.errors import PySparkTypeError, PySparkValueError from pyspark.sql import SparkSession as PySparkSession, Row From a153d9df24dbe8cd763da2eb3b3904a218efdcf2 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Wed, 2 Oct 2024 16:18:40 +0200 Subject: [PATCH 03/11] docs --- python/pyspark/sql/dataframe.py | 1 + sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 7cc2f18b329c6..b2fd1412a991a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1028,6 +1028,7 @@ def localCheckpoint( .. versionchanged:: 4.0.0 Supports Spark Connect. + Added storageLevel parameter. Parameters ---------- diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index 3b279e09617d8..91339936bc12c 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -384,7 +384,7 @@ abstract class Dataset[T] extends Serializable { * saving a deterministic snapshot of the data, eager = true should be used. Otherwise, it is * only deterministic after the first execution, after the checkpoint was finalized. * @group basic - * @since 2.3.0 + * @since 4.0.0 */ def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = checkpoint(eager = eager, reliableCheckpoint = false, storageLevel = storageLevel) From adde2e988a06de7e6413692b9d6d0b95b567e586 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Thu, 3 Oct 2024 14:53:55 +0200 Subject: [PATCH 04/11] fix --- python/pyspark/sql/tests/test_dataframe.py | 2 +- .../src/main/scala/org/apache/spark/sql/api/Dataset.scala | 4 ++-- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 5 +---- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 1a1cbb4c702a5..d3b942fbe7b93 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -959,7 +959,7 @@ def test_local_checkpoint_dataframe(self): def test_local_checkpoint_dataframe_with_storage_level(self): # We don't have a way to reach into the server and assert the storage level server side, but # this test should cover for unexpected errors in the API. - df = self.connect.range(10).localCheckpoint( + df = self.spark.range(10).localCheckpoint( eager = True, storageLevel = StorageLevel.DISK_ONLY ) df.collect() diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index 91339936bc12c..d38ceea5c1751 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -398,8 +398,8 @@ abstract class Dataset[T] extends Serializable { * Whether to create a reliable checkpoint saved to files inside the checkpoint directory. If * false creates a local checkpoint using the caching subsystem * @param storageLevel - * Option. If defined, StorageLevel with which to checkpoint the data. - * Only with reliableCheckpoint = false. + * Option. If defined, StorageLevel with which to checkpoint the data. Only with + * reliableCheckpoint = false. */ protected def checkpoint( eager: Boolean, diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 092bcb0cdef95..2881b02c87978 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3360,10 +3360,7 @@ class SparkConnectPlanner( } else { None } - target.localCheckpoint( - eager = checkpointCommand.getEager, - storageLevel = storageLevelOpt - ) + target.localCheckpoint(eager = checkpointCommand.getEager, storageLevel = storageLevelOpt) } else { target.checkpoint(eager = checkpointCommand.getEager) } From 4c38a7706a816f87fd7f169a0609d2bf2c9d0bde Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Thu, 3 Oct 2024 18:33:18 +0200 Subject: [PATCH 05/11] fix --- python/pyspark/sql/classic/dataframe.py | 2 +- python/pyspark/sql/tests/test_dataframe.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index aac05c632c1c9..1d6bbe052613a 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -367,7 +367,7 @@ def localCheckpoint( if storageLevel is None: javaStorageLevelOpt = gateway.jvm.scala.Option.empty() else: - javaStorageLevelOpt = gateway.jvm.scala.Option( + javaStorageLevelOpt = gateway.jvm.scala.Option.apply( self._sc._getJavaStorageLevel(storageLevel) ) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index d3b942fbe7b93..79c9e947352aa 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -959,9 +959,7 @@ def test_local_checkpoint_dataframe(self): def test_local_checkpoint_dataframe_with_storage_level(self): # We don't have a way to reach into the server and assert the storage level server side, but # this test should cover for unexpected errors in the API. - df = self.spark.range(10).localCheckpoint( - eager = True, storageLevel = StorageLevel.DISK_ONLY - ) + df = self.spark.range(10).localCheckpoint(eager=True, storageLevel=StorageLevel.DISK_ONLY) df.collect() def test_transpose(self): From 039e2be93e23e3d2db822afa4d866894ef54e82c Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Mon, 7 Oct 2024 11:35:55 +0200 Subject: [PATCH 06/11] fix compile --- python/pyspark/sql/connect/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index ee39c2f65307c..e8060b2709765 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2181,7 +2181,7 @@ def localCheckpoint( ) -> ParentDataFrame: cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager) if storageLevel is not None: - cmd.storage_level.CopyFrom(storage_level_to_proto(storage_level)) + cmd.storage_level.CopyFrom(storage_level_to_proto(storageLevel)) _, properties, self._execution_info = self._session.client.execute_command( cmd.command(self._session.client) ) From 94890e0c492d550e5e1b1e4e7dcd26c4dd558711 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Mon, 7 Oct 2024 14:45:39 +0200 Subject: [PATCH 07/11] fix python more --- python/pyspark/sql/connect/dataframe.py | 4 +--- python/pyspark/sql/connect/plan.py | 22 +++++++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index e8060b2709765..fb537537d2dcb 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2179,9 +2179,7 @@ def checkpoint(self, eager: bool = True) -> ParentDataFrame: def localCheckpoint( self, eager: bool = True, storageLevel: Optional[StorageLevel] = None ) -> ParentDataFrame: - cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager) - if storageLevel is not None: - cmd.storage_level.CopyFrom(storage_level_to_proto(storageLevel)) + cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager, storage_level=storageLevel) _, properties, self._execution_info = self._session.client.execute_command( cmd.command(self._session.client) ) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index fbed0eabc684f..b74f863db1e83 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -1868,21 +1868,29 @@ def command(self, session: "SparkConnectClient") -> proto.Command: class Checkpoint(LogicalPlan): - def __init__(self, child: Optional["LogicalPlan"], local: bool, eager: bool) -> None: + def __init__( + self, + child: Optional["LogicalPlan"], + local: bool, + eager: bool, + storage_level: Optional[StorageLevel] = None, + ) -> None: super().__init__(child) self._local = local self._eager = eager + self._storage_level = storage_level def command(self, session: "SparkConnectClient") -> proto.Command: cmd = proto.Command() assert self._child is not None - cmd.checkpoint_command.CopyFrom( - proto.CheckpointCommand( - relation=self._child.plan(session), - local=self._local, - eager=self._eager, - ) + checkpoint_command = proto.CheckpointCommand( + relation=self._child.plan(session), + local=self._local, + eager=self._eager, ) + if self._storage_level is not None: + checkpoint_command.storage_level.CopyFrom(storage_level_to_proto(self._storage_level)) + cmd.checkpoint_command.CopyFrom(checkpoint_command) return cmd From 6e2247fdee6d9998b664917a22a330df0650abf3 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Mon, 7 Oct 2024 18:25:29 +0200 Subject: [PATCH 08/11] more python lint --- python/pyspark/sql/classic/dataframe.py | 1 + python/pyspark/sql/connect/dataframe.py | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index 1d6bbe052613a..cf56933a9e346 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -364,6 +364,7 @@ def localCheckpoint( self, eager: bool = True, storageLevel: Optional[StorageLevel] = None ) -> ParentDataFrame: gateway = self._sc._gateway + assert gateway is not None if storageLevel is None: javaStorageLevelOpt = gateway.jvm.scala.Option.empty() else: diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index fb537537d2dcb..212d52ed2e955 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -71,10 +71,7 @@ from pyspark.util import PythonEvalType from pyspark.storagelevel import StorageLevel import pyspark.sql.connect.plan as plan -from pyspark.sql.connect.conversion import ( - ArrowTableToRowsConversion, - storage_level_to_proto, -) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion from pyspark.sql.connect.group import GroupedData from pyspark.sql.connect.merge import MergeIntoWriter from pyspark.sql.connect.readwriter import DataFrameWriter, DataFrameWriterV2 From 07760a3b3d2bdbbc47bf9765cf4e5c07eb5bcd6e Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Tue, 8 Oct 2024 14:21:22 +0200 Subject: [PATCH 09/11] remove Option --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../scala/org/apache/spark/sql/CheckpointSuite.scala | 2 +- .../main/scala/org/apache/spark/sql/api/Dataset.scala | 6 +++--- .../sql/connect/planner/SparkConnectPlanner.scala | 10 ++++++---- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 4 ++-- 6 files changed, 14 insertions(+), 12 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 1d95ec1c067af..2fcea3636cf20 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1312,7 +1312,7 @@ class Dataset[T] private[sql] ( override def localCheckpoint(eager: Boolean): Dataset[T] = super.localCheckpoint(eager) /** @inheritdoc */ - override def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = + override def localCheckpoint(eager: Boolean, storageLevel: StorageLevel): Dataset[T] = super.localCheckpoint(eager, storageLevel) /** @inheritdoc */ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala index 6d4c3ac7e3474..0d9685d9c710f 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala @@ -60,7 +60,7 @@ class CheckpointSuite extends ConnectFunSuite with RemoteSparkSession with SQLHe // We don't have a way to reach into the server and assert the storage level server side, but // this test should cover for unexpected errors in the API. val df = - spark.range(100).localCheckpoint(eager = true, storageLevel = Some(StorageLevel.DISK_ONLY)) + spark.range(100).localCheckpoint(eager = true, storageLevel = StorageLevel.DISK_ONLY) df.collect() } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala index d38ceea5c1751..2d140f21c6b95 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/api/Dataset.scala @@ -376,7 +376,7 @@ abstract class Dataset[T] extends Serializable { * @param eager * Whether to checkpoint this dataframe immediately * @param storageLevel - * Option. If defined, StorageLevel with which to checkpoint the data. + * StorageLevel with which to checkpoint the data. * @note * When checkpoint is used with eager = false, the final data that is checkpointed after the * first action may be different from the data that was used during the job due to @@ -386,8 +386,8 @@ abstract class Dataset[T] extends Serializable { * @group basic * @since 4.0.0 */ - def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = - checkpoint(eager = eager, reliableCheckpoint = false, storageLevel = storageLevel) + def localCheckpoint(eager: Boolean, storageLevel: StorageLevel): Dataset[T] = + checkpoint(eager = eager, reliableCheckpoint = false, storageLevel = Some(storageLevel)) /** * Returns a checkpointed version of this Dataset. diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 2881b02c87978..25fd7d13b7d48 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3355,12 +3355,14 @@ class SparkConnectPlanner( val target = Dataset .ofRows(session, transformRelation(checkpointCommand.getRelation)) val checkpointed = if (checkpointCommand.getLocal) { - val storageLevelOpt = if (checkpointCommand.hasStorageLevel) { - Some(StorageLevelProtoConverter.toStorageLevel(checkpointCommand.getStorageLevel)) + if (checkpointCommand.hasStorageLevel) { + target.localCheckpoint( + eager = checkpointCommand.getEager, + storageLevel = + StorageLevelProtoConverter.toStorageLevel(checkpointCommand.getStorageLevel)) } else { - None + target.localCheckpoint(eager = checkpointCommand.getEager) } - target.localCheckpoint(eager = checkpointCommand.getEager, storageLevel = storageLevelOpt) } else { target.checkpoint(eager = checkpointCommand.getEager) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 21c7b8206cca5..4778f401dba20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1816,7 +1816,7 @@ class Dataset[T] private[sql]( override def localCheckpoint(eager: Boolean): Dataset[T] = super.localCheckpoint(eager) /** @inheritdoc */ - override def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = + override def localCheckpoint(eager: Boolean, storageLevel: StorageLevel): Dataset[T] = super.localCheckpoint(eager, storageLevel) /** @inheritdoc */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0b88640d9aaa8..85f296665b6e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1851,7 +1851,7 @@ class DatasetSuite extends QueryTest test("Dataset().localCheckpoint() lazy with StorageLevel") { val df = spark.range(10).repartition($"id" % 2) - val checkpointedDf = df.localCheckpoint(eager = false, Some(StorageLevel.DISK_ONLY)) + val checkpointedDf = df.localCheckpoint(eager = false, StorageLevel.DISK_ONLY) val checkpointedPlan = checkpointedDf.queryExecution.analyzed val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd assert(rdd.getStorageLevel == StorageLevel.DISK_ONLY) @@ -1862,7 +1862,7 @@ class DatasetSuite extends QueryTest test("Dataset().localCheckpoint() eager with StorageLevel") { val df = spark.range(10).repartition($"id" % 2) - val checkpointedDf = df.localCheckpoint(eager = true, Some(StorageLevel.DISK_ONLY)) + val checkpointedDf = df.localCheckpoint(eager = true, StorageLevel.DISK_ONLY) val checkpointedPlan = checkpointedDf.queryExecution.analyzed val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd assert(rdd.isCheckpointed) From 71fd29a58840c80b1eda37eba745cd35d28ef6ee Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Tue, 8 Oct 2024 17:23:11 +0200 Subject: [PATCH 10/11] also fix python --- python/pyspark/sql/classic/dataframe.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index cf56933a9e346..148a1de0a06de 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -363,16 +363,10 @@ def checkpoint(self, eager: bool = True) -> ParentDataFrame: def localCheckpoint( self, eager: bool = True, storageLevel: Optional[StorageLevel] = None ) -> ParentDataFrame: - gateway = self._sc._gateway - assert gateway is not None if storageLevel is None: - javaStorageLevelOpt = gateway.jvm.scala.Option.empty() + jdf = self._jdf.localCheckpoint(eager) else: - javaStorageLevelOpt = gateway.jvm.scala.Option.apply( - self._sc._getJavaStorageLevel(storageLevel) - ) - - jdf = self._jdf.localCheckpoint(eager, javaStorageLevelOpt) + jdf = self._jdf.localCheckpoint(eager, self._sc._getJavaStorageLevel(storageLevel)) return DataFrame(jdf, self.sparkSession) def withWatermark(self, eventTime: str, delayThreshold: str) -> ParentDataFrame: From 730cb77c63ec969cf80e662392863df7e1274bf1 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Wed, 9 Oct 2024 11:32:02 +0200 Subject: [PATCH 11/11] retrigger CI