Skip to content

Commit

Permalink
First version
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Dec 9, 2022
1 parent 9c68ca2 commit ad9dae5
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 36 deletions.
59 changes: 59 additions & 0 deletions .github/workflows/python-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: "Python CI"
on:
push:
branches:
- 'master'
- '0.**'
tags:
- 'apache-iceberg-**'
pull_request:
paths:
- '.github/workflows/python-ci.yml'
- 'python/**'

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}

jobs:
lint-and-test:
runs-on: ubuntu-20.04

steps:
- uses: actions/checkout@v3
- name: Install poetry
run: pip install poetry
- uses: actions/setup-python@v4
with:
python-version: '3.9'
cache: poetry
cache-dependency-path: |
./python/poetry.lock
- name: Install
working-directory: ./python
run: make install
- name: Run Apache-Spark setup
working-directory: ./python/dev
run: docker-compose up
- name: Tests
working-directory: ./python
run: make test-integration
11 changes: 6 additions & 5 deletions python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ lint:
poetry run pre-commit run --all-files

test:
poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m "not s3" ${PYTEST_ARGS}
poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m unmarked ${PYTEST_ARGS}
poetry run coverage report -m --fail-under=90
poetry run coverage html
poetry run coverage xml

test-s3:
sh ./dev/run-minio.sh
poetry run coverage run --source=pyiceberg/ -m pytest tests/ ${PYTEST_ARGS}
poetry run coverage report -m --fail-under=90
poetry run coverage html
poetry run coverage xml
poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m s3 ${PYTEST_ARGS}

test-integration:
docker-compose -f dev/docker-compose.yml up -d
poetry run coverage run --source=pyiceberg/ -m pytest tests/ -m integration ${PYTEST_ARGS}
5 changes: 3 additions & 2 deletions python/dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ RUN apt-get update && \
# Optional env variables
ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"}
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.5-src.zip:$PYTHONPATH

RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events
WORKDIR ${SPARK_HOME}
Expand All @@ -39,14 +40,14 @@ RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-cl

COPY spark-defaults.conf /opt/spark/conf
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"
ENV SPARK_HOME="/opt/spark"

