In [9]:
import os
print(os.getcwd())

/home/monarch/workspace/ssc-assistant/app/api


In [10]:
from azure.identity import DefaultAzureCredential
import dotenv

# from src.repository.conversation_repository import ConversationRepository
from src.repository.conversation_repository import ConversationRepository

dotenv.load_dotenv()

True

In [11]:
from datetime import datetime
from typing import override
import os

from azure.data.tables import TableServiceClient

import json
from src.dao.chat_table_dao import ChatTableDaoImpl
from src.entity.table_row_entity import (
    ChatTableRow,
)

# get dir of this file

dir_path = os.getcwd()


class DiskCachingChatTableDaoImpl(ChatTableDaoImpl):
    """
    This class is a ChatTableDaoImpl that caches the results of the `all` method to disk
    in a JSON file. This is useful for development purposes, as it allows us to avoid
    making repeated requests to the Azure Table Storage service.
    """

    def __init__(
        self,
        table_service_client: TableServiceClient,
        # but from dir of this file
        cache_filepath: str = os.path.join(
            dir_path, "./.cache/chat_table_cache.global.json"
        ),
    ):
        super().__init__(table_service_client)
        self.cache: list[ChatTableRow] = []
        self.cache_filepath = cache_filepath

    @override
    def all(self) -> list[ChatTableRow]:
        # try and get pickle
        if len(self.cache) == 0:
            try:
                # create path if not exist
                os.makedirs(os.path.dirname(self.cache_filepath), exist_ok=True)
                with open(self.cache_filepath, "r") as f:
                    json_content = json.load(f)
                    # convert all timestamps to datetime
                    for row in json_content:
                        timestamp_str = row["metadata"]["timestamp"]
                        if timestamp_str is not None:
                            row["metadata"]["timestamp"] = datetime.fromisoformat(
                                timestamp_str
                            )
                    self.cache = json_content
            except FileNotFoundError:
                self.cache = []

        # if cache is still empty, get from super
        if len(self.cache) == 0:
            self.cache = super().all()
            # also save to disk.
            with open(self.cache_filepath, "w") as f:
                # json.dump(self.cache, f)
                json.dump(
                    self.cache,
                    f,
                    default=lambda o: o.isoformat()
                    if isinstance(o, datetime)
                    else o.__dict__,
                )

        return self.cache


In [12]:

# Get conversations
from src.service.stats_report_service import StatsReportService


credential = DefaultAzureCredential()
table_service_client = TableServiceClient(endpoint=os.getenv("DATABASE_ENDPOINT") or "", credential=credential)
chat_table_dao = DiskCachingChatTableDaoImpl(table_service_client)
conversation_repo = ConversationRepository(chat_table_dao)
conversations = conversation_repo.list_conversations()
stats_reporting_service = StatsReportService(conversation_repo)

In [13]:
report = stats_reporting_service.get_statistics_by_month_of_year()
print(report)

headers = ["Month", "Active users", "Total Questions Asked", "Average questions asked per day", "Average questions per user"]
header_format = "{:<15} {:<15} {:<25} {:<35} {:<25}"
row_format = "{:<15} {:<15} {:<25} {:<35} {:<25}"

print(header_format.format(*headers))

for stats in report:
    month = stats["month_label"]
    print(row_format.format(
        month, 
        stats["active_users"],
        stats["total_questions_asked"],
        stats["average_questions_asked_per_day"],
        stats["average_questions_per_user"])
    )

