<a href="https://colab.research.google.com/github/cn23070/cn23070.github.io/blob/main/Review_Module4EoMAssignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1. Import Required Libraries

In [None]:
import sqlite3  # For SQLite database operations
from hashlib import sha256  # For generating checksums and password hashing
from datetime import datetime  # For handling timestamps
from cryptography.fernet import Fernet  # For encryption and decryption
import os  # For secure key storage
import getpass  # For securely getting the password input

2. Define Classes

In [None]:
#UserRole Enum
from enum import Enum

class UserRole(Enum):
    ADMIN = "admin"
    REGULAR_USER = "regular_user"

3. User Class

In [None]:
class User:
    def __init__(self, user_id, username, password_hash, role):
        self.user_id = user_id
        self.username = username
        self.password_hash = password_hash
        self.role = role

    def set_password(self, password):
        """Hashes the password and stores it securely."""
        self.password_hash = sha256(password.encode()).hexdigest()

    def check_password(self, password):
        """Verifies the entered password against the stored hash."""
        return self.password_hash == sha256(password.encode()).hexdigest()

    def get_role(self):
        """Returns the role of the user."""
        return self.role

    def get_user_id(self):
        """Returns the user ID."""
        return self.user_id

    def get_username(self):
        """Returns the username."""
        return self.username

    def get_password_hash(self):
        """Returns the password hash."""
        return self.password_hash

    def upload_artefact(self, artefact, db_manager):
        """Uploads an artefact to the database."""
        db_manager.store_artefact(artefact)




4. Administrator Class

In [None]:
class Administrator(User):
    def create_artefact(self, artefact, db_manager):
        """Allows the admin to create and store an artefact."""
        db_manager.store_artefact(artefact)

    def delete_artefact(self, artefact_id, db_manager):
        """Allows the admin to delete an artefact."""
        db_manager.delete_artefact(artefact_id)

    def view_artefact(self, artefact_id, db_manager):
        """Allows the admin to view an artefact."""
        return db_manager.retrieve_artefact(artefact_id)


5. RegularUser Class

In [None]:
class RegularUser(User):
    def upload_artefact(self, artefact, db_manager):
        """Allows the user to upload their own artefact."""
        if artefact.owner_id == self.user_id:
            db_manager.store_artefact(artefact)

    def modify_artefact(self, artefact, db_manager):
        """Allows the user to modify their own artefact."""
        existing_artefact = db_manager.retrieve_artefact(artefact.artefact_id)
        if existing_artefact.owner_id == self.user_id:
            db_manager.update_artefact(artefact)

    def view_artefact(self, artefact_id, db_manager):
        """Allows the user to view an artefact."""
        artefact = db_manager.retrieve_artefact(artefact_id)
        if artefact.owner_id == self.user_id or self.role == UserRole.ADMIN:
            return artefact


6. Artifact Class

In [None]:
class Artefact:
    def __init__(self, artefact_id, title, content, owner_id):
        self.artefact_id = artefact_id
        self.title = title
        self.content = content
        self.checksum = self.generate_checksum(content)
        self.encrypted_content = None
        self.timestamps = self.create_timestamp()
        self.owner_id = owner_id

    def generate_checksum(self, data):
        """Generates a checksum to verify data integrity."""
        return sha256(data).hexdigest()

    def encrypt_content(self, encryption_manager):
        """Encrypts the content using the EncryptionManager."""
        self.encrypted_content = encryption_manager.encrypt(self.content)

    def decrypt_content(self, encryption_manager):
        """Decrypts the content using the EncryptionManager."""
        return encryption_manager.decrypt(self.encrypted_content)

    def create_timestamp(self):
        """Creates a timestamp for the artefact."""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


7. EncryptionManager Class

In [None]:
class EncryptionManager:
    def __init__(self):
        """Initializes the encryption manager with a secure key."""
        self.key = Fernet.generate_key()
        self.cipher = Fernet(self.key)

    def encrypt(self, data):
        """Encrypts the provided data."""
        return self.cipher.encrypt(data)

    def decrypt(self, data):
        """Decrypts the provided data."""
        return self.cipher.decrypt(data)


8. ChecksumManager Class

In [None]:
class ChecksumManager:
    @staticmethod
    def generate_checksum(data):
        """Generates a checksum for the provided data."""
        return sha256(data).hexdigest()

    @staticmethod
    def verify_checksum(data, checksum):
        """Verifies the checksum of the provided data."""
        return sha256(data).hexdigest() == checksum


9. TimestampManager Class

In [None]:
class TimestampManager:
    @staticmethod
    def create_timestamp():
        """Creates a current timestamp."""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    @staticmethod
    def update_timestamp():
        """Updates the timestamp."""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


10. DatabaseManager Class (with SQLite Integration)

