# Fast API 심화

## 데이터 베이스 연동

In [1]:
# pip install sqlalchemy

In [2]:
# pip install pymysql

In [3]:
# pip install cryptography

In [4]:
# pip install "PyMySQL[rsa]"


In [3]:
# 데이터베이스 연결
# sql 조회
import pymysql

connection = pymysql.connect(
    host = 'localhost',
    user = 'root',
    password = '00000000',
    database = 'orm_tutorial',
    charset = 'utf8mb4'
)

In [4]:
def get_users_sql():
    """순수 sql로 사용자 목록 조회"""
    with connection.cursor() as cursor:
        sql = "SELECT id, username, full_name, age FROM users WHERE is_active = %s"
        cursor.execute(sql,(True,))
        results = cursor.fetchall()
        return results
    
users = get_users_sql()
print("순수 SQL 결과\n", users)

순수 SQL 결과
 ((1, 'john_doe', '존 도', 25), (2, 'jane_smith', '제인 스미스', 30), (3, 'bob_wilson', '밥 윌슨', 35))


In [5]:
## ORM 조회
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# DB 연결 설정

DATABASE_URL = "mysql+pymysql://root:00000000@localhost:3306/orm_tutorial"

engine = create_engine(DATABASE_URL)

SessionLocal = sessionmaker(bind = engine)

Base = declarative_base()

  Base = declarative_base()


In [6]:
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key = True)
    username = Column(String(50), unique = True, nullable = False)
    email = Column(String(50), unique = True, nullable = False)
    full_name =Column(String(100))
    age = Column(Integer)
    is_active = Column(Boolean, default= True)

In [7]:
def get_users_orm():
    session = SessionLocal()
    
    try:
        users = session.query(User)\
                        .filter(User.is_active == True)\
                        .all()
        return users
    
    finally:
        session.close()
        
        
users = get_users_orm()

In [8]:
#  SQL각 사용자별 게시글 수와 총 조회수를 구하기

def get_user_stats_sql():
    """
    순수 SQL로 사용자별 통계 조회 (JOIN + 집계함수 사용)
    """
    with connection.cursor() as cursor:
        # 복잡한 SQL 쿼리 작성
        sql = """
        SELECT
            u.id,                                      -- 사용자 ID
            u.username,                                -- 사용자명
            u.full_name,                               -- 실명
            COUNT(p.id) as post_count,                 -- 게시글 개수 (COUNT 집계)
            IFNULL(SUM(p.view_count), 0) as total_views -- 총 조회수 (SUM 집계, NULL시 0)
        FROM users u                                  -- 메인 테이블: users
        LEFT JOIN posts p ON u.id = p.user_id         -- 외부 조인: 게시글이 없는 사용자도 포함
        WHERE u.is_active = %s                        -- 활성 사용자만 필터링
        GROUP BY u.id, u.username, u.full_name        -- 사용자별로 그룹화
        ORDER BY total_views DESC                     -- 조회수 내림차순 정렬
        """
        # 여기에 실제 실행 >코드를 추가 (예시)
        cursor.execute(sql, (True,))  # True: 활성 사용자만
        result = cursor.fetchall()
        return result


In [9]:
## 실행 결과 출력
stats = get_user_stats_sql()

for stat in stats:
    print(f"사용자: {stat[2]}, 게시글: {stat[3]}개, 총 조회수: {stat[4]}")
    

사용자: 존 도, 게시글: 2개, 총 조회수: 35
사용자: 제인 스미스, 게시글: 1개, 총 조회수: 15
사용자: 밥 윌슨, 게시글: 1개, 총 조회수: 8


In [10]:
# ORM각 사용자별 게시글 수와 총 조회수를 구하기
from sqlalchemy.orm import relationship
from sqlalchemy import ForeignKey, func
from sqlalchemy import create_engine, Column, Integer,String, Boolean,DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "mysql+pymysql://root:00000000@localhost:3306/orm_tutorial"

engine = create_engine(DATABASE_URL)

SessionLocal = sessionmaker(bind = engine)

Base = declarative_base()

  Base = declarative_base()


In [None]:
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key = True)
    username = Column(String(50), unique = True, nullable = False)
    email = Column(String(50), unique = True, nullable = False)
    full_name =Column(String(100))
    age = Column(Integer)
    is_active = Column(Boolean, default= True)
    
class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key = True)
    title = Column(String(200), nullable = False)
    content = Column(String(1000))
    user_id =Column(Integer, ForeignKey('users.id'))
    view_count = Column(Integer, default= 0)
    
    # 관계 설정
    # post -> user: 다대일 관계 (여러 게시글이 한 사용자에게 속함)
    user = relationship("User", back_populates= "posts")
    
User.posts = relationship("Post",back_populates="user")


    

In [12]:
def get_user_stats_orm():
    """SQLAlchemy ORM으로 사용자별 통계 조회 (조인 + 집계)"""
    session = SessionLocal()
    
    try:
        stats = session.query(
            User.id,
            User.username,
            User.full_name,
            func.count(Post.id).label('post_count'),
            func.coalesce(
                func.sum(Post.view_count),0
            ).label('total_views')
        )\
        .outerjoin(Post, User.id == Post.user_id)\
        .filter(User.is_active == True)\
        .group_by(User.id, User.username, User.full_name)\
        .order_by(
            func.coalesce(func.sum(Post.view_count),0).desc()
        )\
        .all()
        
        return stats
        
    finally:
        session.close()
        
