In [None]:
import sys
sys.path.append('../ppda')

import pandas as pd
from datetime import datetime
import re

from db_service import DBService
import config
print("Connecting to ",config.BASE_URL)
conn = DBService()
conn.login('administrator', 'admin')

In [None]:
from sis_school_year import SISSchoolYear

from sis_person import SISPerson
from sis_student import SISStudent
from sis_school_class import SISSchoolClass
from sis_school_grade_level import SISSchoolGradeLevel
from sis_family import SISFamily
from sis_family_guardian import SISFamilyGuardian
from sis_family_child import SISFamilyChild

from sis_staff import SISStaff
from sis_timetable import SISTimetable
from sis_timetable_column import SISTimetableColumn
from sis_timetable_day import SISTimetableDay
from sis_timetable_column_row import SISTimetableColumnRow

from sis_course import SISCourse
from sis_course_class import SISCourseClass
from sis_attendance_log_course_class import SISAttendanceLogCourseClass
from sis_attendance_log_school_class import SISAttendanceLogSchoolClass
from sis_class_feed import SISClassFeed
from pp_user import PPUser

def delete_all(cls):
    print('Deleting all', cls.doctype)
    while True:
        data = cls.find(limit_page_length=500)
        if len(data) == 0:
            break
        for id in data['name'].values:
            if cls.doctype == 'SIS Person':
                try:
                    cls.delete_by_id(id)
                except:
                    continue
            else:
                cls.delete_by_id(id)
        print('Deleted', len(data))
    print('-'*20)

def reset_db():
    delete_all(SISAttendanceLogSchoolClass)
    delete_all(SISAttendanceLogCourseClass)
    delete_all(SISClassFeed)
    delete_all(SISCourseClass)
    delete_all(SISCourse)
    delete_all(SISSchoolClass)
    delete_all(SISFamily)
    delete_all(SISStudent)
    delete_all(SISStaff)
    delete_all(SISPerson)

# reset_db()

In [None]:
# Get current school year

CUR_SCHOOL_YEAR_ID = SISSchoolYear.find_one(filters={"status": "Current"}).name

### Academic Year Event

In [None]:
from sis_academic_year_event import SISAcademicYearEvent

# Get all academic year events
aca_event_df = pd.read_excel('../input_data/SIS Academic Year Events.xlsx', engine="openpyxl")
aca_event_df.head()

for index, row in aca_event_df.iterrows():
    event = SISAcademicYearEvent({
        "start_date": datetime.strftime(row['Start Date'], "%Y-%m-%d"),
        "end_date": datetime.strftime(row['End Date'], "%Y-%m-%d"),
        "school_year": CUR_SCHOOL_YEAR_ID,
        "description": row['Description'] if not pd.isna(row['Description']) else "",
        "title": row['Title'],
    })
    event.save_if_not_exists(filters={"title": row['Title'], "school_year": CUR_SCHOOL_YEAR_ID})

### Student, Guardian, School Class, Family

In [None]:
# Load data student, parent, school class
import pandas as pd

df = pd.read_excel('../input_data/FINALIZED_ADMISSION_2324_DATA.xlsx', engine="openpyxl")
df['std_full_name'] = df['std_full_name'].str.strip()
df['mother_full_name'] = df['mother_full_name'].str.strip()
df['father_full_name'] = df['father_full_name'].str.strip()
df['std_2324_grade'] = df['std_2324_grade'].astype(str)

In [None]:
df.columns

In [None]:
df[df['std_full_name'].apply(lambda x: len(x.split(' ')) if type(x) == str else 0) == 1]

In [None]:
df.info()

In [None]:
df['std_2324_grade'].apply(lambda x: x.split('.')[0].split(' ')[1]).unique()

In [None]:
# Validate data
# wssg_std_code must be unique
assert df['wssg_std_code'].is_unique, "wssg_std_code is not unique"

# print duplicate std_full_name
# duplicate_std_full_name = df[df.duplicated('std_full_name')]
# if not duplicate_std_full_name.empty:
#     print("Duplicate std_full_name")
#     print(duplicate_std_full_name)

In [None]:
# Delete all family
families = SISFamily.find(limit_page_length=3000)
for id in families['name'].values:
    SISFamily.delete_by_id(id)

In [None]:


def split_name(full_name):
    try:
        last_name = full_name.split(' ', 1)[0].strip()
        first_name = full_name.split(' ', 1)[1].strip()
        return first_name, last_name
    except Exception as e:
        print(f"ERROR split_name: {full_name}")
        return None, None



grade_levels = []
for grade in range(1, 13):
    grade_level = SISSchoolGradeLevel({
        "title": f"Grade {grade}",
        "short_title": f"G{grade}",
        "sequence_number": grade
    })
    grade_level.save_if_not_exists(filters={"title": f"Grade {grade}"})
    grade_levels.append(grade_level)

