# STEP -1

## Import Libraries and Create connection

In [45]:
import os
import json
from datetime import timedelta, datetime
from bson import json_util
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, lit


In [46]:
load_dotenv(".env")
mongo_url = os.getenv("mongo_url")


In [24]:
spark = (
    SparkSession.builder
        .appName("MongoDBReadJoin")
        .master("local[*]")
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0")
        .getOrCreate()
)


# STEP - 2

## User pipeline

In [26]:
user_email = input("Enter user email: ").strip()
print(user_email)


nirmallya@klizos.com


In [27]:
pipeline_users = json.dumps([
    {"$match": {"email": user_email}},
    {"$project": {
        "_id": 1,
        "username": 1,
        "email": 1,
        "role": 1
    }}
])

pipeline_users


'[{"$match": {"email": "nirmallya@klizos.com"}}, {"$project": {"_id": 1, "username": 1, "email": 1, "role": 1}}]'

In [28]:
from pymongo import MongoClient
client = MongoClient("mongodb://54.216.237.221:27017", serverSelectionTimeoutMS=5000)
print(client.server_info())




In [29]:
df_users = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "users")
        .load()
        .filter(col("email") == user_email)   # <-- Spark filter
        .select("_id", "username", "email", "role")
)

user_data = df_users.limit(1).collect()
user_data


[Row(_id='6874c8bd378bbb7f8e9c07bd', username='nirmallya', email='nirmallya@klizos.com', role='employee')]

In [30]:
user_id = user_data[0]["_id"]
username = user_data[0]["username"]
role = user_data[0]["role"]

print(f"Found user: {username} ({role}), _id={user_id}")


Found user: nirmallya (employee), _id=6874c8bd378bbb7f8e9c07bd


## Agentic Activity pipeline

In [31]:
filter_date = input("Enter date (yyyy-MM-dd): ").strip()

start_date = datetime.strptime(filter_date, "%Y-%m-%d")
end_date = start_date + timedelta(days=1)

filter_date


'2025-08-20'

### Calculating Total Duration of the Date

In [34]:
pipeline_total_duration = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$group": {
            "_id": {
                "date": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$timestamp",
                        "timezone": "Asia/Kolkata"
                    }
                }
            },
            "min_timestamp": {"$min": "$timestamp"},
            "max_timestamp": {"$max": "$timestamp"}
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "min_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$min_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "max_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$max_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    }
]

pipeline_total_duration_json = json.dumps(pipeline_total_duration, default=json_util.default)

df_base = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "agentactivities")
        .option("aggregation.pipeline", pipeline_total_duration_json)
        .load()
)

# Now calculate durations in Spark
from pyspark.sql.functions import unix_timestamp, to_timestamp

df_final = (
    df_base
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .withColumn("min_ts", to_timestamp("min_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("max_ts", to_timestamp("max_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("total_duration_minutes", (unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 60)
    .withColumn("total_duration_hours", (unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 3600)
    .select(
        "username", "email", "role",
        "date", "min_timestamp", "max_timestamp",
        "total_duration_minutes", "total_duration_hours"
    )
)

df_final.show(truncate=False)


+---------+--------------------+--------+----------+-------------------+-------------------+----------------------+--------------------+
|username |email               |role    |date      |min_timestamp      |max_timestamp      |total_duration_minutes|total_duration_hours|
+---------+--------------------+--------+----------+-------------------+-------------------+----------------------+--------------------+
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|2025-08-20 12:53:04|2025-08-20 22:32:04|579.0                 |9.65                |
+---------+--------------------+--------+----------+-------------------+-------------------+----------------------+--------------------+



## Calculate total duration for the 'type' column

In [35]:
# Aggregation pipeline to calculate min/max timestamps per user per type per day
pipeline_duration_by_type = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$group": {
            "_id": {
                "date": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$timestamp",
                        "timezone": "Asia/Kolkata"
                    }
                },
                "type": "$type"  # group by type
            },
            "min_timestamp": {"$min": "$timestamp"},
            "max_timestamp": {"$max": "$timestamp"}
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "type": "$_id.type",
            "min_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$min_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "max_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$max_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    }
]

pipeline_duration_by_type_json = json.dumps(pipeline_duration_by_type, default=json_util.default)

# Read from MongoDB with the updated pipeline
df_type = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "agentactivities")
        .option("aggregation.pipeline", pipeline_duration_by_type_json)
        .load()
)

# Calculate durations in Spark
from pyspark.sql.functions import round as spark_round

