In [386]:
import numpy as np
from database_connect import client # gets MongoDB client, which gives access to data

In [0]:
# constants
# default collection used
main_collection = "Section0"

In [197]:
# examples for visualizing jsons
# format of 'all_updates':
all_updates_example1 = {
    "overall_avg": 4.6,
    "communication": {
        "score": 4,
        "related_mistakes": ["could have been more elaboration and clarity in his explanation."]
    },
    "interpretation": {
        "score": 5,
        "related_mistakes": []
    },
    "computation": {
        "score": 5,
        "related_mistakes": []
    },
    "conceptual": {
        "score": 4,
        "related_mistakes": ["explanation could have included more conceptual details to further enhance his understanding"]
    },
    "time": {
        "score": 5,
        "seconds": 20
    }
}

all_updates_example_all_5 = {
    "overall_avg": 5,
    "communication": {
        "score": 5,
        "related_mistakes": ["could have been more elaboration and clarity in his explanation."]
    },
    "interpretation": {
        "score": 5,
        "related_mistakes": []
    },
    "computation": {
        "score": 5,
        "related_mistakes": []
    },
    "conceptual": {
        "score": 5,
        "related_mistakes": ["explanation could have included more conceptual details to further enhance his understanding"]
    },
    "time": {
        "score": 5,
        "seconds": 20
    }
}


In [4]:
# recursively create JSON
def obj_to_dict(obj):
    # recursive calls run depending on if the call's obj is a list, dictionary, object, or primitive ( int, string, float, etc)
    if isinstance(obj, list): # if list
        return [obj_to_dict(e) for e in obj]
    elif isinstance(obj, dict): # if dictionary or object
        return {key: obj_to_dict(value) for key, value in obj.items()}
    elif hasattr(obj, '__dict__'): # if object
        return {key: obj_to_dict(value) for key, value in obj.__dict__.items()}
    else: # if primitive
        return obj

# converts dictionary into student object
def dict_to_student(dict_data):
    # Initialize student Object
    student = Student(dict_data['name'], dict_data['grade'])

    # Initialize and fill subtopics
    for subtopic_data in dict_data['subtopics']:
        subtopic = Subtopic(subtopic_data['subtopic_name'], subtopic_data['grade'])
        subtopic.num_questions_answered = subtopic_data['num_questions_answered']

        metrics_data = subtopic_data['metrics']
        metrics = Metrics()

        for metric_name, metric_data in metrics_data.items():
            metric = Metric()
            metric.avg_score = metric_data['avg_score']
            metric.previous_scores = metric_data['previous_scores']

            if 'avg_time' in metric_data:
                metric.avg_time = metric_data['avg_time']
            if 'recent_times' in metric_data:
                metric.recent_times = metric_data['recent_times']
            if 'related_mistakes' in metric_data:
                metric.related_mistakes = metric_data['related_mistakes']

            setattr(metrics, metric_name, metric)

        subtopic.metrics = metrics
        student.subtopics.append(subtopic)

    return student

In [379]:
class StudentsCollection:
    def __init__(self, collection):
        database = client["Students"]
        self.name = collection
        self.collection = database[collection] # a collection ( ex: Section0)

    def add_student(self,student):
        # make student into dictionary format
        student_dict = student.in_dict_format()
        self.collection.insert_one(student_dict)
        print(f"{student.name} has been added to collection: {self.name}")

    # takes in student's name
    # returns wanted student from database or None (student is not in database)
    def get_student(self, student_name):
        query = {"name": student_name}
        # Fetch the student data as a list
        student_dict_list = list(self.collection.find(query))
        try:
            if not student_dict_list:  # Checks if the list is empty
                raise ValueError("No student found with the given name.")
            # Get the first student_dict from the list (assuming there should only be one)
            student_dict = student_dict_list[0]
            # Do something with student_dict
            student_obj = dict_to_student(student_dict)
            print("Student found:", student_obj.name)
            return student_obj

        except ValueError as e:
            print(e)

    def delete_student(self,student):
        query = {"name": student.name}
        self.collection.delete_one(query)
        print(f"{student.name} has been deleted from collection: {self.name}")

    # gets list of all students in collection
    def current_student_names(self):
        student_names = []
        student_data = list(self.collection.find())

        for datam in student_data:

            student_names.append(datam["name"])
        return student_names

class Student:
    def __init__(self,name, grade: float):
        self.name = name
        self.grade = grade
        self.subtopics = [] # array of Subtopic objects

    def in_dict_format(self):
        return obj_to_dict(self)
    
    def get_subtopic_level(self):
        pass