for index, row in df.iterrows():
    print("Executing row ", index)
    # Create SIS Person for student, if not exists
    first_name, last_name = split_name(row['std_full_name'])
    std_person = SISPerson({
        "first_name": first_name,
        "last_name": last_name,
        "email": None,
        "phone_number": None,
        "gender": row["std_gender"],
        "date_of_birth": datetime.strftime(row["std_dob"], "%Y-%m-%d"),
        "primary_role": "Student",
    })
    std_person_exists = std_person.save_if_not_exists(filters={
        "first_name": first_name, 
        "last_name": last_name, 
        "date_of_birth": datetime.strftime(row["std_dob"], "%Y-%m-%d")
    })

    # Create SIS Student with person id
    sis_student = SISStudent({
        "person": std_person.name,
        "wellspring_student_code": row["wssg_std_code"],
    })
    sis_student.save_if_not_exists(filters={
        "wellspring_student_code": row["wssg_std_code"]
    })

    # Create SIS School Class
    try:
        grade_level_index = row['std_2324_grade'].split('.')[0].split(' ')[1]
        grade_level_id = grade_levels[int(grade_level_index) - 1].name
        sis_school_class = SISSchoolClass({
            "school_year": CUR_SCHOOL_YEAR_ID,
            "school_grade_level": grade_level_id,
            "title": row['std_2324_grade'],
            "short_title": row['std_2324_grade'].split(' ')[1],
        })
        sis_school_class_exists = sis_school_class.save_if_not_exists(filters={
            "title": row['std_2324_grade'],
            "school_year": CUR_SCHOOL_YEAR_ID,
        })
        if sis_school_class_exists:
            sis_school_class = SISSchoolClass.find_by_id(sis_school_class_exists)
    except Exception as e:
        print("ERROR importing School Class:")
        print({
            "school_year": CUR_SCHOOL_YEAR_ID,
            "school_grade_level": grade_level_id,
            "title": row['std_2324_grade'],
            "short_title": row['std_2324_grade'].split(' ')[1],
        })
        break
   

    # Enroll student to school class
    if not hasattr(sis_school_class, "participants"):
        setattr(sis_school_class, "participants", [])
        print("------------Create participants")
    std_already_enrolled = False
    for participant in sis_school_class.participants:
        if participant["person"] == std_person.name:
            std_already_enrolled = True
            break
    if not std_already_enrolled:
        sis_school_class.participants.append({
            "person": std_person.name,
            "role": "Student",
        })
        sis_school_class.save()
        print(f"----------Enroll student to school class {sis_school_class.title} - {std_person.last_name} {std_person.first_name}")

    # Create SIS Person for mother, if not exists
    has_mother = pd.notna(row['mother_full_name'])
    if has_mother:
        mother_first_name, mother_last_name = split_name(row['mother_full_name'])
        mother_person = SISPerson({
            "first_name": mother_first_name,
            "last_name": mother_last_name,
            "email": row["mother_email"] if pd.notna(row["mother_email"]) else None,
            "phone_number": row["mother_mobile_nr_1"] if pd.notna(row["mother_mobile_nr_1"]) else None,
            "gender": "Female",
            "date_of_birth": None,
            "primary_role": "Guardian",
        })

        mother_exists = mother_person.save_if_not_exists(filters={
            "first_name": mother_first_name, 
            "last_name": mother_last_name, 
            "phone_number": row["mother_mobile_nr_1"] if pd.notna(row["mother_mobile_nr_1"]) else None,
        })

    # Create SIS Person for father, if not exists
    has_father = pd.notna(row['father_full_name'])
    if has_father:    
        father_first_name, father_last_name = split_name(row['father_full_name'])
        father_person = SISPerson({
            "first_name": father_first_name,
            "last_name": father_last_name,
            "email": row["father_email"] if pd.notna(row["father_email"]) else None,
            "phone_number": row["father_mobile_nr_1"] if pd.notna(row["father_mobile_nr_1"]) else None,
            "gender": "Male",
            "date_of_birth": None,
            "primary_role": "Guardian",
        })
        father_exists = father_person.save_if_not_exists(filters={
            "first_name": father_first_name, 
            "last_name": father_last_name, 
            "phone_number": row["father_mobile_nr_1"] if pd.notna(row["father_mobile_nr_1"]) else None,
        })

    

    families = SISFamily.find(filters={"home_address": row["address"]}, output="Object")
    if len(families) > 1:
        print("Address Duplicate")

    family_found = False
    for family in families:
        sis_family = SISFamily.find_by_id(family.name)
        if (has_mother and (sis_family.guardian_exists(mother_exists)) or (has_father and (sis_family.guardian_exists(father_exists)))):
            if not sis_family.child_exists(std_person.name):
                sis_family.children.append({"person": std_person.name})
                sis_family.save()
                print("add student to family", sis_family.home_address)
            family_found = True
            break
        
    if not family_found:
        guardians = []
        if has_mother:
            guardians.append({"person": mother_person.name, "relationship_with_student": "Mother"})
        if has_father:
            guardians.append({"person": father_person.name, "relationship_with_student": "Father"})
        sis_family = SISFamily({
            "home_address": row["address"],
            "children": [{"person": std_person.name}],
            "guardians": guardians
        })
        sis_family.save()


    # if len(families) > 1 and not family_found:
    #     for family in families:
    #         sis_family = SISFamily.find_by_id(family.name)
    #         print({
    #             "name": sis_family.name,
    #             "home_address": sis_family.home_address,
    #             "children": [SISPerson.find_by_id(child['person']).full_name for child in sis_family.children],
    #             "guardians": [SISPerson.find_by_id(guardian['person']).full_name for guardian in sis_family.guardians]
    #         })
    #         print("-------------------------------------------------")
    #     print(mother_person.to_json(), std_person.to_json())
    #     break


    # if family_id:
    #     sis_family = SISFamily.find_by_id(family_id)
    #     if (has_mother and ( not sis_family.guardian_exists(mother_person.name)) or (has_father and (not sis_family.guardian_exists(father_person.name)))):
    #         # Case 1: different family with the same address
    #         guardians = []
    #         if has_mother:
    #             guardians.append({"person": mother_person.name, "relationship_with_student": "Mother"})
    #         if has_father:
    #             guardians.append({"person": father_person.name, "relationship_with_student": "Father"})
            
    #         # create a hash for family address
    #         family_hash = abs(hash(row["address"])) % (10 ** 8)

    #         sis_family = SISFamily({
    #             "home_address": row["address"] + f" ({family_hash})",
    #             "children": [{"person": std_person.name}],
    #             "guardians": guardians
    #         })
    #         sis_family.save()
    #         print("--------------Family with same address", row["address"])
    #         break
    #     else:
    #         # Case 2: same family -> add student to children list, add mother and father to guardians list if not exists
    #         if has_mother and (not sis_family.guardian_exists(mother_person.name)):
    #             sis_family.guardians.append({"person": mother_person.name, "relationship_with_student": "Mother"})
    #         if has_father and (not sis_family.guardian_exists(father_person.name)):
    #             sis_family.guardians.append({"person": father_person.name, "relationship_with_student": "Father"})
    #         if not sis_family.child_exists(std_person.name):
    #             sis_family.children.append({"person": std_person.name})
    #         sis_family.save()
        
    # else:
        # guardians = []
        # if has_mother:
        #     guardians.append({"person": mother_person.name, "relationship_with_student": "Mother"})
        # if has_father:
        #     guardians.append({"person": father_person.name, "relationship_with_student": "Father"})
        # sis_family = SISFamily({
        #     "home_address": row["address"],
        #     "children": [{"person": std_person.name}],
        #     "guardians": guardians
        # })
        # sis_family.save()

