## Setup connection to database

In [48]:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
SQLALCHEMY_DATABASE_URL = "postgresql://phong02468:12481632@postgresserver/db"
SQLALCHEMY_DATABASE_URL = "postgresql+psycopg2://postgres:123456aA@:5432/MSE19"


engine = create_engine(
    SQLALCHEMY_DATABASE_URL
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


Base = declarative_base()

## Models

In [49]:
from sqlalchemy import Column, Integer, String, Float, Boolean, ForeignKey, LargeBinary, Table, Date, DateTime,Text
from sqlalchemy.orm import relationship
import uuid
from sqlalchemy.dialects.postgresql import UUID as pg_UUID


class User(Base):
    # Tên bảng trong DB
    __tablename__ = "User"

    user_id = Column(pg_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    username = Column(String(50), nullable=False)
    email = Column(String(100), nullable=False)
    password = Column(String(255), nullable=False)
    mobile = Column(String(15))
    fullname = Column(String(100))
    role = Column(String(20), nullable=False)


# Định nghĩa model Question
class Question(Base):
    # Tên bảng trong DB
    __tablename__ = "Question"

    question_id = Column(pg_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    question_number = Column(String(15), nullable=False)
    exam_subject = Column(String(100), nullable=False)
    exam_maker = Column(pg_UUID(as_uuid=True), ForeignKey('User.user_id'), nullable=False)  # khóa ngoại PK tham chiếu đến User
    question_date = Column(Date, nullable=False)
    question_content = Column(Text, nullable=False)
    question_image = Column(Text)
    option_a = Column(Text, nullable=False)
    option_b = Column(Text, nullable=False)
    option_c = Column(Text, nullable=False)
    option_d = Column(Text, nullable=False)
    correct_answer = Column(String(1), nullable=False)
    question_mark = Column(Integer, nullable=False)
    question_unit = Column(String(50), nullable=False)
    question_mixchoices = Column(Boolean, default=False)

    # Định nghĩa mối quan hệ với bảng User
    exam_maker_user = relationship("User", backref="questions")


class Subject(Base):
    __tablename__ = "subjects"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, index=True)
    lecturer = Column(String)
    date = Column(Date)

    questions = relationship("Question", back_populates="subject")
    quiz_sets = relationship("QuizSet", back_populates="subject")

quiz_set_questions = Table('quiz_set_questions', Base.metadata,
    Column('quiz_set_id', Integer, ForeignKey('quiz_sets.id')),
    Column('question_id', Integer, ForeignKey('questions.id'))
)


class QuizSet(Base):
    __tablename__ = "quiz_sets"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    subject_id = Column(Integer, ForeignKey("subjects.id"))
    
    subject = relationship("Subject", back_populates="quiz_sets")
    questions = relationship("Question", secondary=quiz_set_questions, back_populates="quiz_sets")
    exams = relationship("Exam", back_populates="quiz_set")



class Exam(Base):
    __tablename__ = "exams"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    quiz_set_id = Column(Integer, ForeignKey("quiz_sets.id"))
    user_id = Column(Integer, ForeignKey("users.id"))
    start_time = Column(DateTime)
    end_time = Column(DateTime)
    duration = Column(Integer)  # in minutes

    quiz_set = relationship("QuizSet", back_populates="exams")
    user = relationship("User", back_populates="exams")

## Schema

In [50]:
# Import các thư viện cần thiết từ Pydantic
from pydantic import BaseModel, EmailStr

from uuid import UUID
from typing import Optional
from datetime import date, time

# Định nghĩa schema cơ bản cho User
class UserBase(BaseModel):
    username: str  
    email: Optional[str] = None
    mobile: Optional[str] = None
    fullname: Optional[str] = None
    role: str  # Vai trò của người dùng

# Schema cho việc tạo User mới, kế thừa từ UserBase
class UserCreate(UserBase):
    password: str  # Mật khẩu người dùng

# Schema cho việc cập nhật User, kế thừa từ UserBase
class UserUpdate(UserBase):
    password: Optional[str] = None  

# Schema cho phản hồi User, kế thừa từ UserBase
class UserOut(UserBase):
    user_id: UUID  # ID người dùng

# Định nghĩa schema cơ bản cho Question
class QuestionBase(BaseModel):
    question_number: str  # Số câu hỏi
    exam_subject: str  # Môn thi
    exam_maker: UUID  # ID người tạo câu hỏi
    question_date: date  # Ngày tạo câu hỏi
    question_content: str  # Nội dung câu hỏi
    question_image: Optional[str] = None  # Hình ảnh câu hỏi (base64)
    option_a: str  # Lựa chọn A
    option_b: str  # Lựa chọn B
    option_c: str  # Lựa chọn C
    option_d: str  # Lựa chọn D
    correct_answer: str  # Đáp án đúng (phải kiểm tra hợp lệ trong CRUD)
    question_mark: int  # Điểm của câu hỏi
    question_unit: str  # Đơn vị câu hỏi
    question_mixchoices: Optional[bool] = False  # Trộn lựa chọn (mặc định là False)

# Schema cho việc tạo Question mới, kế thừa từ QuestionBase
class QuestionCreate(QuestionBase):
    question_number: str
    # exam_maker: UUID 
    exam_subject: str 
    question_date: date 
    question_content: str 
    question_image: Optional[str] = None 
    option_a: str
    option_b: str
    option_c: str
    option_d: str 
    correct_answer: str 
    question_mark: int  
    question_unit: str 
    question_mixchoices: Optional[bool] = False  

# Schema cho việc cập nhật Question, kế thừa từ QuestionBase
class QuestionUpdate(QuestionBase):
    pass

# Schema cho phản hồi Question, kế thừa từ QuestionBase
class QuestionOut(QuestionBase):
    question_id: UUID  # ID câu hỏi


# Schema cơ bản cho Exam
class ExamBase(BaseModel):
    exam_subject: str  # Môn thi
    exam_code: str  # Mã đề thi
    duration: int  # Thời lượng thi
    number_of_questions: int  # Số lượng câu hỏi

# Schema cho việc tạo Exam
class ExamCreate(ExamBase):
    pass

# Schema cho việc cập nhật Exam
class ExamUpdate(ExamBase):
    pass

# Schema để trả về thông tin Exam
class ExamOut(ExamBase):
    exam_id: UUID  # ID đề thi

    class Config:
        from_attributes = True  # Cho phép Pydantic tương thích với ORM


# Schema cho Schedule
class ScheduleBase(BaseModel):
    exam_id: UUID
    schedule_date: date
    start_time: time
    end_time: time

class ScheduleCreate(ScheduleBase):
    pass

class ScheduleUpdate(ScheduleBase):
    pass

class ScheduleOut(ScheduleBase):
    schedule_id: UUID

class IntroducerResponse(BaseModel):
    status: str 
    message: str
    exam_subject: str
    questions_created: int
    warnings: list[str]

## <b>CRUD</b>

In [51]:
# Import các thư viện cần thiết từ SQLAlchemy và các module khác
from sqlalchemy.orm import Session
import uuid

# Hàm tạo Question mới

def create_question(db: Session, question: QuestionCreate):
    db_question = Question(
        question_number=question.question_number,
        exam_subject=question.exam_subject,
        exam_maker=question.exam_maker,
        question_date=question.question_date,
        question_content=question.question_content,
        question_image=question.question_image,
        option_a=question.option_a,
        option_b=question.option_b,
        option_c=question.option_c,
        option_d=question.option_d,
        correct_answer=question.correct_answer,
        question_mark=question.question_mark,
        question_unit=question.question_unit,
        question_mixchoices=question.question_mixchoices
    )
    db.add(db_question)  # Thêm Question vào session
    db.commit()  # Lưu các thay đổi vào cơ sở dữ liệu
    db.refresh(db_question)  # Làm mới đối tượng Question từ cơ sở dữ liệu
    return db_question  # Trả về đối tượng Question mới tạo
from sqlalchemy.orm import Session

def create_question(db: Session, question: QuestionCreate, subject_id: int):
    db_question = Question(
        subject_id=subject_id,
        text=question.text,
        image=question.image,
        choices=",".join(question.choices),
        correct_answer=question.correct_answer,
        mark=question.mark,
        unit=question.unit,
        mix_choices=question.mix_choices
    )
    db.add(db_question)
    db.commit()
    db.refresh(db_question)
    return db_question

# Hàm lấy thông tin Question theo question_id
def get_question(db: Session, question_id: uuid.UUID):
    return db.query(Question).filter(Question.question_id == question_id).first()

# Hàm lấy danh sách các Question với phân trang
def get_questions(db: Session, skip: int = 0, limit: int = 10):
    return db.query(Question).offset(skip).limit(limit).all()

# Hàm cập nhật thông tin Question
def update_question(db: Session, question_id: uuid.UUID, question_data: QuestionUpdate):
    question = db.query(Question).filter(Question.question_id == question_id).first()
    if question:
        update_data = question_data.dict(exclude_unset=True)  # Chỉ cập nhật các trường được cung cấp
        for key, value in update_data.items():
            setattr(question, key, value)  # Cập nhật giá trị mới vào question
        db.commit()  # Lưu các thay đổi vào cơ sở dữ liệu
        db.refresh(question)  # Làm mới đối tượng Question từ cơ sở dữ liệu
        return question
    return None  # Trả về None nếu không tìm thấy question

# Hàm xóa Question theo question_id
def delete_question(db: Session, question_id: uuid.UUID):
    question = db.query(Question).filter(Question.question_id == question_id).first()
    if question:
        db.delete(question)  # Xóa Question khỏi session
        db.commit()  # Lưu các thay đổi vào cơ sở dữ liệu
        return question
    return None  # Trả về None nếu không tìm thấy question


# CRUD cho Exam
def get_exam(db: Session, exam_id: uuid.UUID):
    return db.query(Exam).filter(Exam.exam_id == exam_id).first()  # Lấy đề thi theo exam_id

def create_exam(db: Session, exam: ExamCreate):
    db_exam = Exam(**exam.dict())  # Tạo đối tượng Exam từ schema ExamCreate
    db.add(db_exam)  # Thêm đối tượng Exam vào session
    db.commit()  # Lưu thay đổi vào cơ sở dữ liệu
    db.refresh(db_exam)  # Làm mới đối tượng Exam
    return db_exam  # Trả về đối tượng Exam đã tạo

def get_exams(db: Session, skip: int = 0, limit: int = 10):
    return db.query(Exam).offset(skip).limit(limit).all()  # Lấy danh sách đề thi với phân trang

def update_exam(db: Session, exam_id: uuid.UUID, exam_data: ExamUpdate):
    db_exam = get_exam(db, exam_id)  # Lấy đề thi từ cơ sở dữ liệu
    if db_exam:
        for key, value in exam_data.dict().items():  # Cập nhật các trường của đề thi
            setattr(db_exam, key, value)
        db.commit()  # Lưu thay đổi vào cơ sở dữ liệu
        db.refresh(db_exam)  # Làm mới đối tượng Exam
    return db_exam  # Trả về đối tượng Exam đã cập nhật

def delete_exam(db: Session, exam_id: uuid.UUID):
    db_exam = get_exam(db, exam_id)  # Lấy đề thi từ cơ sở dữ liệu
    if db_exam:
        db.delete(db_exam)  # Xóa đối tượng Exam
        db.commit()  # Lưu thay đổi vào cơ sở dữ liệu
    return db_exam  # Trả về đối tượng Exam đã xóa


## <b>Services, Utils<b>

In [52]:
import docx
import base64
from datetime import datetime
from docx.oxml.shape import CT_Picture
import re


def parse_docx(file_path):
    doc = docx.Document(file_path)
    info = {}
    questions = []
    warnings = []

    # Process metadata
    for para in doc.paragraphs:
        text = para.text.strip()
        if text.startswith("Subject:"):
            try:
                info['subject'] = text.split(":", 1)[1].strip()
            except IndexError:
                warnings.append("Subject format is incorrect.")
        elif text.startswith("Number of Quiz:"):
            try:
                info['num_quiz'] = int(text.split(":", 1)[1].strip())
            except (ValueError, IndexError):
                warnings.append("Number of Quiz format is incorrect.")
        elif text.startswith("Lecturer:"):
            info['lecturer'] = text.split(":", 1)[1].strip()
        elif text.startswith("Date:"):
            try:
                date_str = text.split(":", 1)[1].strip()
                info['date'] = datetime.strptime(date_str, "%d-%m-%Y").date()
            except (ValueError, IndexError):
                warnings.append("Date format is incorrect.")
    if len(info) != 4:
        warnings.append("Missing header information.")

    expected_order = ['qn', 'a.', 'b.', 'c.', 'd.', 'answer:', 'mark:', 'unit:', 'mix choices:']
    table_index = 0

    for table in doc.tables:
        question = {}
        order = []

        for row in table.rows:
            if len(row.cells) != 2:
                warnings.append(f"Invalid table format in table {table_index + 1}")
                continue

            key = row.cells[0].text.strip().lower()
            value = row.cells[1].text.strip()
            order.append(key)
            
            if value is None:
                warnings.append(f"Emty cell value was found in table {table_index + 1}")
                continue
            
            

            if key.strip().lower().startswith('qn'):
                qn = re.findall("^qn=(\d+)", key.strip())
                if qn[0] is None:
                    warnings.append(f"Question number not found in table {table_index + 1}")
                    continue
                question['question_number'] = qn[0]

                question['text'] = value
                # Handle image
                for paragraph in row.cells[1].paragraphs:
                    for run in paragraph.runs:
                        image_parts = []
                        for element in run._element.iter():
                            if isinstance(element, CT_Picture):
                                rId = element.xpath('.//a:blip/@r:embed')[0]
                                image_part = doc.part.related_parts[rId]
                                image_parts.append(image_part)
                    
                        if image_parts:
                            image_bytes = image_parts[0].blob
                            question['image'] = base64.b64encode(image_bytes).decode('utf-8')
                            # print('question image: ', question['image'])
                question['choices'] = [value]
            elif key.strip().lower() in ['b.', 'c.', 'd.']:
                if 'choices' in question:
                    question['choices'].append(value)
                else:
                    warnings.append(f"Choice {key} found before 'a.' in table {table_index + 1}")
            elif key.strip().lower() == 'answer:':
                question['correct_answer'] = value
            elif key.strip().lower() == 'mark:':
                try:
                    question['mark'] = float(value)
                except ValueError:
                    warnings.append(f"Invalid mark value in table {table_index + 1}")
            elif key.strip().lower() == 'unit:':
                question['unit'] = value
            elif key.strip().lower() == 'mix choices:':
                question['mix_choices'] = value.lower() == 'yes'

        # Ensure keys are in the expected order
        order = list(map(lambda x: x.strip().lower(), order))
        order[0] = order[0][:2]
        
        if order != expected_order:
            warnings.append(f"Row names are not following the expected order in table {table_index + 1}, {order}")

        if 'text' not in question:
            warnings.append(f"Question text is missing in table {table_index + 1}")
        if 'choices' not in question or len(question.get('choices', [])) < 2:
            warnings.append(f"Insufficient choices in table {table_index + 1}")
        if 'correct_answer' not in question:
            warnings.append(f"Correct answer is missing in table {table_index + 1}")

        if 'text' in question and 'choices' in question and 'correct_answer' in question:
            questions.append(question)
        
        table_index += 1

    
    # for question in enumerate(questions,1):
    #     print(question[0], '-----------------------------------------------')
    #     for k,v in question[1].items():
    #         if k == 'image':
    #             print(k, v[:50])
    #         else:
    #             print(k,v)

    return info, questions, warnings


async def process_uploaded_file(user_id: uuid.UUID, file_path: str, db: Session) -> IntroducerResponse:
    info, questions, warnings = parse_docx(file_path)
    
    # user = crud.get_user(db, user_id)
    # if not user:
    #     raise HTTPException(status_code=404, detail="User not found")
    if warnings:
        # If there are warnings, don't add questions to the database
        return IntroducerResponse(
            status="fail",
            message="Warnings found during processing. Questions not added to database.",
            exam_subject=info.get('subject', ''),
            questions_created=0,
            warnings=warnings
        )

    questions_created = []
    for q in questions:
        db_question = create_question(db, QuestionCreate(
            question_number=f"{len(questions_created) + 1}",
            exam_subject=info['subject'],
            # exam_maker=user_id,
            question_date=date.today(),
            question_content=q['text'],
            question_image=q.get('image'),
            option_a=q['choices'][0],
            option_b=q['choices'][1],
            option_c=q['choices'][2],
            option_d=q['choices'][3],
            correct_answer=q['correct_answer'],
            question_mark=int(q['mark']),
            question_unit=q.get('unit', ''),
            question_mixchoices=q.get('mix_choices', False)
        ))
        questions_created.append(db_question)
    
    return IntroducerResponse(
        status="success",
        message="File processed successfully. Questions added to database.",
        exam_subject=info['subject'],
        questions_created=len(questions_created),
        warnings=[]
        )

## Testing service, util functions

In [53]:
info, questions, warnings = parse_docx("F:\FSB\Python for Engineer\Week 2 - 1\Template 2.docx")
for question in enumerate(questions,1):
    print(question[0], '-----------------------------------------------')
    for k,v in question[1].items():
        if k == 'image':
            print(k, v[:50])
        else:
            print(k,v)

1 -----------------------------------------------
question_number 1
text See the figure and choose the right type of B2B E-Commerce
[file:8435.jpg]
image iVBORw0KGgoAAAANSUhEUgAAATQAAAC7CAYAAAGcF1KwAAAAAX
choices ['See the figure and choose the right type of B2B E-Commerce\n[file:8435.jpg]', 'Electronic Exchange', 'Buy-side B2B', 'Supply Chain Improvements and Collaborative Commerce']
correct_answer B
mark 0.5
unit Chapter1
mix_choices True
2 -----------------------------------------------
question_number 2
text See the figure and choose the right type of B2B E-Commerce
choices ['See the figure and choose the right type of B2B E-Commerce', 'Electronic', 'B2B', 'Supply Commerce']
correct_answer B
mark 0.5
unit Chapt1
mix_choices True
3 -----------------------------------------------
question_number 3
text See the figure and choose the right type of B2B E-Commerce
image iVBORw0KGgoAAAANSUhEUgAAATQAAAC7CAYAAAGcF1KwAAAAAX
choices ['See the figure and choose the right type of B2B E-Commerce

## <b>Main (controller)</b>

In [54]:
# Import các thư viện cần thiết từ FastAPI, SQLAlchemy và các module khác
from fastapi import FastAPI, File, UploadFile, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
import tempfile
import os
from datetime import date
import uuid

app = FastAPI()


# Endpoint tạo Question mới
@app.post("/questions/", response_model=QuestionOut)
def create_question(question: QuestionCreate, db: Session = Depends(get_db)):
    return create_question(db=db, question=question)  # Gọi hàm tạo câu hỏi trong CRUD

# Endpoint lấy thông tin Question theo question_id
@app.get("/questions/{question_id}", response_model=QuestionOut)
def read_question(question_id: uuid.UUID, db: Session = Depends(get_db)):
    question = get_question(db=db, question_id=question_id)  # Lấy câu hỏi từ cơ sở dữ liệu
    if question is None:
        raise HTTPException(status_code=404, detail="Question not found")  # Nếu không tìm thấy câu hỏi, trả về lỗi 404
    return question  # Trả về câu hỏi tìm được

# Endpoint lấy danh sách các Question
@app.get("/questions/", response_model=list[QuestionOut])
def read_questions(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    questions = get_questions(db=db, skip=skip, limit=limit)  # Lấy danh sách câu hỏi từ cơ sở dữ liệu với phân trang
    return questions  # Trả về danh sách các câu hỏi

# Endpoint cập nhật Question theo question_id
@app.put("/questions/{question_id}", response_model=QuestionOut)
def update_question(question_id: uuid.UUID, question_data: QuestionUpdate, db: Session = Depends(get_db)):
    question = update_question(db=db, question_id=question_id, question_data=question_data)  # Cập nhật câu hỏi trong cơ sở dữ liệu
    if question is None:
        raise HTTPException(status_code=404, detail="Question not found")  # Nếu không tìm thấy câu hỏi, trả về lỗi 404
    return question  # Trả về câu hỏi đã được cập nhật

# Endpoint xóa Question theo question_id
@app.delete("/questions/{question_id}", response_model=QuestionOut)
def delete_question(question_id: uuid.UUID, db: Session = Depends(get_db)):
    question = delete_question(db=db, question_id=question_id)  # Xóa câu hỏi khỏi cơ sở dữ liệu
    if question is None:
        raise HTTPException(status_code=404, detail="Question not found")  # Nếu không tìm thấy câu hỏi, trả về lỗi 404
    return question  # Trả về câu hỏi đã bị xóa


# Endpoint tạo mới Exam
@app.post("/exams/", response_model=ExamOut)
def create_exam(exam: ExamCreate, db: Session = Depends(get_db)):
    return create_exam(db=db, exam=exam)

# Endpoint lấy thông tin Exam theo exam_id
@app.get("/exams/{exam_id}", response_model=ExamOut)
def read_exam(exam_id: uuid.UUID, db: Session = Depends(get_db)):
    exam = get_exam(db=db, exam_id=exam_id)
    if exam is None:
        raise HTTPException(status_code=404, detail="Exam not found")
    return exam

# Endpoint lấy danh sách các Exam
@app.get("/exams/", response_model=list[ExamOut])
def read_exams(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    exams = get_exams(db=db, skip=skip, limit=limit)
    return exams

# Endpoint cập nhật Exam theo exam_id
@app.put("/exams/{exam_id}", response_model=ExamOut)
def update_exam(exam_id: uuid.UUID, exam_data: ExamUpdate, db: Session = Depends(get_db)):
    exam = update_exam(db=db, exam_id=exam_id, exam_data=exam_data)
    if exam is None:
        raise HTTPException(status_code=404, detail="Exam not found")
    return exam

# Endpoint xóa Exam theo exam_id
@app.delete("/exams/{exam_id}", response_model=ExamOut)
def delete_exam(exam_id: uuid.UUID, db: Session = Depends(get_db)):
    exam = delete_exam(db=db, exam_id=exam_id)
    if exam is None:
        raise HTTPException(status_code=404, detail="Exam not found")
    return exam

@app.post("/upload-and-process/", response_model=IntroducerResponse)
async def upload_and_process(
    user_id: uuid.UUID,
    file: UploadFile = File(...),
    db: Session = Depends(get_db)
):
    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as temp_file:
            contents = await file.read()
            temp_file.write(contents)
            temp_file_path = temp_file.name
        
        response = await process_uploaded_file(user_id, temp_file_path, db)
        
        os.unlink(temp_file_path)
        return response
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

In [55]:
file_path = "path_to_your_test_file.docx"
user_id = uuid.uuid4()
upload_and_process(user_id,file_path)

<coroutine object upload_and_process at 0x0000028EA54A3760>