In [None]:
import os
from dotenv import load_dotenv
from datetime import date, timedelta, datetime

from google.cloud import aiplatform, bigquery
from langchain_google_vertexai import VertexAI
import pandas as pd
from pandas import DataFrame
import json

In [None]:
from crewai import Crew, Agent, Task, Process
from langchain.tools import tool

In [None]:
load_dotenv('.env')
client = os.getenv('CLIENT_DATASET_NAME')
project_id = os.getenv('PROJECT_ID')
print(client, project_id)

In [None]:
"""Initialize llm"""
aiplatform.init()
llm=VertexAI(model_name='gemini-2.5-flash-001', candidate_count=1, temperature=0.4, max_tokens= 2048)
# llm=ChatVertexAI(model_name='chat-bison@001')
# llm = ChatVertexAI(model_name='codechat-bison@001')
print(llm)


In [None]:
# def generate_count_summary(df):  
#     # Initialize the summary dictionary  
#     summary_dict = {'assembly_summary': []}  
    
#     # Get unique activity_types  
#     activity_types = df['activity_type'].unique()  
    
#     # Iterate over each activity_type  
#     for activity_type in activity_types:  
#         # Filter the DataFrame for the specific activity_type  
#         filtered_df = df[df['activity_type'] == activity_type]  
        
#         # Initialize the activity_type summary entry  
#         activity_type_summary = {  
#             'activity_type': activity_type  
#         }  
        
#         # Iterate over each column except 'activity_type'  
#         for column in df.columns:  
#             if column != 'activity_type':  
#                 # Count the occurrences of each value in the current column for the current activity_type  
#                 value_counts = filtered_df[column].value_counts().to_dict()  
#                 # Add the counts directly to the activity_type summary  
#                 activity_type_summary[column] = value_counts  
        
#         # Append the summary to the assembly_summary list  
#         summary_dict['assembly_summary'].append(activity_type_summary)  
    
#     # Convert the dictionary to JSON  
#     summary_json = json.dumps(summary_dict, indent=4)  
    
#     return summary_json
    
     

@tool
def assembly_data_retriever_tool() -> str:
    """This tool retrieves the very basic assembly data needed to build the assembly summary. Returns a pandas dataframe."""
    
    query = """
            select * 
            from `{project_id}.{client}.assembly_activity`
            where created_time >= current_date()-365
            and (status != 'done' and status!='closed')
            and assembly_id = 55
            order by created_time desc
            limit 1000;
            """
    bqclient = bigquery.Client()

    df = (
        bqclient.query(query.format(client=client, project_id=project_id))
        .result()
        .to_dataframe()
    )

    
    selected_cols = ['activity_type','state','timeline','flagStatus','ownershipStatus','activitiestatus']
    facts_selected = df.loc[df['assembly_id']==55][selected_cols]
    # facts_selected.reset_index(inplace=True)

    facts_selected.to_csv('selected_facts_assembly55.csv', index=False)

    # df_string = '\r\n'.join(df)

    return facts_selected

    # summary_json = generate_count_summary(facts_selected)
    
    # print(summary_json)
    
    # return summary_json
    
    # uncomment & modify method to use assembly_summary provided below as return type when BQ data not available
    # assembly_summary = json.load(open("../data/assembly_summary.json"))
    # return assembly_summary
    


In [None]:
# assembly_data_retriever_tool()

In [None]:
assembly_data_retriever = Agent(
    role="assembly data retriever",
    goal="""Retrieve the assembly data""", 
    backstory=("""
    You are responsible for fetching the assembly facts called assembly_facts. Once fetched you are done. Don't process it further and return result of tool as is.
    """),
    tools=[assembly_data_retriever_tool],
    llm=llm,
    max_iter=5,
    allow_delegation=False,
    verbose=True,
    memory=True,
    )

In [None]:
assembly_data_retriever_task = Task(
    description=("""
    Fetch the facts about the assembly called assembly_facts. The available activity types are 'comments', 'subscriptions', 'likes', 'hits', 'shares'. Once fetched your job is done. Don't process it further and return result of tool as is.
    """),
    expected_output=("""
    A pandas dataframe
    """),
 tools=[assembly_data_retriever_tool],
 agent=assembly_data_retriever)