In [None]:
abs(hash('123')) % (10 ** 8)

### Course, Course Class, Course Class Person, Timetable

```
Subject Columns:  ['id', 'title', 'short_title']
School Class Columns:  ['id', 'title', 'short_title']
Course Class Columns:  ['grade_level', 'school_class', 'subject', 'course', 'course_class', 'teacher', 'email', 'wssg_id']
Timetable Columns:  ['Thứ', 'Thời gian', 'Tiết', 'Lớp 1.1\n (Ms.Dương)', 'Lớp 1.2\n (Ms. Trâm)', 'Lớp 1.3\n (Ms. Nguyệt)', 'Lớp 1.4\n (Ms. Duyên)', 'Lớp 1.5\n (Ms. Tuyền)', 'Lớp 1.6\n (Ms. Trang)', 'Lớp 1.7\n (Ms. Hà)']
```

TODO:
- [x] Get current school year id
- From timetable_id extract course_class and timetable_day_row_class:
    - [ ] Validation: subject must be in subject_df if the third column is a number
    - [ ] If the third column is not a number, it's a special course class (break, lunch, etc.)
    - [ ] Course Class Title = subject + grade_level + school_class
    - [ ] Get timetable_day_row_class (periods) from timetable_day_row_class
- For each row in course_class_df:
    - [ ] Create SIS Course, if not exists
    - [ ] Create SIS Course Class
    - [ ] Create SIS Course Class Person for teachers
    - [ ] Get student list from SIS School Class and add to SIS Course Class
    - [ ] Find timetable_day_row_class for each course_class
