Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions src/openai/_utils/_datetime_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

import re
from typing import Dict, Union, Optional
from typing import Union, Optional
from datetime import date, datetime, timezone, timedelta

from .._types import StrBytesIntFloat
Expand All @@ -31,13 +31,27 @@


def _get_numeric(value: StrBytesIntFloat, native_expected_type: str) -> Union[None, int, float]:
# Small local variable for faster lookup in exceptions
type_err = TypeError
val_err = ValueError
float_cast = float

if isinstance(value, (int, float)):
return value

# Fast path checks for bytes and str types to avoid try/except overhead on non-castable types
if isinstance(value, (bytes, str)):
try:
return float_cast(value)
except val_err:
return None

# At this point, only unexpected types will reach here
try:
return float(value)
except ValueError:
return float_cast(value)
except val_err:
return None
except TypeError:
except type_err:
raise TypeError(f"invalid type; expected {native_expected_type}, string, bytes, int or float") from None


Expand Down Expand Up @@ -69,38 +83,43 @@ def _parse_timezone(value: Optional[str]) -> Union[None, int, timezone]:
def parse_datetime(value: Union[datetime, StrBytesIntFloat]) -> datetime:
"""
Parse a datetime/int/float/string and return a datetime.datetime.

This function supports time zone offsets. When the input contains one,
the output uses a timezone with a fixed offset from UTC.

Raise ValueError if the input is well formatted but not a valid datetime.
Raise ValueError if the input isn't well formatted.
"""
if isinstance(value, datetime):
return value

number = _get_numeric(value, "datetime")
if number is not None:
return _from_unix_seconds(number)

if isinstance(value, bytes):
value = value.decode()

assert not isinstance(value, (float, int))

match = datetime_re.match(value)
if match is None:
raise ValueError("invalid datetime format")

kw = match.groupdict()
if kw["microsecond"]:
kw["microsecond"] = kw["microsecond"].ljust(6, "0")
# Direct extraction to minimize dict allocations and .items() iteration
gd = match.groupdict()
year = int(gd["year"])
month = int(gd["month"])
day = int(gd["day"])
hour = int(gd["hour"])
minute = int(gd["minute"])

# microsecond padding only if present
microsecond_str = gd["microsecond"]
if microsecond_str:
microsecond = int(microsecond_str.ljust(6, "0"))
else:
microsecond = 0

second = int(gd["second"]) if gd["second"] else 0

tzinfo = _parse_timezone(kw.pop("tzinfo"))
kw_: Dict[str, Union[None, int, timezone]] = {k: int(v) for k, v in kw.items() if v is not None}
kw_["tzinfo"] = tzinfo
tzinfo = _parse_timezone(gd["tzinfo"])

return datetime(**kw_) # type: ignore
return datetime(year, month, day, hour, minute, second, microsecond, tzinfo=tzinfo)


def parse_date(value: Union[date, StrBytesIntFloat]) -> date:
Expand Down