In [2]:
import argparse
import csv
import pandas as pd
from pymongo import MongoClient

def parse_arguments():
    parser = argparse.ArgumentParser(description="Script to insert data from a CSV file into a MongoDB collection")
    parser.add_argument("--mongo_uri", default="mongodb://localhost:27017/", help="MongoDB URI (default: mongodb://localhost:27017/)")
    parser.add_argument("--database", default="your_database", help="Name of the MongoDB database (default: your_database)")
    parser.add_argument("--collection", help="Name of the MongoDB collection")
    parser.add_argument("--insert", help="Path to the CSV file to insert data from")
    return parser.parse_args()
def read_csv(csv_file):
    data = []
    with open(csv_file, 'r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            data.append(row)
    return data

def insert_data_mongodb(data, mongo_uri, database, collection):
    client = MongoClient(mongo_uri)
    db = client[database]
    col = db[collection]
    col.insert_many(data)
    client.close()

def main():
    args = parse_arguments()
    mongo_uri = args.mongo_uri
    database = args.database
    collection = args.collection
    csv_file = args.insert

    if not collection:
        print("Error: Please provide the name of the collection.")
        return

    if not csv_file:
        print("Error: Please provide the path to the CSV file.")
        return

    data = read_csv(csv_file)
    insert_data_mongodb(data, mongo_uri, database, collection)
    print("Data inserted into MongoDB collection successfully.")

if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [-h] [--mongo_uri MONGO_URI] [--database DATABASE] [--collection COLLECTION]
                             [--insert INSERT]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\stuff\AppData\Roaming\jupyter\runtime\kernel-7dc33b0a-7125-42ae-8585-d62292faabaa.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [3]:
import argparse
import csv
import pandas as pd
from pymongo import MongoClient

def parse_arguments():
    parser = argparse.ArgumentParser(description="Script to insert data from a CSV/Excel file into a MongoDB collection and create reports")
    parser.add_argument("--mongo_uri", default="mongodb://localhost:27017/", help="MongoDB URI (default: mongodb://localhost:27017/)")
    parser.add_argument("--database", default="your_database", help="Name of the MongoDB database (default: your_database)")
    parser.add_argument("--collection", help="Name of the MongoDB collection")
    parser.add_argument("--insert", help="Path to the CSV/Excel file to insert data from")
    parser.add_argument("--create_report", action="store_true", help="Flag to indicate creating reports")
    return parser.parse_args()

def read_data(file_path):
    if file_path.endswith('.csv'):
        return pd.read_csv(file_path, encoding='utf-8')
    elif file_path.endswith('.xlsx'):
        return pd.read_excel(file_path)
    else:
        raise ValueError("Unsupported file format. Please provide a CSV or Excel file.")


def insert_data_mongodb(data, mongo_uri, database, collection):
    client = MongoClient(mongo_uri)
    db = client[database]
    col = db[collection]
    col.insert_many(data.to_dict('records'))
    client.close()

def create_report(data):
    # Example: Creating a report based on the provided data
    # This can be customized based on specific requirements
    report = data.groupby('employee')['score'].mean()
    return report

def main():
    args = parse_arguments()
    mongo_uri = args.mongo_uri
    database = args.database
    collection = args.collection
    file_path = args.insert
    create_report_flag = args.create_report

    if not collection:
        print("Error: Please provide the name of the collection.")
        return

    if not file_path:
        print("Error: Please provide the path to the CSV/Excel file.")
        return

    data = read_data(file_path)
    insert_data_mongodb(data, mongo_uri, database, collection)
    print("Data inserted into MongoDB collection successfully.")

    if create_report_flag:
        report = create_report(data)
        print("Report:")
        print(report)

if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] [--mongo_uri MONGO_URI] [--database DATABASE] [--collection COLLECTION]
                             [--insert INSERT] [--create_report]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\stuff\AppData\Roaming\jupyter\runtime\kernel-7dc33b0a-7125-42ae-8585-d62292faabaa.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [9]:
import argparse
import os
import pandas as pd
from pymongo import MongoClient

def parse_arguments():
    parser = argparse.ArgumentParser(description="Script to insert data from an Excel file into a MongoDB collection and create reports")
    parser.add_argument("--mongo_uri", default="mongodb://localhost:27017/", help="MongoDB URI (default: mongodb://localhost:27017/)")
    parser.add_argument("--database", default="your_database", help="Name of the MongoDB database (default: your_database)")
    parser.add_argument("--collection", help="Name of the MongoDB collection")
    parser.add_argument("--excel_file", help="Path to the Excel file to insert data from")
    parser.add_argument("--create_report", action="store_true", help="Flag to indicate creating reports")
    return parser.parse_args()

def read_excel_file(file_path):
    return pd.read_excel(file_path)

def insert_data_mongodb(data, mongo_uri, database, collection):
    client = MongoClient(mongo_uri)
    db = client[database]
    col = db[collection]
    col.insert_many(data.to_dict('records'))
    client.close()

def create_report(data):
    # Example: Creating a report based on the provided data
    # This can be customized based on specific requirements
    report = data.groupby('employee')['score'].mean()
    return report

def main():
    args = parse_arguments()
    mongo_uri = args.mongo_uri
    database = args.database
    collection = args.collection
    excel_file = args.excel_file
    create_report_flag = args.create_report

    if not collection:
        print("Error: Please provide the name of the collection.")
        return

    if not excel_file:
        print("Error: Please provide the path to the Excel file.")
        return

    if not os.path.isfile(excel_file):
        print("Error: The specified Excel file does not exist.")
        return

    data = read_excel_file(excel_file)
    insert_data_mongodb(data, mongo_uri, database, collection)
    print("Data inserted into MongoDB collection successfully.")

    if create_report_flag:
        report = create_report(data)
        print("Report:")
        print(report)

if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] [--mongo_uri MONGO_URI] [--database DATABASE] [--collection COLLECTION]
                             [--excel_file EXCEL_FILE] [--create_report]
ipykernel_launcher.py: error: unrecognized arguments: -f C:\Users\stuff\AppData\Roaming\jupyter\runtime\kernel-7dc33b0a-7125-42ae-8585-d62292faabaa.json


SystemExit: 2

In [8]:
import pandas as pd
from pymongo import MongoClient

def clean_collections(collection1, collection2):
    unique_keys = set()
    bad_data_ids = set()

    # Retrieve data from collection1 and populate the hash set
    for document in collection1.find():
        unique_keys.add(document["_id"])

        # Validate and filter data
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Retrieve data from collection2 and populate the hash set
    for document in collection2.find():
        unique_keys.add(document["_id"])

        # Validate and filter data
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Deduplicate data and remove bad data
    for document in collection1.find():
        if document["_id"] in unique_keys:
            unique_keys.remove(document["_id"])
        else:
            # If the document is a duplicate or bad data, remove it from collection1
            collection1.delete_one({"_id": document["_id"]})

    for document in collection2.find():
        if document["_id"] in unique_keys:
            unique_keys.remove(document["_id"])
        else:
            # If the document is a duplicate or bad data, remove it from collection2
            collection2.delete_one({"_id": document["_id"]})

    # Remove bad data documents
    for bad_id in bad_data_ids:
        collection1.delete_one({"_id": bad_id})
        collection2.delete_one({"_id": bad_id})

# Function to validate and filter data (example implementation)
def is_bad_data(document):
    # Define a list of required fields
    required_fields = ["Test #", "Build #", "Category", "Test Case", "Expected Result", "Actual Result", "Repeatable?", "Blocker?", "Test Owner"]

    # Check if any required field is missing
    for field in required_fields:
        if field not in document:
            return True  # Document is bad if any required field is missing
    return False  # Document is not bad if all required fields are present