stats = get_user_stats_orm()
for stat in stats:
    print(f"사용자: {stat[2]}, 게시글: {stat[3]}개, 총 조회수: {stat[4]}")
    

사용자: 존 도, 게시글: 2개, 총 조회수: 35
사용자: 제인 스미스, 게시글: 1개, 총 조회수: 15
사용자: 밥 윌슨, 게시글: 1개, 총 조회수: 8


----

### SQL vs ORM (데이터 삽입, 삭제)

#### 데이터 삽입(INSERT)

In [13]:
# SQL insert

def create_user_sql(username, email, full_name, age):
    with connection.cursor() as cursor:
        
        #1. 중복 체크
        check_sql = "SELECT COUNT(*) FROM users WHERE username = %s OR email = %s"
        cursor.execute(check_sql, (username, email))
        
        count = cursor.fetchone()[0]
        if count > 0:
            raise ValueError("사용자명 또는 이메일이 이미 존재합니다.")
        
        #2. 데이터 삽입
        insert_sql = """
        INSERT INTO users (username, email, full_name, age, is_active)
        VALUES (%s,%s,%s,%s,%s)
        """
        
        cursor.execute(insert_sql, (username, email, full_name, age, True))
        
        connection.commit()
        
        return cursor.lastrowid
    
# 실행
try:
    user_id = create_user_sql("alice", "alice@examplr.com","앨리스",28)
    print(f"새 사용자 생성 성공! ID: {user_id}")
except ValueError as e:
    print("중복 오류: {e}")
except Exception as e:
    print("DB 오류: {e}")

새 사용자 생성 성공! ID: 4


In [14]:
# ORM INSERT

def create_user_orm(username, email, full_name, age):
    session = SessionLocal()
    
    try:
        #1. 데이터가 있는지 없는지 중복 체크
        existing_user = session.query(User).filter(
            (User.username == username) | (User.email == email )
        ).first()
        if existing_user:
            raise ValueError("사용자명 또는 이메일이 이미 존재합니다.")
        
        # 2. 새 객체 생성
        new_user = User(
            username = username,
            email = email,
            full_name = full_name,
            age = age,
            is_active = True
        )
        
        #3. DB에 저장
        session.add(new_user)
        session.commit()
        session.refresh(new_user)
        
        return new_user.id
    
    except Exception as e:
        #오류 발생 시 변경 사항 취소 (트랜잭션 롤백
        session.rollback()
        raise e
    
    finally: 
        session.close()
        
        
try:
    user_id = create_user_orm("alice2", "alice2@example.com", "앨리스2", "28")
    print(f"새 사용자 생성 성공! ID : {user_id}")
except ValueError as e:
    print(f"중복 오류: {e}")
except Exception as e:
    print(f"DB 오류: {e}")
        

새 사용자 생성 성공! ID : 5


#### 데이터 수정(UPDATE)

In [17]:
# SQL UPDATE

def update_user_sql(user_ud, **kwargs):
    if not kwargs:
        return False
    
    set_clauses = []
    values = [] 
    
    allowed_columns = ['username','email','full_name','age', 'is_active']
    
    for key,value in kwargs.items():
        if key in allowed_columns:
            set_clauses.append(f"{key} = %s")
            values.append(value)
    
    if not set_clauses:
        return False
    
    values.append(user_id)
    
    sql = f'UPDATE users SET {','.join(set_clauses)} WHERE id = %s'
    
    with connection.cursor() as cursor:
        cursor.execute(sql, values)
        connection.commit()
        
        return cursor.rowcount > 0
    
    

In [18]:


#1. 실행 테스트
success = update_user_sql(1, full_name = "john do (update)", age = 26)
print(f"update success: {success}")

# 여러 필드 동시 수정
success2 = update_user_sql(2, username = "jane_updated", age = 30, is_active = False)
print(f"다중 필드 수정 성공: {success2}")

update success: True
다중 필드 수정 성공: True


In [22]:
# ORM UPDATE

def update_user_orm(user_id, **kwargs):
    session = SessionLocal()
    
    try:
        user = session.query(User)\
                    .filter(User.id == user_id)\
                        .first()
        if not user:
            return False
        
        for key, value in kwargs.items():
            if hasattr(user, key):
                
                setattr(user, key, value)
                
        session.commit()
        return True

    except Exception as e:
        session.rollback()
        print(f"업데이트 오류: {e}")
        raise e

    finally:
        session.close()
        
success = update_user_orm(1, full_name = "존 도 (ORM 업데이트)", age = 26)
print(f"수정 성공: {success}")

success2 = update_user_orm(2, 
                           username = "jane_orm_updated",
                           age = 30,
                           is_active = False,
                           full_name = "제인 스미스 (수정됨)"
                           )
print(f"다중 필드 수정 성공: {success2}")

success3 = update_user_orm(999, username = 'nonexistent')
print(f"존재하지 않는 사용자 : {success3}")


수정 성공: True
다중 필드 수정 성공: True
존재하지 않는 사용자 : False
