In [1]:
pip install pandas

Collecting pandas
  Downloading pandas-2.2.3-cp39-cp39-macosx_11_0_arm64.whl.metadata (89 kB)
Collecting numpy>=1.22.4 (from pandas)
  Downloading numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl.metadata (60 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2024.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.2.3-cp39-cp39-macosx_11_0_arm64.whl (11.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.3/11.3 MB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl (5.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.3/5.3 MB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading tzdata-2024.2-py2.py3-none-any.whl (346 kB)
Installing collected packages: tzdata, numpy, pandas
Successfully installed numpy-2.0.2 pandas-2.2.3 tzdata-2024.2
Note: you may need to restart the kernel to use updated packages.


In [3]:
import pandas as pd
import os
from pymongo import MongoClient

MONGO_HOST = 'localhost'
MONGO_PORT = 27017
MONGO_DB = 'admin'
MONGO_USERNAME = 'root'
MONGO_PASSWORD = 'mongo_password'

def get_mongo_client():
    uri = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/"
    client = MongoClient(uri)
    return client

def get_database():
    try:
        client = get_mongo_client()
        db = client[MONGO_DB]
        print("DB connection OK")
        return db
    except Exception as e:
        print("DB connection error:", e)
        return None


BASE_PATH = '../../data/'
activity_log_path = os.path.join(BASE_PATH, 'ACTIVITY_LOG.csv')
user_log_path = os.path.join(BASE_PATH, 'USER_LOG.csv')
component_codes_path = os.path.join(BASE_PATH, 'COMPONENT_CODES.csv')


def save_csv_to_mongodb(file_path, collection_name, rename_column=None, add_month_column=False, date_column=None):
    try:
        data = pd.read_csv(file_path)
        print(f"Loaded data from {file_path}")

        # RENAME
        if rename_column:
            old_name, new_name = rename_column
            if old_name in data.columns:
                data.rename(columns={old_name: new_name}, inplace=True)
                print(f"Renamed column '{old_name}' to '{new_name}'")

        # Add Month column for COUNT
        if add_month_column and date_column:
            if date_column in data.columns:
                data['Date'] = pd.to_datetime(data[date_column], format='%d/%m/%Y %H:%M', errors='coerce')
                data['Month'] = data['Date'].dt.month
                print(f"Added 'Month' column based on '{date_column}' column")

        # Save to MongoDB for BACKUP
        db = get_database()
        collection = db[collection_name]
        records = data.to_dict('records')
        collection.insert_many(records)
        print(f"Saved {len(records)} records to MongoDB collection: {collection_name}")
    except Exception as e:
        print(f"Error processing file {file_path}: {e}")


# MAIN
save_csv_to_mongodb(activity_log_path, 'activity_log', rename_column=('User Full Name *Anonymized', 'User_ID'))
save_csv_to_mongodb(user_log_path, 'user_log', rename_column=('User Full Name *Anonymized', 'User_ID'), add_month_column=True, date_column='Date')
save_csv_to_mongodb(component_codes_path, 'component_codes')

Loaded data from ../../data/USER_LOG.csv
Renamed column 'User Full Name *Anonymized' to 'User_ID'
Added 'Month' column based on 'Date' column
DB connection OK
Saved 150835 records to MongoDB collection: user_log


In [None]:
# import pandas as pd
# import threading
# from queue import Queue
# import sys
# import os
# from pymongo import MongoClient

# MONGO_HOST = 'localhost'
# MONGO_PORT = 27017
# MONGO_DB = 'admin'
# MONGO_USERNAME = 'root'
# MONGO_PASSWORD = 'mongo_password'

# def get_mongo_client():
#     uri = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/"
#     client = MongoClient(uri)
#     return client

# def get_database():
#     try:
#         client = get_mongo_client()
#         db = client[MONGO_DB]
#         print("DB connection OK")
#         return db
#     except Exception as e:
#         print("DB connection error:", e)
#         return None


# BASE_PATH = '../../data/'
# activity_log_path = os.path.join(BASE_PATH, 'ACTIVITY_LOG.csv')
# user_log_path = os.path.join(BASE_PATH, 'USER_LOG.csv')

# def load_data(file_path, encoding='utf-8'):
#     try:
#         data = pd.read_csv(file_padth, encoding=encoding)
#         print(f"Loaded data from {file_path}")
#         return data
#     except FileNotFoundError:
#         print(f"Error: The file {file_path} was not found.")
#     except pd.errors.EmptyDataError:
#         print(f"Error: The file {file_path} is empty.")
#     except Exception as e:
#         print(f"Error loading {file_path}: {e}")
#     return None

# def merge_data(activity_log, user_log):
#     if activity_log is not None and user_log is not None:
#         merged = pd.merge(activity_log, user_log, on='User Full Name *Anonymized', how='inner')
#         print("Data merged successfully.")
#         return merged
#     else:
#         print("Error: One or more data frames are None, cannot proceed with merge.")
#         return None

# def clean_data(data):
#     if data is not None:
#         # Rename
#         data.rename(columns={'User Full Name *Anonymized': 'User_ID'}, inplace=True)
#         # Remove
#         data = data[~data['Component'].isin(['System', 'Folder'])]
#         # Drop rows with any missing values
#         data.dropna(inplace=True)
#         print("Data cleaned successfully.")
#         return data
#     else:
#         print("Error: Merged data is None, cannot proceed with cleaning.")
#         return None


# def clean(activity_log_path, user_log_path):
#     activity_log = load_data(activity_log_path)
#     user_log = load_data(user_log_path)
#     merged_data = merge_data(activity_log, user_log)
#     cleaned_data = clean_data(merged_data)
#     return cleaned_data


# def save_chunk_to_mongodb(chunk, collection_name, queue):
#     try:
#         db = get_database()
#         collection = db[collection_name]
#         collection.insert_many(chunk)
#         queue.put("Chunk saved successfully")
#     except Exception as e:
#         queue.put(f"Error saving chunk to MongoDB: {e}")


# def save_to_mongodb(cleaned_data, collection_name):
#     if cleaned_data is not None:
#         records = cleaned_data.to_dict('records')
#         num_threads = 4  # Number of threads for insertion
#         chunk_size = len(records) // num_threads
#         threads = []
#         queue = Queue()

#         for i in range(num_threads):
#             start_index = i * chunk_size
#             end_index = (i + 1) * chunk_size if i != num_threads - 1 else len(records)
#             chunk = records[start_index:end_index]

#             thread = threading.Thread(target=save_chunk_to_mongodb, args=(chunk, collection_name, queue))
#             threads.append(thread)
#             thread.start()

#         for thread in threads:
#             thread.join()

#         while not queue.empty():
#             print(queue.get())

#         print(f"Data saved to MongoDB collection: {collection_name} using threading.")
#     else:
#         print("No data to save to MongoDB.")


# # Main
# cleaned_data = clean(activity_log_path, user_log_path)
# if cleaned_data is not None:
#     save_to_mongodb(cleaned_data, "user_activity_data")