The current implementation, where only IDs are stored in the UserActivities table and names are kept in separate reference tables (Component, Action,  Target), prevents **data inconsistency**. If a name changes (e.g., renaming a Component), it only needs to be updated in the reference table, and all related rows in UserActivities will automatically reflect the change through the foreign key relationship. This eliminates the need to update multiple rows in UserActivities, avoids potential mismatches, and ensures data consistency across the database. It also simplifies maintenance and reduces the risk of errors in large datasets.


In [2]:
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

# Define the base for SQLAlchemy
Base = declarative_base()

# Define normalized table schemas
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, unique=True, nullable=False)
    activities = relationship('UserActivity', back_populates='user')
    # user.activities  # Return all UserActivity records for a given User

class Component(Base):
    __tablename__ = 'components'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)
    activities = relationship('UserActivity', back_populates='component')

class Action(Base):
    __tablename__ = 'actions'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)
    activities = relationship('UserActivity', back_populates='action')

class Target(Base):
    __tablename__ = 'targets'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)
    activities = relationship('UserActivity', back_populates='target')

class UserActivity(Base):
    __tablename__ = 'user_activities'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    datetime = Column(DateTime, nullable=False)
    component_id = Column(Integer, ForeignKey('components.id'), nullable=False)
    action_id = Column(Integer, ForeignKey('actions.id'), nullable=False)
    target_id = Column(Integer, ForeignKey('targets.id'), nullable=True)

    user = relationship('User', back_populates='activities')
    # user_activity.user  # Returns the User record for a given UserActivity
    component = relationship('Component', back_populates='activities')
    action = relationship('Action', back_populates='activities')
    target = relationship('Target', back_populates='activities')

########################################################################################################################################################

from tqdm import tqdm
import pandas as pd
import os, datetime
import threading, queue
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.orm import scoped_session, sessionmaker

