Skip to content

Commit 644351c

Browse files
committed
Merge branch 'main' of github.com:apache/iceberg-python into fd-integrate-residuals
2 parents fafdb68 + bdb7c7d commit 644351c

40 files changed

+2710
-1858
lines changed

Makefile

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919
help: ## Display this help
2020
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-20s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)
2121

22-
install-poetry: ## Install poetry if the user has not done that yet.
23-
@if ! command -v poetry &> /dev/null; then \
24-
echo "Poetry could not be found. Installing..."; \
25-
pip install --user poetry==2.0.1; \
26-
else \
27-
echo "Poetry is already installed."; \
28-
fi
22+
POETRY_VERSION = 2.0.1
23+
install-poetry: ## Ensure Poetry is installed and the correct version is being used.
24+
@if ! command -v poetry &> /dev/null; then \
25+
echo "Poetry could not be found. Installing..."; \
26+
pip install --user poetry==$(POETRY_VERSION); \
27+
else \
28+
INSTALLED_VERSION=$$(pip show poetry | grep Version | awk '{print $$2}'); \
29+
if [ "$$INSTALLED_VERSION" != "$(POETRY_VERSION)" ]; then \
30+
echo "Poetry version $$INSTALLED_VERSION does not match required version $(POETRY_VERSION). Updating..."; \
31+
pip install --user --upgrade poetry==$(POETRY_VERSION); \
32+
else \
33+
echo "Poetry version $$INSTALLED_VERSION is already installed."; \
34+
fi \
35+
fi
2936

3037
install-dependencies: ## Install dependencies including dev, docs, and all extras
3138
poetry install --all-extras

dev/Dockerfile

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,23 @@ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/
3737
WORKDIR ${SPARK_HOME}
3838

3939
# Remember to also update `tests/conftest`'s spark setting
40-
ENV SPARK_VERSION=3.5.3
40+
ENV SPARK_VERSION=3.5.4
4141
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
42-
ENV ICEBERG_VERSION=1.6.0
42+
ENV ICEBERG_VERSION=1.8.0
4343
ENV PYICEBERG_VERSION=0.8.1
4444

4545
RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
4646
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
4747
&& rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz
4848

4949
# Download iceberg spark runtime
50-
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \
51-
&& mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars
50+
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \
51+
-Lo /opt/spark/jars/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar
52+
5253

5354
# Download AWS bundle
54-
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
55+
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
56+
-Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
5557

5658
COPY spark-defaults.conf /opt/spark/conf
5759
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"