In [None]:
class DatabaseManager:
    def __init__(self, db_name="secure_enclave.db"):
        """Initializes the database manager and connects to the SQLite database."""
        self.db_name = db_name
        self.connect_to_db()

    def connect_to_db(self):
        """Connects to the SQLite database."""
        self.db_connection = sqlite3.connect(self.db_name)
        self.db_cursor = self.db_connection.cursor()
        self.create_tables()

    def create_tables(self):
        """Creates the necessary tables if they don't exist."""
        self.db_cursor.execute('''CREATE TABLE IF NOT EXISTS Users (
                                    user_id INTEGER PRIMARY KEY AUTOINCREMENT,
                                    username TEXT UNIQUE NOT NULL,
                                    password_hash TEXT NOT NULL,
                                    role TEXT NOT NULL)''')

        self.db_cursor.execute('''CREATE TABLE IF NOT EXISTS Artefacts (
                                    artefact_id INTEGER PRIMARY KEY AUTOINCREMENT,
                                    title TEXT NOT NULL,
                                    content BLOB NOT NULL,
                                    checksum TEXT NOT NULL,
                                    encrypted_content BLOB NOT NULL,
                                    timestamps TEXT NOT NULL,
                                    owner_id INTEGER NOT NULL,
                                    FOREIGN KEY(owner_id) REFERENCES Users(user_id))''')

        self.db_cursor.execute('''CREATE TABLE IF NOT EXISTS Logs (
                                    log_id INTEGER PRIMARY KEY AUTOINCREMENT,
                                    action TEXT NOT NULL,
                                    user_id INTEGER,
                                    artefact_id INTEGER,
                                    timestamp TEXT NOT NULL,
                                    FOREIGN KEY(user_id) REFERENCES Users(user_id),
                                    FOREIGN KEY(artefact_id) REFERENCES Artefacts(artefact_id))''')

        self.db_connection.commit()

    def store_artefact(self, artefact):
        """Stores an artefact in the database."""
        self.db_cursor.execute('''INSERT INTO Artefacts (title, content, checksum, encrypted_content, timestamps, owner_id)
                                  VALUES (?, ?, ?, ?, ?, ?)''',
                               (artefact.title, artefact.content, artefact.checksum,
                                artefact.encrypted_content, artefact.timestamps, artefact.owner_id))
        self.db_connection.commit()

    def retrieve_artefact(self, artefact_id):
        """Retrieves an artefact by its ID."""
        self.db_cursor.execute('SELECT * FROM Artefacts WHERE artefact_id = ?', (artefact_id,))
        row = self.db_cursor.fetchone()
        if row:
            return Artefact(row[0], row[1], row[2], row[6])
        return None

    def update_artefact(self, artefact):
        """Updates an existing artefact."""
        self.db_cursor.execute('''UPDATE Artefacts
                                  SET title = ?, content = ?, checksum = ?, encrypted_content = ?, timestamps = ?
                                  WHERE artefact_id = ?''',
                               (artefact.title, artefact.content, artefact.checksum,
                                artefact.encrypted_content, artefact.timestamps, artefact.artefact_id))
        self.db_connection.commit()

    def delete_artefact(self, artefact_id):
        """Deletes an artefact by its ID."""
        self.db_cursor.execute('DELETE FROM Artefacts WHERE artefact_id = ?', (artefact_id,))
        self.db_connection.commit()

    def get_user(self, user_id):
        """Retrieves user details by user ID."""
        self.db_cursor.execute('SELECT * FROM Users WHERE user_id = ?', (user_id,))
        row = self.db_cursor.fetchone()
        if row:
            return User(row[0], row[1], row[2], UserRole(row[3]))
        return None


11. Logger Class

In [None]:
class Logger:
    def __init__(self, db_manager):
        """Initializes the logger with a reference to the database manager."""
        self.db_manager = db_manager

    def log_event(self, event, user_id=None, artefact_id=None):
        """Logs a system event."""
        timestamp = TimestampManager.create_timestamp()
        self.db_manager.db_cursor.execute('''INSERT INTO Logs (action, user_id, artefact_id, timestamp)
                                             VALUES (?, ?, ?, ?)''',
                                          (event, user_id, artefact_id, timestamp))
        self.db_manager.db_connection.commit()

    def log_error(self, error, user_id=None):
        """Logs an error message."""
        timestamp = TimestampManager.create_timestamp()
        self.db_manager.db_cursor.execute('''INSERT INTO Logs (action, user_id, timestamp)
                                             VALUES (?, ?, ?)''',
                                          (f"ERROR: {error}", user_id, timestamp))
        self.db_manager.db_connection.commit()


12. Example Use

12.1 Initialise the database and encryption manager

In [None]:
# Initialize the database manager and encryption manager
db_manager = DatabaseManager()
encryption_manager = EncryptionManager()

12.2 Create admin user

In [None]:
admin = Administrator(user_id=1, username="admin", password_hash="", role=UserRole.ADMIN)
admin.set_password("adminpassword")