class Subtopic:
    def __init__(self, subtopic_name, level):
        # how many questions you answered for each of the 5 levels of a topic
        self.subtopic_name = subtopic_name
        self.level = level # range: 1 - 5
        self.num_questions_answered = [0,0,0,0,0] # on slot per level (5 total levels)
        # self.num_questions_answered = [5,5,5,5,5]
        self.metrics = Metrics()

    # resets metrics
    def clear_metrics(self):
        self.metrics = Metrics()

# TODO need to be tested
    # updates the level if the metrics have a perfect score for the last 5 questions
    def update_level(self):
        #check teh amount of questions answered for the current level
        ques_answered = self.num_questions_answered
        num_questions_current_level = ques_answered[self.level-1] # -1 b/c of 0 based indexing
        # if a student has been asked 5 questions, AND the average score for the metrics is 5...... increment the level
        average_score_current_level = self.metrics.overall_avg.avg_score
        # print(num_questions_current_level )
        if num_questions_current_level == 5 and average_score_current_level == 5:
            self.level += 1
            print(f"Subtopic '{self.subtopic_name}' has been upgraded to level {self.level}")
        else:
            print("level has not been updated")

    # metric_updates: string or json or dictionary: updates that need to be done for metrics 
    def update_subtopic(self, all_updates):
        # find level
        # print("current level ", self.level)
        level = self.level
        # update the metrics
        self.metrics.update(all_updates)
        # update the amount of quesitons
        # add 1 to the index that corresponds to the level ( "index -1" because the array is 0 based)
        self.num_questions_answered[level-1] +=1
        # update the level if needed
        self.update_level()

    # returns and prints subtopic data in a json
    def to_json(self):
        subtopic_json = {
            "subtopic_name": self.subtopic_name,
            "level": self.level,
            "num_questions_answered": self.num_questions_answered,
            "metrics": self.metrics.to_json()  # Convert metrics to JSON
        }

        # subtopic_json = json.dumps(subtopic_json, indent=1)
        print(subtopic_json)
        return subtopic_json  # Return the JSON string with indentation

In [380]:


class Metrics:

    def __init__(self):
        self.overall_avg = Metric("overall_avg")
        self.communication = Metric()
        self.interpretation = Metric()
        self.computation = Metric()
        self.conceptual = Metric()
        self.time = Metric("time")

    # returns dictionary of mistakes
    # key = metric type, value: the actual mistakes
    def get_all_mistakes(self):
        mistakes = {}
        print("function starts")
        mistakes["communication"] = self.communication.related_mistakes
        mistakes["interpretation"] = self.interpretation.related_mistakes
        mistakes["computation"] = self.computation.related_mistakes
        mistakes["conceptual"] = self.conceptual.related_mistakes
        return str(mistakes) # convert dict to string

    # update metric given update metrics in json/dict
    def update(self, all_updates):
        # attributes each metric has
        # if the json does not have a value for an update, the value in the Metric object will remain unchanged
        avg_comm = self.communication.update(all_updates["communication"])
        avg_interp = self.interpretation.update(all_updates["interpretation"])
        avg_comp = self.computation.update(all_updates["computation"])
        avg_conc = self.conceptual.update(all_updates["conceptual"])
        avg_time = self.time.update(all_updates["time"])
        # update overall_avg
        # get average score of the newly calculated metrics
        new_average = np.mean([avg_comm,avg_interp,avg_comp,avg_conc,avg_time])
        # print(f"\n calculating overall avg")

        prev_scores = self.overall_avg.previous_scores
        prev_scores.append(new_average)
        if len(prev_scores) == 6:
            prev_scores.pop(0)

        # us the average of average scores to get the new "overall avg" score
        self.overall_avg.avg_score = np.mean(prev_scores)

        # print("score avg", self.overall_avg.avg_score)
        # print("previous scores",self.overall_avg.previous_scores)
    def to_json(self):
        metrics_json = {
            "overall_avg": self.overall_avg.to_json(),
            "communication": self.communication.to_json(),
            "interpretation": self.interpretation.to_json(),
            "computation": self.computation.to_json(),
            "conceptual": self.conceptual.to_json(),
            "time": self.time.to_json()
        }
        return metrics_json


In [381]:

