## The Flyweight pattern

The Flyweight pattern is a memory optimization pattern.
The Flyweight pattern ensures that objects that share a state can use the same
memory for their shared state.
Each Flyweight object has no specific state of its own.
Any time it needs to perform
an operation on SpecificState, that state needs to be passed into the Flyweight by
the calling code as an argument value.


![](uml/flyweight_pattern.png)



### A Flyweight example

![](uml/flyweight_example.png)

In [42]:
import abc
import weakref
from dataclasses import dataclass
from math import radians, floor, fmod
from typing import (
    Optional, cast, Container, overload, Union, Iterator
)

@dataclass(frozen=True)
class Point:
    """
    >>> p = Point.from_bytes(b"4916.45", b"N", b"12311.12", b"W")
    >>> str(p)
    '(49°16.4500N, 123°11.1200W)'
    >>> p.lat
    0.8599964445097726
    >>> p.lon
    -2.1499896568333883
    """
    latitude: float
    longitude: float
        
    @classmethod
    def from_bytes(cls,
        latitude: bytes,
        N_S: bytes,
        longitude: bytes,
        E_W: bytes,
    ) -> "Point":
        """
        """
        lat_deg = float(latitude[:2]) +  float(latitude[2:]) / 60
        lat_sign = 1 if N_S.upper() == b"N" else -1
        lon_deg = float(longitude[:3]) + float(longitude[3:]) / 60
        lon_sign = 1 if E_W.upper() == b"E" else -1
        return Point(lat_deg * lat_sign, lon_deg * lon_sign)
        
    def __str__(self) -> str:
        lat = abs(self.latitude)
        lat_deg = floor(lat)
        lat_min_sec = 60 * (lat - lat_deg)
        lat_dir = "N" if self.latitude > 0 else "S"
        lon = abs(self.longitude)
        lon_deg = floor(lon)
        lon_min_sec = 60 * (lon - lon_deg)
        lon_dir = "E" if self.longitude > 0 else "W"        
        return (
            f"({lat_deg:02.0f}°{lat_min_sec:07.4f}{lat_dir}, "
            f"{lon_deg:03.0f}°{lon_min_sec:07.4f}{lon_dir})"
        )
        
    @property
    def lat(self) -> float:
        return radians(self.latitude)
    
    @property
    def lon(self) -> float:
        return radians(self.longitude)
        

In [36]:
from collections.abc import Sequence
Sequence.__abstractmethods__

frozenset({'__getitem__', '__len__'})

In [84]:
class Buffer(Sequence[int]):
    def __init__(self, content: bytes) -> None:
        self.content = content
        
    def __len__(self) -> int:
        return len(self.content) 
    
    def __iter__(self) -> Iterator[int]:
        return iter(self.content)
    
    @overload
    def __getitem__(self, index: int) -> int:
        ...
        
    @overload
    def __getitem__(self, index: slice) -> bytes:
        ...
        
    def __getitem__(self, index: int |slice) -> int | bytes:
        return self.content[index]

In [85]:
class GPSError(Exception):
    pass


class Message(abc.ABC):
    def __init__(self) -> None:
        self.buffer: weakref.ReferenceType[Buffer]
        self.offset: int
        self.end: Optional[int]
        self.commas: list[int]
            
    def from_buffer(self, buffer: Buffer, offset: int) -> "Message":
        self.buffer = weakref.ref(buffer)
        self.offset = offset
        self.commas = [offset]
        self.end = None
        for index in range(offset, offset + 82):
            if buffer[index] == ord(b","):
                self.commas.append(index)
            elif buffer[index] == ord("*"):
                self.commas.append(index)
                self.end = index + 3
                break
        if self.end is None:
            raise GPSError("Incomplete")
        return self
    
    def __getitem__(self, field: int) -> bytes:
        if not hasattr(self, "buffer") or (buffer := self.buffer()) is None:
            raise RuntimeError("Broken reference")
        start, end = self.commas[field] + 1, self.commas[field + 1]
        return buffer[start: end]
    
    def get_fix(self) -> Point:
        return Point.from_bytes(
            self.latitude(), self.lat_n_s(), self.longitude(), self.lon_e_w()
        )
    
    @abc.abstractmethod
    def latitude(self) -> bytes: ...
        
    @abc.abstractmethod
    def lat_n_s(self) -> bytes: ...
        
    @abc.abstractmethod
    def longitude(self) -> bytes: ...
    
    @abc.abstractmethod
    def lon_e_w(self) -> bytes: ...
        

In [86]:
class GPGGA(Message):
    """
    >>> raw = Buffer(b"$GPGGA,170834,4124.8963,N,08151.6838,W,1,05,1.5,280.2,M,-34.0,M,,*75")
    >>> m = GPGGA()
    >>> m.from_buffer(raw, 0)  # doctest: +ELLIPSIS
    <__main__.GPGGA object at ...>
    >>> fix = m.get_fix()
    >>> fix
    Point(latitude=41.41493833333333, longitude=-81.86139666666666)
    >>> fix.lat
    0.7228270334270795
    >>> fix.lon
    -1.4287509021144442
    """
    def latitude(self) -> bytes:
        return self[2]
    
    def lat_n_s(self) -> bytes:
        return self[3]
        
    def longitude(self) -> bytes:
        return self[4]
        
    def lon_e_w(self) -> bytes:
        return self[5]