def main():
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017/")
    db = client["your_database"]
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]
    
    clean_collections(collection1, collection2)
    # Read the Excel file
    excel_file = "EG4-DBDump.xlsx"
    data = pd.read_excel(excel_file)

    # Insert data into MongoDB collection
    collection2.insert_many(data.to_dict('records'))
    print("Data inserted into MongoDB collection successfully.")

    # Close MongoDB connection
    client.close()

if __name__ == "__main__":
    main()


Data inserted into MongoDB collection successfully.


In [9]:
import pandas as pd
from pymongo import MongoClient

def clean_collections(collection1, collection2):
    bad_data_ids = set()

    # Clean collection1
    for document in collection1.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Clean collection2
    for document in collection2.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Remove bad data documents
    for bad_id in bad_data_ids:
        collection1.delete_one({"_id": bad_id})
        collection2.delete_one({"_id": bad_id})

# Function to validate and filter data (example implementation)
def is_bad_data(document):
    # Define a list of required fields
    required_fields = ["Test #", "Build #", "Category", "Test Case", "Expected Result", "Actual Result", "Repeatable?", "Blocker?", "Test Owner"]

    # Check if any required field is missing
    for field in required_fields:
        if field not in document:
            return True  # Document is bad if any required field is missing
    return False  # Document is not bad if all required fields are present


def main():
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017/")
    db = client["your_database"]
    collection1 = db["Collection1"]  # Adjust collection names as needed
    collection2 = db["Collection2"]

    # Clean collections
    clean_collections(collection1, collection2)

    # Read the Excel file and dropna to remove empty rows
    excel_file = "EG4-DBDump.xlsx"
    data = pd.read_excel(excel_file).dropna()

    # Insert data into MongoDB collection
    collection2.insert_many(data.to_dict('records'))
    print("Data inserted into MongoDB collection successfully.")

    # Close MongoDB connection
    client.close()

if __name__ == "__main__":
    main()


Data inserted into MongoDB collection successfully.


In [51]:
from datetime import datetime
import pandas as pd
from pymongo import MongoClient

def clean_collections(collection1, collection2):
    bad_data_ids = set()

    # Clean collection1
    for document in collection1.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Clean collection2
    for document in collection2.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Remove bad data documents
    for bad_id in bad_data_ids:
        collection1.delete_one({"_id": bad_id})
        collection2.delete_one({"_id": bad_id})

# Function to validate and filter data (example implementation)
def is_bad_data(document):
    # Define a list of required fields
    required_fields = ["Test #", "Build #", "Category", "Test Case", "Expected Result", "Actual Result", "Repeatable?", "Blocker?", "Test Owner"]

    # Check if any required field is missing
    for field in required_fields:
        if field not in document:
            return True  # Document is bad if any required field is missing
    return False  # Document is not bad if all required fields are present

def list_entries_by_user(db, user_id):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Set to store unique entries
    unique_entries = set()

    # List entries for the specific user from both collections
    print(f"Unique entries for user {user_id}:")
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        print(f"Entries from {collection_name}:")
        for document in collection.find({"Test Owner": user_id}):
            entry_str = ", ".join([f"{key}: {value}" for key, value in document.items()])
            if entry_str not in unique_entries:
                unique_entries.add(entry_str)
                print(entry_str)
                print("-------------------------")  # Separator between documents

    # Count the number of unique entries
    count = len(unique_entries)
    
    return count



def count_blocker_and_repeater_bugs(db):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Count the number of blocker and repeater bugs
    blocker_count = 0
    repeater_count = 0
    for collection in [collection1, collection2]:
        for document in collection.find({}, {"_id": 0, "Blocker?": 1, "Repeatable?": 1}):
            if document.get("Blocker?") == "Yes":
                blocker_count += 1
            if document.get("Repeatable?") == "Yes":
                repeater_count += 1

    return blocker_count, repeater_count