df_final_type = (
    df_type
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .withColumn("min_ts", to_timestamp("min_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("max_ts", to_timestamp("max_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("total_duration_minutes", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 60, 5))
    .withColumn("total_duration_hours", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 3600, 5))
    .select(
        "username", "email", "role", "date", "type",
        "min_timestamp", "max_timestamp",
        "total_duration_minutes", "total_duration_hours"
    )
)

df_final_type.show(truncate=False)



+---------+--------------------+--------+----------+----------------+-------------------+-------------------+----------------------+--------------------+
|username |email               |role    |date      |type            |min_timestamp      |max_timestamp      |total_duration_minutes|total_duration_hours|
+---------+--------------------+--------+----------+----------------+-------------------+-------------------+----------------------+--------------------+
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|keystroke       |2025-08-20 12:53:04|2025-08-20 22:32:03|578.98333             |9.64972             |
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|window_focus    |2025-08-20 12:53:04|2025-08-20 22:31:59|578.91667             |9.64861             |
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|activity_summary|2025-08-20 12:53:26|2025-08-20 22:32:04|578.63333             |9.64389             |
+---------+--------------------+--------+----------+----------------+-------

## Calculate total duration for the 'application_name' column

In [36]:
# Aggregation pipeline to calculate min/max timestamps per user per application per day
pipeline_duration_by_app = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$group": {
            "_id": {
                "date": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$timestamp",
                        "timezone": "Asia/Kolkata"
                    }
                },
                "application_name": "$application.name"  # group by application.name
            },
            "min_timestamp": {"$min": "$timestamp"},
            "max_timestamp": {"$max": "$timestamp"}
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "application_name": "$_id.application_name",
            "min_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$min_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "max_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$max_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    }
]

pipeline_duration_by_app_json = json.dumps(pipeline_duration_by_app, default=json_util.default)

# Read from MongoDB with the updated pipeline
df_app = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "agentactivities")
        .option("aggregation.pipeline", pipeline_duration_by_app_json)
        .load()
)

# Calculate durations in Spark
from pyspark.sql.functions import round as spark_round

df_final_app = (
    df_app
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .withColumn("min_ts", to_timestamp("min_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("max_ts", to_timestamp("max_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("total_duration_minutes", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 60, 5))
    .withColumn("total_duration_hours", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 3600, 5))
    .select(
        "username", "email", "role", "date", "application_name",
        "min_timestamp", "max_timestamp",
        "total_duration_minutes", "total_duration_hours"
    )
)

df_final_app.show(truncate=False)


+---------+--------------------+--------+----------+-----------------------------+-------------------+-------------------+----------------------+--------------------+
|username |email               |role    |date      |application_name             |min_timestamp      |max_timestamp      |total_duration_minutes|total_duration_hours|
+---------+--------------------+--------+----------+-----------------------------+-------------------+-------------------+----------------------+--------------------+
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|Store                        |2025-08-20 15:58:22|2025-08-20 16:09:49|11.45                 |0.19083             |
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|Slack                        |2025-08-20 12:53:36|2025-08-20 22:31:36|578.0                 |9.63333             |
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|Store Installer              |2025-08-20 15:47:10|2025-08-20 16:05:17|18.11667              |0.30194             

## Calculate total duration for the 'window_title' column

In [37]:
# Aggregation pipeline to calculate min/max timestamps per user per window.title per day
pipeline_duration_by_window = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$group": {
            "_id": {
                "date": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$timestamp",
                        "timezone": "Asia/Kolkata"
                    }
                },
                "window_title": "$window.title"  # group by window.title
            },
            "min_timestamp": {"$min": "$timestamp"},
            "max_timestamp": {"$max": "$timestamp"}
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "window_title": "$_id.window_title",
            "min_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$min_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "max_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$max_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    }
]

pipeline_duration_by_window_json = json.dumps(pipeline_duration_by_window, default=json_util.default)

# Read from MongoDB with the updated pipeline
df_window = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "agentactivities")
        .option("aggregation.pipeline", pipeline_duration_by_window_json)
        .load()
)

# Calculate durations in Spark
from pyspark.sql.functions import round as spark_round

df_final_window = (
    df_window
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .withColumn("min_ts", to_timestamp("min_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("max_ts", to_timestamp("max_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("total_duration_minutes", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 60, 5))
    .withColumn("total_duration_hours", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 3600, 5))
    .select(
        "username", #"email", "role", 
        "date", 
        "window_title",
        "min_timestamp", "max_timestamp",
        "total_duration_minutes", "total_duration_hours"
    )
)

df_final_window.show(10)


