Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Script API in python. #860

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ export class BaseClient {
* @returns a value that depends on the script that was executed.
*
* @example
* const luaScript = "return \{ KEYS[1], ARGV[1] \}";
* const luaScript = new Script("return \{ KEYS[1], ARGV[1] \}");
* const scriptOptions = \{
* keys: ["foo"],
* args: ["bar"],
Expand Down
3 changes: 3 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
SlotType,
)

from .glide import Script

__all__ = [
"BaseClientConfiguration",
"ClusterClientConfiguration",
Expand All @@ -54,6 +56,7 @@
"RedisClient",
"RedisClusterClient",
"RedisCredentials",
"Script",
"NodeAddress",
"Transaction",
"ClusterTransaction",
Expand Down
2 changes: 1 addition & 1 deletion python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def exec(
If the transaction failed due to a WATCH command, `exec` will return `None`.
"""
commands = transaction.commands[:]
return await self.execute_transaction(commands, route)
return await self._execute_transaction(commands, route)

async def config_resetstat(
self,
Expand Down
40 changes: 39 additions & 1 deletion python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from glide.protobuf.redis_request_pb2 import RequestType
from glide.routes import Route

from ..glide import Script


class ConditionalChange(Enum):
"""
Expand Down Expand Up @@ -182,12 +184,20 @@ async def _execute_command(
route: Optional[Route] = ...,
) -> TResult: ...

async def execute_transaction(
async def _execute_transaction(
self,
commands: List[Tuple[RequestType.ValueType, List[str]]],
route: Optional[Route] = None,
) -> List[TResult]: ...

async def _execute_script(
self,
hash: str,
keys: Optional[List[str]] = None,
args: Optional[List[str]] = None,
route: Optional[Route] = None,
) -> TResult: ...

async def set(
self,
key: str,
Expand Down Expand Up @@ -1234,3 +1244,31 @@ async def zrem(
int,
await self._execute_command(RequestType.Zrem, [key] + members),
)

async def invoke_script(
self,
script: Script,
keys: Optional[List[str]] = None,
args: Optional[List[str]] = None,
) -> TResult:
"""
Invokes a Lua script with its keys and arguments.
This method simplifies the process of invoking scripts on a Redis server by using an object that represents a Lua script.
The script loading, argument preparation, and execution will all be handled internally.
If the script has not already been loaded, it will be loaded automatically using the Redis `SCRIPT LOAD` command.
After that, it will be invoked using the Redis `EVALSHA` command.

Args:
script (Script): The Lua script to execute.
keys (List[str]): The keys that are used in the script.
args (List[str]): The arguments for the script.

Returns:
TResult: a value that depends on the script that was executed.

Examples:
>>> lua_script = Script("return { KEYS[1], ARGV[1] }")
>>> await invoke_script(lua_script, keys=["foo"], args=["bar"] );
["foo", "bar"]
"""
return await self._execute_script(script.get_hash(), keys, args)
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def exec(
If the transaction failed due to a WATCH command, `exec` will return `None`.
"""
commands = transaction.commands[:]
return await self.execute_transaction(commands)
return await self._execute_transaction(commands)

async def select(self, index: int) -> TOK:
"""Change the currently selected Redis database.
Expand Down
5 changes: 5 additions & 0 deletions python/python/glide/glide.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class Level(Enum):

def is_lower(self, level: Level) -> bool: ...

class Script:
def __init__(self, code: str) -> None: ...
def get_hash(self) -> str: ...
def __del__(self) -> None: ...

def start_socket_listener_external(init_callback: Callable) -> None: ...
def value_from_pointer(pointer: int) -> TResult: ...
def create_leaked_value(message: str) -> int: ...
Expand Down
31 changes: 24 additions & 7 deletions python/python/glide/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,9 @@ async def _execute_command(
request.single_command.request_type = request_type
request.single_command.args_array.args[:] = args # TODO - use arg pointer
set_protobuf_route(request, route)
# Create a response future for this request and add it to the available
# futures map
response_future = self._get_future(request.callback_idx)
self._create_write_task(request)
await response_future
return response_future.result()
return await self._write_request_await_response(request)

async def execute_transaction(
async def _execute_transaction(
self,
commands: List[Tuple[RequestType.ValueType, List[str]]],
route: Optional[Route] = None,
Expand All @@ -223,6 +218,28 @@ async def execute_transaction(
transaction_commands.append(command)
request.transaction.commands.extend(transaction_commands)
set_protobuf_route(request, route)
return await self._write_request_await_response(request)

async def _execute_script(
self,
hash: str,
keys: Optional[List[str]] = None,
args: Optional[List[str]] = None,
route: Optional[Route] = None,
) -> TResult:
if self._is_closed:
raise ClosingError(
"Unable to execute requests; the client is closed. Please create a new client."
)
request = RedisRequest()
request.callback_idx = self._get_callback_index()
request.script_invocation.hash = hash
request.script_invocation.args[:] = args if args is not None else []
request.script_invocation.keys[:] = keys if keys is not None else []
set_protobuf_route(request, route)
return await self._write_request_await_response(request)

async def _write_request_await_response(self, request: RedisRequest):
# Create a response future for this request and add it to the available
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the lines from here to the end of the function are repeating both in execute_command and execute_pipeline, lets move it to a separate function:

    async def _write_request_await_response(self, request: RedisRequest):
        # Create a response future for this request and add it to the available
        # futures map
        response_future = self._get_future(request.callback_idx)
        self._create_write_task(request)
        await response_future
        return response_future.result()

# futures map
response_future = self._get_future(request.callback_idx)
Expand Down
27 changes: 26 additions & 1 deletion python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Dict, List, TypeVar, Union, cast

import pytest
from glide import ClosingError, RequestError, TimeoutError
from glide import ClosingError, RequestError, Script, TimeoutError
from glide.async_commands.core import (
ConditionalChange,
ExpireOptions,
Expand Down Expand Up @@ -1229,3 +1229,28 @@ async def test_timeout_exception_with_blpop(self, redis_client: TRedisClient):
key = get_random_string(10)
with pytest.raises(TimeoutError) as e:
await redis_client.custom_command(["BLPOP", key, "1"])


@pytest.mark.asyncio
class TestScripts:
@pytest.mark.smoke_test
@pytest.mark.parametrize("cluster_mode", [True, False])
async def test_script(self, redis_client: TRedisClient):
key1 = get_random_string(10)
key2 = get_random_string(10)
script = Script("return 'Hello'")
assert await redis_client.invoke_script(script) == "Hello"

script = Script("return redis.call('SET', KEYS[1], ARGV[1])")
assert (
await redis_client.invoke_script(script, keys=[key1], args=["value1"])
== "OK"
)
# Reuse the same script with different parameters.
assert (
await redis_client.invoke_script(script, keys=[key2], args=["value2"])
== "OK"
)
script = Script("return redis.call('GET', KEYS[1])")
assert await redis_client.invoke_script(script, keys=[key1]) == "value1"
assert await redis_client.invoke_script(script, keys=[key2]) == "value2"
23 changes: 23 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,33 @@ impl Level {
}
}

#[pyclass]
pub struct Script {
hash: String,
}

#[pymethods]
impl Script {
#[new]
fn new(code: String) -> Self {
let hash = glide_core::scripts_container::add_script(&code);
Script { hash }
}

fn get_hash(&self) -> String {
self.hash.clone()
}

fn __del__(&mut self) {
glide_core::scripts_container::remove_script(&self.hash);
}
}

/// A Python module implemented in Rust.
#[pymodule]
fn glide(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Level>()?;
m.add_class::<Script>()?;
m.add(
"DEFAULT_TIMEOUT_IN_MILLISECONDS",
DEFAULT_TIMEOUT_IN_MILLISECONDS,
Expand Down
Loading