# DB Connector Showcase

This jupyter notebook demonstrates some of the functionality of the **Database** class to automatically connect to the remote server for the word_editor project.

Import the Database class from the db_connector file

***Make sure that you have installed the following python packages with pip for using the connector:*** 
- pymysql 
- cryptography

In [1]:
from db_connector import Database

{'id': 29, 'username': 'smartin10', 'text': 'They be on the way', 'correction': 'They are on the way', 'tokens_used': 1, 'created_at': datetime.datetime(2025, 5, 12, 17, 0)}


Below is the basic functionality to use the db connector

In [2]:
# Instantiate an instance of the Database connector
db = Database()

# PERFORM DATABASE FUNCTIONS HERE

# Close the connection
db.close()

## Testing Users Table Functionality

In [3]:
# Instantiate the Database
db = Database()

# Test username and password
test_username = "test_user_123"
test_password = "secure_pass"

# 1. Check that the user doesn't exist
print("User exists (should be False):", db.username_exists(test_username))

# 2. Create the user
user_created = db.register_user(test_username, test_password)
print("User creation successful (should be True):", user_created)

# 3. Check that the user now exists
print("User exists (should be True):", db.username_exists(test_username))

# 4. Check if password is correct
print("Password correct (should be True):", db.check_password(test_username, test_password))

# 5. Check initial user level
print("Initial user type (should be 'free'):", db.get_user_type(test_username))

# 6. Check initial token count
print("Initial user tokens (should be 0):", db.get_tokens(test_username))

# 7. Change user level to 'paid'
db.alter_user_type(test_username, 'paid')

# 8. Check updated user level
print("Updated user type (should be 'paid'):", db.get_user_type(test_username))

# 9. Add 10 tokens
db.alter_tokens(test_username, 10)

# 10. Check token count (should be 10)
print("Updated user tokens (should be 10):", db.get_tokens(test_username))

# 11. Subtract 5 tokens
db.alter_tokens(test_username, -5)

# 12. Check token count again (should be 5)
print("Final user tokens (should be 5):", db.get_tokens(test_username))

# 13. Delete the user
user_deleted = db.delete_user(test_username)
print("User deletion successful (should be True):", user_deleted)

# 14. Confirm deletion
print("User exists after deletion (should be False):", db.username_exists(test_username))

# Close the database connection
db.close()


User exists (should be False): False
User creation successful (should be True): True
User exists (should be True): True
Password correct (should be True): True
Initial user type (should be 'free'): free
Initial user tokens (should be 0): 0
Updated user type (should be 'paid'): paid
Updated user tokens (should be 10): 10
Final user tokens (should be 5): 5
User deletion successful (should be True): True
User exists after deletion (should be False): False


## Testing Blacklist Table Functionality

In [4]:
# Instantiate the Database
db = Database()

# Test word
test_word = "inappropriate_word_123"

# 1. Check that the word doesn't exist in the blacklist
print("Blacklist word exists (should be False):", db.blacklist_word_exists(test_word))

# 2. Add the word to the blacklist
word_added = db.add_blacklist_word(test_word)
print("Word added to blacklist (should be True):", word_added)

# 3. Check that the word now exists
print("Blacklist word exists (should be True):", db.blacklist_word_exists(test_word))

# 4. Check if the word is in the list of unreviewed blacklist words
unreviewed_words = db.get_unreviewed_blacklist_words()
word_in_unreviewed = any(word['word'] == test_word for word in unreviewed_words)
print("Word is in unreviewed blacklist words (should be True):", word_in_unreviewed)

# 5. Mark the word as reviewed
db.blacklist_alter_reviewed(test_word, 1)

# 6. Verify the word is no longer in unreviewed blacklist words
unreviewed_words_updated = db.get_unreviewed_blacklist_words()
word_still_unreviewed = any(word['word'] == test_word for word in unreviewed_words_updated)
print("Word is in unreviewed blacklist words after review (should be False):", word_still_unreviewed)

# 7. Delete the word from the blacklist
word_deleted = db.delete_blacklist_word(test_word)
print("Word deletion successful (should be True):", word_deleted)

# 8. Confirm the word no longer exists
print("Blacklist word exists after deletion (should be False):", db.blacklist_word_exists(test_word))

# Close the database connection
db.close()


Blacklist word exists (should be False): False
Word added to blacklist (should be True): True
Blacklist word exists (should be True): True
Word is in unreviewed blacklist words (should be True): True
Word is in unreviewed blacklist words after review (should be False): False
Word deletion successful (should be True): True
Blacklist word exists after deletion (should be False): False