+---------+----------+--------------------+-------------------+-------------------+----------------------+--------------------+
| username|      date|        window_title|      min_timestamp|      max_timestamp|total_duration_minutes|total_duration_hours|
+---------+----------+--------------------+-------------------+-------------------+----------------------+--------------------+
|nirmallya|2025-08-20|cursor/dashboard ...|2025-08-20 15:48:20|2025-08-20 15:48:20|                   0.0|                 0.0|
|nirmallya|2025-08-20|chatgpy desktop d...|2025-08-20 16:07:13|2025-08-20 16:07:13|                   0.0|                 0.0|
|nirmallya|2025-08-20|Untitled - Google...|2025-08-20 16:07:18|2025-08-20 17:47:28|             100.16667|             1.66944|
|nirmallya|2025-08-20|Cursor Settings -...|2025-08-20 13:51:18|2025-08-20 14:03:46|              12.46667|             0.20778|
|nirmallya|2025-08-20|  Transparent Window|2025-08-20 22:10:37|2025-08-20 22:31:36|              20.9833

### group by type, application_name and windows title - together

In [39]:
# Aggregation pipeline to calculate min/max timestamps per user, per type, per application, per window title per day
pipeline_duration_all = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$group": {
            "_id": {
                "date": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$timestamp",
                        "timezone": "Asia/Kolkata"
                    }
                },
                "type": "$type",
                "application_name": "$application.name",
                "window_title": "$window.title"
            },
            "min_timestamp": {"$min": "$timestamp"},
            "max_timestamp": {"$max": "$timestamp"}
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "type": "$_id.type",
            "application_name": "$_id.application_name",
            "window_title": "$_id.window_title",
            "min_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$min_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "max_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$max_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    }
]

pipeline_duration_all_json = json.dumps(pipeline_duration_all, default=json_util.default)


# Read MongoDB with the unified pipeline
df_all = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "agentactivities")
        .option("aggregation.pipeline", pipeline_duration_all_json)
        .load()
)

from pyspark.sql.functions import round as spark_round

# Calculate durations in Spark
df_final = (
    df_all
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .withColumn("min_ts", to_timestamp("min_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("max_ts", to_timestamp("max_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("total_duration_minutes", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 60, 5))
    .withColumn("total_duration_hours", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 3600, 5))
    .select(
        "username", "email", "role", 
        "date",
        "type", "application_name", "window_title",
        "min_timestamp", "max_timestamp",
        "total_duration_minutes", "total_duration_hours"
    )
)

df_final.show()



+---------+--------------------+--------+----------+----------------+--------------------+--------------------+-------------------+-------------------+----------------------+--------------------+
| username|               email|    role|      date|            type|    application_name|        window_title|      min_timestamp|      max_timestamp|total_duration_minutes|total_duration_hours|
+---------+--------------------+--------+----------+----------------+--------------------+--------------------+-------------------+-------------------+----------------------+--------------------+
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|    window_focus|       Google Chrome|[IS-1379] Job-sco...|2025-08-20 15:23:23|2025-08-20 15:23:33|               0.16667|             0.00278|
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|activity_summary|              Cursor|              Cursor|2025-08-20 14:30:15|2025-08-20 18:27:24|                237.15|              3.9525|
|nirmallya|nirmallya

In [None]:
# Aggregation pipeline to calculate min/max timestamps per user, per type, per application, per window title per day
pipeline_duration_all = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$group": {
            "_id": {
                "date": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$timestamp",
                        "timezone": "Asia/Kolkata"
                    }
                },
                "type": "$type",
                "application_name": "$application.name",
                "window_title": "$window.title"
            },
            "min_timestamp": {"$min": "$timestamp"},
            "max_timestamp": {"$max": "$timestamp"}
        }
    },
    {
        "$project": {
            "date": "$_id.date",
            "type": "$_id.type",
            "application_name": "$_id.application_name",
            "window_title": "$_id.window_title",
            "min_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$min_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "max_timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$max_timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    }
]

pipeline_duration_all_json = json.dumps(pipeline_duration_all, default=json_util.default)


from pyspark.sql.functions import round as spark_round, unix_timestamp, to_timestamp, min as spark_min, max as spark_max