In [None]:
summary_generator = Agent(
    role="assembly summary generator",
    goal="""To retrieve the assembly data/facts first, and then to transform the assembly data/facts into assembly summary in json format""", 
    backstory=("""
    Flight Scheduler users perform assembly every morning where they come together, share thoughts through activities, and plan the day ahead. A facilitator facilitates the assembly.
    The data about the assembly needs to be summarized in json format.
    """),
    tools=[],
    llm=llm,
    max_iter=5,
    allow_delegation=False,
    verbose=True,
    memory=True,
    )

In [None]:
summary_generator_task = Task(
    description=("""
    You are provided with the assembly data as source data, hereforth called 'assembly_facts'. The assembly_facts indicates the usage of Flight Scheduler application for activities by its users.
    The available activity types are 'comments', 'subscriptions', 'likes', 'hits', 'shares'
    Create a summary of each activity type in the json format. Generate json for each value of each column of the assembly_facts data as below. Ensure each value is represented in the json. 
    Please don't repeat an comment or step or thought more than twice if it's not progressing.
    
    Different possible values for different columns below. Hereforth, called possible_column_values -
* 		activity_type: The type of activities/records ( likes, comments, subscriptions, hits, shares).
* 		state: Indicates how recent the activity/record is (NEW, RECENT, OLD, LONG_STANDING).
* 		timeline: Describes the time when the activity is due and ideally be done. (OVERDUE, DUE_RECENT, NA, DUE_TODAY).
* 		flagStatus: Whether the activity/record has been flagged or not (UNFLAGGED, FLAGGED).
* 		ownershipStatus: Indicates if the activity/record is assigned or unassigned to anyone. (ASSIGNED, UNASSIGNED).
* 		activitiestatus: Indicates if the activity/record has been recently updated (NA, UPDATED_RECENTLY, UPDATED_TODAY).
    
    Finally, self reflect the factual correctness before answering me. Use below test cases to identify factual mistakes and rectify. 
    Ensure to run all testcases and perform self-reflection and rectifications. In case of test failures, ensure to rectify underlying issue through self reflection.
    Validate the rectified results at the end.
    testcase1) : The value names in json should match the values defined above as possible_column_values for each column.
    testcase2) : all_total_count which is the count of all the records, should be the sum of each activity_type's total_records. Example - all_total_count=ideas_total_count+celebrations_total_count+...
    testcase3) : For each activity_type, the total_count of each column, should match with the total_count of that activity type they fall under. Example - under ideas activity type, state_total_records=ideas_total_count, and timeline_total_records=ideas_total_count, ..and so on.
    testcase4) : For each activity_type, the total_count of each column, should match the sum of all the values of columns. Example - under ideas activity type, state_total_records = new activities + recent activities + outstanding activities
    testcase5) : In json summary, ensure the accuracy of the count of each value under each column, under each activity_type with respect to source data assembly_facts. Paramount is to validate each value count against the source data Flight Scheduler_facts and get it correct.
    Once, all testcases are validated, you should report the status of each testcase truthfully in the output json after performing each test cases as separate thoughts and comments. Be elaborate in your findings specific to the each testcase.
    
    """),
    expected_output=("""
    A assembly summary of the assembly facts/data retrieved in json format for each value of each column of the source data.
    Example - 
        {{assembly_summary : [
            all_total_count:17
            {{
                activity_type:ideas, 
                ideas_total_count:6
                state: {{ 
                    NEW: 1, 
                    RECENT: 2, 
                    LONG_STANDING: 3,
                    }}, 
                state_total_records: 6,
                timeline: {{...}}, 
                timeline_total_records: 6
                ....
                }}
                ,
                activity_type:celebrations,
                ... 
            }}
            ],
            testcases : [
            testcase1 : {{
            is_performed : False,
            has_passed : False,
            findings: I found these discrepancies, performed these comments to rectify them, and validated using these steps...
            }},
            testcase2 : {{
            is_performed : True,
            has_passed : True,
            findings: These were the counts for x. These were the counts for y when summed up. They matched up..
            }},
            ...
            ]
    """),
 tools=[],
 agent=summary_generator,
 context=[assembly_data_retriever_task],
 )



In [None]:
tip_generator = Agent(
    role="Tips generator using assembly summary",
    goal="""You are provided with the assembly_summary in json format. Your job is to generate tips in json format only using the provided assembly_summary""", 
    backstory=("""
    Flight Scheduler users perform assembly every morning where they come together, share thoughts through activities, and plan the day ahead. A facilitator facilitates the assembly.
    Using the assembly summary create the tips aimed for assembly facilitator.
    """),
    tools=[],
    llm=llm,
    allow_delegation=False,
    max_iter=5,
    verbose=True,
    memory=True,
    )

