Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/paimon-python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ on:
push:
pull_request:
paths-ignore:
- 'dev/**'
- 'java_based_implementation/paimon-python-java-bridge/**'
- '**/*.md'

env:
JDK_VERSION: 8

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true
Expand All @@ -37,6 +38,11 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v2
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- name: Run lint-python.sh
run: |
chmod +x dev/lint-python.sh
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ dev/.stage.txt
dev/download
dev/log
dist/
paimon_python.egg-info/
*.egg-info/
30 changes: 14 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,28 @@ This repo is for Apache Paimon Python SDK.
You can verify the setup by right-clicking on any file or folder in the flink-python project
and running "External Tools" → "flake8".

# Usage

## Java-Based Implementation
## Check

We can use `py4j` to leverage Java code to read Paimon data. This section describes how to use this implementation.
We provide script to check codes.

### Set Environment Variables
```shell
./dev/lint-python.sh # execute all checks
./dev/lint-python.sh -h # run this to see more usages
```

`py4j` need to access a JVM, so we should set JVM arguments (optional) and Java classpath. A convenient way is using
`os` packages to set environment variables which only affect current process.
## Build

```python
import os
We provide script to build wheel.

os.environ['PYPAIMON_JAVA_CLASSPATH'] = '/path/to/dependent_jars/*'
os.environ['_PYPAIMON_JVM_ARGS'] = 'jvm_arg1 jvm_arg2 ...'
```shell
./dev/build-wheels.sh
```

NOTE: the package has set paimon core and hadoop dependencies. If you just test in local or run code in hadoop, you doesn't
need to set classpath. If you need other dependencies such as OSS/S3 filesystem jars, or special catalog which isn't implemented
in paimon core, please download jars and set classpath.
The target wheel is under `dist/`

# Usage

# API Reference
TODO
See Apache Paimon Python API [Doc](https://paimon.apache.org/docs/master/program-api/python-api/).



Expand Down
25 changes: 25 additions & 0 deletions paimon_python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from .split import Split
from .table_read import TableRead
from .table_scan import TableScan, Plan
from .read_builder import ReadBuilder
from .commit_message import CommitMessage
from .table_commit import BatchTableCommit
from .table_write import BatchTableWrite
from .write_builder import BatchWriteBuilder
from .table import Table
from .catalog import Catalog

__all__ = [
'Split',
'TableRead',
'TableScan',
'Plan',
'ReadBuilder',
'CommitMessage',
'BatchTableCommit',
'BatchTableWrite',
'BatchWriteBuilder',
'Table',
'Catalog'
]
2 changes: 1 addition & 1 deletion paimon_python_api/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api.table import Table
from paimon_python_api import Table


class Catalog(ABC):
Expand Down
3 changes: 1 addition & 2 deletions paimon_python_api/read_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api.table_read import TableRead
from paimon_python_api.table_scan import TableScan
from paimon_python_api import TableRead, TableScan
from typing import List


Expand Down
3 changes: 1 addition & 2 deletions paimon_python_api/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api.read_builder import ReadBuilder
from paimon_python_api.write_builder import BatchWriteBuilder
from paimon_python_api import ReadBuilder, BatchWriteBuilder


class Table(ABC):
Expand Down
2 changes: 1 addition & 1 deletion paimon_python_api/table_commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api.commit_message import CommitMessage
from paimon_python_api import CommitMessage
from typing import List


Expand Down
7 changes: 4 additions & 3 deletions paimon_python_api/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
# limitations under the License.
#################################################################################

import pyarrow as pa

from abc import ABC, abstractmethod
from pyarrow import RecordBatchReader
from paimon_python_api.split import Split
from paimon_python_api import Split
from typing import List


class TableRead(ABC):
"""To read data from data splits."""

@abstractmethod
def create_reader(self, splits: List[Split]) -> RecordBatchReader:
def create_reader(self, splits: List[Split]) -> pa.RecordBatchReader:
"""Return a reader containing batches of pyarrow format."""
2 changes: 1 addition & 1 deletion paimon_python_api/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from abc import ABC, abstractmethod
from typing import List
from paimon_python_api.split import Split
from paimon_python_api import Split


class TableScan(ABC):
Expand Down
7 changes: 4 additions & 3 deletions paimon_python_api/table_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
# limitations under the License.
#################################################################################

import pyarrow as pa

from abc import ABC, abstractmethod
from paimon_python_api.commit_message import CommitMessage
from pyarrow import RecordBatch
from paimon_python_api import CommitMessage
from typing import List


class BatchTableWrite(ABC):
"""A table write for batch processing. Recommended for one-time committing."""

@abstractmethod
def write(self, record_batch: RecordBatch):
def write(self, record_batch: pa.RecordBatch):
""" Write a batch to the writer. */"""

@abstractmethod
Expand Down
3 changes: 1 addition & 2 deletions paimon_python_api/write_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#################################################################################

from abc import ABC, abstractmethod
from paimon_python_api.table_commit import BatchTableCommit
from paimon_python_api.table_write import BatchTableWrite
from paimon_python_api import BatchTableCommit, BatchTableWrite


class BatchWriteBuilder(ABC):
Expand Down
36 changes: 36 additions & 0 deletions paimon_python_java/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from .util import constants
from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead,
BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit)

__all__ = [
'constants',
'Catalog',
'Table',
'ReadBuilder',
'TableScan',
'Plan',
'Split',
'TableRead',
'BatchWriteBuilder',
'BatchTableWrite',
'CommitMessage',
'BatchTableCommit'
]
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
# limitations under the License.
################################################################################

import importlib
import os
import platform
import signal

from subprocess import Popen, PIPE
from java_based_implementation.util.constants import (PYPAIMON_JVM_ARGS, PYPAIMON_MAIN_ARGS,
PYPAIMON_MAIN_CLASS)
from java_based_implementation.util.setup_utils import get_classpath
from paimon_python_java import constants


def on_windows():
Expand All @@ -47,9 +46,9 @@ def launch_gateway_server_process(env):
java_executable = find_java_executable()
# TODO construct Java module log settings
log_settings = []
jvm_args = env.get(PYPAIMON_JVM_ARGS, '').split()
classpath = get_classpath(env)
main_args = env.get(PYPAIMON_MAIN_ARGS, '').split()
jvm_args = env.get(constants.PYPAIMON_JVM_ARGS, '').split()
classpath = _get_classpath(env)
main_args = env.get(constants.PYPAIMON_MAIN_ARGS, '').split()
command = [
java_executable,
*jvm_args,
Expand All @@ -60,7 +59,7 @@ def launch_gateway_server_process(env):
"-cp",
classpath,
"-c",
PYPAIMON_MAIN_CLASS,
constants.PYPAIMON_MAIN_CLASS,
*main_args
]

Expand All @@ -72,3 +71,20 @@ def preexec_func():
preexec_fn = preexec_func
return Popen(list(filter(lambda c: len(c) != 0, command)),
stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)


_JAVA_IMPL_MODULE = 'paimon_python_java'
_JAVA_DEPS = 'java_dependencies'
_JAVA_BRIDGE = 'paimon-python-java-bridge'


def _get_classpath(env):
user_defined = env.get(constants.PYPAIMON_JAVA_CLASSPATH)

module = importlib.import_module(_JAVA_IMPL_MODULE)
builtin_java_bridge = os.path.join(*module.__path__, _JAVA_DEPS, _JAVA_BRIDGE + '.jar')

if user_defined is None:
return builtin_java_bridge
else:
return os.pathsep.join([builtin_java_bridge, user_defined])
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
CallbackServerParameters)
from threading import RLock

from java_based_implementation.gateway_server import launch_gateway_server_process
from java_based_implementation.util.constants import PYPAIMON_CONN_INFO_PATH
from java_based_implementation.util.exceptions import install_py4j_hooks
from paimon_python_java.gateway_server import launch_gateway_server_process
from paimon_python_java import constants
from paimon_python_java.util.exceptions import install_py4j_hooks

_gateway = None
_lock = RLock()
Expand Down Expand Up @@ -73,7 +73,7 @@ def launch_gateway():
os.unlink(conn_info_file)

env = dict(os.environ)
env[PYPAIMON_CONN_INFO_PATH] = conn_info_file
env[constants.PYPAIMON_CONN_INFO_PATH] = conn_info_file

p = launch_gateway_server_process(env)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@

<groupId>org.apache.paimon</groupId>
<artifactId>paimon-python-java-bridge</artifactId>
<version>0.9-SNAPSHOT</version>
<version>0.1-SNAPSHOT</version>
<name>Paimon : Python-Java Bridge</name>

<packaging>jar</packaging>

<properties>
<paimon.version>0.9-SNAPSHOT</paimon.version>
<paimon.version>0.9.0</paimon.version>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
<py4j.version>0.10.9.7</py4j.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.13.0</spotless.version>
<spotless.delimiter>package</spotless.delimiter>
<arrow.version>14.0.0</arrow.version>
<target.java.version>1.8</target.java.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -137,6 +138,23 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
<!-- The semantics of this option are reversed, see MCOMPILER-209. -->
<useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>
<!-- Prevents recompilation due to missing package-info.class, see MCOMPILER-205 -->
<arg>-Xpkginfo:always</arg>
<arg>-Xlint:deprecation</arg>
</compilerArgs>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
Loading