def find_reports_on_build(db, build_date):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Convert input date to a datetime object
    formatted_build_date = datetime.strptime(build_date, "%m/%d/%Y").date()

    # Print formatted build date
    print("Formatted build date:", formatted_build_date)

    # Retrieve all reports on the specified build date from both collections (no duplicates)
    print(f"Entries on build date {build_date}:")
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        print(f"Entries from {collection_name}:")
        for document in collection.find():
            # Check if the "Build #" field is a datetime object and compare the date part
            if isinstance(document.get("Build #"), datetime) and document.get("Build #").date() == formatted_build_date:
                # Print the document
                print(document)
                print("-------------------------")  # Separator between documents

def get_first_middle_last_test_cases(db):
    collection2 = db["Collection2"]

    # Retrieve all test cases from collection 2
    test_cases = []
    for document in collection2.find({}, {"_id": 0, "Test #": 1}):
        test_cases.append(document["Test #"])

    # Find the first, middle, and last test cases
    first_test_case = test_cases[0]
    middle_test_case = test_cases[len(test_cases) // 2]  # Determine the middle test case
    last_test_case = test_cases[-1]

    return first_test_case, middle_test_case, last_test_case

def main():
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017/")
    db = client["your_database"]
    collection1 = db["Collection1"]  # Adjust collection names as needed
    collection2 = db["Collection2"]

    # Clean collections
    clean_collections(collection1, collection2)

    # Read the Excel file and dropna to remove empty rows
    excel_file = "EG4-DBDump.xlsx"
    data = pd.read_excel(excel_file).dropna()

    # Insert data into MongoDB collection
    collection2.insert_many(data.to_dict('records'))
    print("Data inserted into MongoDB collection successfully.")
    
    userId = "Kevin Chaja"
    user_entries = list_entries_by_user(db,userId)
    print("Entries for user", userId, ":", user_entries)

    blocker_count, repeater_count = count_blocker_and_repeater_bugs(db)
    print("Number of blocker bugs:", blocker_count)
    print("Number of repeater bugs:", repeater_count)

    reports_on_build = find_reports_on_build(db, "3/19/2024")
    print("Reports on build date: ", reports_on_build)

    first_test_case, middle_test_case, last_test_case = get_first_middle_last_test_cases(db)
    print("First test case: ", first_test_case)
    print("Middle test case: ", middle_test_case)
    print("Last test case: ", last_test_case)

    # Close MongoDB connection
    client.close()

if __name__ == "__main__":
    main()


Data inserted into MongoDB collection successfully.
Unique entries for user Kevin Chaja:
Entries from Collection1:
Entries from Collection2:
_id: 660ccd3ac0d474b140be2629, Test #: 1, Build #: 2024-03-18 00:00:00, Category: UI, Test Case: Bug: Settings, Expected Result: Close settings tab when hitting the x button, Actual Result: Would not close, Repeatable?: Yes, Blocker?: No, Test Owner: Kevin Chaja
-------------------------
_id: 660ccd3ac0d474b140be262a, Test #: 2, Build #: 2024-03-18 00:00:00, Category: UI, Test Case: Bug: Adventure mode, Expected Result: Play adventure mode, Actual Result: Still does not atleast let you leave the mode to go back to main menu even though it does not work currently, Repeatable?: Yes, Blocker?: Yes, Test Owner: Kevin Chaja
-------------------------
_id: 660ccd3ac0d474b140be262b, Test #: 3, Build #: 2024-03-18 00:00:00, Category: UI, Test Case: Bug: Tutorial coins, Expected Result: Show that you receive the coins/ indicator of the amount of coins you h

In [52]:
from datetime import datetime
import pandas as pd
from pymongo import MongoClient

def clean_collections(collection1, collection2):
    bad_data_ids = set()

    # Clean collection1
    for document in collection1.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Clean collection2
    for document in collection2.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Remove bad data documents
    for bad_id in bad_data_ids:
        collection1.delete_one({"_id": bad_id})
        collection2.delete_one({"_id": bad_id})

# Function to validate and filter data (example implementation)
def is_bad_data(document):
    # Define a list of required fields
    required_fields = ["Test #", "Build #", "Category", "Test Case", "Expected Result", "Actual Result", "Repeatable?", "Blocker?", "Test Owner"]

    # Check if any required field is missing
    for field in required_fields:
        if field not in document:
            return True  # Document is bad if any required field is missing
    return False  # Document is not bad if all required fields are present

def list_entries_by_user(db, user_id):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Set to store unique entries
    unique_entries = set()

    # List entries for the specific user from both collections
    output = []
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        for document in collection.find({"Test Owner": user_id}):
            entry_str = ", ".join([f"{key}: {value}" for key, value in document.items()])
            if entry_str not in unique_entries:
                unique_entries.add(entry_str)
                output.append(entry_str + "\n-------------------------\n")  # Separator between documents

    # Count the number of unique entries
    count = len(unique_entries)
    
    return count, output


def count_blocker_and_repeater_bugs(db):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Count the number of blocker and repeater bugs
    blocker_count = 0
    repeater_count = 0
    for collection in [collection1, collection2]:
        for document in collection.find({}, {"_id": 0, "Blocker?": 1, "Repeatable?": 1}):
            if document.get("Blocker?") == "Yes":
                blocker_count += 1
            if document.get("Repeatable?") == "Yes":
                repeater_count += 1

    return blocker_count, repeater_count

def find_reports_on_build(db, build_date):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Convert input date to a datetime object
    formatted_build_date = datetime.strptime(build_date, "%m/%d/%Y").date()

    # Retrieve all reports on the specified build date from both collections (no duplicates)
    output = []
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        for document in collection.find():
            # Check if the "Build #" field is a datetime object and compare the date part
            if isinstance(document.get("Build #"), datetime) and document.get("Build #").date() == formatted_build_date:
                # Add the document to the output list
                output.append(str(document) + "\n-------------------------\n")  # Separator between documents

    return output

def get_first_middle_last_test_cases(db):
    collection2 = db["Collection2"]

    # Retrieve all test cases from collection 2
    test_cases = []
    for document in collection2.find({}, {"_id": 0, "Test #": 1}):
        test_cases.append(document["Test #"])

    # Find the first, middle, and last test cases
    first_test_case = test_cases[0]
    middle_test_case = test_cases[len(test_cases) // 2]  # Determine the middle test case
    last_test_case = test_cases[-1]

    return first_test_case, middle_test_case, last_test_case

def write_output_to_file(output, file_name):
    with open(file_name, "w") as file:
        file.writelines(output)

def main():
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017/")
    db = client["your_database"]
    collection1 = db["Collection1"]  # Adjust collection names as needed
    collection2 = db["Collection2"]

    # Clean collections
    clean_collections(collection1, collection2)

    # Read the Excel file and dropna to remove empty rows
    excel_file = "EG4-DBDump.xlsx"
    data = pd.read_excel(excel_file).dropna()

    # Insert data into MongoDB collection
    collection2.insert_many(data.to_dict('records'))
    print("Data inserted into MongoDB collection successfully.")
    
    userId = "Kevin Chaja"
    user_entries_count, user_entries_output = list_entries_by_user(db, userId)
    write_output_to_file(user_entries_output, f"{userId}_entries.txt")
    print("Entries for user", userId, ":", user_entries_count)

    blocker_count, repeater_count = count_blocker_and_repeater_bugs(db)
    with open("blockers.txt", "w") as file:
        file.write("Number of blocker bugs: " + str(blocker_count))
    print("Number of blocker bugs:", blocker_count)
    with open("repeaters.txt", "w") as file:
        file.write("Number of repeater bugs: " + str(repeater_count))
    print("Number of repeater bugs:", repeater_count)

    reports_on_build_output = find_reports_on_build(db, "3/19/2024")
    write_output_to_file(reports_on_build_output, "reports_on_build.txt")
    print("Reports on build date: ", "3/19/2024")

    first_test_case, middle_test_case, last_test_case = get_first_middle_last_test_cases(db)
    with open("test_cases.txt", "w") as file:
        file.write("First test case: " + str(first_test_case) + "\n")
        file.write("Middle test case: " + str(middle_test_case) + "\n")
        file.write("Last test case: " + str(last_test_case) + "\n")
    print("First test case: ", first_test_case)
    print("Middle test case: ", middle_test_case)
    print("Last test case: ", last_test_case)

    # Close MongoDB connection
    client.close()

if __name__ == "__main__":
    main()


Data inserted into MongoDB collection successfully.
Entries for user Kevin Chaja : 44
Number of blocker bugs: 342
Number of repeater bugs: 592
Reports on build date:  3/19/2024
First test case:  1
Middle test case:  1
Last test case:  3


In [68]:
from datetime import datetime
import pandas as pd
from pymongo import MongoClient

def clean_collections(collection1, collection2):
    bad_data_ids = set()

    # Clean collection1
    for document in collection1.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Clean collection2
    for document in collection2.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Remove bad data documents
    for bad_id in bad_data_ids:
        collection1.delete_one({"_id": bad_id})
        collection2.delete_one({"_id": bad_id})

# Function to validate and filter data (example implementation)
def is_bad_data(document):
    # Define a list of required fields
    required_fields = ["Test #", "Build #", "Category", "Test Case", "Expected Result", "Actual Result", "Repeatable?", "Blocker?", "Test Owner"]

    # Check if any required field is missing
    for field in required_fields:
        if field not in document:
            return True  # Document is bad if any required field is missing
    return False  # Document is not bad if all required fields are present

def list_entries_by_user(db, user_id, csv_filename):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # List entries for the specific user from both collections
    entries = []
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        for document in collection.find({"Test Owner": user_id}):
            entries.append(document)

    # Create a DataFrame from the entries
    df = pd.DataFrame(entries)

    # Export DataFrame to CSV
    df.to_csv(csv_filename, index=False)

    return len(entries)

def count_blocker_and_repeater_bugs(db, blocker_filename, repeater_filename):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Lists to store blocker and repeater documents
    blocker_documents = []
    repeater_documents = []

    # Find blocker and repeater bugs and store their documents
    for collection in [collection1, collection2]:
        for document in collection.find({}, {"_id": 0, "Blocker?": 1, "Repeatable?": 1}):
            if document.get("Blocker?") == "Yes":
                blocker_documents.append(document)
            if document.get("Repeatable?") == "Yes":
                repeater_documents.append(document)

    # Export blocker documents to a file
    with open(blocker_filename, "w") as f:
        for doc in blocker_documents:
            f.write(str(doc) + "\n")

    # Export repeater documents to a file
    with open(repeater_filename, "w") as f:
        for doc in repeater_documents:
            f.write(str(doc) + "\n")

    return len(blocker_documents), len(repeater_documents)


def find_reports_on_build(db, build_date):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Convert input date to a datetime object
    formatted_build_date = datetime.strptime(build_date, "%m/%d/%Y").date()

    # Retrieve all reports on the specified build date from both collections (no duplicates)
    output = []
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        for document in collection.find():
            # Check if the "Build #" field is a datetime object and compare the date part
            if isinstance(document.get("Build #"), datetime) and document.get("Build #").date() == formatted_build_date:
                # Add the document to the output list
                output.append(str(document) + "\n-------------------------\n")  # Separator between documents

    return output

def get_first_middle_last_test_cases(db):
    collection2 = db["Collection2"]

    # Retrieve all test cases from collection 2
    test_cases = []
    for document in collection2.find({}, {"_id": 0, "Test #": 1}):
        test_cases.append(document["Test #"])

    # Find the first, middle, and last test cases
    first_test_case = test_cases[0]
    middle_test_case = test_cases[len(test_cases) // 2]  # Determine the middle test case
    last_test_case = test_cases[-1]
    

    return first_test_case, middle_test_case, last_test_case

def write_output_to_file(output, file_name):
    with open(file_name, "w") as file:
        file.writelines(output)

def main():
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017/")
    db = client["your_database"]
    collection1 = db["Collection1"]  # Adjust collection names as needed
    collection2 = db["Collection2"]

    # Clean collections
    clean_collections(collection1, collection2)

    # Read the Excel file and dropna to remove empty rows
    excel_file = "EG4-DBDump.xlsx"
    data = pd.read_excel(excel_file).dropna()

    # Insert data into MongoDB collection
    collection2.insert_many(data.to_dict('records'))
    print("Data inserted into MongoDB collection successfully.")
    
    userId = "Omar Monge"
    csv_filename = userId + ".csv"
    user_entries_count = list_entries_by_user(db, userId, csv_filename)
    print(f"Entries for user {userId} exported to {csv_filename}. Total entries: {user_entries_count}")

    blocker_filename = "blocker_bugs.txt"
    repeater_filename = "repeater_bugs.txt"
    blocker_count, repeater_count = count_blocker_and_repeater_bugs(db, blocker_filename, repeater_filename)
    print("Number of blocker bugs:", blocker_count)
    print("Number of repeater bugs:", repeater_count)
    print(f"Blocker bugs exported to {blocker_filename}")
    print(f"Repeater bugs exported to {repeater_filename}")

    

    reports_on_build_output = find_reports_on_build(db, "3/19/2024")
    write_output_to_file(reports_on_build_output, "reports_on_build.txt")
    print("Reports on build date: ", "3/19/2024")

    first_test_case, middle_test_case, last_test_case = get_first_middle_last_test_cases(db)
    with open("test_cases.txt", "w") as file:
        file.write("First test case: " + str(first_test_case) + "\n")
        file.write("Middle test case: " + str(middle_test_case) + "\n")
        file.write("Last test case: " + str(last_test_case) + "\n")
    print("First test case: ", first_test_case)
    print("Middle test case: ", middle_test_case)
    print("Last test case: ", last_test_case)

    # Close MongoDB connection
    client.close()

if __name__ == "__main__":
    main()


Data inserted into MongoDB collection successfully.
Entries for user Omar Monge exported to Omar Monge.csv. Total entries: 15
Number of blocker bugs: 342
Number of repeater bugs: 592
Blocker bugs exported to blocker_bugs.txt
Repeater bugs exported to repeater_bugs.txt
Reports on build date:  3/19/2024


NameError: name 'first_filename' is not defined

In [75]:
from datetime import datetime
import pandas as pd
from pymongo import MongoClient

def clean_collections(collection1, collection2):
    bad_data_ids = set()

    # Clean collection1
    for document in collection1.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Clean collection2
    for document in collection2.find():
        if is_bad_data(document):
            bad_data_ids.add(document["_id"])

    # Remove bad data documents
    for bad_id in bad_data_ids:
        collection1.delete_one({"_id": bad_id})
        collection2.delete_one({"_id": bad_id})

# Function to validate and filter data (example implementation)
def is_bad_data(document):
    # Define a list of required fields
    required_fields = ["Test #", "Build #", "Category", "Test Case", "Expected Result", "Actual Result", "Repeatable?", "Blocker?", "Test Owner"]

    # Check if any required field is missing
    for field in required_fields:
        if field not in document:
            return True  # Document is bad if any required field is missing
    return False  # Document is not bad if all required fields are present

def list_entries_by_user(db, user_id, csv_filename):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # List entries for the specific user from both collections
    entries = []
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        for document in collection.find({"Test Owner": user_id}):
            entries.append(document)

    # Create a DataFrame from the entries
    df = pd.DataFrame(entries)

    # Export DataFrame to CSV
    df.to_csv(csv_filename, index=False)

    return len(entries)

def count_blocker_and_repeater_bugs(db, blocker_filename, repeater_filename):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Initialize counters
    blocker_count = 0
    repeater_count = 0

    # Open files for writing documents
    with open(blocker_filename, "w") as blocker_file, open(repeater_filename, "w") as repeater_file:
        # Find and write documents for blocker bugs
        blocker_file.write("Blocker Bugs:\n\n")
        for collection in [collection1, collection2]:
            for document in collection.find({"Blocker?": "Yes"}):
                blocker_count += 1
                blocker_file.write(str(document) + "\n\n")

        # Find and write documents for repeater bugs
        repeater_file.write("Repeater Bugs:\n\n")
        for collection in [collection1, collection2]:
            for document in collection.find({"Repeatable?": "Yes"}):
                repeater_count += 1
                repeater_file.write(str(document) + "\n\n")

    return blocker_count, repeater_count


def find_reports_on_build(db, build_date):
    collection1 = db["Collection1"]
    collection2 = db["Collection2"]

    # Convert input date to a datetime object
    formatted_build_date = datetime.strptime(build_date, "%m/%d/%Y").date()

    # Retrieve all reports on the specified build date from both collections (no duplicates)
    output = []
    for collection_name, collection in [("Collection1", collection1), ("Collection2", collection2)]:
        for document in collection.find():
            # Check if the "Build #" field is a datetime object and compare the date part
            if isinstance(document.get("Build #"), datetime) and document.get("Build #").date() == formatted_build_date:
                # Add the document to the output list
                output.append(str(document) + "\n-------------------------\n")  # Separator between documents

    return output

def get_documents_for_test_cases(db):
    collection2 = db["Collection2"]

    # Retrieve all documents from collection 2
    documents = list(collection2.find())

    # Get indices for first, middle, and last documents
    first_index = 0
    middle_index = len(documents) // 2
    last_index = len(documents) - 1

    # Extract first, middle, and last documents
    first_document = documents[first_index]
    middle_document = documents[middle_index]
    last_document = documents[last_index]

    return first_document, middle_document, last_document

def write_output_to_file(output, file_name):
    with open(file_name, "w") as file:
        file.writelines(output)

def main():
    # Connect to MongoDB
    client = MongoClient("mongodb://localhost:27017/")
    db = client["your_database"]
    collection1 = db["Collection1"]  # Adjust collection names as needed
    collection2 = db["Collection2"]

    # Clean collections
    clean_collections(collection1, collection2)

    # Read the Excel file and dropna to remove empty rows
    excel_file = "EG4-DBDump.xlsx"
    data = pd.read_excel(excel_file).dropna()

    # Insert data into MongoDB collection
    collection2.insert_many(data.to_dict('records'))
    print("Data inserted into MongoDB collection successfully.")
    
    userId = "Omar Monge"
    csv_filename = userId + ".csv"
    user_entries_count = list_entries_by_user(db, userId, csv_filename)
    print(f"Entries for user {userId} exported to {csv_filename}. Total entries: {user_entries_count}")

    blocker_filename = "blocker_bugs.txt"
    repeater_filename = "repeater_bugs.txt"
    test_cases_filename = "firstMiddleLast.txt"
    
    blocker_count, repeater_count = count_blocker_and_repeater_bugs(db, blocker_filename, repeater_filename)
    print("Number of blocker bugs:", blocker_count)
    print("Number of repeater bugs:", repeater_count)
    print(f"Blocker bugs exported to {blocker_filename}")
    print(f"Repeater bugs exported to {repeater_filename}")

    

    reports_on_build_output = find_reports_on_build(db, "3/19/2024")
    write_output_to_file(reports_on_build_output, "reports_on_build.txt")
    print("Reports on build date: ", "3/19/2024")

    first_document, middle_document, last_document = get_documents_for_test_cases(db)

    # Write documents to file
    with open("test_cases.txt", "w") as f:
        f.write("First test case:\n")
        f.write(str(first_document) + "\n\n")
        f.write("Middle test case:\n")
        f.write(str(middle_document) + "\n\n")
        f.write("Last test case:\n")
        f.write(str(last_document) + "\n\n")

    # Close MongoDB connection
    client.close()

if __name__ == "__main__":
    main()


Data inserted into MongoDB collection successfully.
Entries for user Omar Monge exported to Omar Monge.csv. Total entries: 17
Number of blocker bugs: 171
Number of repeater bugs: 296
Blocker bugs exported to blocker_bugs.txt
Repeater bugs exported to repeater_bugs.txt
Reports on build date:  3/19/2024