class Metric:
    # special types: time, overall_avg

    # order of recent_times and related mistakes (oldest..... newest)
    def __init__(self, metric_type = None):
        self.avg_score = 0
        self.previous_scores = []
        if metric_type == "time":
            self.avg_time = None
            self.recent_times = []
        elif metric_type != "overall_avg" and metric_type == "time" :
            self.related_mistakes = []

    # updates metrics given json of new data
    # NOT USED to update overall_avg ( can only be updated in "Metrics" object
    # update: JSON
    # returns average score for metric
    def update(self, update):
        # get metrics previous scores
        prev_scores = self.previous_scores
        # add new score,
        prev_scores.append(update["score"])
        # remove oldest score
        if len(prev_scores) == 6:
            prev_scores.pop(0)
        # get the average
        self.avg_score = np.mean(prev_scores)
        # print("score avg", self.avg_score)
        # print("previous scores",self.previous_scores)

        # replace related mistakes
        if hasattr(self, 'related_mistakes'):
            self.related_mistakes = update["related_mistakes"]
            # print("related mistakes",self.related_mistakes)

        # if there is  time attribute, update the time data:
        if hasattr(self, 'recent_times'):
            recent_times = self.recent_times
            recent_times.append(update["seconds"])
            if len(recent_times) == 6:
                recent_times.pop(0)
            self.avg_time = np.mean(recent_times)
            # print("time average",self.avg_time)
            # print("recent times",self.recent_times)

        return self.avg_score

    def to_json(self):
        metric_json = {
            "avg_score": self.avg_score,
            "previous_scores": self.previous_scores
        }
        if hasattr(self, "avg_time"):
            metric_json["avg_time"] = self.avg_time
            metric_json["recent_times"] = self.recent_times
        elif hasattr(self, "related_mistakes"):
            metric_json["related_mistakes"] = self.related_mistakes

        return metric_json

In [382]:
# testing
sub1 = Subtopic(" basic addition", 1)
sub1.metrics = Metrics()
sub1.update_subtopic(all_updates_example_all_5)



level has not been updated


In [385]:
# testing to_json
a = sub1.to_json()
a

{'subtopic_name': ' basic addition', 'level': 1, 'num_questions_answered': [1, 0, 0, 0, 0], 'metrics': {'overall_avg': {'avg_score': 5.0, 'previous_scores': [5.0]}, 'communication': {'avg_score': 5.0, 'previous_scores': [5]}, 'interpretation': {'avg_score': 5.0, 'previous_scores': [5]}, 'computation': {'avg_score': 5.0, 'previous_scores': [5]}, 'conceptual': {'avg_score': 5.0, 'previous_scores': [5]}, 'time': {'avg_score': 5.0, 'previous_scores': [5], 'avg_time': 20.0, 'recent_times': [20]}}}


{'subtopic_name': ' basic addition',
 'level': 1,
 'num_questions_answered': [1, 0, 0, 0, 0],
 'metrics': {'overall_avg': {'avg_score': 5.0, 'previous_scores': [5.0]},
  'communication': {'avg_score': 5.0, 'previous_scores': [5]},
  'interpretation': {'avg_score': 5.0, 'previous_scores': [5]},
  'computation': {'avg_score': 5.0, 'previous_scores': [5]},
  'conceptual': {'avg_score': 5.0, 'previous_scores': [5]},
  'time': {'avg_score': 5.0,
   'previous_scores': [5],
   'avg_time': 20.0,
   'recent_times': [20]}}}

In [None]:
#TODO methods that have been redone

# checks to see if the student has done well enough at a subtopic to move up a level
def is_level_update_needed(overall_avg_stats):
    bool = False
    avg_score, recent_scores =  overall_avg_stats["avg_score"],overall_avg_stats["recent_scores"]
    # if the avg score is 5, and we have 3 scores that make up the average, we need a level update
    if avg_score == 5 and len(recent_scores) == 3:
        bool = True
    return bool
def update_student_stats(name, sub_topic, metric_updates):
    # Get data from students.json
    data = get_ext_data(student_data_path)

    # Check if the student is in the database
    students = data["students"]
    for student in students:
        if name in student:
            # Check if the student has a section for the given subtopic
            sections = student[name]
            for section in sections:
                if section["sub_topic"] == sub_topic:
                    # Update the student's metrics
                    section["proficiency_metrics"], section["level"],section["questions_answered"], = update_data(section,metric_updates)
                    print(f"{name}'s data metrics for '{sub_topic} 'has been updated ")
                    break
            else:
                # update stats with cleared metric data
                new_section = create_sub_topic_section(sub_topic,metric_updates)
                # If the student does not have data for that subtopic, add metric_updates
                sections.append(new_section)
                print(f"{name}'s data metrics for '{sub_topic}' has been added ")

            break
    else:
        # If the student is not found in the database, create a new entry
        # update stats with cleared metric data
        new_section = create_sub_topic_section(sub_topic,metric_updates)
        students.append({
            name: [new_section]})
        print(f"{name}'s data metrics for '{sub_topic}'has been added ")
    # Write the updated data back to students.json
    post_ext_data(data, student_data_path)