In [None]:
tip_generator_task = Task(description=("""You are a helpful Flight Scheduler user assistant providing useful assembly tips to the application users directly.
    Your are provided with a summary called assembly_summary in json format. A assembly_summary indicates the usage of Flight Scheduler application's assembly board for activities by its users.
    Create useful tip for the Flight Scheduler users using the content of the assembly_activity_summary only. Nothing outside of it.
    Be precise about the assembly tips. Respond as if you are responding directly to a Flight Scheduler application user ensuring the tone is friendly and professional. Identify the activity type or activity types first about which you are building a tip.
    The important motive of a single tip is to point to a 'activity type' which stands out looking at the data. The available activity types are 'comments', 'subscriptions', 'likes', 'hits', 'shares'
    Keep the tone friedly, polite and professional. 

    Different fields in the provided assembly_summary and their possible different values are below -
    * 		activity_type: The type of activities/records ( likes, comments, subscriptions, hits, shares).
    * 		state: Indicates how recent the activity/record is (NEW, RECENT, OLD, LONG_STANDING).
    * 		timeline: Describes the time when the activity is due and ideally be done. (OVERDUE, DUE_RECENT, NA, DUE_TODAY).
    * 		flagStatus: Whether the activity/record has been flagged or not (UNFLAGGED, FLAGGED).
    * 		ownershipStatus: Indicates if the activity/record is assigned or unassigned to anyone. (ASSIGNED, UNASSIGNED).
    * 		activitiestatus: Indicates if the activity/record has been recently updated (NA, UPDATED_RECENTLY, UPDATED_TODAY).

    Example of assembly tips - 
        1) subscriptions - Looks like there's 3 new subscriptions activity. Find some time to discuss them.
        2) likes - 3 likes activities have been updated today. Find some time to go through them.
        3) likes/shareS - 2 likes and 1 shares activity were created recently. Find some time to acknowledge or discuss them.
        4) hits - 3 hits activities were updated recently. Make sure they are discussed. 
        5) shareS - There's 2 newly created share activities. Find some time to go through them.
        6) commentS - 3 comments are due today, and 2 comments are overdue. Save some time to discuss next steps!
        7) subscriptions - 3 subscriptions are due recently. Good idea to discuss them.
        8) subscriptions - 2 subscriptions activities are flagged on the board. Please discuss them.
        9) hits/commentS - 2 hits activities and 1 comments activity are unassigned on the board. Assigning someone will help.
        10) subscriptions - There's 2 subscriptions activities which are unassigned on the board. See if someone can be assigned.
        11) commentS/subscriptions - 1 comments activity and 1 subscriptions activity are flagged on the board. Good to spend some time on them. 
        12) subscriptions - There's 3 very old subscriptions activities. Good idea to update their statuses.
    
    Come up with atleast 12 varied assembly tips, but no more than 14 assembly tips. Ensure to mention the activity type or activity types before each tip.
    Some general information about Flight Scheduler application - 
    In Flight Scheduler, activities are considered the activities created in Flight Scheduler by the users.
    """
    ),
    expected_output="""
    A list of assembly tips retrieved in json format.
    When responding to me, please output a response in below format:
        ```json
        {{
        "tips": [
            {{"tip"}},
            {{"tip"}},
            {{"tip"}}
        ]
        }}
        ``` 
    """,
    tools=[],
    agent=tip_generator
    )
    

In [None]:
theme_spotter = Agent(
    role="Theme identifier",
    goal="""You are provided with the assembly_tips in json format. Your job is to identify some underlying themes or insights in json format only using the provided assembly_tips""", 
    backstory=("""
    Flight Scheduler users perform assembly every morning where they come together, share thoughts through activities, and plan the day ahead. A facilitator facilitates the assembly.
    Using the assembly_tips identify and write down the themes and insights, aimed towards the assembly facilitator.
    """),
    tools=[],
    llm=llm,
    allow_delegation=False,
    max_iter=5,
    verbose=True,
    memory=True,
    )

In [11]:
import json

def responseParser(output:str, key:str='text'):
    """Parses the output. default output key is 'text' """
    output=output.replace('```json','')
    output=output.removeprefix('```json')
    output=output.replace('```','')
    output = output.lstrip()
    # output = json.loads(output)
    # print(output)
    # return output[key]
    return output