## Testing Complaint Table Functionality

In [5]:
# Instantiate the Database
db = Database()

# Test users
reporter = "complaint_tester_1"
reportee = "complaint_tester_2"
complaint_text = "This user submitted inappropriate edits."
dispute_text = "That is not true, my edits were valid."

# Ensure both users exist
if not db.username_exists(reporter):
    db.register_user(reporter, "test123")
if not db.username_exists(reportee):
    db.register_user(reportee, "test123")

# 1. Add a complaint
complaint_added = db.add_complaint(reporter, reportee, complaint_text)
print("Complaint successfully added (should be True):", complaint_added)

# 2. Retrieve undisputed complaints against reportee
undisputed_complaints = db.get_undisputed_complaints(reportee)
print(f"Undisputed complaints found for {reportee} (should be >= 1):", len(undisputed_complaints))

# Store ID of the complaint we just added
complaint_id = undisputed_complaints[0]['id']
print("Complaint ID:", complaint_id)

# 3. Verify complaint exists
print("Complaint exists (should be True):", db.complaint_exists(complaint_id))

# 4. Add a dispute to the complaint
dispute_added = db.alter_dispute(complaint_id, reportee, dispute_text)
print("Dispute successfully added (should be True):", dispute_added)

# 5. Mark the complaint as reviewed
reviewed = db.complaints_alter_reviewed(complaint_id, 1)
print("Complaint marked as reviewed (should be True):", reviewed)

# 6. Check for unreviewed complaints (should not include the above)
unreviewed_complaints = db.get_unreviewed_complaints()
is_still_unreviewed = any(c['id'] == complaint_id for c in unreviewed_complaints)
print("Complaint still in unreviewed list (should be False):", is_still_unreviewed)

# 7. Delete the complaint
deleted = db.delete_complaint(complaint_id)
print("Complaint deletion successful (should be True):", deleted)

# 8. Confirm deletion
print("Complaint exists after deletion (should be False):", db.complaint_exists(complaint_id))

# Clean up test users
db.delete_user(reporter)
db.delete_user(reportee)

# Close the database connection
db.close()


Complaint successfully added (should be True): True
Undisputed complaints found for complaint_tester_2 (should be >= 1): 1
Complaint ID: 140
Complaint exists (should be True): True
Dispute successfully added (should be True): True
Complaint marked as reviewed (should be True): True
Complaint still in unreviewed list (should be False): False
Complaint deletion successful (should be True): True
Complaint exists after deletion (should be False): False


## Testing Rejected Corrections Table Functionality

In [6]:
# Instantiate the Database
db = Database()

# Test user and correction data
username = "correction_tester"
correction_text = "They replaced colour with color."
reason_text = "Spelling doesnt match UK English style."

# Ensure user exists
if not db.username_exists(username):
    db.register_user(username, "test123")

# 1. Add a rejected correction
correction_added = db.add_correction(username, correction_text, reason_text)
print("Correction added (should be True):", correction_added)

# 2. Retrieve unreviewed corrections
unreviewed = db.get_unreviewed_corrections()
print("Unreviewed corrections found (should be >= 1):", len(unreviewed))

# Get the ID of the correction just added
correction_id = unreviewed[0]['id']
print("Correction ID:", correction_id)

# 3. Check if the correction exists
print("Correction exists (should be True):", db.check_correction_exists(correction_id))

# 4. Mark correction as reviewed
reviewed = db.corrections_alter_reviewed(correction_id, 1)
print("Correction marked as reviewed (should be True):", reviewed)

# 5. Check that it no longer appears in unreviewed list
still_unreviewed = any(c['id'] == correction_id for c in db.get_unreviewed_corrections())
print("Correction still in unreviewed list (should be False):", still_unreviewed)

# 6. Delete the correction
deleted = db.delete_correction(correction_id)
print("Correction deletion successful (should be True):", deleted)

# 7. Confirm deletion
print("Correction exists after deletion (should be False):", db.check_correction_exists(correction_id))

# Clean up user
db.delete_user(username)

# Close the database connection
db.close()


Correction added (should be True): True
Unreviewed corrections found (should be >= 1): 18
Correction ID: 22
Correction exists (should be True): True
Correction marked as reviewed (should be True): True
Correction still in unreviewed list (should be False): False
Correction deletion successful (should be True): True
Correction exists after deletion (should be False): False


## Testing Rejected Corrections Table Functionality

In [7]:
# Instantiate the Database
db = Database()

# Test data
username = "approved_word_tester"
word = "technobabble"

