Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/build_infra_images_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ on:
- 'dev/spark-test-image/python-311/Dockerfile'
- 'dev/spark-test-image/python-312/Dockerfile'
- 'dev/spark-test-image/python-313/Dockerfile'
- 'dev/spark-test-image/python-313-nogil/Dockerfile'
- 'dev/spark-test-image/numpy-213/Dockerfile'
- '.github/workflows/build_infra_images_cache.yml'
# Create infra image when cutting down branches/tags
Expand Down Expand Up @@ -216,6 +217,19 @@ jobs:
- name: Image digest (PySpark with Python 3.13)
if: hashFiles('dev/spark-test-image/python-313/Dockerfile') != ''
run: echo ${{ steps.docker_build_pyspark_python_313.outputs.digest }}
- name: Build and push (PySpark with Python 3.13 no GIL)
if: hashFiles('dev/spark-test-image/python-313-nogil/Dockerfile') != ''
id: docker_build_pyspark_python_313_nogil
uses: docker/build-push-action@v6
with:
context: ./dev/spark-test-image/python-313-nogil/
push: true
tags: ghcr.io/apache/spark/apache-spark-github-action-image-pyspark-python-313-nogil-cache:${{ github.ref_name }}-static
cache-from: type=registry,ref=ghcr.io/apache/spark/apache-spark-github-action-image-pyspark-python-313-nogil-cache:${{ github.ref_name }}
cache-to: type=registry,ref=ghcr.io/apache/spark/apache-spark-github-action-image-pyspark-python-313-nogil-cache:${{ github.ref_name }},mode=max
- name: Image digest (PySpark with Python 3.13 no GIL)
if: hashFiles('dev/spark-test-image/python-313-nogil/Dockerfile') != ''
run: echo ${{ steps.docker_build_pyspark_python_313_nogil.outputs.digest }}
- name: Build and push (PySpark with Numpy 2.1.3)
if: hashFiles('dev/spark-test-image/numpy-213/Dockerfile') != ''
id: docker_build_pyspark_numpy_213
Expand Down
48 changes: 48 additions & 0 deletions .github/workflows/build_python_3.13_nogil.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: "Build / Python-only (master, Python 3.13 no GIL)"

on:
schedule:
- cron: '0 19 */3 * *'
workflow_dispatch:

jobs:
run-build:
permissions:
packages: write
name: Run
uses: ./.github/workflows/build_and_test.yml
if: github.repository == 'apache/spark'
with:
java: 17
branch: master
hadoop: hadoop3
envs: >-
{
"PYSPARK_IMAGE_TO_TEST": "python-313-nogil",
"PYTHON_TO_TEST": "python3.13t",
"PYTHON_GIL": "0"
}
jobs: >-
{
"pyspark": "true",
"pyspark-pandas": "true"
}
80 changes: 80 additions & 0 deletions dev/spark-test-image/python-313-nogil/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Image for building and testing Spark branches. Based on Ubuntu 22.04.
# See also in https://hub.docker.com/_/ubuntu
FROM ubuntu:jammy-20240911.1
LABEL org.opencontainers.image.authors="Apache Spark project <dev@spark.apache.org>"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.ref.name="Apache Spark Infra Image For PySpark with Python 3.13 (no GIL)"
# Overwrite this label to avoid exposing the underlying Ubuntu OS version label
LABEL org.opencontainers.image.version=""

ENV FULL_REFRESH_DATE=20250407

ENV DEBIAN_FRONTEND=noninteractive
ENV DEBCONF_NONINTERACTIVE_SEEN=true

RUN apt-get update && apt-get install -y \
build-essential \
ca-certificates \
curl \
gfortran \
git \
gnupg \
libcurl4-openssl-dev \
libfontconfig1-dev \
libfreetype6-dev \
libfribidi-dev \
libgit2-dev \
libharfbuzz-dev \
libjpeg-dev \
liblapack-dev \
libopenblas-dev \
libpng-dev \
libpython3-dev \
libssl-dev \
libtiff5-dev \
libxml2-dev \
openjdk-17-jdk-headless \
pkg-config \
qpdf \
tzdata \
software-properties-common \
wget \
zlib1g-dev

# Install Python 3.13 (no GIL)
RUN add-apt-repository ppa:deadsnakes/ppa
RUN apt-get update && apt-get install -y \
python3.13-nogil \
&& apt-get autoremove --purge -y \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*


ARG BASIC_PIP_PKGS="numpy pyarrow>=19.0.0 six==1.16.0 pandas==2.2.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
ARG CONNECT_PIP_PKGS="grpcio==1.67.0 grpcio-status==1.67.0 protobuf==5.29.1 googleapis-common-protos==1.65.0 graphviz==0.20.3"