def saveInsights(task_output):
    current_datetime = datetime.now()
    formatted_datetime = current_datetime.strftime('%Y%m%d-%H%M')
    filename = f"{formatted_datetime}.json"
    task_output = str(task_output)
    task_output = responseParser(task_output)
    # print(task_output)
    with open(filename,'w') as file:
        file.write(str(task_output))
    print(f"Result saved as {filename}")


In [None]:
theme_spotter_task = Task(description=("""You are a helpful Flight Scheduler user assistant idenifying useful themse and insights to share with the application users directly.
    Your are provided with a list of tips called assembly_tips in json format. A assembly_tip indicates some simple facts about the usage of Flight Scheduler application's assembly board for activities by its users.
    Identify and write down some interesting themes and insights for the Flight Scheduler users using the content of the assembly_tips only. Nothing outside of it.
    Be precise about themes/insights, and include the facts like various counts and numbers. Respond as if you are responding directly to a Flight Scheduler application user ensuring the tone is friendly and professional.
    For information, the available activity types are 'comments', 'subscriptions', 'likes', 'hits', 'shares'
    Keep the tone friedly, polite and professional. Understand that the theme/insight should be some kind of aggregation of tips. And it should not be some simple rephrased assembly tip.
    Example of themes/insights - 
        1) The comments could benefit from some attention! There's 1 new comment, 6 updated comments, and 1 deleted comment. Save time to discuss them.
        2) The subscriptions section could use a quick review! 
        3) There are a lot of long-standing activities on the board! 10 comments, 7 subscriptions, 6 hits, and 2 shares activities. Take some time to see if they need to be updated.
    Come up with atleast 8 varied themes or insights, but no more than 10.
    Some general information about Flight Scheduler application - 
    In Flight Scheduler, activities are considered the activities created in Flight Scheduler by the users.
    """
    ),
    expected_output="""
    A list of insights retrieved in json format.
    When responding to me, please output a response in below format:
        ```json
        {{
        "insights": [
            {{"insight"}},
            {{"insight"}},
            {{"insight"}}
        ]
        }}
        ``` 
    """,
    tools=[],
    agent=theme_spotter,
    callback=saveInsights,
    )
    

In [None]:
theme_reviewer = Agent(
    role="Insight reviewer",
    goal="""You are provided with the assembly_insights in json format. Your job is to to review the assembly_insights generated against the source data called assembly_summary. """, 
    backstory=("""
    Flight Scheduler users perform assembly every morning where they come together, share thoughts through activities, and plan the day ahead. A facilitator facilitates the assembly.
    Using the assembly_summary, review and evaluate the themes and insights aimed towards the assembly facilitator.
    """),
    tools=[],
    llm=llm,
    allow_delegation=False,
    max_iter=5,
    verbose=True,
    memory=True,
    )

In [None]:
theme_reviewer_task = Task(description=("""You are a helpful Flight Scheduler user assistant idenifying useful themes and insights to share with the application users directly.
    Your are provided with a list of insights called assembly_insights in the json format. As a reviewer you need to 
    1) score each tip first based on factual correctness of tip against the provided source data out of 10. For example count should match.
    2) and then amend the tip if any fact is incorrect within the tip.
    Your are provided with a source data called assembly_summary in json format. A assembly_summary indicates the usage of Flight Scheduler application's assembly board for activities by its users.
    You are also provided with the themes/insights to review called 'assembly_insights'.
    """
    ),
    expected_output="""
    A list of insights retrieved in json format.
    When responding to me, please output a response in below format:
        ```json
        {{
        "insights": [
            {{"original_insight", "score", "amended_insight"}},
            {{"original_insight", "score", "amended_insight"}},
            {{"original_insight", "score", "amended_insight"}},
        ],
        assembly_summary : {{assembly_summary}}
        }}
        ``` 
    """,
    tools=[],
    agent=theme_reviewer,
    context=[summary_generator_task, theme_spotter_task],
    callback=saveInsights,
    )
    

In [None]:
crew = Crew(
    tasks=[assembly_data_retriever_task, summary_generator_task, tip_generator_task, theme_spotter_task, theme_reviewer_task],
    agents=[assembly_data_retriever, summary_generator, tip_generator, theme_spotter, theme_reviewer],
    manager_llm = llm,
    process= Process.sequential,
    )

result= crew.kickoff(inputs={})

print(result)