In [None]:
import os
from datetime import datetime
from dotenv import load_dotenv
#
# #--------Google Drive Integration--------#
# # from google.colab import drive, userdata
# # This gives Colab access to your files in Google Drive.
# # drive.mount('/content/drive')
# # 'GITHUB_USERNAME' and 'GITHUB_TOKEN' saved as secrets in Colab.
# GITHUB_USERNAME = userdata.get('GITHUB_USERNAME')
# GITHUB_TOKEN = userdata.get('GITHUB_TOKEN')
# REPOSITORY_NAME = 'PyNucleus-Model' # Your repository name
# NOTEBOOK_DRIVE_PATH = "/content/drive/MyDrive/PyNucleus Project/Capstone Project.ipynb"
#
#
# #--------Cursor Integration--------#
# # Load environment variables from .env file
load_dotenv()
#
# # Get GitHub credentials from environment variables
GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
#
# # Print to verify the variables are loaded (remove this in production)
print(f"Username: {GITHUB_USERNAME}")
print(f"Token: {GITHUB_TOKEN[:4]}...") # Only print first 4 chars of token for security
#
# Repository information
REPOSITORY_NAME = 'PyNucleus-Model'
NOTEBOOK_REPO_FILENAME = "Capstone Project.ipynb"
LOG_FILENAME = "update_log.txt"

# Pull latest changes from GitHub
print("Pulling latest changes from GitHub...")
!git pull https://{GITHUB_TOKEN}@github.com/{GITHUB_USERNAME}/{REPOSITORY_NAME}.git main

print("Repository is up to date!")

# Log start time
with open("update_log.txt", "a") as f:
    f.write(f" {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: Log Update\n")

# **Data Ingestion and Preprocessing for RAG**

In [None]:
#----- Date processing for all documents types -----#
import os
from langchain_unstructured import UnstructuredLoader
from PyPDF2 import PdfReader

# --- Configuration ---
# Folder where you will place all your source files (PDFs, DOCX, TXT, etc.)
INPUT_DIR = 'source_documents'

# Folder where the processed .txt files will be saved
OUTPUT_DIR = 'processed_txt_files'

# --- Main Logic ---
if __name__ == "__main__":
    # Create the input directory if it doesn't exist and give instructions
    if not os.path.exists(INPUT_DIR):
        print(f"📂 Creating directory: '{INPUT_DIR}'")
        os.makedirs(INPUT_DIR)
        print(f" Please place your files (PDF, DOCX, TXT, etc.) in the '{INPUT_DIR}' directory and run the script again.")
        exit()

    # Create the output directory
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    files_to_process = [f for f in os.listdir(INPUT_DIR) if os.path.isfile(os.path.join(INPUT_DIR, f))]

    if not files_to_process:
        print(f"ℹ The '{INPUT_DIR}' directory is empty. Nothing to process.")
        exit()

    print(f"--- 📄 Starting processing for {len(files_to_process)} file(s) in '{INPUT_DIR}' ---")

    for filename in files_to_process:
        # Skip hidden files like .DS_Store
        if filename.startswith('.'):
            continue

        input_path = os.path.join(INPUT_DIR, filename)
        output_filename = os.path.splitext(os.path.basename(filename))[0] + '.txt'
        output_path = os.path.join(OUTPUT_DIR, output_filename)

        print(f" ▶ Processing: {filename}")

        try:
            # Handle PDF files differently
            if filename.lower().endswith('.pdf'):
                # Use PyPDF2 for PDF files
                reader = PdfReader(input_path)
                full_text = ""
                for page in reader.pages:
                    full_text += page.extract_text() + "\n\n"
            else:
                # Use UnstructuredLoader for other file types
                loader = UnstructuredLoader(input_path)
                documents = loader.load()
                full_text = "\n\n".join([doc.page_content for doc in documents])

            # Save the extracted text to a new .txt file
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(full_text)

            print(f"   • Success! Saved to: {output_path}")

        except Exception as e:
            print(f"   • Error processing {filename}: {e}")

    print("\n\n All files processed.")

In [13]:
import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse


# ── CONFIGURATION ─────────────────────────────────────────────────────────────

# 1) The URL you want to scrape:
TARGET_URL = "https://en.wikipedia.org/wiki/Modular_design"

# 2) Where to put the plain‐text output:
DATA_DIR = "data_sources"


# ── HELPER TO TURN A URL INTO A SAFE FILENAME ────────────────────────────────────