In [88]:
class GPGLL(Message):
    """
    >>> raw = Buffer(b"$GPGLL,3751.65,S,14507.36,E*77")
    >>> m = GPGLL()
    >>> m.from_buffer(raw, 0)  # doctest: +ELLIPSIS
    <__main__.GPGLL object at ...>
    >>> fix = m.get_fix()
    >>> fix
    Point(latitude=-37.86083333333333, longitude=145.12266666666667)
    >>> fix.lat
    -0.6607961992154864
    >>> fix.lon
    2.5328683526075575
    """
    def latitude(self) -> bytes:
        return self[1]

    def lat_n_s(self) -> bytes:
        return self[2]

    def longitude(self) -> bytes:
        return self[3]

    def lon_e_w(self) -> bytes:
        return self[4]


In [111]:
class GPRMC(Message):
    """
    >>> raw = Buffer(b"$GPRMC,225446,A,4916.45,N,12311.12,W,000.5,054.7,191194,020.3,E*68")
    >>> m = GPRMC()
    >>> m.from_buffer(raw, 0)  # doctest: +ELLIPSIS
    <__main__.GPRMC object at ...>
    >>> fix = m.get_fix()
    >>> fix
    Point(latitude=49.274166666666666, longitude=-123.18533333333333)
    >>> fix.lat
    0.8599964445097726
    >>> fix.lon
    -2.1499896568333883
    """
    def latitude(self) -> bytes:
        return self[3]

    def lat_n_s(self) -> bytes:
        return self[4]

    def longitude(self) -> bytes:
        return self[5]

    def lon_e_w(self) -> bytes:
        return self[6]

In [115]:
def message_factory(header: bytes) -> Message:
    if header == b"GPGGA":
        return GPGGA()
    elif header == b"GPGLL":
        return GPGLL()
    elif header == b"GPRMC":
        return GPRMC()
    else:
        raise ValueError("Invalid Header")
    
buffer = Buffer(b"$GPGLL,3751.65,S,14507.36,E*77")
flyweight = message_factory(buffer[1: 6])
print(flyweight.__class__.__name__)
print(flyweight.from_buffer(buffer, 0))  
print(repr(flyweight.get_fix())) 
print(str(flyweight.get_fix()))  

GPGLL
<__main__.GPGLL object at 0x7f45e117bc50>
Point(latitude=-37.86083333333333, longitude=145.12266666666667)
(37°51.6500S, 145°07.3600E)


In [112]:
class Client:
    def __init__(self, buffer: Buffer) -> None:
        self.buffer = buffer
    
    def scan(self) -> None:
        end = 0
        while True:
            try:
                start = self.buffer.index(ord(b"$"), end)
                header = self.buffer[start + 1 : start + 6]
                m = message_factory(header)
                if m:
                    fix = m.from_buffer(self.buffer, start).get_fix()
                    print(fix)
                    end = cast(int, m.end)
                else:
                    star = self.buffer.index(ord(b"*"), end)
                    end = star + 3
            except ValueError:
                # No "$" found: no more messages
                break
            except GPSError:
                # No final "*" found: last message damaged
                break
             
            
buffer = Buffer(b'''
$GPGGA,161229.487,3723.2475,N,12158.3416,W,1,07,1.0,9.0,M,,,,0000*18
$GPGLL,3723.2475,N,12158.3416,W,161229.487,A,A*41
$GPGSA,A,3,07,02,26,27,09,04,15,,,,,,1.8,1.0,1.5*33
$GPVTG,309.62,T,,M,0.13,N,0.2,K,A*23
$GPRMC,161229.487,A,3723.2475,N,12158.3416,W,0.13,309.62,120598,,*10
''')
c = Client(buffer)
len(buffer)
print(bytes([buffer[1]]))
print(buffer[2:7])
c.scan()

b'$'
b'GPGGA'
(37°23.2475N, 121°58.3416W)
(37°23.2475N, 121°58.3416W)
(37°23.2475N, 121°58.3416W)


In [117]:
if __name__ == '__main__':        
    import doctest
    import subprocess
    name = "12-The Flyweight pattern"
    doctest.testmod(verbose=False)
    subprocess.run(f'jupyter nbconvert --to script --output test "{name}"', shell=True)
    std_out = subprocess.run('mypy --strict test.py', capture_output=True, shell=True).stdout
    print(std_out.decode('ascii'))

[NbConvertApp] Converting notebook 12-The Flyweight pattern.ipynb to script
[NbConvertApp] Writing 8536 bytes to test.py


[1m[32mSuccess: no issues found in 1 source file[m