- [ ] Get student from all grade levels and add to special course classes

In [None]:
import unicodedata

# Function to normalize data
def normalize_unicode(data, form='NFC'):
    return unicodedata.normalize(form, data) if pd.notna(data) else None

def normalize_df(df):
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].str.strip()
            df[col] = df[col].apply(normalize_unicode)
    return df

def extract_school_class_from_text(text):
    """
    Using regex to extract school class from text.. School class is in the format: [Grade].[Class order] e.g. 1.1, 6.10

    Text is a string that could has multiple lines. The function should return the first school class found in the text.
    """
    text = text.replace('\n', ' ')
    match = re.search(r'\b\d+\.\d+\b', text)
    if match:
        return match.group()
    return None

In [None]:
subject_df = pd.read_excel('../input_data/SIS Subject Updated.xlsx', engine="openpyxl")
subject_df = normalize_df(subject_df)

school_class_df = pd.read_excel('../input_data/SIS School Class.xlsx', engine="openpyxl")
school_class_df = normalize_df(school_class_df)

course_class_df = pd.read_excel('../input_data/SIS Course Class Updated.xlsx', engine="openpyxl")

In [None]:
school_class_df = SISSchoolClass.find(limit_page_length=1000)

In [None]:
# Extract data from timetable
# Step 1: Validation: subject must be in subject_df if the third column is a number

TIMETABLE_EXCEL_FILENAME = '../input_data/TKB_2324_Tieuhoc.xlsx'
TIMETABLE_EXCEL_FILENAME = '../input_data/TIMETABLE_2324_Trunghoc.xlsx'

# Go through all sheets in timetable excel file
sheet_names = pd.read_excel(TIMETABLE_EXCEL_FILENAME, sheet_name=None).keys()

not_found_subject = []
found_subject = []
for sheet_name in sheet_names:
    print("Processing sheet ", sheet_name)
    timetable_df = pd.read_excel(TIMETABLE_EXCEL_FILENAME, sheet_name=sheet_name, engine="openpyxl")
    timetable_df = normalize_df(timetable_df)
    
    for column in timetable_df.columns[3:]:
        school_class_short_title = extract_school_class_from_text(column)
        school_class_found = school_class_df[school_class_df["short_title"] == school_class_short_title].shape[0]
        if not school_class_found:
            print("ERROR: Invalid school class", column)
    
    for index, row in timetable_df.iterrows():
        # if pd.notna(row.iloc[2]) and (row.iloc[2] in ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']):
        subject_title = row.iloc[3]
        subject_found = subject_df[subject_df["short_title"] == subject_title].shape[0]
        if subject_found == 0 and subject_title not in not_found_subject:
            print(f"Subject |{subject_title}| not found in subject_df")
            not_found_subject.append(subject_title)
        if subject_title not in found_subject:
            found_subject.append(subject_title)
        # TODO: Validate school class


In [None]:
# Create Timetable TieuHoc

# Create timetable if not exists
timetable = SISTimetable({
    "grade_level_list": "1,2,3,4,5",
    "school_year": CUR_SCHOOL_YEAR_ID,
    "short_title": "TKB_TH",
    "status": "Active",
    "title": "Primary Timetable",
})
timetable.save_if_not_exists(filters={"title": timetable.title})

# Extract timetable column
TIMETABLE_EXCEL_FILENAME = '../input_data/TKB_2324_Tieuhoc.xlsx'
timetable_df = pd.read_excel(TIMETABLE_EXCEL_FILENAME, sheet_name=0, engine="openpyxl")
timetable_df = timetable_df[timetable_df.iloc[:,0]=='Ba']
timetable_column_dict = timetable_df.iloc[:, :3].values

In [None]:
type_dict = {
    "Registration": "Other",
    "Break": "Break",
    "Lunch": "Lunch",
    "Snack": "Snack",
    "Lesson": "Lesson"
}

timetable_column = SISTimetableColumn({
    "title": "TT Primary Weekday",
    "short_title": "TT Primary",
    "timetable_column_row": [
        {
            "title": item[2] if item[2] not in list(range(11)) else f'Period {item[2]}',
            "short_title": item[2] if item[2] not in list(range(11)) else f'P{item[2]}',
            "type": type_dict[item[2]] if item[2] in type_dict.keys() else "Lesson" if item[2] in list(range(11)) else "Other",
            "time_start": item[1].split('-')[0].strip(),
            "time_end": item[1].split('-')[1].strip(),
        }
        for item in timetable_column_dict
    ]
})
timetable_column.save_if_not_exists(filters={
    "title": timetable_column.title
})

timetable_column.to_json()

In [None]:
timetable_column_dict