def get_student_subtopic_level(student, sub_topic):
    # Read the JSON file
    data = get_ext_data(student_data_path)

    # if we don't find the student, or the subtopic in the database, we will use the lowest level by default
    default_level = 1

    # Access the student's data from the database
    student_data = None
    for student_entry in data["students"]:
        if student in student_entry:
            student_data = student_entry[student]
            break

    if student_data is not None:
        # Find the sub-topic information for the student
        sub_topic_data = None
        for entry in student_data:
            if entry["sub_topic"] == sub_topic:
                sub_topic_data = entry
                break

    # Check if student exists in the database
    if student_data is None:
        print("student is not in database. They will be start at Level 1, Proficiency 1")
        return default_level
    # Check if sub-topic exists for the student
    if sub_topic_data is None:
        print(f"student has no data for '{sub_topic}' in this database. They will be start at Level 1.")
        return default_level
    else:
        # Retrieve the level and proficiency scores
        level = sub_topic_data["level"]

    # Return the level and proficiency scores
    return level

def get_all_student_related_mistakes(student, sub_topic):
    # Assuming you have the data dictionary defined
    data = get_ext_data(student_data_path)
    # Find the student in the data dictionary
    student_data = None
    for student_entry in data['students']:
        if student in student_entry:
            student_data = student_entry[student]
            break

    if student_data is None:
        return "Student not found"

    # Find the sub-topic in the student's data
    sub_topic_data = None
    for topic in student_data:
        if topic['sub_topic'] == sub_topic:
            sub_topic_data = topic
            break

    if sub_topic_data is None:
        return "Sub-topic not found for this student"

    metrics = sub_topic_data['proficiency_metrics']
    # Retrieve the related mistakes from the sub-topic data
    related_mistakes = get_all_metric_mistakes(metrics)
    # return metrics
    print(f"{student}'s previous mistakes with the sub_topic : {sub_topic}: {related_mistakes}\n")
    return related_mistakes

# temporary helper functions
def manual_level_reset(name, sub_topic, level):
    # Get data from students.json
    data = get_ext_data(student_data_path)

    # Check if the student is in the database
    students = data['students']
    for student in students:
        if name in student:
            # Check if the student has a section for the given subtopic
            sections = student[name]
            for section in sections:
                if section['sub_topic'] == sub_topic:
                    # Reset the student's metrics
                    section['proficiency_metrics'] = clear_metrics()
                    section['level'] =  level
                    section['questions_answered'] = [0, 0, 0, 0, 0]
                    print(f"{name}'s data metrics for '{sub_topic}' has been reset.")
                    break
    print("level changed, database stats reset")
    # Write the updated data back to students.json
    post_ext_data(data, student_data_path)

def get_specific_metric_mistakes(specific_metric,metrics_data):
    related_mistakes = metrics_data[specific_metric]['related_mistakes']

    return related_mistakes
def get_all_metric_mistakes(metrics_data): # TODO
    metric_types = ["communication","interpretation", "computation","conceptual"]

    related_mistakes = []
    for metric_type in metric_types:
        # print(metric_type)
        specific_mistakes = get_specific_metric_mistakes(metric_type, metrics_data)
        related_mistakes.append(specific_mistakes)

    related_mistakes_str = f"{related_mistakes}"
    return related_mistakes_str

# creates subtopic section with metric updates
def create_sub_topic_section(sub_topic, metric_updates):
    # make a new section
    new_section = {
        "sub_topic": sub_topic,
        "level": 1,
        "questions_answered": [0,0,0,0,0],
        "proficiency_metrics": clear_metrics()
    }
    # update the section with the metric updates
    new_section["proficiency_metrics"],new_section["level"],new_section["questions_answered"] = update_data(new_section,metric_updates)

    return new_section

# deletes a subtopic's metric data for user
def clear_metrics():
    metrics = {
        "overall_avg": {
            "avg_score": 0,
            "recent_scores": [
            ]
        },
        "communication": {
            "avg_score": 0,
            "related_mistakes": [
            ],
            "recent_scores": [
            ]
        },
        "interpretation": {
            "avg_score": 0,
            "related_mistakes": [],
            "recent_scores": []
        },
        "computation": {
            "avg_score": 5.0,
            "related_mistakes": [],
            "recent_scores": []
        },
        "conceptual": {
            "avg_score": 0,
            "related_mistakes": [],
            "recent_scores": []
        },
        "time": {
            "avg_score": 0,
            "avg_times": None,
            "recent_times": [
            ],
            "recent_scores": [
            ]
        }}
    return metrics



