Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 125 additions & 42 deletions src/together/abstract/api_requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import aiohttp
import requests
from tqdm.utils import CallbackIOWrapper
from typing_extensions import Literal


if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -249,84 +250,166 @@ def request(
resp, got_stream = self._interpret_response(result, stream)
return resp, got_stream, self.api_key

@overload
async def arequest(
self,
options: TogetherRequest,
stream: Literal[True],
request_timeout: float | Tuple[float, float] | None = ...,
) -> Tuple[AsyncGenerator[TogetherResponse, None], bool, str]:
pass
# Use async context manager for session lifetime management for speed and correctness.
async with AioHTTPSession() as session:
result = None
try:
result = await self.arequest_raw(
options,
session,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
if result is not None:
result.release()
raise
if got_stream:
async def wrap_resp() -> AsyncGenerator[TogetherResponse, None]:
try:
async for r in resp:
yield r
finally:
result.release()
return wrap_resp(), got_stream, self.api_key # type: ignore
else:
result.release()
return resp, got_stream, self.api_key # type: ignore

@overload
async def arequest(
self,
options: TogetherRequest,
*,
stream: Literal[True],
request_timeout: float | Tuple[float, float] | None = ...,
) -> Tuple[AsyncGenerator[TogetherResponse, None], bool, str]:
pass
# Use async context manager for session lifetime management for speed and correctness.
async with AioHTTPSession() as session:
result = None
try:
result = await self.arequest_raw(
options,
session,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
if result is not None:
result.release()
raise
if got_stream:
async def wrap_resp() -> AsyncGenerator[TogetherResponse, None]:
try:
async for r in resp:
yield r
finally:
result.release()
return wrap_resp(), got_stream, self.api_key # type: ignore
else:
result.release()
return resp, got_stream, self.api_key # type: ignore

@overload
async def arequest(
self,
options: TogetherRequest,
stream: Literal[False] = ...,
request_timeout: float | Tuple[float, float] | None = ...,
) -> Tuple[TogetherResponse, bool, str]:
pass
# Use async context manager for session lifetime management for speed and correctness.
async with AioHTTPSession() as session:
result = None
try:
result = await self.arequest_raw(
options,
session,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
if result is not None:
result.release()
raise
if got_stream:
async def wrap_resp() -> AsyncGenerator[TogetherResponse, None]:
try:
async for r in resp:
yield r
finally:
result.release()
return wrap_resp(), got_stream, self.api_key # type: ignore
else:
result.release()
return resp, got_stream, self.api_key # type: ignore

@overload
async def arequest(
self,
options: TogetherRequest,
stream: bool = ...,
request_timeout: float | Tuple[float, float] | None = ...,
) -> Tuple[TogetherResponse | AsyncGenerator[TogetherResponse, None], bool, str]:
pass
# Use async context manager for session lifetime management for speed and correctness.
async with AioHTTPSession() as session:
result = None
try:
result = await self.arequest_raw(
options,
session,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
if result is not None:
result.release()
raise
if got_stream:
async def wrap_resp() -> AsyncGenerator[TogetherResponse, None]:
try:
async for r in resp:
yield r
finally:
result.release()
return wrap_resp(), got_stream, self.api_key # type: ignore
else:
result.release()
return resp, got_stream, self.api_key # type: ignore

async def arequest(
self,
options: TogetherRequest,
stream: bool = False,
request_timeout: float | Tuple[float, float] | None = None,
) -> Tuple[TogetherResponse | AsyncGenerator[TogetherResponse, None], bool, str]:
ctx = AioHTTPSession()
session = await ctx.__aenter__()
result = None
try:
result = await self.arequest_raw(
options,
session,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
# Close the request before exiting session context.
if result is not None:
result.release()
await ctx.__aexit__(None, None, None)
raise
if got_stream:

async def wrap_resp() -> AsyncGenerator[TogetherResponse, None]:
assert isinstance(resp, AsyncGenerator)
try:
async for r in resp:
yield r
finally:
# Close the request before exiting session context. Important to do it here
# as if stream is not fully exhausted, we need to close the request nevertheless.
# Use async context manager for session lifetime management for speed and correctness.
async with AioHTTPSession() as session:
result = None
try:
result = await self.arequest_raw(
options,
session,
request_timeout=request_timeout,
)
resp, got_stream = await self._interpret_async_response(result, stream)
except Exception:
if result is not None:
result.release()
await ctx.__aexit__(None, None, None)

return wrap_resp(), got_stream, self.api_key # type: ignore
else:
# Close the request before exiting session context.
result.release()
await ctx.__aexit__(None, None, None)
return resp, got_stream, self.api_key # type: ignore
raise
if got_stream:
async def wrap_resp() -> AsyncGenerator[TogetherResponse, None]:
try:
async for r in resp:
yield r
finally:
result.release()
return wrap_resp(), got_stream, self.api_key # type: ignore
else:
result.release()
return resp, got_stream, self.api_key # type: ignore

@classmethod
def handle_error_response(
Expand Down
42 changes: 14 additions & 28 deletions src/together/resources/audio/transcriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,31 +193,27 @@ async def create(
files_data: Dict[str, Union[Tuple[None, str], BinaryIO]] = {}
params_data = {}

if isinstance(file, (str, Path)):
if isinstance(file, str) and file.startswith(("http://", "https://")):
# Cache attribute lookups and avoid repeated conversions
file_is_str = isinstance(file, str)
file_is_path = isinstance(file, Path)
if file_is_str or file_is_path:
if file_is_str and file.startswith(("http://", "https://")):
# URL string - send as multipart field
files_data["file"] = (None, file)
else:
# Local file path
file_path = Path(file)
file_path = file if file_is_path else Path(file)
files_data["file"] = open(file_path, "rb")
else:
# File object
files_data["file"] = file

# Build request parameters
param_format = response_format if isinstance(response_format, str) else getattr(response_format, "value", response_format)
params_data.update(
{
"model": model,
"response_format": (
response_format
if isinstance(response_format, str)
else (
response_format.value
if hasattr(response_format, "value")
else response_format
)
),
"response_format": param_format,
"temperature": temperature,
}
)
Expand All @@ -229,21 +225,13 @@ async def create(
params_data["prompt"] = prompt

if timestamp_granularities is not None:
params_data["timestamp_granularities"] = (
timestamp_granularities
if isinstance(timestamp_granularities, str)
else (
timestamp_granularities.value
if hasattr(timestamp_granularities, "value")
else timestamp_granularities
)
)
param_granularities = timestamp_granularities if isinstance(timestamp_granularities, str) else getattr(timestamp_granularities, "value", timestamp_granularities)
params_data["timestamp_granularities"] = param_granularities

# Add any additional kwargs
# Convert boolean values to lowercase strings for proper form encoding
for key, value in kwargs.items():
if isinstance(value, bool):
params_data[key] = str(value).lower()
params_data[key] = "true" if value else "false"
else:
params_data[key] = value

Expand All @@ -258,12 +246,10 @@ async def create(
)
finally:
# Close file if we opened it
if files_data and "file" in files_data:
file_obj = files_data.get("file")
if file_obj is not None and hasattr(file_obj, "close") and not isinstance(file_obj, tuple):
try:
# Only close if it's a file object (not a tuple for URL)
file_obj = files_data["file"]
if hasattr(file_obj, "close") and not isinstance(file_obj, tuple):
file_obj.close()
file_obj.close()
except:
pass

Expand Down