def make_filename_from_url(url: str) -> str:
    """
    Convert a URL into a filesystem‐safe name.
    Example: "https://example.com/foo/bar.html" -> "example.com_foo_bar.txt"
    """
    parsed = urlparse(url)
    # join netloc + path, replace any "/" or "." with "_"
    raw_name = parsed.netloc + parsed.path
    safe_name = raw_name.replace("/", "_").replace(".", "_").strip("_")
    return safe_name + ".txt"


# ── MAIN SCRAPER FUNCTION ──────────────────────────────────────────────────────

def fetch_and_save_text(url: str, output_dir: str):
    # 1) Make sure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # 2) Build our output filename from the URL
    out_filename = make_filename_from_url(url)
    out_path = os.path.join(output_dir, out_filename)

    print(f"▶️  Fetching: {url}")
    try:
        # 3) Grab the HTML
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/91.0.4472.124 Safari/537.36"
        }
        resp = requests.get(url, headers=headers, timeout=15)
        resp.raise_for_status()
    except requests.RequestException as e:
        print(f"❌  Error fetching page: {e}")
        return

    # 4) Parse with BeautifulSoup and extract visible text
    soup = BeautifulSoup(resp.content, "html.parser")
    page_text = soup.get_text(separator=" ").strip()

    # 5) Save to a .txt file
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(page_text)

    print(f"✅  Saved plain text to: {out_path}")


# ── ENTRY POINT ────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    fetch_and_save_text(TARGET_URL, DATA_DIR)


▶️  Fetching: https://en.wikipedia.org/wiki/Modular_design
✅  Saved plain text to: data_sources/en_wikipedia_org_wiki_Modular_design.txt


In [None]:
# --- 4. Inspect the Results ---
import numpy as np
# --- NEW: Statistical Analysis of Chunks ---
print("\n--- Statistical Analysis & Quality Check ---")

# Calculate the lengths of all chunks
chunk_lengths = [len(chunk.page_content) for chunk in chunked_documents]

# Calculate and print key statistics
total_chunks = len(chunk_lengths)
min_size = np.min(chunk_lengths)
max_size = np.max(chunk_lengths)
avg_size = np.mean(chunk_lengths)
std_dev = np.std(chunk_lengths)

print(f"Total Chunks: {total_chunks}")
print(f"Minimum Chunk Size: {min_size} characters")
print(f"Maximum Chunk Size: {max_size} characters")
print(f"Average Chunk Size: {avg_size:.2f} characters")
print(f"Standard Deviation of Chunk Size: {std_dev:.2f}")

# --- Automated Quality Feedback ---

# 1. Check for high variation in chunk size
# A high standard deviation suggests inconsistent chunking.
if std_dev > 150:
    print(f"\n[WARNING] High chunk size variation detected (Std Dev: {std_dev:.2f}).")
    print("  > This suggests documents may have irregular structures (e.g., many short lines or lists).")
    print("  > Resulting chunks may have inconsistent levels of context.")

# 2. Check for and count potentially "orphaned" or very small chunks
small_chunk_threshold = CHUNK_SIZE * 0.20 # Chunks smaller than 20% of the target size
small_chunk_count = sum(1 for length in chunk_lengths if length < small_chunk_threshold)

if small_chunk_count > 0:
    # This check is more specific than just looking at the absolute minimum.
    print(f"\n[ADVISORY] Found {small_chunk_count} chunks smaller than {small_chunk_threshold} characters.")
    print(f"  > The smallest chunk is {min_size} characters.")
    print("  > These small chunks might lack sufficient context and could clutter search results.")
    print("  > Consider cleaning the source documents or adjusting the chunking separators.")

# Add a success message if no issues are flagged
if std_dev <= 150 and small_chunk_count == 0:
    print("\n[INFO] Chunking statistics appear healthy. Sizes are consistent.")


# --- Manual Inspection of Sample Chunks ---
# (This part remains the same)
print("\n--- Sample Chunk Preview ---")
# Print the first few chunks to get a feel for their content and structure
for i, chunk in enumerate(chunked_documents[:3]): # Print first 3 chunks
    chunk_source = os.path.basename(chunk.metadata.get('source', 'N/A'))
    print(f"\n--- Chunk {i+1} (Source: {chunk_source}, Length: {len(chunk.page_content)} chars) ---")
    print(chunk.page_content)


print("\n\nData ingestion and preprocessing is complete. The 'chunked_documents' are ready for the next stage (embedding).")

# This is the last cell of the code

In [None]:
# Log end time
with open("update_log.txt", "a") as f:
    f.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} changes made and pushed to origin main\n")

# Simple GitHub update function
def update_github():
    !git add .
    !git commit -m "Update: Adding all files to repository"
    !git push origin main
    print("All files pushed to GitHub successfully!")

# To use it, just run:
update_github()