[{'month_label': 'Jan 2025', 'month_start_iso_date': '2025-01-01T00:00:00Z', 'month_end_iso_date': '2025-01-16T23:59:59Z', 'active_users': 5, 'total_questions_asked': 118, 'average_questions_asked_per_day': 7.38, 'average_questions_per_user': 23.6}, {'month_label': 'Dec 2024', 'month_start_iso_date': '2024-12-01T00:00:00Z', 'month_end_iso_date': '2024-12-31T23:59:59Z', 'active_users': 3, 'total_questions_asked': 67, 'average_questions_asked_per_day': 2.16, 'average_questions_per_user': 22.33}, {'month_label': 'Nov 2024', 'month_start_iso_date': '2024-11-01T00:00:00Z', 'month_end_iso_date': '2024-11-30T23:59:59Z', 'active_users': 3, 'total_questions_asked': 91, 'average_questions_asked_per_day': 3.03, 'average_questions_per_user': 30.33}, {'month_label': 'Oct 2024', 'month_start_iso_date': '2024-10-01T00:00:00Z', 'month_end_iso_date': '2024-10-31T23:59:59Z', 'active_users': 2, 'total_questions_asked': 187, 'average_questions_asked_per_day': 6.03, 'average_questions_per_user': 93.5}, {'m

In [14]:
# Statistics by day of week
headers = ["Day of week", "Total Questions Asked", "Average questions asked per day", "Average questions per user"]
header_format = "{:<15} {:<25} {:<35} {:<25}"
row_format = "{:<15} {:<25} {:<35} {:<25}"
days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

active_users: set[str] = set()
for conversation in conversations:
    for message in conversation["messages"]:
        owner_id = message["owner_id"]
        if owner_id is not None:
            active_users.add(owner_id)

active_users_count = len(active_users)

print(header_format.format(*headers))

for day in days:
    # Total questions asked
    total_questions_asked = 0
    for conversation in conversations:
        for message in conversation["messages"]:
            if message["sender"] == "user" and datetime.fromisoformat(message["created_at"]).strftime("%A") == day:
                total_questions_asked += 1

    # Average questions asked per day
    total_questions_asked = 0
    for conversation in conversations:
        for message in conversation["messages"]:
            if message["sender"] == "user" and datetime.fromisoformat(message["created_at"]).strftime("%A") == day:
                total_questions_asked += 1

    # Average questions per user
    total_questions_asked = 0
    for conversation in conversations:
        for message in conversation["messages"]:
            if message["sender"] == "user" and datetime.fromisoformat(message["created_at"]).strftime("%A") == day:
                total_questions_asked += 1
    
    average_questions_per_day = total_questions_asked / len(date_ranges)
    average_questions_per_user = total_questions_asked / active_users_count if active_users_count > 0 else 0

    print(row_format.format(
        day,
        total_questions_asked,
        f"{average_questions_per_day:.2f}",
        f"{average_questions_per_user:.2f}"
    ))


Day of week     Total Questions Asked     Average questions asked per day     Average questions per user


NameError: name 'date_ranges' is not defined

In [None]:
# Statistics by day of month

headers = ["Day of month", "Total Questions Asked", "Average questions asked per day", "Average questions per user"]
header_format = "{:<15} {:<25} {:<35} {:<25}"
row_format = "{:<15} {:<25} {:<35} {:<25}"

print(header_format.format(*headers))

for day in range(1, 32):
    # Total questions asked
    total_questions_asked = 0
    for conversation in conversations:
        for message in conversation["messages"]:
            if message["sender"] == "user" and datetime.fromisoformat(message["created_at"]).day == day:
                total_questions_asked += 1

    # Average questions asked per day
    total_questions_asked = 0
    for conversation in conversations:
        for message in conversation["messages"]:
            if message["sender"] == "user" and datetime.fromisoformat(message["created_at"]).day == day:
                total_questions_asked += 1

    # Average questions per user
    total_questions_asked = 0
    for conversation in conversations:
        for message in conversation["messages"]:
            if message["sender"] == "user" and datetime.fromisoformat(message["created_at"]).day == day:
                total_questions_asked += 1
    
    average_questions_per_day = total_questions_asked / len(date_ranges)
    average_questions_per_user = total_questions_asked / active_users_count if active_users_count > 0 else 0

    print(row_format.format(
        day,
        total_questions_asked,
        f"{average_questions_per_day:.2f}",
        f"{average_questions_per_user:.2f}"
    ))