Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ jobs:
\"lint\" : \"true\",
\"k8s-integration-tests\" : \"true\",
\"breaking-changes-buf\" : \"true\",
\"golang\" : \"true\",
}"
echo $precondition # For debugging
# Remove `\n` to avoid "Invalid format" error
Expand Down Expand Up @@ -530,6 +531,95 @@ jobs:
against: 'https://github.com/apache/spark.git#branch=branch-3.4,subdir=connector/connect/common/src/main'


# Static analysis, and documentation build
golang:
needs: [precondition, infra-image]
# always run if lint == 'true', even infra-image is skip (such as non-master job)
if: always() && fromJson(needs.precondition.outputs.required).golang == 'true'
name: Golang
runs-on: ubuntu-22.04
env:
LC_ALL: C.UTF-8
LANG: C.UTF-8
PYSPARK_DRIVER_PYTHON: python3.9
PYSPARK_PYTHON: python3.9
GITHUB_PREV_SHA: ${{ github.event.before }}
container:
image: ${{ needs.precondition.outputs.image_url }}
steps:
- name: Checkout Spark repository
uses: actions/checkout@v3
with:
fetch-depth: 0
repository: apache/spark
ref: ${{ inputs.branch }}
- name: Add GITHUB_WORKSPACE to git trust safe.directory
run: |
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Sync the current branch with the latest in Apache Spark
if: github.repository != 'apache/spark'
run: |
echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF#refs/heads/}
git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' merge --no-commit --progress --squash FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='sparktestacc@gmail.com' commit -m "Merged commit" --allow-empty
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT and Maven
uses: actions/cache@v3
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-
- name: Cache Coursier local repository
uses: actions/cache@v3
with:
path: ~/.cache/coursier
key: docs-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
restore-keys: |
docs-coursier-
- name: Cache Maven local repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: docs-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
docs-maven-
- name: Install Java 8
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
- name: Install Golang
run: |
sudo apt update
sudo apt-get install -y golang-go
- name: Install Python linter dependencies
run: |
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
# Jinja2 3.0.0+ causes error when building with Sphinx.
# See also https://issues.apache.org/jira/browse/SPARK-35375.
python3.9 -m pip install 'flake8==3.9.0' pydata_sphinx_theme 'mypy==0.920' 'pytest==7.1.3' 'pytest-mypy-plugins==1.9.3' numpydoc 'jinja2<3.0.0' 'black==22.6.0'
python3.9 -m pip install 'pandas-stubs==1.2.0.53' ipython 'grpcio==1.48.1' 'grpc-stubs==1.24.11' 'googleapis-common-protos-stubs==2.2.0'
- name: Install dependencies for Python code generation
run: |
# See more in "Installation" https://docs.buf.build/installation#tarball
curl -LO https://github.com/bufbuild/buf/releases/download/v1.18.0/buf-Linux-x86_64.tar.gz
mkdir -p $HOME/buf
tar -xvzf buf-Linux-x86_64.tar.gz -C $HOME/buf --strip-components 1
python3.9 -m pip install 'protobuf==3.19.5' 'mypy-protobuf==3.3.0'
- name: Run Build & Test
run: |
export PATH=$PATH:$HOME/buf/bin
cd connector/connect/client/go
make
make fulltest

# Static analysis, and documentation build
lint:
needs: [precondition, infra-image]
Expand Down
8 changes: 8 additions & 0 deletions connector/connect/client/go/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# All generated files
internal/generated

cmd/spark-connect-example-raw-grpc-client/spark-connect-example-raw-grpc-client
cmd/spark-connect-example-spark-session/spark-connect-example-spark-session

# Ignore Coverage Files
coverage*
94 changes: 94 additions & 0 deletions connector/connect/client/go/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
FIRST_GOPATH := $(firstword $(subst :, ,$(GOPATH)))
PKGS := $(shell go list ./... | grep -v /tests | grep -v /xcpb | grep -v /gpb)
GOFILES_NOVENDOR := $(shell find . -name vendor -prune -o -type f -name '*.go' -not -name '*.pb.go' -print)
GOFILES_BUILD := $(shell find . -type f -name '*.go' -not -name '*_test.go')
PROTOFILES := $(shell find . -name vendor -prune -o -type f -name '*.proto' -print)

ALLGOFILES := $(shell find . -type f -name '*.go')
DATE := $(shell date -u -d "@$(SOURCE_DATE_EPOCH)" '+%FT%T%z' 2>/dev/null || date -u '+%FT%T%z')

BUILDFLAGS_NOPIE :=
#BUILDFLAGS_NOPIE := -trimpath -ldflags="-s -w -X main.version=$(GOPASS_VERSION) -X main.commit=$(GOPASS_REVISION) -X main.date=$(DATE)" -gcflags="-trimpath=$(GOPATH)" -asmflags="-trimpath=$(GOPATH)"
BUILDFLAGS ?= $(BUILDFLAGS_NOPIE) -buildmode=pie
TESTFLAGS ?=
PWD := $(shell pwd)
PREFIX ?= $(GOPATH)
BINDIR ?= $(PREFIX)/bin
GO := GO111MODULE=on go
GOOS ?= $(shell go version | cut -d' ' -f4 | cut -d'/' -f1)
GOARCH ?= $(shell go version | cut -d' ' -f4 | cut -d'/' -f2)
TAGS ?= netgo
SHELL = bash

BINARIES := cmd/spark-connect-example-raw-grpc-client/spark-connect-example-raw-grpc-client cmd/spark-connect-example-spark-session/spark-connect-example-spark-session