# Step 1: Compute per-row durations (your existing df_final base)
df_final = (
    df_all
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .withColumn("min_ts", to_timestamp("min_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("max_ts", to_timestamp("max_timestamp", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("total_duration_minutes", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 60, 5))
    .withColumn("total_duration_hours", spark_round((unix_timestamp("max_ts") - unix_timestamp("min_ts")) / 3600, 5))
    .select(
        "username", "email", "role", 
        "date",
        "type", "application_name", "window_title",
        "min_timestamp", "max_timestamp",
        "total_duration_minutes", "total_duration_hours"
    )
)

# Step 2: Compute daily duration (max - min for the whole day)
df_daily = (
    df_final
    .groupBy("date")
    .agg(
        spark_min("min_timestamp").alias("day_min"),
        spark_max("max_timestamp").alias("day_max")
    )
    .withColumn("daily_duration_minutes", spark_round((unix_timestamp("day_max") - unix_timestamp("day_min")) / 60, 5))
    .withColumn("daily_duration_hours", spark_round((unix_timestamp("day_max") - unix_timestamp("day_min")) / 3600, 5))
    .select("date", "daily_duration_minutes", "daily_duration_hours")
)

# Step 3: Join back to original df_final
df_final_with_daily = (
    df_final
    .join(df_daily, on="date", how="left")
    .select(
        "username", #"email", "role", 
        "date",
        "type", "application_name", "window_title"
        # ,"min_timestamp", "max_timestamp",
        # "total_duration_minutes", "total_duration_hours",
        # "daily_duration_minutes", "daily_duration_hours"
    )
)

df_final_with_daily.show(100, truncate=False)



+---------+----------+----------------+-----------------------------+--------------------------------------------------------------------------------------------------------------+
|username |date      |type            |application_name             |window_title                                                                                                  |
+---------+----------+----------------+-----------------------------+--------------------------------------------------------------------------------------------------------------+
|nirmallya|2025-08-20|activity_summary|Google Chrome                |ChatGPT - Supriyo - Interview Screener - Google Chrome                                                        |
|nirmallya|2025-08-20|activity_summary|Google Chrome                |Meet - kqt-xfjp-xye - Google Chrome                                                                           |
|nirmallya|2025-08-20|activity_summary|Slack                        |interviewscreener-demo-upd

# STEP - 3

In [None]:
import json

activity_data = df_final_with_daily.toPandas().to_dict(orient="records")

activity_json = json.dumps(activity_data, indent=2, default=str)

with open("activity_log.json", "w", encoding="utf-8") as f:
    f.write(activity_json)

print("Activity log exported to activity_log.json")
print(f"Total records exported: {len(activity_data)}")


                                                                                

In [None]:
from langchain_groq import ChatGroq
from langchain.prompts import ChatPromptTemplate
from dotenv import load_dotenv
import json

# Load GROQ_API_KEY from .env
load_dotenv()

# Initialize LLM
llm = ChatGroq(model="llama3-70b-8192", temperature=0)

# Convert Spark DataFrame → JSON
activity_data = df_final_with_daily.toPandas().to_dict(orient="records")
activity_json = json.dumps(activity_data, indent=2, default=str)

# Create prompt
prompt_template = ChatPromptTemplate.from_messages([
    ("system", "You are a productivity analysis assistant."),
    ("user", """Analyze the following employee activity log and generate a structured productivity report.

The report should include:
- Employee name, report generation time
- Daily summary of activity patterns
- Top applications & websites used
- Any irrelevant/distracting activity
- Productivity rating (0–100) with reasoning
- Recommendations to improve
- A final JSON block with productivity_score, rating, and recommendations.

Employee: {username}
Date: {date}
Data:
{activity_json}
""")
])

# Format the prompt with values
prompt = prompt_template.format_messages(
    username=username,
    date=start_date.strftime('%Y-%m-%d'),
    activity_json=activity_json
)

# Call Groq
response = llm.invoke(prompt)

print(response.content)


In [50]:
from langchain_groq import ChatGroq
from langchain.prompts import ChatPromptTemplate
from dotenv import load_dotenv
import json
import math

# Load GROQ_API_KEY from .env
load_dotenv()

# Initialize LLM
llm = ChatGroq(model="llama3-70b-8192", temperature=0)

# Convert Spark DataFrame → JSON-like dict
activity_data = df_final_with_daily.toPandas().to_dict(orient="records")

# Chunk the activity logs to avoid exceeding token limits
CHUNK_SIZE = 50   # adjust depending on avg row size
chunks = [activity_data[i:i+CHUNK_SIZE] for i in range(0, len(activity_data), CHUNK_SIZE)]

# Define summarization prompt
chunk_prompt_template = ChatPromptTemplate.from_messages([
    ("system", "You are a productivity analysis assistant. Summarize concisely."),
    ("user", """Summarize the following activity logs into a compact form with:
- Key apps/websites used
- Irrelevant/distractions
- Time distribution summary

Data:
{activity_chunk}
""")
])

chunk_summaries = []
for idx, chunk in enumerate(chunks):
    activity_json_chunk = json.dumps(chunk, indent=2, default=str)
    chunk_prompt = chunk_prompt_template.format_messages(
        activity_chunk=activity_json_chunk
    )
    response = llm.invoke(chunk_prompt)
    chunk_summaries.append(response.content)

# Now combine summaries into the final structured report
final_prompt_template = ChatPromptTemplate.from_messages([
    ("system", "You are a productivity analysis assistant."),
    ("user", """Based on the following summarized activity data, generate a structured productivity report.

The report should include:
- Employee name, report generation time
- Daily summary of activity patterns
- Top applications & websites used
- Any irrelevant/distracting activity
- Productivity rating (0–100) with reasoning
- Recommendations to improve
- A final JSON block with productivity_score, rating, and recommendations.

Employee: {username}
Date: {date}
Summarized Data:
{summaries}
""")
])

final_prompt = final_prompt_template.format_messages(
    username=username,
    date=start_date.strftime('%Y-%m-%d'),
    summaries="\n\n".join(chunk_summaries)
)

final_response = llm.invoke(final_prompt)
print(final_response.content)


                                                                                

RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit reached for model `llama3-70b-8192` in organization `org_01jngt063xfzybyt7kbae0mt6z` service tier `on_demand` on tokens per minute (TPM): Limit 6000, Used 3456, Requested 2555. Please try again in 101.999999ms. Need more tokens? Upgrade to Dev Tier today at https://console.groq.com/settings/billing', 'type': 'tokens', 'code': 'rate_limit_exceeded'}}

In [41]:
rows = df_final.count()
cols = len(df_final.columns)

print(f"df_final has {rows} rows and {cols} columns")


df_final has 263 rows and 9 columns


## Raw Data

In [46]:
start_date = datetime.strptime(filter_date, "%Y-%m-%d")
end_date = start_date + timedelta(days=1)

pipeline_activities = [
    {
        "$match": {
            "userId": user_id,
            "timestamp": {
                "$gte": start_date,
                "$lt": end_date
            }
        }
    },
    {
        "$project": {
            "type": 1,
            "application_name": "$application.name",
            "window_title": "$window.title",
            # Extract date and convert UTC → IST
            "date": {
                "$dateToString": {
                    "format": "%Y-%m-%d",
                    "date": "$timestamp",
                    "timezone": "Asia/Kolkata"
                }
            },
            "timestamp": {
                "$dateToString": {
                    "format": "%Y-%m-%d %H:%M:%S",
                    "date": "$timestamp",
                    "timezone": "Asia/Kolkata"
                }
            }
        }
    },
    {
        "$sort": {"timestamp": 1}
    }
]

pipeline_activities_json = json.dumps(pipeline_activities, default=json_util.default)

# Load into Spark
df_activities = (
    spark.read
        .format("mongodb")
        .option("connection.uri", mongo_url)
        .option("database", "monitoring")
        .option("collection", "agentactivities")
        .option("aggregation.pipeline", pipeline_activities_json)
        .load()
)

# Add user metadata
df_activities = (
    df_activities
    .withColumn("username", lit(username))
    .withColumn("email", lit(user_email))
    .withColumn("role", lit(role))
    .select(
        "username", "email", "role",
        "date",
        "type", "application_name", "window_title",
        "timestamp"
    )
)

print(f"Total raw records: {df_activities.count()}")
df_activities.show(10, truncate=False)


                                                                                

Total raw records: 10230
+---------+--------------------+--------+----------+----------------+----------------+------------+-------------------+
|username |email               |role    |date      |type            |application_name|window_title|timestamp          |
+---------+--------------------+--------+----------+----------------+----------------+------------+-------------------+
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|window_focus    |Electron        |Klizo Agent |2025-08-20 12:53:04|
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|keystroke       |Windows Explorer|NULL        |2025-08-20 12:53:04|
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|window_focus    |Windows Explorer|NULL        |2025-08-20 12:53:06|
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|window_focus    |Windows Explorer|NULL        |2025-08-20 12:53:11|
|nirmallya|nirmallya@klizos.com|employee|2025-08-20|window_focus    |Windows Explorer|NULL        |2025-08-20 12:53:16|
|nirmallya|nirm

In [None]:
# Show first 5 rows
print("===== HEAD (first 5 rows) =====")
df_activities.limit(5).show(truncate=False)


In [None]:
from pyspark.sql import Row

last_rows = df_final.tail(5)
df_tail = spark.createDataFrame(last_rows)

print("===== TAIL (last 5 rows) =====")
df_tail.show(truncate=False)