# helper functions for update_student_stats
# returns updated single metric array with the average value of the array
# recent_single_metric_score
# new_score: a score for one metric
def update_scores_and_average(recent_single_metric_scores, new_score):

    # the database stores the 3 most recent scores, so we will have to add our new_score and get rid of the old one

    # add new score
    # print(recent_single_metric_score)
    recent_single_metric_scores += [new_score]
    # remove the oldest score if there are more than 3 numbers in the list
    if len(recent_single_metric_scores) > 3:
        recent_single_metric_score = recent_single_metric_scores.pop(0)
        # recent_single_metric_score = recent_single_metric_scores[1:]
        # print(recent_single_metric_score)

    # get the new avg score and round to the last 2 decimals
    new_avg_score = np.mean(recent_single_metric_scores)
    new_avg_score = round(new_avg_score, 2)
    return recent_single_metric_scores,new_avg_score



# updata_data takes in a subtopic's data and metric_updates that need to be implemented
# returns the updated data, specifically the metrics, the level and the questions the student has answered
def update_data(data,metrics_updates):

    metrics, level,questions_answered  =  data["proficiency_metrics"], data["level"],data["questions_answered"]

    # update metrics
    metrics = update_metrics(metrics,metrics_updates)
    # update level and questions_answered

    # each index of the array corresponds to the amount of questions answered a a certain level of difficulty
    questions_answered[level-1] += 1

    # if we have to upgrade to the next level, we get rid of our previous level's stats
    if is_level_update_needed(metrics["overall_avg"]):
        # print(level)
        if level >= 5:
            topic = data["sub_topic"]
            print(f"Congratulations, you have mastered the topic: {topic} the highest level available. Please pick another topic to learn. ")
            # THe database will not reset and you will just keep all of your scores
        else:
            level += 1
            print(f"Congratulations, you have moved up to Level {level}")
            metrics = clear_metrics() # remove previous level's data


    return metrics,level,questions_answered

# updates all of the metrics for any subtopic
# takes in metrics and the updated from a recent evaluation
# returns updated metrics
def update_metrics(metrics, metric_updates):
    # print(f"\n\n metrics:{metrics} \n\n")
    # print(f"\n\n metrics updates:{metric_updates} \n\n")
    metric_types = ['overall_avg', 'communication', 'interpretation', 'computation', 'conceptual', 'time']

    # Iterate and update each metric
    for metric_type in metric_types:
        metric, metric_update = metrics[metric_type], metric_updates[metric_type]

        if metric_type == "overall_avg":
            new_score = metric_update

        else:
            # print(metric_update)
            new_score = metric_update["avg_score"]
            if 'related_mistakes' in metric_update:
                metric['related_mistakes'] = metric_update['related_mistakes']
            if 'seconds' in metric_update:
                new_time = metric_update['seconds']
                metric['recent_times'], metric['avg_times'] = update_scores_and_average(metric['recent_times'], new_time)

        # print(f"\n{metric}\n")
        # print(metric_type)
        metric['recent_scores'], metric['avg_score'] = update_scores_and_average(metric['recent_scores'], new_score)

    return metrics
# updates the student metrics in the database


# updates the student metrics in the database
def update_student_stats(name, sub_topic, metric_updates):
    # Get data from students.json
    data = get_ext_data(student_data_path)

    # Check if the student is in the database
    students = data["students"]
    for student in students:
        if name in student:
            # Check if the student has a section for the given subtopic
            sections = student[name]
            for section in sections:
                if section["sub_topic"] == sub_topic:
                    # Update the student's metrics
                    section["proficiency_metrics"], section["level"],section["questions_answered"], = update_data(section,metric_updates)
                    print(f"{name}'s data metrics for '{sub_topic} 'has been updated ")
                    break
            else:
                # update stats with cleared metric data
                new_section = create_sub_topic_section(sub_topic,metric_updates)
                # If the student does not have data for that subtopic, add metric_updates
                sections.append(new_section)
                print(f"{name}'s data metrics for '{sub_topic}' has been added ")

            break
    else:
        # If the student is not found in the database, create a new entry
        # update stats with cleared metric data
        new_section = create_sub_topic_section(sub_topic,metric_updates)
        students.append({
            name: [new_section]})
        print(f"{name}'s data metrics for '{sub_topic}'has been added ")
    # Write the updated data back to students.json
    post_ext_data(data, student_data_path)