In [1]:
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

SQLALCHEMY_DATABASE_URL = "sqlite:///./myapi.db"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

naming_convention = {
    "ix": 'ix_%(column_0_label)s',
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(column_0_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s"
}
Base.metadata = MetaData(naming_convention=naming_convention)

  Base = declarative_base()


In [2]:
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Table
from sqlalchemy.orm import relationship

question_voter = Table(
    'question_voter',
    Base.metadata,
    Column('user_id', Integer, ForeignKey('user.id'), primary_key=True),
    Column('question_id', Integer, ForeignKey('question.id'), primary_key=True)
)

answer_voter = Table(
    'answer_voter',
    Base.metadata,
    Column('user_id', Integer, ForeignKey('user.id'), primary_key=True),
    Column('answer_id', Integer, ForeignKey('answer.id'), primary_key=True)
    
)

class Question(Base):
    __tablename__ = "question"
    
    id = Column(Integer, primary_key=True)
    subject = Column(String, nullable=False)
    content = Column(Text, nullable=False)
    create_date = Column(DateTime, nullable=False)
    user_id = Column(Integer, ForeignKey('user.id'), nullable=True)
    user = relationship("User", backref="question_users")
    modify_date = Column(DateTime, nullable=True)
    voter = relationship('User', secondary=question_voter, backref='question_voters')

class Answer(Base):
    __tablename__ = "answer"

    id = Column(Integer, primary_key=True)
    content = Column(Text, nullable=False)
    create_date = Column(DateTime, nullable=False)
    question_id = Column(Integer, ForeignKey("question.id"))
    question = relationship("Question", backref="answers")
    user_id = Column(Integer, ForeignKey('user.id'), nullable=True)
    user = relationship("User", backref="answer_users")
    modify_date = Column(DateTime, nullable=True)
    voter = relationship('User', secondary=answer_voter, backref='answer_voters')

class User(Base):
    __tablename__ = "user"
    
    id = Column(Integer, primary_key=True)
    username = Column(String, unique=True, nullable=False)
    password = Column(String, nullable=False)
    email = Column(String, unique=True, nullable=False)

In [4]:
db = SessionLocal()

In [4]:
from datetime import datetime

In [16]:
q = Question(subject='pybo가 무엇인가요?', content='pybo에 대해서 알고 싶습니다.', create_date=datetime.now())

db.add(q)
db.commit()

In [None]:
q.id

In [17]:

q = Question(subject='FastAPI 모델 질문입니다.', content='id는 자동으로 생성되나요?', create_date=datetime.now())
db.add(q)
db.commit()


In [11]:
for q in db.query(Answer).all():
    db.delete(q)
    db.commit()

In [5]:
db.query(Answer).all()

[<__main__.Answer at 0x11c614e20>,
 <__main__.Answer at 0x11c614100>,
 <__main__.Answer at 0x11c614190>,
 <__main__.Answer at 0x11c6141f0>,
 <__main__.Answer at 0x11c614250>,
 <__main__.Answer at 0x11c6142b0>,
 <__main__.Answer at 0x11c614310>,
 <__main__.Answer at 0x11c614370>,
 <__main__.Answer at 0x11c6143d0>,
 <__main__.Answer at 0x11c614430>,
 <__main__.Answer at 0x11c6144c0>]

In [6]:
db.query(Question).all()

[<__main__.Question at 0x11c650370>,
 <__main__.Question at 0x11c674250>,
 <__main__.Question at 0x11c66ef40>,
 <__main__.Question at 0x11c66e730>,
 <__main__.Question at 0x11c6682b0>,
 <__main__.Question at 0x11c655dc0>,
 <__main__.Question at 0x11c640970>,
 <__main__.Question at 0x11c45d670>,
 <__main__.Question at 0x11c45d7f0>,
 <__main__.Question at 0x11c45d760>,
 <__main__.Question at 0x11c45da60>,
 <__main__.Question at 0x11c45d430>,
 <__main__.Question at 0x11c45d460>,
 <__main__.Question at 0x11c45dc10>,
 <__main__.Question at 0x11c45d7c0>,
 <__main__.Question at 0x11c44b490>,
 <__main__.Question at 0x1078fdfa0>,
 <__main__.Question at 0x1078f8550>,
 <__main__.Question at 0x1078f8e50>,
 <__main__.Question at 0x11c733700>,
 <__main__.Question at 0x11c71cf40>,
 <__main__.Question at 0x11c7fb820>,
 <__main__.Question at 0x11c7a6ca0>,
 <__main__.Question at 0x11c778340>,
 <__main__.Question at 0x11c77d7c0>,
 <__main__.Question at 0x11c78e9a0>,
 <__main__.Question at 0x11c590400>,
 

In [7]:
db.query(Question).filter(Question.id==1).all()

[]

In [8]:
db.query(Question).get(1)

In [None]:
db.query(Question).filter(Question.subject.like('%FastAPI%')).all()

In [9]:
q = db.query(Question).get(2)
q.content

'id는 자동으로 생성되나요?'

In [22]:
for ans in db.query(Answer).all():
    print(ans.question_id)

2
2
2
3
306
306
None
309
310
308
306


In [55]:
from sqlalchemy import desc

q = db.query(Answer).filter(Answer.question_id == 306).order_by(Answer.voter)


In [None]:
from sqlalchemy import desc
for i in db.query(Answer).filter(Answer.question_id == 306).order_by(desc(Answer.voter)):
    print(i.content)

In [97]:
answers = db.query(Answer).filter(Answer.question_id == 306).all()
print(answers)

[<__main__.Answer object at 0x11c614250>, <__main__.Answer object at 0x11c6142b0>, <__main__.Answer object at 0x11c6144c0>]


In [99]:
answers = db.query(Answer).filter(Answer.question_id == 306).distinct().count()
answers

3

In [69]:
answers = db.query(Question).get(306).answers
sorted_answers = sorted(answers, key = lambda x : [len(x.voter), x.create_date], reverse=True)

In [70]:
for ans in sorted_answers:
    print(len(ans.voter), ans.create_date)

2 2023-10-01 21:53:17.251011
0 2023-10-06 16:07:31.878335
0 2023-10-01 20:53:05.188373


In [78]:
from sqlalchemy import func, desc

# voter의 수를 계산하고, 이를 기준으로 내림차순 정렬하여 쿼리를 작성
query = (
    db.query(Answer, func.count(User.id).label('voter_count'))
    .join(User, Answer.voter)  # 'voters'는 Answer 모델의 관계 이름이어야 합니다.
    .filter(Answer.question_id == 306)
    .group_by(Answer.id)
    .order_by(desc('voter_count'))
)

# 결과 실행 및 출력
for answer, voter_count in query:
    print(f'Answer ID: {answer.id}, Voter Count: {voter_count}')


Answer ID: 6, Voter Count: 2


In [92]:
query = (
    db.query(Answer, func.count(User.id).label('voter_count'))
    .join(User, Answer.voter)  # 'voters'는 Answer 모델의 관계 이름이어야 합니다.
    .filter(Answer.question_id == 306)
    # .group_by(Answer.id)
    .order_by(desc('voter_count'))
)

In [93]:
# 결과 실행 및 출력
for answer, voter_count in query:
    print(f'Answer ID: {answer.id}, Voter Count: {voter_count}')

Answer ID: 6, Voter Count: 2


In [None]:
from sqlalchemy import func, desc

db.query(Question, func.count(User))

In [41]:
db.query(Answer).filter(Answer.question_id == 2).order_by(Answer.voter)

<sqlalchemy.orm.query.Query at 0x11d6aef40>

In [None]:
db.query(Answer).filter(Answer.question_id == 2).order_by(Answer.voter)

In [4]:
q.subject = "FastAPI Model Question"
db.commit()

In [18]:
q = db.query(Question).get(2)
q.subject

'FastAPI 모델 질문입니다.'

In [19]:
q = db.query(Question).get(1)
db.delete(q)
db.commit()

In [20]:
db.query(Question).all()

[<__main__.Question at 0x10bc29730>]

In [10]:
q = db.query(Question).get(2)
a = Answer(question=q, content="네 자동으로 생성됩니다.", create_date=datetime.now())
db.add(a)
db.commit()

In [11]:
a.question

<models.Question at 0x106e31ee0>

In [12]:
q.answer

[<models.Answer at 0x106e159a0>]

In [5]:
for i in range(300):
    q = Question(subject="test data : [%03d]" % i, content=f"content {i}", create_date=datetime.now())
    db.add(q)

In [6]:
db.commit()

In [19]:
db = SessionLocal()

In [3]:
db.query(Question).count()

NameError: name 'db' is not defined

In [21]:
db.query(Answer).count()

8

In [22]:
db.query(Question).join(Answer).count()

7

In [23]:
db.query(Question).outerjoin(Answer).count()

312

In [24]:
db.query(Question).outerjoin(Answer).distinct().count()

309

In [40]:
db.query(Question).outerjoin(Answer).filter(
    Question.content.ilike("%파이썬%") |
    Answer.content.ilike("%파이썬%")
).distinct().count()

1

In [46]:
db.query(Answer.question_id, Answer.content, User.username).outerjoin(User, Answer.user_id == User.id).subquery()

<sqlalchemy.sql.selectable.Subquery at 0x107fde640; anon_1>

In [47]:
sub_query = db.query(Answer.question_id, Answer.content, User.username) \
.outerjoin(User, Answer.user_id == User.id).subquery()

In [49]:
sub_query.c.question_id

Column('question_id', Integer(), ForeignKey('question.id'), table=<anon_1>)

In [51]:
db.query(Question).outerjoin(sub_query, sub_query.c.question_id == Question.id) \
    .filter(sub_query.c.content.ilike("%파이썬%") | 
            sub_query.c.username.ilike("%파이썬%")).distinct().count()

0

In [53]:
db.query(Question).outerjoin(sub_query, sub_query.c.question_id == Question.id) \
    .filter(sub_query.c.content.ilike('%파이썬%') |   # 답변내용
           sub_query.c.username.ilike('%파이썬%')    # 답변작성자
           ).distinct().count()

0