class DatabaseHandler:
    def __init__(self, db_path='user_activities.db'):
        """Initialize the database with Write-Ahead Logging (WAL) mode."""
        try:

            self.engine = create_engine(
                f'sqlite:///{db_path}', # Create an SQLAlchemy Engine representing the database connection
                connect_args={
                    "timeout": 30  # Specifies the duration (in seconds) the connection will wait 
                                   # for a database lock to be released when another thread is using it.
                },
                pool_size=5,  # The number of persistent connections to maintain in a connection pool
                max_overflow=10  # Allow up to 10 additional connections when all connections are in use.
            )
            
            # create tables using SQLAlchemy metadata
            Base.metadata.create_all(self.engine)
    
            # Enable WAL mode with raw connection
            conn = self.engine.raw_connection()
            cursor = conn.cursor()
            cursor.execute('PRAGMA journal_mode=WAL')  # Enable WAL mode
            cursor.close()
            conn.close() 
    
            # Scoped session for thread-safe database access
            self.SessionFactory = sessionmaker(bind=self.engine)
            self.scoped_session = scoped_session(self.SessionFactory)
            
            # A shared global cache
            self.global_cache = {
                "user": {},
                "component": {},
                "action": {},
                "target": {}
            }
            self.lock = threading.Lock()  # A lock for thread-safe cache access
            
        except Exception as e:
            print(f"Error setting up the database: {e}")
            self.close()

    def close(self):
        """Dispose of the database engine."""
        if self.scoped_session:
            self.scoped_session.remove()  # Remove scoped sessions
        self.engine.dispose()
        print("Database connections closed.")
    
    def _add_or_return_row(self, session, table, field, value, cache_key):
        """Retrieve or create an entry in the database with a shared cache and ON CONFLICT handling."""
        try:
            with self.lock:
                # Check the global cache
                if value in self.global_cache[cache_key]:
                    return self.global_cache[cache_key][value]
    
            # Use raw SQL with ON CONFLICT to handle duplicates
            sql = text(f"""
            INSERT INTO {table.__tablename__} ({field})
            VALUES (:value)
            ON CONFLICT ({field}) DO NOTHING
            """)
            session.execute(sql, {'value': value})
            session.flush()  # Ensure the new row is available
    
            # Retrieve the ID (either newly inserted or existing)
            entry = session.query(table).filter_by(**{field: value}).first()
            if entry:
                with self.lock:
                    # Update global cache
                    self.global_cache[cache_key][value] = entry.id
                return entry.id
    
        except Exception as e:
            session.rollback()
            print(f"Error in _add_or_return_row for model {table.__name__}, field {field}, value {value}: {e}")
            return None
    
    def worker_thread(self, data_chunk, write_queue, progress_bar):
        """Thread worker to process a chunk of data."""
        session = self.scoped_session()
        activities = []
    
        try:
            for _, row in data_chunk.iterrows():
                # Attempt to add or retrieve IDs for dimensional tables
                required_fields = [
                    ('user_id', User, 'user_id', row['User_ID'], 'user'),
                    ('component_id', Component, 'name', row['Component'], 'component'),
                    ('action_id', Action, 'name', row['Action'], 'action'),
                    ('target_id', Target, 'name', row['Target'], 'target'),
                ]
    
                # UserActivity
                record = {}
                for field_name, table, column, value, cache_key in required_fields:
                    if pd.notna(value):
                        record[field_name] = self._add_or_return_row(session, table, column, value, cache_key)
                        
                        if record[field_name] is None:
                            print(f"Skipping row due to error in {field_name}: {row.to_dict()}")
                            break
                            
                else:
                    # Uses a 'for-else' which runs only if the loop above completes without a break
                    record['datetime'] = datetime=pd.to_datetime(row['DateTime'], format="%d/%m/%Y %H:%M")
                    activities.append(record)
                    progress_bar.update(1)
    
            # Commit changes for dimensional tables
            session.commit()
    
            # Pass the activities to the writer thread
            write_queue.put(activities)
    
        except Exception as e:
            session.rollback()
            print(f"Error in worker thread: {e}")
        finally:
            session.close()


    def writer_thread(self, write_queue):
        """Thread to handle batch writes to the database."""
        session = self.scoped_session()
        while True:
            batch = write_queue.get()
            if batch is None:  # Stop signal
                break

            retry_count = 5  # Retry up to 5 times
            while retry_count > 0:
                try:
                    session.bulk_save_objects([UserActivity(**activity) for activity in batch])
                    session.commit()
                    break
                except Exception as e:
                    session.rollback()
                    retry_count -= 1
                    if retry_count == 0:
                        print(f"Error in writer thread after retries: {e}")
                    else:
                        print(f"Retrying writer thread due to: {e}")
        session.close()
        

    def populate_concurrently(self, dataframe, num_threads=4):

        # Keep track of worker threads
        threads = [] 
        
        # Determine the size of each chunk to balance workload and minimize thread contention.
        chunk_size = max(len(dataframe) // (num_threads * 2), 1000) 
        
        write_queue = queue.Queue() # Queue for passing data chunks to the writer thread

        # A shared progress bar to visually track the number of rows inserted
        with tqdm(total=len(dataframe), desc="Inserting Rows", unit="row") as progress_bar:
            
            #  A single writer thread manages manages batch writes to reduce contention
            writer = threading.Thread(target=self.writer_thread, args=(write_queue,))
            writer.start()

            # Launch worker threads to process data chunks into database representation
            for i in range(num_threads):
                
                start = i * chunk_size
                end = (i + 1) * chunk_size if i < num_threads - 1 else len(dataframe)
                # Map the data_chunk using the beginning and end indices
                data_chunk = dataframe.iloc[start:end]

                # Each thread processes a chunk of data and prepares it for insertion.
                # The worker thread then passes the results to the write_queue.
                thread = threading.Thread(target=self.worker_thread, args=(data_chunk, write_queue, progress_bar))
                threads.append(thread)
                thread.start()

            # The join() method waits for threads to finish before continuing
            for thread in threads:
                thread.join()

            # Notify writer_thread that all work is complete by sending None
            write_queue.put(None)

            writer.join()

        print("Data inserted successfully using threads and batch writes.")

# Example usage
db_path = 'user_activities.db'

# Remove existing database

if os.path.exists(db_path):
    os.remove(db_path)

dataframes = pd.read_csv('dataframes.csv')
db = DatabaseHandler(db_path)
db.populate_concurrently(dataframes, num_threads=4)
db.close()

print("Populated")


Inserting Rows: 100%|███████████████████████████████████████████████████████| 145261/145261 [00:22<00:00, 6332.29row/s]

Data inserted successfully using threads and batch writes.
Database connections closed.
Populated