# Ensure the user exists
if not db.username_exists(username):
    db.register_user(username, "test123")

# 1. Add the approved word
word_added = db.add_approved_word(username, word)
print("Approved word added (should be True):", word_added)

# 2. Retrieve all approved words for the user
user_words = db.get_user_words(username)
print("Approved words for user:", user_words)

# 3. Get the ID of the word we just added (assuming most recent)
with db.connection.cursor() as cursor:
    cursor.execute(f"SELECT id FROM approved_words WHERE username = %s AND word = %s ORDER BY id DESC LIMIT 1", (username, word))
    result = cursor.fetchone()
    word_id = result['id'] if result else None

if word_id is None:
    print("❌ Error: Word ID not found.")
else:
    # 4. Check if word exists by ID
    exists = db.check_approved_word_exists(word_id)
    print("Approved word exists by ID (should be True):", exists)

    # 5. Delete the word
    deleted = db.delete_approved_word(word_id)
    print("Approved word deletion successful (should be True):", deleted)

    # 6. Confirm deletion
    still_exists = db.check_approved_word_exists(word_id)
    print("Approved word exists after deletion (should be False):", still_exists)

# Clean up test user
db.delete_user(username)

# Close connection
db.close()


Approved word added (should be True): True
Approved words for user: [{'word': 'technobabble'}]
Approved word exists by ID (should be True): True
Approved word deletion successful (should be True): True
Approved word exists after deletion (should be False): False


## Testing User History Table Functionality

In [9]:
# Regression Test: user_history table

db = Database()

# Test data
username = "jdoe1"
text = "Original text"
correction = "Corrected text"
tokens_used = 5

# Add user history
assert db.add_user_history(username, text, correction, tokens_used) == True, "Failed to add user history"

# Fetch all histories for the user
histories = db.get_user_history_by_user(username)
assert any(h['text'] == text and h['correction'] == correction for h in histories), "Inserted history not found"

# Get last inserted history ID (assuming auto-increment ID is used)
last_history = histories[-1]
history_id = last_history['id']

# Fetch by ID
fetched = db.get_user_history_by_id(history_id)
assert fetched['text'] == text, "Mismatch in fetched text"
assert fetched['correction'] == correction, "Mismatch in fetched correction"

# Update history
updated_text = "Updated text"
updated_correction = "Updated correction"
updated_tokens = 10
assert db.update_history_content(history_id, updated_text, updated_correction, updated_tokens) == True, "Failed to update history"

# Confirm update
updated = db.get_user_history_by_id(history_id)
assert updated['text'] == updated_text, "Text not updated correctly"
assert updated['correction'] == updated_correction, "Correction not updated correctly"
assert updated['tokens_used'] == updated_tokens, "Tokens not updated correctly"

# Delete history
assert db.delete_user_history(history_id) == True, "Failed to delete history"
assert db.get_user_history_by_id(history_id) is None, "Deleted history still accessible"

print("✅ user_history regression test passed.")


✅ user_history regression test passed.


## Testing Collaborators Table Functionality

In [None]:
# Regression Test: collaborators table

db = Database()

# Test data
inviter = "test_inviter"
collaborator = "test_collaborator"
text = "Shared text"
correction = "Shared correction"
received = False

# Add collaboration
db.add_collaboration(inviter, collaborator, text, correction, received)

# Retrieve all to get last ID
collabs = db.query("SELECT * FROM collaborators WHERE inviter = %s ORDER BY id DESC", (inviter,))
assert collabs, "No collaboration record found after insert"
collab_id = collabs[0]['id']

# Fetch by ID
fetched = db.get_collaboration_by_id(collab_id)
assert fetched['text'] == text, "Text mismatch"
assert fetched['correction'] == correction, "Correction mismatch"
assert fetched['received'] == int(received), "Received status mismatch"

# Update collaboration
new_text = "New shared text"
new_correction = "New shared correction"
new_received = True
assert db.update_collaboration(collab_id, new_text, new_correction, new_received) == True, "Failed to update collaboration"

# Confirm update
updated = db.get_collaboration_by_id(collab_id)
assert updated['text'] == new_text, "Text not updated correctly"
assert updated['correction'] == new_correction, "Correction not updated correctly"
assert updated['received'] == int(new_received), "Received not updated correctly"

# Delete collaboration
assert db.delete_collaboration(collab_id) == True, "Failed to delete collaboration"
assert db.get_collaboration_by_id(collab_id) is None, "Deleted collaboration still accessible"

print("✅ collaborators regression test passed.")


✅ collaborators regression test passed.