mkdocs/docs/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
- [Verify a release](verify-release.md)
3131
- [How to release](how-to-release.md)
3232
- [Release Notes](https://github.com/apache/iceberg-python/releases)
33+
- [Nightly Build](nightly-build.md)
3334
- [Code Reference](reference/)
3435

3536
<!-- markdown-link-check-enable-->

mkdocs/docs/api.md

Lines changed: 202 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ catalog:
4949

5050
and loaded in python by calling `load_catalog(name="hive")` and `load_catalog(name="rest")`.
5151

52-
This information must be placed inside a file called `.pyiceberg.yaml` located either in the `$HOME` or `%USERPROFILE%` directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the `$PYICEBERG_HOME` directory (if the corresponding environment variable is set).
52+
This information must be placed inside a file called `.pyiceberg.yaml` located either in the `$HOME` or `%USERPROFILE%` directory (depending on whether the operating system is Unix-based or Windows-based, respectively), in the current working directory, or in the `$PYICEBERG_HOME` directory (if the corresponding environment variable is set).
5353

5454
For more details on possible configurations refer to the [specific page](https://py.iceberg.apache.org/configuration/).
5555

@@ -474,6 +474,71 @@ lat: [[48.864716],[52.371807],[53.11254],[37.773972]]
474474
long: [[2.349014],[4.896029],[6.0989],[-122.431297]]
475475
```
476476

477+
### Upsert
478+
479+
PyIceberg supports upsert operations, meaning that it is able to merge an Arrow table into an Iceberg table. Rows are considered the same based on the [identifier field](https://iceberg.apache.org/spec/?column-projection#identifier-field-ids). If a row is already in the table, it will update that row. If a row cannot be found, it will insert that new row.
480+
481+
Consider the following table, with some data:
482+
483+
```python
484+
from pyiceberg.schema import Schema
485+
from pyiceberg.types import IntegerType, NestedField, StringType
486+
487+
import pyarrow as pa
488+
489+
schema = Schema(
490+
NestedField(1, "city", StringType(), required=True),
491+
NestedField(2, "inhabitants", IntegerType(), required=True),
492+
# Mark City as the identifier field, also known as the primary-key
493+
identifier_field_ids=[1]
494+
)
495+
496+
tbl = catalog.create_table("default.cities", schema=schema)
497+
498+
arrow_schema = pa.schema(
499+
[
500+
pa.field("city", pa.string(), nullable=False),
501+
pa.field("inhabitants", pa.int32(), nullable=False),
502+
]
503+
)
504+
505+
# Write some data
506+
df = pa.Table.from_pylist(
507+
[
508+
{"city": "Amsterdam", "inhabitants": 921402},
509+
{"city": "San Francisco", "inhabitants": 808988},
510+
{"city": "Drachten", "inhabitants": 45019},
511+
{"city": "Paris", "inhabitants": 2103000},
512+
],
513+
schema=arrow_schema
514+
)
515+
tbl.append(df)
516+
```
517+
518+
Next, we'll upsert a table into the Iceberg table:
519+
520+
```python
521+
df = pa.Table.from_pylist(
522+
[
523+
# Will be updated, the inhabitants has been updated
524+
{"city": "Drachten", "inhabitants": 45505},
525+
526+
# New row, will be inserted
527+
{"city": "Berlin", "inhabitants": 3432000},
528+
529+
# Ignored, already exists in the table
530+
{"city": "Paris", "inhabitants": 2103000},
531+
],
532+
schema=arrow_schema
533+
)
534+
upd = tbl.upsert(df)
535+
536+
assert upd.rows_updated == 1
537+
assert upd.rows_inserted == 1
538+
```
539+
540+
PyIceberg will automatically detect which rows need to be updated, inserted or can simply be ignored.
541+
477542
## Inspecting tables
478543

479544
To explore the table metadata, tables can be inspected.
@@ -1546,3 +1611,139 @@ df.show(2)
15461611

15471612
(Showing first 2 rows)
15481613
```
1614+
1615+
### Polars
1616+
1617+
PyIceberg interfaces closely with Polars Dataframes and LazyFrame which provides a full lazily optimized query engine interface on top of PyIceberg tables.
1618+
1619+
<!-- prettier-ignore-start -->
1620+
1621+
!!! note "Requirements"
1622+
This requires [`polars` to be installed](index.md).
1623+
1624+
```python
1625+
pip install pyiceberg['polars']
1626+
```
1627+
<!-- prettier-ignore-end -->
1628+
1629+
PyIceberg data can be analyzed and accessed through Polars using either DataFrame or LazyFrame.
1630+
If your code utilizes the Apache Iceberg data scanning and retrieval API and then analyzes the resulting DataFrame in Polars, use the `table.scan().to_polars()` API.
1631+
If the intent is to utilize Polars' high-performance filtering and retrieval functionalities, use LazyFrame exported from the Iceberg table with the `table.to_polars()` API.
1632+
1633+
```python
1634+
# Get LazyFrame
1635+
iceberg_table.to_polars()
1636+
1637+
# Get Data Frame
1638+
iceberg_table.scan().to_polars()
1639+
```
1640+
1641+
#### Working with Polars DataFrame
1642+
1643+
PyIceberg makes it easy to filter out data from a huge table and pull it into a Polars dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost.
1644+
1645+
```python
1646+
schema = Schema(
1647+
NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True),
1648+
NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True),
1649+
NestedField(field_id=3, name='issue', field_type=StringType(), required=False),
1650+
NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True),
1651+
required=True
1652+
)
1653+
1654+
iceberg_table = catalog.create_table(
1655+
identifier='default.product_support_issues',
1656+
schema=schema
1657+
)
1658+
1659+
pa_table_data = pa.Table.from_pylist(
1660+
[
1661+
{'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000},
1662+
{'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000},
1663+
{'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000},
1664+
{'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000},
1665+
{'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000},
1666+
{'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000},
1667+
{'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000},
1668+
{'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000},
1669+
{'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000},
1670+
{'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000},
1671+
{'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000},
1672+
{'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000},
1673+
{'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000},
1674+
{'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000},
1675+
{'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000},
1676+
{'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000},
1677+
{'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000},
1678+
{'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000},
1679+
{'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000},
1680+
{'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000},
1681+
{'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000},
1682+
], schema=iceberg_table.schema().as_arrow()
1683+
)
1684+
1685+
iceberg_table.append(
1686+
df=pa_table_data
1687+
)
1688+
1689+
table.scan(
1690+
row_filter="ticket_id > 10",
1691+
).to_polars()
1692+
```
1693+
1694+
This will return a Polars DataFrame:
1695+
1696+
```python
1697+
shape: (11, 4)
1698+
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
1699+
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
1700+
------------
1701+
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
1702+
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
1703+
11556 ┆ Website not loading ┆ 2022-04-16 10:53:20
1704+
12557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20
1705+
13558 ┆ Unable to track order ┆ 2022-04-16 15:41:20
1706+
14559 ┆ Order delayed ┆ 2022-04-16 18:05:20
1707+
15560 ┆ Product not as described ┆ 2022-04-16 20:29:20
1708+
│ … ┆ … ┆ … ┆ … │
1709+
17562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20
1710+
18563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20
1711+
19564 ┆ App crashing ┆ 2022-04-17 06:05:20
1712+
20565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20
1713+
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
1714+
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
1715+
```
1716+
1717+
#### Working with Polars LazyFrame
1718+
1719+
PyIceberg supports creation of a Polars LazyFrame based on an Iceberg Table.
1720+
1721+
using the above code example:
1722+
1723+
```python
1724+
lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10)
1725+
print(lf.collect())
1726+
```
1727+
1728+
This above code snippet returns a Polars LazyFrame and defines a filter to be executed by Polars:
1729+
1730+
```python
1731+
shape: (11, 4)
1732+
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
1733+
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
1734+
------------
1735+
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
1736+
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
1737+
11556 ┆ Website not loading ┆ 2022-04-16 10:53:20
1738+
12557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20
1739+
13558 ┆ Unable to track order ┆ 2022-04-16 15:41:20
1740+
14559 ┆ Order delayed ┆ 2022-04-16 18:05:20
1741+
15560 ┆ Product not as described ┆ 2022-04-16 20:29:20
1742+
│ … ┆ … ┆ … ┆ … │
1743+
17562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20
1744+
18563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20
1745+
19564 ┆ App crashing ┆ 2022-04-17 06:05:20
1746+
20565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20
1747+
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
1748+
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
1749+
```

0 commit comments

Comments
 (0)