현재 team module로 확장가능성이 적음.

상관이 있음?

이후에 teamspace를 만든다 치면,

같은 모듈을 배포하는 user들을 모아다가, 이것을 하나의 모듈그룹으로 보면 됨.

그럼 여기서 모듈들을 합쳐주기만 하면 됨.

In [35]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, UniqueConstraint, Boolean
from sqlalchemy.orm import declarative_base, relationship, Session, Mapped
from typing import *

Base = declarative_base()

class Module(Base):
    __tablename__ = "modules"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    owner_id = Column(Integer, ForeignKey("users.id"))
    owner = relationship("User", back_populates="modules")


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    modules = relationship("Module", back_populates="owner")

user = User(name="crimson")


In [26]:
def generate_view_class_code(
    model_cls: Type[Any],
    include_fields: List[str],
    with_setter: bool = True,
    with_deleter: bool = False,
    class_name: str = None,
) -> str:
    """
    Generate a property-based wrapper class definition string for a given SQLAlchemy model.

    Args:
        model_cls (Type[Any]): The SQLAlchemy model class to wrap.
        include_fields (List[str]): List of field names to expose as properties.
        with_setter (bool): Whether to generate setter methods for the properties.
        with_deleter (bool): Whether to generate deleter methods for the properties.
        class_name (str, optional): Custom name for the generated view class. Defaults to {ModelName}View.

    Returns:
        str: Python source code for the wrapper class definition.
    """
    model_name = model_cls.__name__
    view_name = class_name or f"{model_name}View"
    lines = []

    lines.append(f"class {view_name}:")
    lines.append(f"    def __init__(self, model: {model_name}):")
    lines.append(f"        self._model = model\n")

    for field in include_fields:
        lines.append(f"    @property")
        lines.append(f"    def {field}(self):")
        lines.append(f'        """Access the {field} field."""')
        lines.append(f"        return self._model.{field}\n")

        if with_setter:
            lines.append(f"    @{field}.setter")
            lines.append(f"    def {field}(self, value):")
            lines.append(f"        # TODO: Add validation or transformation if needed")
            lines.append(f"        self._model.{field} = value\n")

        if with_deleter:
            lines.append(f"    @{field}.deleter")
            lines.append(f"    def {field}(self):")
            lines.append(f"        del self._model.{field}\n")

    return "\n".join(lines)


In [28]:
code = generate_view_class_code(
    model_cls=User,
    include_fields=["id", "name", "modules"],
    with_setter=True,
    with_deleter=True
)

with open("user_view.py", "w") as f:
    f.write(code)


In [46]:
from typing import get_type_hints, Any

def extract_fields(cls: type) -> dict[str, Any]:
    """
    Generalized field extractor:
    - Supports dataclasses, Pydantic models, and regular Python classes.
    """
    try:
        # dataclass
        from dataclasses import is_dataclass, fields as dc_fields
        if is_dataclass(cls):
            return {f.name: f.type for f in dc_fields(cls)}

        # Pydantic
        if hasattr(cls, "__fields__"):  # Pydantic BaseModel
            return {k: v.outer_type_ for k, v in cls.__fields__.items()}

        # General Python class
        return get_type_hints(cls)

    except Exception:
        return {}



In [62]:
def get_sqlalchemy_model_fields(cls):
    column_fields = list(cls.__table__.columns.keys())
    relationship_fields = list(cls.__mapper__.relationships.keys())
    return {
        "columns": column_fields,
        "relationships": relationship_fields,
    }


In [63]:
get_sqlalchemy_model_fields(User)

{'columns': ['id', 'name'], 'relationships': ['modules']}

In [64]:
def get_sqlalchemy_field_types(cls):
    columns = {
        col.name: col.type.python_type.__name__
        for col in cls.__table__.columns
    }
    relationships = {
        rel.key: f"List[{rel.mapper.class_.__name__}]" if rel.uselist else rel.mapper.class_.__name__
        for rel in cls.__mapper__.relationships
    }
    return {**columns, **relationships}


In [65]:
get_sqlalchemy_field_types(User)

{'id': 'int', 'name': 'str', 'modules': 'List[Module]'}

In [1]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, UniqueConstraint, Boolean
from sqlalchemy.orm import declarative_base, relationship, Session, Mapped
from typing import *

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    modules = relationship("Module", back_populates="owner")