RUN chmod u+x /opt/spark/sbin/* && \
chmod u+x /opt/spark/bin/*

COPY entrypoint.sh .
COPY provision.py .

COPY data/ data/

ENTRYPOINT ["./entrypoint.sh"]
CMD ["notebook"]
CMD ["notebook"]
20 changes: 19 additions & 1 deletion python/dev/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

start-master.sh -p 7077
start-worker.sh spark://spark-iceberg:7077
start-history-server.sh

tail -f /dev/null
python3 ./provision.py
43 changes: 21 additions & 22 deletions python/dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time


from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType

spark = SparkSession.builder.getOrCreate()

spark.sql("""
CREATE DATABASE IF NOT EXISTS default;
""")
print("Create database")

spark.sql(
"""
CREATE DATABASE default;
"""
)

print("Create beers table")

df = spark.read.csv('/opt/spark/data/beers.csv', header=True)
df = spark.read.csv("/opt/spark/data/beers.csv", header=True)

df = (
df.withColumn("abv", df.abv.cast(DoubleType()))
Expand All @@ -35,27 +41,20 @@
.withColumn("brewery_id", df.brewery_id.cast(IntegerType()))
.withColumn("ounces", df.ounces.cast(DoubleType()))
# Inject a NaN which is nice for testing
.withColumn("ibu", F.when(F.col("beer_id") == 2546, float('NaN')).otherwise(F.col("ibu")))
.withColumn("ibu", F.when(F.col("beer_id") == 2546, float("NaN")).otherwise(F.col("ibu")))
)

df.write.mode('overwrite').saveAsTable("default.beers")
df.write.saveAsTable("default.beers")

spark.sql("""
CALL system.rewrite_data_files(table => 'default.beers', strategy => 'sort', sort_order => 'brewery_id')
""").show()
print("Create breweries table")

df = spark.read.csv('/opt/spark/data/breweries.csv', header=True)
df = spark.read.csv("/opt/spark/data/breweries.csv", header=True)

df = (
df.withColumn("brewery_id", df.brewery_id.cast(IntegerType()))
)
df = df.withColumn("brewery_id", df.brewery_id.cast(IntegerType()))

df.write.mode('overwrite').saveAsTable("default.breweries")
df.write.saveAsTable("default.breweries")

spark.sql("""
ALTER TABLE default.breweries ADD PARTITION FIELD state
""").show()
print("Done!")

spark.sql("""
CALL system.rewrite_data_files('default.breweries')
""").show()
while True:
time.sleep(1)
2 changes: 1 addition & 1 deletion python/dev/spark-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ spark.sql.defaultCatalog demo
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
spark.sql.catalogImplementation in-memory
spark.sql.catalogImplementation in-memory
2 changes: 1 addition & 1 deletion python/pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def load_catalog(name: str, **properties: Optional[str]) -> Catalog:
catalog_type = infer_catalog_type(name, conf)

if catalog_type:
return AVAILABLE_CATALOGS[catalog_type](name, cast(dict[str, str], conf))
return AVAILABLE_CATALOGS[catalog_type](name, conf)

raise ValueError(f"Could not initialize catalog with the following properties: {properties}")

Expand Down
6 changes: 3 additions & 3 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ def name(self) -> Identifier:
def scan(
self,
row_filter: Optional[BooleanExpression] = None,
selected_fields: Tuple[str] = ("*",),
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
) -> TableScan[Any]:
) -> DataScan:
return DataScan(
table=self,
row_filter=row_filter or AlwaysTrue(),
Expand Down Expand Up @@ -169,7 +169,7 @@ def __eq__(self, other: Any) -> bool:
class TableScan(Generic[S], ABC):
table: Table
row_filter: BooleanExpression
selected_fields: Tuple[str]
selected_fields: Tuple[str, ...]
case_sensitive: bool
snapshot_id: Optional[int]
options: Properties
Expand Down
7 changes: 6 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ glue = ["boto3"]

[tool.pytest.ini_options]
markers = [
"s3: marks a test as requiring access to s3 compliant storage (use with --aws-access-key-id, ----aws-secret-access-key, and --endpoint-url args)"
"s3: marks a test as requiring access to s3 compliant storage (use with --aws-access-key-id, ----aws-secret-access-key, and --endpoint-url args)",
"integration: marks integration tests against Apache Spark"
]

[tool.black]
Expand Down Expand Up @@ -214,5 +215,9 @@ ignore_missing_imports = true
module = "pyparsing.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "pyspark.*"
ignore_missing_imports = true

[tool.coverage.run]
source = ['pyiceberg/']
68 changes: 68 additions & 0 deletions python/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name

import math

import pytest

from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.expressions import IsNaN, NotNaN
from pyiceberg.table import Table


@pytest.fixture()
def catalog() -> Catalog:
return load_catalog(
"local",
**{
"type": "rest",
"uri": f"http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
)


@pytest.fixture()
def table_beers(catalog: Catalog) -> Table:
return catalog.load_table("default.beers")


@pytest.mark.integration
def test_pyarrow_nan(table_beers: Table) -> None:
"""To check if we detect NaN values properly"""
arrow_table = table_beers.scan(row_filter=IsNaN("ibu"), selected_fields=("beer_id", "ibu")).to_arrow()
assert arrow_table[0][0].as_py() == 2546
assert math.isnan(arrow_table[1][0].as_py())


@pytest.mark.integration
def test_pyarrow_not_nan_count(table_beers: Table) -> None:
"""To check if exclude NaN values properly"""
not_nan = table_beers.scan(row_filter=NotNaN("ibu"), selected_fields=("beer_id", "ibu")).to_arrow()
total = table_beers.scan(selected_fields=("beer_id", "ibu")).to_arrow()
assert len(total) - 1 == len(not_nan)


@pytest.mark.integration
@pytest.mark.skip(reason="Seems to be a bug in the PyArrow to DuckDB conversion")
def test_duckdb_nan(table_beers: Table) -> None:
"""To check if we detect NaN values properly"""
con = table_beers.scan(row_filter=IsNaN("ibu"), selected_fields=("beer_id", "ibu")).to_duckdb("beers")
assert con.query("SELECT beer_id, ibu FROM beers WHERE ibu = 'NaN'").fetchone() == (2546, float("NaN"))

0 comments on commit ad9dae5

Please sign in to comment.