Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix filesystem layout timestamps with milliseconds #1286

Merged
merged 8 commits into from
Apr 26, 2024
8 changes: 8 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ def to_py_date(value: datetime.date) -> datetime.date:
return value


def datetime_to_timestamp(moment: Union[datetime.datetime, pendulum.DateTime]) -> int:
return int(moment.timestamp())


def datetime_to_timestamp_ms(moment: Union[datetime.datetime, pendulum.DateTime]) -> int:
return int(moment.timestamp() * 1000)


def _datetime_from_ts_or_iso(
value: Union[int, float, str]
) -> Union[pendulum.DateTime, pendulum.Date, pendulum.Time]:
Expand Down
14 changes: 9 additions & 5 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from dlt.common import logger
from dlt.common.pendulum import pendulum
from dlt.common.storages.load_package import ParsedLoadJobFileName
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.time import (
ensure_pendulum_datetime,
datetime_to_timestamp,
datetime_to_timestamp_ms,
)
from dlt.destinations.exceptions import (
CantExtractTablePrefix,
InvalidFilesystemLayout,
Expand Down Expand Up @@ -91,8 +95,8 @@ def prepare_datetime_params(
current_timestamp: pendulum.DateTime = None
if load_package_timestamp:
current_timestamp = ensure_pendulum_datetime(load_package_timestamp)
params["load_package_timestamp"] = str(int(current_timestamp.timestamp()))
params["load_package_timestamp_ms"] = current_timestamp.format("SSS")
params["load_package_timestamp"] = str(datetime_to_timestamp(current_timestamp))
params["load_package_timestamp_ms"] = str(datetime_to_timestamp_ms(current_timestamp))

if not current_datetime:
if current_timestamp:
Expand All @@ -102,8 +106,8 @@ def prepare_datetime_params(
logger.info("current_datetime is not set, using pendulum.now()")
current_datetime = pendulum.now()

params["timestamp"] = str(int(current_datetime.timestamp()))
params["timestamp_ms"] = current_datetime.format("SSS")
params["timestamp"] = str(datetime_to_timestamp(current_datetime))
params["timestamp_ms"] = str(datetime_to_timestamp_ms(current_datetime))
params["curr_date"] = str(current_datetime.date())

for format_string in DATETIME_PLACEHOLDERS:
Expand Down
9 changes: 7 additions & 2 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,14 @@ Keep in mind all values are lowercased.
:::

* `timestamp` - the current timestamp in Unix Timestamp format rounded to seconds
* `timestamp_ms` - the current timestamp in Unix Timestamp format rounded to milliseconds
* `timestamp_ms` - the current timestamp in Unix Timestamp format in milliseconds
* `load_package_timestamp` - timestamp from [load package](../../general-usage/destination-tables.md#load-packages-and-load-ids) in Unix Timestamp format rounded to seconds
* `load_package_timestamp_ms` - timestamp from [load package](../../general-usage/destination-tables.md#load-packages-and-load-ids) in Unix Timestamp format rounded to milliseconds
* `load_package_timestamp_ms` - timestamp from [load package](../../general-usage/destination-tables.md#load-packages-and-load-ids) in Unix Timestamp format in milliseconds

:::note
Both `timestamp_ms` and `load_package_timestamp_ms` are in milliseconds (e.g., 12334455233), not fractional seconds to make sure millisecond precision without decimals.
:::

* Years
* `YYYY` - 2024, 2025
* `Y` - 2024, 2025
Expand Down
24 changes: 24 additions & 0 deletions tests/common/test_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
timestamp_within,
ensure_pendulum_datetime,
ensure_pendulum_date,
datetime_to_timestamp,
datetime_to_timestamp_ms,
)
from dlt.common.typing import TAnyDateTime

Expand Down Expand Up @@ -100,3 +102,25 @@ def test_ensure_pendulum_date_utc() -> None:
assert ensure_pendulum_date(
datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(timedelta(hours=8)))
) == pendulum.date(2020, 12, 31)


test_timestamps = [
(pendulum.DateTime(2024, 4, 26, 5, 16, 22, 738029).in_tz("UTC"), 1714108582, 1714108582738),
(pendulum.DateTime(2024, 4, 26, 6, 26, 22, 738029).in_tz("UTC"), 1714112782, 1714112782738),
(pendulum.DateTime(2024, 4, 26, 7, 36, 22, 738029).in_tz("UTC"), 1714116982, 1714116982738),
(pendulum.DateTime(2024, 4, 26, 8, 46, 22, 738029).in_tz("UTC"), 1714121182, 1714121182738),
(pendulum.DateTime(2024, 4, 26, 9, 56, 22, 738029).in_tz("UTC"), 1714125382, 1714125382738),
(pendulum.DateTime(2024, 4, 26, 11, 6, 22, 738029).in_tz("UTC"), 1714129582, 1714129582738),
(pendulum.DateTime(2024, 4, 26, 12, 16, 22, 738029).in_tz("UTC"), 1714133782, 1714133782738),
(pendulum.DateTime(2024, 4, 26, 13, 26, 22, 738029).in_tz("UTC"), 1714137982, 1714137982738),
(pendulum.DateTime(2024, 4, 26, 14, 36, 22, 738029).in_tz("UTC"), 1714142182, 1714142182738),
(pendulum.DateTime(2024, 4, 26, 15, 46, 22, 738029).in_tz("UTC"), 1714146382, 1714146382738),
]


@pytest.mark.parametrize("datetime_obj,timestamp,timestamp_ms", test_timestamps)
def test_datetime_to_timestamp_helpers(
datetime_obj: pendulum.DateTime, timestamp: int, timestamp_ms: int
) -> None:
assert datetime_to_timestamp(datetime_obj) == timestamp
assert datetime_to_timestamp_ms(datetime_obj) == timestamp_ms
12 changes: 12 additions & 0 deletions tests/destinations/test_path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ def dummy_callback2(*args, **kwargs):
True,
[],
),
(
"{Y}/{timestamp_ms}/{table_name}",
f"{frozen_datetime.year}/{str(int(frozen_datetime.timestamp()*1000))}/mocked-table",
True,
[],
),
(
"{Y}/{load_package_timestamp_ms}/{table_name}",
f"{frozen_datetime.year}/{str(int(frozen_datetime.timestamp()*1000))}/mocked-table",
True,
[],
),
("{load_id}/{ext}/{table_name}", "mocked-load-id/jsonl/mocked-table", True, []),
("{HH}/{mm}/{schema_name}", f"{frozen_datetime.format('HH/mm')}/schema-name", True, []),
("{type}/{bobo}/{table_name}", "one-for-all/is-name/mocked-table", True, []),
Expand Down
12 changes: 11 additions & 1 deletion tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import posixpath
from pathlib import Path
from typing import Any, Callable, List, Dict, cast

from pytest_mock import MockerFixture
import dlt
import pytest

Expand Down Expand Up @@ -224,7 +226,9 @@ def some_source():
"{table_name}/{DD}/{HH}/{m}/{load_id}.{file_id}.{ext}",
"{table_name}/{D}/{HH}/{mm}/{load_id}.{file_id}.{ext}",
"{table_name}/{timestamp}/{load_id}.{file_id}.{ext}",
"{table_name}/{timestamp_ms}/{load_id}.{file_id}.{ext}",
"{table_name}/{load_package_timestamp}/{d}/{load_id}.{file_id}.{ext}",
"{table_name}/{load_package_timestamp_ms}/{d}/{load_id}.{file_id}.{ext}",
(
"{table_name}/{YYYY}/{YY}/{Y}/{MMMM}/{MMM}/{MM}/{M}/{DD}/{D}/"
"{HH}/{H}/{ddd}/{dd}/{d}/{ss}/{s}/{Q}/{timestamp}/{curr_date}/{load_id}.{file_id}.{ext}"
Expand All @@ -238,7 +242,7 @@ def some_source():

@pytest.mark.parametrize("layout", TEST_LAYOUTS)
def test_filesystem_destination_extended_layout_placeholders(
layout: str, default_buckets_env: str
layout: str, default_buckets_env: str, mocker: MockerFixture
) -> None:
data = load_json_case("simple_row")
call_count = 0
Expand All @@ -263,6 +267,12 @@ def count(*args, **kwargs) -> Any:
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
os.environ["DATA_WRITER__DISABLE_COMPRESSION"] = "TRUE"

# the reason why we are patching pendulum.from_timestamp is that
# we are checking if the load package under a given path exists
# so we have to mock this out because there will difference in
# calculated timestamps thus will make the test flaky due to
# small differences in timestamp calculations
mocker.patch("pendulum.from_timestamp", return_value=now)
fs_destination = filesystem(
layout=layout,
extra_placeholders=extra_placeholders,
Expand Down
Loading