# Check if username already exists
db_manager.db_cursor.execute("SELECT 1 FROM Users WHERE username = ?", (admin.username,))
existing_user = db_manager.db_cursor.fetchone()

if not existing_user:  # If username doesn't exist
    db_manager.db_cursor.execute('''INSERT INTO Users (username, password_hash, role) VALUES (?, ?, ?)''',
                                (admin.username, admin.password_hash, admin.role.value))
    db_manager.db_connection.commit()
    print("Admin user created successfully.")
else:
    print("A user with this username already exists.")

A user with this username already exists.


Create regular user

In [None]:
user = User(user_id=2, username="craig", password_hash="", role=UserRole.REGULAR_USER)
user.set_password("craigpassword")

# Check if username already exists
db_manager.db_cursor.execute("SELECT 1 FROM Users WHERE username = ?", (user.username,))
existing_user = db_manager.db_cursor.fetchone()

if not existing_user:  # If username doesn't exist
    db_manager.db_cursor.execute('''INSERT INTO Users (username, password_hash, role) VALUES (?, ?, ?)''',
                                (user.username, user.password_hash, user.role.value))
    db_manager.db_connection.commit()
    print("User Craig created successfully.")
else:
    print("A user with this username already exists.")

A user with this username already exists.


12.3 Login

In [None]:
def login(username, password, db_manager):
    """Log in a user by verifying username and password."""
    db_manager.db_cursor.execute('SELECT * FROM Users WHERE username = ?', (username,))
    user_data = db_manager.db_cursor.fetchone()

    if user_data:
        user_id, username, password_hash, role = user_data
        # Verify password
        if sha256(password.encode()).hexdigest() == password_hash:
            if role == UserRole.ADMIN.value:
                print(f"Admin {username} logged in successfully.")
                return Administrator(user_id, username, password_hash, UserRole.ADMIN)
            else:
                print(f"User {username} logged in successfully.")
                return RegularUser(user_id, username, password_hash, UserRole.REGULAR_USER)
        else:
            print("Incorrect password.")
    else:
        print("Username not found.")

    return None

# Example usage

# Initialize the database manager
db_manager = DatabaseManager()

# Get user input for username and password
input_username = input("Enter your username: ")
input_password = getpass.getpass("Enter your password: ")  # This hides the password input for security

# Attempt to log in
logged_in_user = login(input_username, input_password, db_manager)

# Check if login was successful
if logged_in_user:
    print(f"Welcome, {logged_in_user.username}.")
else:
    print("Login failed.")

Enter your username: craig
Enter your password: ··········
User craig logged in successfully.
Welcome, craig.


12.3 Show user details

In [None]:
def show_user_details(username, db_manager):
    """Retrieve and display user details from the Users table."""
    db_manager.db_cursor.execute('SELECT * FROM Users WHERE username = ?', (username,))
    user_details = db_manager.db_cursor.fetchone()

    if user_details:
        user_id, username, password_hash, role = user_details
        print(f"User ID: {user_id}")
        print(f"Username: {username}")
        print(f"Password Hash: {password_hash}")
        print(f"Role: {role}")
    else:
        print("User not found.")

# Example usage
db_manager = DatabaseManager()

# Show details of the admin user
show_user_details("craig", db_manager)


User ID: 2
Username: craig
Password Hash: da05bcf4b13d5d7ed40875479b69bf223355b62a056210fc6a6c57272437d3d8
Role: regular_user


12.4 Show Artifacts in database

In [None]:
def show_all_artefacts(db_manager):
    """Retrieve and display all artefacts from the Artefacts table."""
    db_manager.db_cursor.execute('SELECT * FROM Artefacts')
    artefacts = db_manager.db_cursor.fetchall()

    if artefacts:
        for artefact in artefacts:
            artefact_id, title, content, checksum, encrypted_content, timestamps, owner_id = artefact
            print(f"Artefact ID: {artefact_id}")
            print(f"Title: {title}")
            print(f"Checksum: {checksum}")
            print(f"Timestamps: {timestamps}")
            print(f"Owner ID: {owner_id}")
            print("-" * 40)
    else:
        print("No artefacts found.")

# Show details of all artefacts
show_all_artefacts(db_manager)

No artefacts found.


User uploads artifact

12. Example Use

In [None]:
# User uploads an artefact
artefact_content = b"Swift"
artefact = Artefact(artefact_id=None, title="Taylor Swift Song", content=artefact_content, owner_id=user.user_id)
artefact.encrypt_content(encryption_manager)
user.upload_artefact(artefact, db_manager)


show_all_artefacts(db_manager)

Artefact ID: 1
Title: Taylor Swift Song
Checksum: ae8ed2743925477eafad0dfdb0d0832702634da4dc6ec1aefa4f3a61953e93f0
Timestamps: 2024-08-23 11:04:43
Owner ID: 2
----------------------------------------