# Install Python 3.13 packages
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.13t
# TODO: Add BASIC_PIP_PKGS and CONNECT_PIP_PKGS when it supports Python 3.13 free threaded
# TODO: Add lxml, grpcio, grpcio-status back when they support Python 3.13 free threaded
RUN python3.13t -m pip install --ignore-installed blinker>=1.6.2 # mlflow needs this
RUN python3.13t -m pip install numpy>=2.1 pyarrow>=19.0.0 six==1.16.0 pandas==2.2.3 scipy coverage matplotlib openpyxl jinja2 && \
python3.13t -m pip cache purge
16 changes: 9 additions & 7 deletions python/pyspark/errors/exceptions/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pyspark.sql.connect.proto as pb2
import json
from typing import Dict, List, Optional, TYPE_CHECKING

Expand Down Expand Up @@ -43,6 +42,7 @@
)

if TYPE_CHECKING:
import pyspark.sql.connect.proto as pb2
from google.rpc.error_details_pb2 import ErrorInfo


Expand All @@ -55,7 +55,7 @@ class SparkConnectException(PySparkException):
def convert_exception(
info: "ErrorInfo",
truncated_message: str,
resp: Optional[pb2.FetchErrorDetailsResponse],
resp: Optional["pb2.FetchErrorDetailsResponse"],
display_server_stacktrace: bool = False,
) -> SparkConnectException:
converted = _convert_exception(info, truncated_message, resp, display_server_stacktrace)
Expand All @@ -65,9 +65,11 @@ def convert_exception(
def _convert_exception(
info: "ErrorInfo",
truncated_message: str,
resp: Optional[pb2.FetchErrorDetailsResponse],
resp: Optional["pb2.FetchErrorDetailsResponse"],
display_server_stacktrace: bool = False,
) -> SparkConnectException:
import pyspark.sql.connect.proto as pb2

raw_classes = info.metadata.get("classes")
classes: List[str] = json.loads(raw_classes) if raw_classes else []
sql_state = info.metadata.get("sqlState")
Expand Down Expand Up @@ -139,13 +141,13 @@ def _convert_exception(
)


def _extract_jvm_stacktrace(resp: pb2.FetchErrorDetailsResponse) -> str:
def _extract_jvm_stacktrace(resp: "pb2.FetchErrorDetailsResponse") -> str:
if len(resp.errors[resp.root_error_idx].stack_trace) == 0:
return ""

lines: List[str] = []

def format_stacktrace(error: pb2.FetchErrorDetailsResponse.Error) -> None:
def format_stacktrace(error: "pb2.FetchErrorDetailsResponse.Error") -> None:
message = f"{error.error_type_hierarchy[0]}: {error.message}"
if len(lines) == 0:
lines.append(error.error_type_hierarchy[0])
Expand Down Expand Up @@ -404,7 +406,7 @@ class PickleException(SparkConnectGrpcException, BasePickleException):


class SQLQueryContext(BaseQueryContext):
def __init__(self, q: pb2.FetchErrorDetailsResponse.QueryContext):
def __init__(self, q: "pb2.FetchErrorDetailsResponse.QueryContext"):
self._q = q

def contextType(self) -> QueryContextType:
Expand Down Expand Up @@ -441,7 +443,7 @@ def summary(self) -> str:


class DataFrameQueryContext(BaseQueryContext):
def __init__(self, q: pb2.FetchErrorDetailsResponse.QueryContext):
def __init__(self, q: "pb2.FetchErrorDetailsResponse.QueryContext"):
self._q = q

def contextType(self) -> QueryContextType:
Expand Down
4 changes: 0 additions & 4 deletions python/pyspark/ml/connect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
#

"""Spark Connect Python Client - ML module"""
from pyspark.sql.connect.utils import check_dependencies

check_dependencies(__name__)

from pyspark.ml.connect.base import (
Estimator,
Transformer,
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/ml/connect/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
HasFeaturesCol,
HasPredictionCol,
)
from pyspark.ml.connect.util import transform_dataframe_column

if TYPE_CHECKING:
from pyspark.ml._typing import ParamMap
Expand Down Expand Up @@ -188,6 +187,8 @@ def transform(
return self._transform(dataset)

def _transform(self, dataset: Union[DataFrame, pd.DataFrame]) -> Union[DataFrame, pd.DataFrame]:
from pyspark.ml.connect.util import transform_dataframe_column

input_cols = self._input_columns()
transform_fn = self._get_transform_fn()
output_cols = self._output_columns()
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/ml/connect/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from pyspark.ml.param.shared import HasLabelCol, HasPredictionCol, HasProbabilityCol
from pyspark.ml.connect.base import Evaluator
from pyspark.ml.connect.io_utils import ParamsReadWrite
from pyspark.ml.connect.util import aggregate_dataframe
from pyspark.sql import DataFrame


Expand Down Expand Up @@ -56,6 +55,8 @@ def _get_metric_update_inputs(self, dataset: "pd.DataFrame") -> Tuple[Any, Any]:
raise NotImplementedError()

def _evaluate(self, dataset: Union["DataFrame", "pd.DataFrame"]) -> float:
from pyspark.ml.connect.util import aggregate_dataframe

torch_metric = self._get_torch_metric()

def local_agg_fn(pandas_df: "pd.DataFrame") -> "pd.DataFrame":
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/ml/connect/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
)
from pyspark.ml.connect.base import Estimator, Model, Transformer
from pyspark.ml.connect.io_utils import ParamsReadWrite, CoreModelReadWrite
from pyspark.ml.connect.summarizer import summarize_dataframe


class MaxAbsScaler(Estimator, HasInputCol, HasOutputCol, ParamsReadWrite):
Expand Down Expand Up @@ -81,6 +80,8 @@ def __init__(self, *, inputCol: Optional[str] = None, outputCol: Optional[str] =
self._set(**kwargs)

def _fit(self, dataset: Union["pd.DataFrame", "DataFrame"]) -> "MaxAbsScalerModel":
from pyspark.ml.connect.summarizer import summarize_dataframe

input_col = self.getInputCol()

stat_res = summarize_dataframe(dataset, input_col, ["min", "max", "count"])
Expand Down Expand Up @@ -197,6 +198,8 @@ def __init__(self, inputCol: Optional[str] = None, outputCol: Optional[str] = No
self._set(**kwargs)

def _fit(self, dataset: Union[DataFrame, pd.DataFrame]) -> "StandardScalerModel":
from pyspark.ml.connect.summarizer import summarize_dataframe

input_col = self.getInputCol()

stat_result = summarize_dataframe(dataset, input_col, ["mean", "std", "count"])
Expand Down
11 changes: 10 additions & 1 deletion python/pyspark/ml/connect/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@

from pyspark.ml import functions as PyMLFunctions
from pyspark.sql.column import Column
from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col, lit


if TYPE_CHECKING:
from pyspark.sql._typing import UserDefinedFunctionLike


def vector_to_array(col: Column, dtype: str = "float64") -> Column:
from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col, lit

return _invoke_function("vector_to_array", _to_col(col), lit(dtype))


vector_to_array.__doc__ = PyMLFunctions.vector_to_array.__doc__


def array_to_vector(col: Column) -> Column:
from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col

return _invoke_function("array_to_vector", _to_col(col))


Expand All @@ -49,6 +53,11 @@ def predict_batch_udf(*args: Any, **kwargs: Any) -> "UserDefinedFunctionLike":
def _test() -> None:
import os
import sys

if os.environ.get("PYTHON_GIL", "?") == "0":
print("Not supported in no-GIL mode", file=sys.stderr)
sys.exit(0)

import doctest
from pyspark.sql import SparkSession as PySparkSession
import pyspark.ml.connect.functions
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/ml/connect/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.connect.utils import check_dependencies

check_dependencies(__name__)

from typing import Optional, TYPE_CHECKING, List

import pyspark.sql.connect.proto as pb2
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/ml/connect/readwrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.connect.utils import check_dependencies

check_dependencies(__name__)

from typing import cast, Type, TYPE_CHECKING, Union, Dict, Any

Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/ml/connect/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from pyspark.sql.connect.utils import check_dependencies

check_dependencies(__name__)

from typing import Any, List, TYPE_CHECKING, Mapping, Dict

import pyspark.sql.connect.proto as pb2
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/ml/connect/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
# limitations under the License.
#

from typing import Any, TypeVar, Callable, List, Tuple, Union, Iterable
from typing import Any, TypeVar, Callable, List, Tuple, Union, Iterable, TYPE_CHECKING

import pandas as pd

from pyspark import cloudpickle
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, pandas_udf
import pyspark.sql.connect.proto as pb2

if TYPE_CHECKING:
import pyspark.sql.connect.proto as pb2

FuncT = TypeVar("FuncT", bound=Callable[..., Any])

Expand Down Expand Up @@ -180,6 +182,8 @@ def transform_fn_pandas_udf(*s: "pd.Series") -> "pd.Series":

def _extract_id_methods(obj_identifier: str) -> Tuple[List["pb2.Fetch.Method"], str]:
"""Extract the obj reference id and the methods. Eg, model.summary"""
import pyspark.sql.connect.proto as pb2

method_chain = obj_identifier.split(".")
obj_ref = method_chain[0]
methods: List["pb2.Fetch.Method"] = []
Expand Down
Loading