In [5]:
%pip install watchdog sqlalchemy psycopg2


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip3 install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [6]:
import pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import os
from sqlalchemy import create_engine

# Set display options
pd.options.display.max_columns = 50

# Database connection details
username = 'postgres'
password = 'root'
host = 'localhost'
port = '5434'
database = 'final_project'
connection_string = f'postgresql://{username}:{password}@{host}:{port}/{database}'
engine = create_engine(connection_string)

# Function to prepare data and generate reports
def prepare_and_generate_reports():
    # Load the CSV files into DataFrames
    users_df = pd.read_csv('../prep/PREP_Users.csv')
    courses_df = pd.read_csv('../prep/PREP_Courses.csv')
    training_performances_df = pd.read_csv('../prep/PREP_TrainingPerformances.csv')
    feedbacks_df = pd.read_csv('../prep/PREP_Feedbacks.csv')

    # Join users and training performances on employeeID
    user_training_df = pd.merge(users_df, training_performances_df, left_on='id', right_on='employeeID')

    # Join the result with courses on courseID
    user_training_courses_df = pd.merge(user_training_df, courses_df, left_on='courseID', right_on='id')

    # Join users and feedbacks on employeeID
    user_feedback_df = pd.merge(users_df, feedbacks_df, left_on='employeeID', right_on='employeeID')
    df = user_training_df.copy()
    df['row'] = df.sort_values(by='attempt', ascending=False) \
                  .groupby(['employeeID_x', 'courseID']) \
                  .cumcount() + 1

    result_df = df[df['row'] == 1]

    user_training_final_df = result_df.drop(columns=['row'])

    user_training_courses_final_df = pd.merge(user_training_final_df, courses_df, left_on='courseID', right_on='id')
    
    # Fact table for datascience
    datascience_table = pd.merge(user_training_courses_final_df, user_feedback_df[['employeeID', 'aggregatedScore']], left_on='employeeID_x', right_on='employeeID', how='outer').fillna(0)

    datascience_table['employee_id'] = datascience_table.apply(
        lambda row: row['employeeID'] if row['employeeID_x'] == 0 else row['employeeID_x'], axis=1
    )

    aggregated_data = datascience_table.groupby(['employee_id', 'designation_x']).agg(
        avg_training_score=('score', 'mean'),
        no_of_trainings=('course', 'count'),
        feedback_score=('aggregatedScore', 'mean')
    ).reset_index()

    aggregated_data.rename(columns={'designation_x': 'designation'}, inplace=True)

    final_df = aggregated_data[['employee_id', 'designation', 'no_of_trainings', 'avg_training_score', 'feedback_score']]
    final_df.to_csv('DS_TABLE.csv', index=False)

    # Insert final_df back into the database
    final_df.to_sql('data_science_table', engine, if_exists='replace', index=False)

    # Generating other reports...
    avg_score_per_designation = user_training_courses_final_df.groupby('designation_x')['score'].mean()
    avg_score_per_designation.to_csv("REPORT_avg_score_per_designation.csv")
    
    course_completion_count = user_training_courses_df.groupby('employeeID_x')['courseID'].nunique()
    course_completion_count.to_csv('REPORT_course_completion_count.csv')

    top_courses_by_score = user_training_courses_final_df.groupby('title')['score'].mean().sort_values(ascending=False)
    top_courses_by_score.to_csv('REPORT_top_courses_by_score.csv')

    avg_time_spent = user_training_courses_df.groupby('designation_x')['time'].mean()
    avg_time_spent.to_csv('REPORT_avg_time_spent.csv')

    top_performing_employees = (
        user_training_courses_final_df.groupby(['employeeID_x', 'name_x', 'designation_x'])
        .agg({'score': 'mean', 'time': 'mean'})
        .reset_index()
    )
    top_performing_employees = top_performing_employees.sort_values(by='score', ascending=False)
    top_performing_employees.to_csv('REPORT_top_performing_employees.csv')

    avg_attempts_per_employee = user_training_courses_df.groupby('employeeID_x')['attempt'].mean()
    avg_attempts_per_employee.to_csv('REPORT_avg_attempts_per_employee.csv')

    avg_feedback_by_manager = user_feedback_df.groupby('managerId')['aggregatedScore'].mean()
    avg_feedback_by_manager.to_csv('REPORT_avg_feedback_by_manager.csv')

    feedback_score_distribution = user_feedback_df['aggregatedScore'].value_counts()
    feedback_score_distribution.to_csv('REPORT_feedback_score_distribution.csv')

    employee_count_by_designation = users_df['designation'].value_counts()
    employee_count_by_designation.to_csv('REPORT_employee_count_by_designation.csv')

    courses_per_designation = courses_df['designation'].value_counts()
    courses_per_designation.to_csv('REPORT_courses_per_designation.csv')

    avg_feedback_score_by_designation = user_feedback_df.groupby('designation_x')['aggregatedScore'].mean()
    avg_feedback_score_by_designation.to_csv('REPORT_avg_feedback_score_by_designation.csv')

    popular_courses = user_training_courses_final_df['title'].value_counts()
    popular_courses.to_csv('REPORT_popular_courses.csv')

    avg_time_per_course = user_training_courses_df.groupby('title')['time'].mean()
    avg_time_per_course.to_csv('REPORT_avg_time_per_course.csv')

    feedback_count_by_manager = user_feedback_df['managerId'].value_counts()
    feedback_count_by_manager.to_csv('REPORT_feedback_count_by_manager.csv')

    user_training_courses_final_df1 = user_training_courses_final_df.groupby('employeeID_x')['score'].mean()
    merged_scores_df = pd.merge(user_training_courses_final_df1, user_feedback_df[['employeeID', 'aggregatedScore']], left_on='employeeID_x', right_on='employeeID', how='outer').fillna(0)
    merged_scores_df['Final_score'] = (merged_scores_df['score'] + merged_scores_df['aggregatedScore']) / 2
    merged_scores_df['Retained'] = np.where(merged_scores_df['Final_score'] > 50, 'Retained', 'Not Retained')
    merged_scores_df.to_csv('REPORT_retained_table.csv')

    # Insert merged_scores_df back into the database
    merged_scores_df.to_sql('merged_scores', engine, if_exists='replace', index=False)

class CSVHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path.endswith('.csv'):
            print(f'{event.src_path} has been modified. Preparing data and generating reports...')
            prepare_and_generate_reports()

if __name__ == "__main__":
    path = '../prep'  # Path to the folder you want to monitor
    event_handler = CSVHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)

    print("Monitoring folder for changes...")
    observer.start()
    
    try:
        while True:
            time.sleep(1)  # Keep the script running
    except KeyboardInterrupt:
        observer.stop()
    observer.join()


Monitoring folder for changes...


Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1052, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/watchdog/observers/api.py", line 213, in run
    self.dispatch_events(self.event_queue)
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/watchdog/observers/api.py", line 391, in dispatch_events
    handler.dispatch(event)
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/watchdog/events.py", line 217, in dispatch
    getattr(self, f"on_{event.event_type}")(event)
  File "/var/folders/vl/_n6dyrzx4c5cdgbwmdsqbb3m0000gn/T/ipykernel_50392/2165251398.py", line 129, in on_modified
  File "/var/folders/vl/_n6dyrzx4c5cdgbwmdsqbb3m0000gn/T/ipykernel_50392/2165251398.py", line 119, in prepare_and_generate_reports
NameError: name 'np' is 

/Users/aswath/Desktop/Jskill2-main/dataEngineering/prep/PREP_TrainingPerformances.csv has been modified. Preparing data and generating reports...