# Define the location of SPARK_HOME because we need that to depend on the build paths
MAKEFILE_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))

PROTO_SRC = $(shell find internal/generated -type f -name *.proto )
REGEN_SCRIPT = $(MAKEFILE_DIR)/../../../../dev/connect-gen-protos.sh


OK := $(shell tput setaf 6; echo ' [OK]'; tput sgr0;)

all: build

build: $(BUILD_OUTPUT) $(BINARIES)

internal/generated.out:
@echo -n ">> BUILD, output = $@"
@$(REGEN_SCRIPT)
@touch internal/generated.out
@printf '%s\n' '$(OK)'

$(GOFILES_BUILD): internal/generated.out

$(BUILD_OUTPUT): $(GOFILES_BUILD)
@echo -n ">> BUILD, output = $@"
@$(GO) build -o $@ $(BUILDFLAGS)
@printf '%s\n' '$(OK)'


lint: $(BUILD_OUTPUT)
@golangci-lint run

fmt:
@gofumpt -extra -w $(ALLGOFILES)

test: $(BUILD_OUTPUT)
@echo ">> TEST, \"verbose\""
@$(foreach pkg, $(PKGS),\
@echo -n " ";\
$(GO) test -v -run '(Test|Example)' $(BUILDFLAGS) $(TESTFLAGS) $(pkg) || exit 1)

fulltest: $(BUILD_OUTPUT)
@echo ">> TEST, \"coverage\""
@echo "mode: atomic" > coverage-all.out
@$(foreach pkg, $(PKGS),\
echo -n " ";\
go test -run '(Test|Example)' $(BUILDFLAGS) $(TESTFLAGS) -coverprofile=coverage.out -covermode=atomic $(pkg) || exit 1;\
tail -n +2 coverage.out >> coverage-all.out;)
@$(GO) tool cover -html=coverage-all.out -o coverage-all.html


cmd/spark-connect-example-raw-grpc-client/spark-connect-example-raw-grpc-client: $(GOFILES_BUILD)
@echo ">> BUILD, output = $@"
@cd $(dir $@) && $(GO) build -o $(notdir $@) $(BUILDFLAGS)
@printf '%s\n' '$(OK)'

cmd/spark-connect-example-spark-session/spark-connect-example-spark-session: $(GOFILES_BUILD)
@echo ">> BUILD, output = $@"
@cd $(dir $@) && $(GO) build -o $(notdir $@) $(BUILDFLAGS)
@printf '%s\n' '$(OK)'


clean:
@echo -n ">> CLEAN"
@$(GO) clean -i ./...
@rm -rf ./internal/generated
@rm -f ./internal/generated.out
@rm -f ./coverage-all.html
@rm -f ./coverage-all.out
@rm -f ./coverage.out
@find . -type f -name "coverage.out" -delete
@printf '%s\n' '$(OK)'
99 changes: 99 additions & 0 deletions connector/connect/client/go/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
## Summary

This folder contains Spark Connect Go client implementation. People could reference to Go module (library) in this folder
and write Spark Connect Go application connecting to a remote Spark driver (Spark Connect server).

## Spark Connect Go Application Example

A very simple example in Go looks like following:

```
func main() {
remote := "localhost:15002"
spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
defer spark.Stop()

df, _ := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
df.Show(100, false)
}
```

## High Level Design

Following [diagram](https://textik.com/#ac299c8f32c4c342) shows main code in current prototype:

```
+-------------------+
| |
| dataFrameImpl |
| |
+-------------------+
|
|
+
+-------------------+
| |
| sparkSessionImpl |
| |
+-------------------+
|
|
+
+---------------------------+ +----------------+
| | | |
| SparkConnectServiceClient |--------------+| Spark Driver |
| | | |
+---------------------------+ +----------------+

```

`SparkConnectServiceClient` is GRPC client which talks to Spark Driver. `sparkSessionImpl` generates `dataFrameImpl`
instances. `dataFrameImpl` uses the GRPC client in `sparkSessionImpl` to communicate with Spark Driver.

We will mimic the logic in Spark Connect Scala implementation, and adopt Go common practices, e.g. returning `error` object for
error handling.

## How to Run Spark Connect Go Application

1. Install Golang: https://go.dev/doc/install.

2. Download Spark distribution (3.4.0+), unzip the folder.

3. Start Spark Connect server by running command:

```
sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.4.0
```

4. In this repo, `cd connector/connect/client/go`, run Go application:

```
go run examples/spark-connect-example-spark-session/main.go
```

## How to Generate protobuf Go Binding

```
make internal/generated.out
```

## Inner Development Loop

Build all targets:

```
make
```

Run all tests:

```
make test
```


Run all tests with code coverage:

```
make fulltest
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
proto "github.com/apache/spark/connector/connect/client/go/v_3_4/internal/generated"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)

var (
remote = flag.String("remote", "localhost:15002", "the remote address of Spark Connect server to connect to")
)

func main() {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
}

conn, err := grpc.Dial(*remote, opts...)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
defer conn.Close()

client := proto.NewSparkConnectServiceClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

configRequest := proto.ConfigRequest{
SessionId: uuid.NewString(),
Operation: &proto.ConfigRequest_Operation{
OpType: &proto.ConfigRequest_Operation_GetAll{
GetAll: &proto.ConfigRequest_GetAll{},
},
},
}
configResponse, err := client.Config(ctx, &configRequest)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}

log.Printf("configResponse: %v", configResponse)
}
Loading