# --- Module ---
class Module(Base):
    __tablename__ = "modules"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    owner_id = Column(Integer, ForeignKey("users.id"))
    repository = Column(String)
    index_url_base = Column(String)

    owner: Mapped[User] = relationship("User", back_populates="modules")
    categories: Mapped[List['Category']] = relationship("Category", back_populates="module")

    __table_args__ = (
        UniqueConstraint("owner_id", "repository", name="uq_owner_repo"),
    )

    # Property: collect releases from assembled categories only
    @property
    def assembled_releases(self) -> List['Release']:
        return [release
                for category in self.categories
                if category.assembled
                for release in category.releases]


# --- Category ---
class Category(Base):
    __tablename__ = "categories"
    id = Column(Integer, primary_key=True)
    name = Column(String)  # e.g., dev, prod
    module_id = Column(Integer, ForeignKey("modules.id"))
    index_url = Column(String)
    assembled = Column(Boolean, default=True)

    module: Mapped[Module] = relationship("Module", back_populates="categories")
    releases: Mapped[List['Release']] = relationship("Release", back_populates="category")


# --- Release ---
class Release(Base):
    __tablename__ = "releases"
    id = Column(Integer, primary_key=True)
    category_id = Column(Integer, ForeignKey("categories.id"))
    version = Column(String)  # e.g., "1.0.0"
    commit = Column(String)   # commit hash

    category: Mapped[Category] = relationship("Category", back_populates="releases")

    # Convenience property for reverse access
    @property
    def module(self) -> Module:
        return self.category.module if self.category else None

    __table_args__ = (
        UniqueConstraint("category_id", "version", name="uq_category_version"),
    )

In [2]:
def validate_release(new_release: Release):
    if not new_release.category.assembled:
        return
    module = new_release.category.module
    for cat in module.categories:
        if not cat.assembled:
            continue
        for rel in cat.releases:
            if rel.version == new_release.version:
                raise ValueError(f"Version {rel.version} already exists in assembled category '{cat.name}'")

def validate_category_assembly_change(category: Category, to_assembled: bool) -> None:
    if not to_assembled:
        return  # False → True가 아닌 경우는 무조건 OK

    # True로 바꾸려는 경우, 해당 카테고리의 version들이 기존 assembled들과 충돌하는지 확인
    version_set = {rel.version for rel in category.releases}
    for cat in category.module.categories:
        if cat.id == category.id or not cat.assembled:
            continue
        for rel in cat.releases:
            if rel.version in version_set:
                raise ValueError(
                    f"Cannot assemble category '{category.name}': version '{rel.version}' "
                    f"already exists in assembled category '{cat.name}'"
                )


In [3]:
from sqlalchemy.orm import Session
from sqlalchemy.exc import InvalidRequestError

engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)

class ReadOnlySession(Session):
    # def flush(self, *args, **kwargs):
    #    raise InvalidRequestError("Read-only session: flush() is disabled")

    def commit(self):
        raise InvalidRequestError("Read-only session: commit() is disabled")

    #def delete(self, instance):
    #    raise InvalidRequestError("Read-only session: delete() is disabled")

session = ReadOnlySession(engine)

user = User(name="crimson")
session.add(user)
# session.commit()

session.query(User).all()
# ro_session.commit()  # ❌ 예외 발생

session.delete(user)  # ❗ DB에는 아직 아무 일도 안 일어남
session.query(User).all()  # ✅ 여전히 보일 수도 있음 (autoflush 여부에 따라 다름)



[]

In [18]:
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)

session = Session(engine)

user = User(name="crimson")
session.add(user)

session.query(User).all()

[<__main__.User at 0x7fc417482b90>]

In [3]:
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)

session = Session(engine)

user = User(name="crimson")
session.add(user)
session.commit()

# Create module
module = Module(name="crimson-lib", owner=user, repository="crimson-lib", index_url_base="https://example.com/simple/")
session.add(module)
session.commit()

# Create categories
cat_dev = Category(name="dev", module=module, index_url="https://example.com/simple/dev/", assembled=True)
cat_test = Category(name="test", module=module, index_url="https://example.com/simple/test/", assembled=True)
session.add_all([cat_dev, cat_test])
session.commit()

# Add releases
rel1 = Release(version="1.0.0", commit="abcd1234", category=cat_dev)
session.add(rel1)
rel2 = Release(version="1.0.0", commit="efgh5678", category=cat_test)



In [4]:
validate_release(rel2)

  if not new_release.category.assembled:


