Skip to content

pyLocked provides utility classes to `[R]Lock` resources

License

Notifications You must be signed in to change notification settings

Eyal-Shalev/pylocked

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

33 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pylocked provides utility classes (and functions) to lock any resource (or function) and thus make them thread and / or concurrency safe.

Install

From pip:

pip install pylocked

From repository:

pip install git+https://github.com/Eyal-Shalev/pylocked

Examples

AsyncIO

pylocked.asyncio.AsyncLocked

from pylocked.asyncio import AsyncLocked

locked_arr: Locked[list[int]] = Locked([])

async def double():
  async with locked_arr as arr:
    for i in range(len(arr)):
      arr[i] += arr[i]

async def reset():
  await locked_arr.replace(list(range(10)))

async def duplicate():
  await locked_arr.update(lambda arr: arr * 2)

pylocked.asyncio.async_locked()

from typing import Optional

from pylocked.asyncio import locked

class LockedSingleton:
    _instance: Optional[LockedSingleton] = None

    @locked
    @staticmethod
    async def get_instance() -> LockedSingleton:
        if LockedSingleton._instance is None:
            LockedSingleton._instance = LockedSingleton()
        return LockedSingleton._instance

Threading

pylocked.threading.RLocked

You can use pylocked.threading.Locked if you want to use threading.Lock instead.

from pylocked.threading import RLocked

locked_arr: RLocked[list[int]] = RLocked([])

def double():
  with locked_arr as arr:
    for i in range(len(arr)):
      arr[i] += arr[i]

def reset():
  locked_arr.replace(list(range(10)))

def duplicate():
  locked_arr.update(lambda arr: arr * 2)

pylocked.threading.rlocked()

You can use pylocked.threading.locked() if you want to use threading.Lock instead.

from typing import Optional

from pylocked.threading import rlocked

class LockedSingleton:
    _instance: Optional[LockedSingleton] = None

    @rlocked
    @staticmethod
    def get_instance() -> LockedSingleton:
        if LockedSingleton._instance is None:
            LockedSingleton._instance = LockedSingleton()
        return LockedSingleton._instance

About

pyLocked provides utility classes to `[R]Lock` resources

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages