# Fill DB with generated data

In [1]:
import hashlib
import random
from datetime import date, datetime, timedelta
from enum import Enum
from typing import Any, Union, Optional

import numpy as np
from faker import Faker
from pydantic import BaseModel

from database import SessionLocal, Base
from database.models import *  # I know, it's bm, but there are 19 classes...

### Utils

In [2]:
fake = Faker()

In [3]:
class SessionFactory():
    def __enter__(self):
        self.db = SessionLocal()
        return self.db
    def __exit__(self, exception_type, exception_value, exception_traceback):
        self.db.close()

In [4]:
def create(obj: Base) -> Base:
    with SessionFactory() as db:
        db.add(obj)
        db.commit()
        db.refresh(obj)
    return obj

In [5]:
def min_dt(*arr: list[Optional[Union[datetime, date]]]) -> Optional[datetime]:
    arr = [dt for dt in arr if dt]
    arr = [
        (
            datetime.combine(dt, datetime.min.time())
            if isinstance(dt, date) else dt
        ) for dt in arr
    ]
    return min(arr) if arr else None


def max_dt(*arr: list[Optional[Union[datetime, date]]]) -> Optional[datetime]:
    arr = [dt for dt in arr if dt]
    arr = [
        (
            datetime.combine(dt, datetime.min.time())
            if isinstance(dt, date) else dt
        ) for dt in arr
    ]
    return max(arr) if arr else None

### Generation config

In [6]:
class TextLengthConfig(BaseModel):
    short: int = 3
    medium: int = 10
    long: int = 30


class DateTimeConfig(BaseModel):
    company_foundation: datetime = datetime(2017, 8, 24)
    lms_launch: datetime = datetime(2021, 4, 16)
    today: datetime = datetime(2024, 5, 10)


class UserConfig(BaseModel):
    count: int = 500


class EmployeeConfig(BaseModel):
    user_fraction: float = 0.95
    num_positions: int = 30
    num_teams: int = 100
    num_departments: int = 20


class CategoryConfig(BaseModel):
    count: int = 20
    max_depth: int = 3


class CourseConfig(BaseModel):
    count: int = 200
    time_limit_proba: float = 0.5
    student_proba: float = 0.15
    teacher_proba: float = 0.01
    max_sections: int = 5


class SectionConfig(BaseModel):
    time_limit_proba: float = 0.5
    max_steps: int = 10


class StepConfig(BaseModel):
    max_score: int = 5
    unlimited_attempts_proba: int = 0.5
    max_attempts: int = 3

    test_task_proba: float = 0.4
    text_task_proba: float = 0.4
    sorting_task_proba: float = 0.1
    open_ended_task_proba: float = 0.1


class TestTaskConfig(BaseModel):
    multiple_choice_proba: float = 0.5
    partial_score_proba: float = 0.5
    max_options: int = 5
    correct_options_frac: float = 0.4
    option_selection_proba: float = 0.4


class SortingTaskConfig(BaseModel):
    partial_score_proba: float = 0.5
    max_options: int = 5


class TextTaskConfig(BaseModel):
    skip_proba: float = 0.4
    full_match_proba: float = 0.4


class OpenEndedTaskConfig(BaseModel):
    skip_proba: float = 0.5
    max_reviews: int = 3
    all_reviews_proba: float = 0.7


class Config(BaseModel):
    dt: DateTimeConfig = DateTimeConfig()
    user: UserConfig = UserConfig()
    employee: EmployeeConfig = EmployeeConfig()
    category: CategoryConfig = CategoryConfig()
    course: CourseConfig = CourseConfig()
    section: SectionConfig = SectionConfig()
    step: StepConfig = StepConfig()
    test_task: TestTaskConfig = TestTaskConfig()
    sorting_task: SortingTaskConfig = SortingTaskConfig()
    text_task: TextTaskConfig = TextTaskConfig()
    open_ended_task: OpenEndedTaskConfig = OpenEndedTaskConfig()
    text_length: TextLengthConfig = TextLengthConfig()

In [7]:
cfg = Config()

### Add users

In [8]:
logins = set()
users = []
for _ in range(cfg.user.count):
    login = fake.user_name()
    while login in logins:
        login = fake.user_name()
    logins.add(login)

    password_hash = hashlib.sha256(fake.password().encode()).hexdigest()
    first_name = fake.first_name()
    last_name = fake.last_name()
    registration_date = fake.date_between(cfg.dt.lms_launch, cfg.dt.today)
    user = User(
        login=login,
        password_hash=password_hash,
        first_name=first_name,
        last_name=last_name,
        registration_date=registration_date,
    )
    user = create(user)
    users.append(user)

In [9]:
employees = []
for user in users:
    if random.random() < cfg.employee.user_fraction:
        employee = Employee(
            position=fake.random_int(0, cfg.employee.num_positions),
            team=fake.random_int(0, cfg.employee.num_teams),
            department=fake.random_int(0, cfg.employee.num_departments),
            date_of_employment=fake.date_between(
                cfg.dt.company_foundation,
                user.registration_date,
            ),
            user_id = user.id,
        )
        employee = create(employee)
        employees.append(employee)

### Add categories

In [10]:
category_by_id = {}
parent_id = None
depth = 0
while len(category_by_id) < cfg.category.count:
    if random.random() < 1 - depth / cfg.category.max_depth:
        category = Category(
            name=' '.join(fake.words(cfg.text_length.short)),
            parent_id=parent_id,
        )
        category = create(category)
        category_by_id[category.id] = category
        parent_id = category.id
        depth += 1
    else:
        parent_id = category_by_id[parent_id].parent_id
        depth -= 1

In [11]:
categories = list(category_by_id.values())

### Add courses

In [12]:
def create_course(users: list[User]) -> None:
    has_time_limit = (random.random() < cfg.course.time_limit_proba)
    start_time = fake.date_time_between(
        cfg.dt.lms_launch,
        cfg.dt.today - timedelta(days=30),
    ) if has_time_limit else None
    end_time = fake.date_time_between(
        start_time + timedelta(days=30),
        cfg.dt.today + timedelta(days=90),
    ) if has_time_limit else None
    course = Course(
        title=' '.join(fake.words(cfg.text_length.short)),
        description=' '.join(fake.words(cfg.text_length.long)),
        start_time=start_time,
        end_time=end_time,
        category_id=random.choice(categories).id,
    )
    course = create(course)

    students = [user for user in users if random.random() < cfg.course.student_proba]
    teachers = [user for user in users if random.random() < cfg.course.teacher_proba]

    for student in students:
        usc = UserStudyingCourse(
            user_id=student.id,
            course_id=course.id,
        )
        create(usc)
    for teacher in teachers:
        utc = UserTeachingCourse(
            user_id=teacher.id,
            course_id=course.id,
        )
        create(utc)

    num_sections = random.randint(1, cfg.course.max_sections)
    for _ in range(num_sections):
        create_section(course, students, teachers, has_time_limit)

In [13]:
def create_section(
    course: Course,
    students: list[User],
    teachers: list[User],
    has_time_limit: bool,
) -> None:
    has_time_limit = has_time_limit & (random.random() < cfg.section.time_limit_proba)
    start_time = fake.date_time_between(
        course.start_time,
        course.end_time,
    ) if has_time_limit else None
    end_time = fake.date_time_between(
        start_time + timedelta(days=1),
        course.end_time,
    ) if has_time_limit else None
    section = Section(
        title=' '.join(fake.words(cfg.text_length.short)),
        description=' '.join(fake.words(cfg.text_length.long)),
        start_time=start_time,
        end_time=end_time,
        course_id=course.id,
    )
    section = create(section)

    time_limit = (
        max_dt(course.start_time, section.start_time),
        min_dt(course.end_time, section.end_time),
    )
    num_steps = random.randint(1, cfg.section.max_steps)
    for _ in range(num_steps):
        create_step(section, students, teachers, time_limit)

In [14]:
def create_step(
    section: Section,
    students: list[User],
    teachers: list[User],
    time_limit: tuple[datetime, datetime],
) -> None:
    step = Step(
        title=' '.join(fake.words(cfg.text_length.medium)),
        content=' '.join(fake.words(cfg.text_length.long)),
        max_score=random.randint(1, cfg.step.max_score),
        max_attempts=(
            None if random.random() < cfg.step.unlimited_attempts_proba
            else random.randint(1, cfg.step.max_attempts)
        ),
        section_id=section.id,
    )
    step = create(step)

    proba = random.random()
    if proba < cfg.step.test_task_proba:
        create_test_task(step, students, time_limit)
        return
    proba -= cfg.step.test_task_proba
    if proba < cfg.step.text_task_proba:
        create_text_task(step, students, time_limit)
        return
    proba -= cfg.step.text_task_proba
    if proba < cfg.step.sorting_task_proba:
        create_sorting_task(step, students, time_limit)
        return
    proba -= cfg.step.sorting_task_proba
    if proba < cfg.step.open_ended_task_proba:
        create_open_ended_task(step, students, teachers, time_limit)
        return

In [15]:
def create_attempt(
    step: Step,
    user: User,
    score: int,
    time_limit: tuple[datetime, datetime],
) -> Attempt:
    attempt = Attempt(
        score=score,
        submission_time=fake.date_time_between(
            max_dt(time_limit[0], user.registration_date),
            min_dt(time_limit[1], cfg.dt.today),
        ),
        step_id=step.id,
        user_id=user.id,
    )
    attempt = create(attempt)
    return attempt

In [16]:
def create_test_task(
    step: Step,
    students: list[User],
    time_limit: tuple[datetime, datetime],
) -> None:
    multiple_choice = (random.random() < cfg.test_task.multiple_choice_proba)
    task = TestTask(
        multiple_choice=multiple_choice,
        partial_score=(not multiple_choice and random.random() < cfg.test_task.partial_score_proba),
        step_id=step.id,
    )
    task = create(task)

    options: list[TestOption] = []
    num_options = random.randint(2, cfg.test_task.max_options)
    for i in range(num_options):
        option = TestOption(
            content=' '.join(fake.words(cfg.text_length.short)),
            is_correct=(
                random.random() < cfg.test_task.correct_options_frac if task.multiple_choice
                else i == 0
            ),
            task_id=task.id,
        )
        option = create(option)
        options.append(option)
    
    for student in students:
        if time_limit[1] is None or student.registration_date < time_limit[1].date():
            num_attempts = random.randint(0, step.max_attempts or cfg.step.max_attempts)
            for _ in range(num_attempts):
                selected_options = [
                    option for option in options
                    if random.random() < cfg.test_task.option_selection_proba
                ]
                if task.partial_score:
                    score = np.mean(
                        [
                            option.is_correct == (option in selected_options)
                            for option in options
                        ]
                    ) * step.max_score
                else:
                    score = int(all([
                        option in selected_options
                        for option in options
                        if option.is_correct
                    ])) * step.max_score
                attempt = create_attempt(step, user, score, time_limit)
                for option in selected_options:
                    ato = AttemptTestOption(
                        attempt_id=attempt.id,
                        option_id=option.id,
                    )
                    create(ato)

In [17]:
def create_sorting_task(
    step: Step,
    students: list[User],
    time_limit: tuple[datetime, datetime],
) -> None:
    task = SortingTask(
        partial_score=(random.random() < cfg.sorting_task.partial_score_proba),
        step_id=step.id,
    )
    task = create(task)

    options: list[TestOption] = []
    num_options = random.randint(2, cfg.sorting_task.max_options)
    for i in range(num_options):
        option = SortingOption(
            content=' '.join(fake.words(cfg.text_length.short)),
            correct_position=i,
            task_id=task.id,
        )
        option = create(option)
        options.append(option)
    
    for student in students:
        if time_limit[1] is None or student.registration_date < time_limit[1].date():
            num_attempts = random.randint(0, step.max_attempts or cfg.step.max_attempts)
            for _ in range(num_attempts):
                selected_options = options.copy()
                np.random.shuffle(selected_options)
                if task.partial_score:
                    score = np.mean(
                        [
                            i == option.correct_position
                            for i, option in enumerate(selected_options)
                        ]
                    ) * step.max_score
                else:
                    score = (selected_options == options) * step.max_score
                attempt = create_attempt(step, user, score, time_limit)
                for i, option in enumerate(selected_options):
                    aso = AttemptSortingOption(
                        position=i,
                        attempt_id=attempt.id,
                        option_id=option.id,
                    )
                    create(aso)