ValueError: Version 1.0.0 already exists in assembled category 'dev'

In [None]:
session.add(rel2)
rel3 = Release(version="1.0.1", commit="ijkl9012", category=cat_test)
session.add(rel3)


In [None]:
session.commit()


In [7]:
rel2_queried = session.query(Release).filter(Release.commit=="efgh5678").first()

In [8]:
rel2_queried.category

<__main__.Category at 0x7fd500c3e9d0>

In [6]:
# --- Setup and example usage ---
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)


user = User(name="crimson")
session.add(user)
session.commit()

# Create module
module = Module(name="crimson-lib", owner=user, repository="crimson-lib", index_url_base="https://example.com/simple/")
session.add(module)
session.commit()

# Create categories
cat_dev = Category(name="dev", module=module, index_url="https://example.com/simple/dev/", assembled=False)
cat_test = Category(name="test", module=module, index_url="https://example.com/simple/test/", assembled=True)
session.add_all([cat_dev, cat_test])
session.commit()

# Add releases
rel1 = Release(version="1.0.0", commit="abcd1234", category=cat_dev)
rel2 = Release(version="1.0.0", commit="efgh5678", category=cat_test)
rel3 = Release(version="1.0.1", commit="ijkl9012", category=cat_test)
session.add_all([rel1, rel2, rel3])




In [8]:
cat_test.assembled = False

In [9]:
validate_category_assembly_change(cat_dev, to_assembled=True)

In [4]:
# --- Setup and example usage ---
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)

results = []
with Session(engine) as session:
    # Create user
    user = User(name="crimson")
    session.add(user)
    session.commit()

    # Create module
    module = Module(name="crimson-lib", owner=user, repository="crimson-lib", index_url_base="https://example.com/simple/")
    session.add(module)
    session.commit()

    # Create categories
    cat_dev = Category(name="dev", module=module, index_url="https://example.com/simple/dev/", assembled=True)
    cat_test = Category(name="test", module=module, index_url="https://example.com/simple/test/", assembled=True)
    session.add_all([cat_dev, cat_test])
    session.commit()

    # Add releases
    rel1 = Release(version="1.0.0", commit="abcd1234", category=cat_dev)
    rel2 = Release(version="1.0.0", commit="efgh5678", category=cat_test)
    rel3 = Release(version="1.0.1", commit="ijkl9012", category=cat_test)
    session.add_all([rel1, rel2, rel3])
    session.commit()

    # Demonstrate access
    results.append(f"All categories for module '{module.name}': {[c.name for c in module.categories]}")
    results.append(f"All assembled releases: {[r.version for r in module.assembled_releases]}")

results


["All categories for module 'crimson-lib': ['dev', 'test']",
 "All assembled releases: ['1.0.0', '1.0.0', '1.0.1']"]

In [4]:
ALL_SESSIONS = []

def create_session():
    session = Session(engine)
    ALL_SESSIONS.append(session)
    return session

In [5]:
session1 = create_session()

In [7]:
modules = session1.query(Module).all()

In [9]:
modules[0].assembled_releases

[<__main__.Release at 0x7f1c544f4790>, <__main__.Release at 0x7f1c3d86d850>]

In [2]:
with Session(engine) as session:
    release = session.query(Release).filter_by(version="1.0.0").first()
    print("Release version:", release.version)
    print("Module name:", release.module.name)
    print("Owner name:", release.module.owner.name)


Release version: 1.0.0
Module name: crimson-file-loader
Owner name: crimson206


Team Extension

In [None]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

# Association table between User and Team
class TeamMember(Base):
    __tablename__ = "team_members"
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    team_id = Column(Integer, ForeignKey("teams.id"))
    role = Column(String)  # optional: "owner", "member", etc.

# Team definition
class Team(Base):
    __tablename__ = "teams"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    members = relationship("User", secondary="team_members", back_populates="teams")
    modules = relationship("TeamModule", back_populates="team")

# Team-owned module, still extending from base
class TeamModule(Base):
    __tablename__ = "team_modules"
    id = Column(Integer, primary_key=True)

    # 기존 Module을 가리키는 외래키 (공통 정의 기반)
    module_id = Column(Integer, ForeignKey("modules.id"))
    team_id = Column(Integer, ForeignKey("teams.id"))

    module = relationship("Module")  # 자세한 정보는 이쪽에서 가져옴
    team = relationship("Team", back_populates="modules")