In [18]:
class TextTaskAnswerType(Enum):
    TEXT = 0
    NUMERIC = 1


class TextTaskCriterion(Enum):
    FULL_MATCH = 0
    IGNORE_CASE = 1
    ROUND = 2

In [19]:
def create_text_task(
    step: Step,
    students: list[User],
    time_limit: tuple[datetime, datetime],
) -> None:
    answer_type = random.choice([
        TextTaskAnswerType.TEXT,
        TextTaskAnswerType.NUMERIC,
    ]).value
    task = TextTask(
        answer_type=answer_type,
        criterion=(
            random.choice([
                TextTaskCriterion.FULL_MATCH,
                TextTaskCriterion.IGNORE_CASE,
            ]).value if answer_type == TextTaskAnswerType.TEXT
            else TextTaskCriterion.ROUND.value
        ),
        correct_answer=(
            ' '.join(fake.word()) if answer_type == TextTaskAnswerType.TEXT
            else fake.random_int(0, 100)
        ),
        step_id=step.id,
    )
    task = create(task)

    for student in students:
        if time_limit[1] is None or student.registration_date < time_limit[1].date():
            if random.random() < cfg.text_task.skip_proba:
                continue
            num_attempts = random.randint(1, step.max_attempts or cfg.step.max_attempts)
            for _ in range(num_attempts):
                if random.random() < cfg.text_task.full_match_proba:
                    answer = task.correct_answer
                    score = 1.0
                elif answer_type == TextTaskAnswerType.TEXT:
                    answer = fake.word()
                    score = float(answer == task.correct_answer)
                else:
                    answer = fake.random_int(0, 100)
                    score = float(answer == task.correct_answer)
                attempt = create_attempt(step, user, score, time_limit)
                user_input = UserInput(
                    content=str(answer),
                    attempt_id=attempt.id,
                )
                create(user_input)

In [20]:
class ReviewType(Enum):
    TEACHER = 0
    CROSS = 1

In [21]:
def create_open_ended_task(
    step: Step,
    students: list[User],
    teachers: list[User],
    time_limit: tuple[datetime, datetime],
) -> None:
    task = OpenEndedTask(
        review_type=random.choice([ReviewType.TEACHER, ReviewType.CROSS]).value,
        num_reviews=random.randint(1, cfg.open_ended_task.max_reviews),
        step_id=step.id,
    )
    task = create(task)

    for student in students:
        if time_limit[1] is None or student.registration_date < time_limit[1].date():
            if random.random() < cfg.open_ended_task.skip_proba:
                continue
            if task.review_type == ReviewType.TEACHER and not teachers:
                continue

            num_reviews = (
                task.num_reviews
                if random.random() < cfg.open_ended_task.all_reviews_proba
                else random.randint(0, task.num_reviews - 1)
            )
            scores = [random.random() for _ in range(num_reviews)]
            score = np.mean(scores) if num_reviews == task.num_reviews else None
            
            attempt = create_attempt(step, user, score, time_limit)
            user_input = UserInput(
                content=' '.join(fake.words(cfg.text_length.long)),
                attempt_id=attempt.id,
            )
            create(user_input)
            
            for score in scores:
                review = Review(
                    score=score,
                    submission_time=fake.date_time_between(
                        attempt.submission_time,
                        min_dt(time_limit[1], cfg.dt.today),
                    ),
                    user_id=(
                        random.choice(teachers).id if task.review_type == ReviewType.TEACHER
                        else random.choice(students).id
                    ),
                    attempt_id=attempt.id,
                )
                create(review)            

In [None]:
for i in range(cfg.course.count):
    create_course(users)