In [None]:
import os
import json
import jsonschema
from jsonschema import validate

# Define the directory containing .txt files (using a raw string or forward slashes to avoid escape issues)
directory = r"data/municipalities"  # Adjust path as needed

# Define the path to the JSON schema
schema_path = r"municipality_schema.json"  # Adjust path as needed

# Load the JSON schema
try:
    with open(schema_path, "r", encoding="utf-8") as schema_file:
        schema = json.load(schema_file)
except FileNotFoundError:
    print(f"❌ Schema file not found: {schema_path}")
    raise

def convert_txt_to_json(file_path):
    """
    Reads a text file and attempts to convert its content to a JSON object.
    Returns the JSON data if successful; otherwise, returns None.
    """
    try:
        with open(file_path, "r", encoding="utf-8") as file:
            content = file.read().strip()  # Remove extra spaces/newlines
            if not content:
                print(f"❌ {file_path} is empty. Skipping...")
                return None
            try:
                return json.loads(content)  # Try parsing as JSON
            except json.JSONDecodeError:
                print(f"❌ {file_path} does not contain valid JSON. Skipping...")
                return None
    except Exception as e:
        print(f"❌ Error reading {file_path}: {e}")
        return None

def validate_municipality(file_path):
    """
    Converts a .txt file to JSON and validates it against the schema.
    Returns the JSON data if valid; otherwise, returns None.
    """
    data = convert_txt_to_json(file_path)
    if data is None:
        print('txt Data not valid. ')
        return None  # Skip invalid JSON files

    try:
        validate(instance=data, schema=schema)
        print(f"✅ {file_path} is valid")
        return data  # Return the parsed JSON if valid
    except jsonschema.exceptions.ValidationError as e:
        print(f"❌ {file_path} does not match the schema: {e.message}")
        return None

# Process all .txt files in the directory
municipality_data = []

if os.path.isdir(directory):
    for filename in os.listdir(directory):
        if filename.endswith(".txt"):
            file_path = os.path.join(directory, filename)
            data = validate_municipality(file_path)
            if data:
                municipality_data.append(data)
else:
    print(f"❌ Directory not found: {directory}")

# Save the combined valid JSON data into a single file if any valid data exists
if municipality_data:
    output_file = "validated_municipalities.json"
    try:
        with open(output_file, "w", encoding="utf-8") as outfile:
            json.dump(municipality_data, outfile, indent=2)
        print(f"✅ All valid municipalities saved to '{output_file}'")
    except Exception as e:
        print(f"❌ Error writing output file: {e}")
else:
    print("No valid municipality data found.")


In [9]:
muni_txt = r'data\municipalities\Adjuntas.txt'



json_muni_file = convert_txt_to_json(muni_txt)

❌ data\municipalities\Adjuntas.txt does not contain valid JSON. Skipping...


In [10]:
from bs4 import BeautifulSoup
import json

# Load  file
html_file_path = r'data\municipalities\Adjuntas.txt'

with open(html_file_path, "r", encoding="utf-8") as file:
    html_content = file.read()

# Parse the HTML using BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")

# Extracting key information
data = {}

# Extracting title (municipality name)
title = soup.find("title")
data["name"] = title.text.replace(" - Wikipedia", "") if title else "Unknown"

# Extracting population (from the infobox)
population_tag = soup.find(text="• Total")
if population_tag:
    population_value = population_tag.find_next("td").text.strip()
    data["population"] = population_value

# Extracting geographical coordinates
coordinates_tag = soup.find("span", class_="geo-dec")
if coordinates_tag:
    coords = coordinates_tag.text.strip().split(" ")
    data["latitude"] = coords[0]
    data["longitude"] = coords[1]

# Extracting mayor (from infobox)
mayor_tag = soup.find(text="• Mayor")
if mayor_tag:
    mayor_value = mayor_tag.find_next("td").text.strip()
    data["mayor"] = mayor_value

# Extracting nicknames
nicknames_tag = soup.find(text="Nicknames:")
if nicknames_tag:
    nicknames_value = nicknames_tag.find_next("td").text.strip()
    data["nicknames"] = nicknames_value.split(", ")

# Extracting Barrios
barrios_section = soup.find("h3", text="Barrios")
if barrios_section:
    barrios_list = []
    for li in barrios_section.find_next("ul").find_all("li"):
        barrios_list.append(li.text.strip())
    data["barrios"] = barrios_list

# Save the extracted data as JSON
json_output_path = "adjuntas.json"
with open(json_output_path, "w", encoding="utf-8") as json_file:
    json.dump(data, json_file, indent=2, ensure_ascii=False)

print(f"✅ Data extracted and saved to {json_output_path}")



✅ Data extracted and saved to adjuntas.json


  population_tag = soup.find(text="• Total")
  mayor_tag = soup.find(text="• Mayor")
  nicknames_tag = soup.find(text="Nicknames:")
  barrios_section = soup.find("h3", text="Barrios")


In [13]:
import pandas as pd
from bs4 import BeautifulSoup

# sections to extract.
target_sections = [
    "Etymology and nicknames",
    "History",
    "Geography",
    "Demographics",
    "Special Communities",
    "Economy",
    "Human resources",
    "Culture",
    "Transportation",
    "Government",
    "Symbols"
]

# Load  file
html_file_path = r'data\municipalities\Adjuntas.txt'

with open(html_file_path, "r", encoding="utf-8") as file:
    html_content = file.read()

# # Open and read the HTML file
# with open("adjuntas.html", "r", encoding="utf-8") as f:
#     html_content = f.read()

soup = BeautifulSoup(html_content, "html.parser")

# Locate the main content container.
content_div = soup.find("div", class_="mw-parser-output")

# Dictionary to hold section text
sections_data = {}
current_section = None

# Iterate over direct children of the content container
for element in content_div.children:
    # When we hit an h2, check if its headline is one of the target sections.
    if element.name == "h2":
        # Many h2 tags in Wikipedia have a structure like:
        #   <h2><span class="mw-headline" id="...">Section Title</span><span class="mw-editsection">[ edit ]</span></h2>
        # We extract the text and remove the "[ edit ]" part.
        headline = element.get_text(separator=" ", strip=True).split("[")[0].strip()
        # Look for a matching target section (case-insensitive substring match)
        match = None
        for sec in target_sections:
            if sec.lower() in headline.lower():
                match = sec
                break
        current_section = match  # may be None if not a target section
        if current_section:
            sections_data[current_section] = []  # initialize list of content
    # If we are currently in a target section, accumulate text from the element
    elif current_section:
        # Some elements (like navigation, tables, etc.) might produce unwanted text.
        # You can add additional filtering if needed.
        text = element.get_text(separator=" ", strip=True)
        if text:
            sections_data[current_section].append(text)

# Build a list of dictionaries for each section.
results = []
for section, texts in sections_data.items():
    full_text = "\n".join(texts)
    results.append({"Section": section, "Content": full_text})

# Create a DataFrame from the results
df = pd.DataFrame(results)
print(df)

Empty DataFrame
Columns: []
Index: []


In [2]:
import openai
import os
import time
import logging
import pandas as pd
from dotenv import load_dotenv
from tabulate import tabulate
from json import JSONDecodeError

# Configure logging
logging.basicConfig(
    filename="deepseek_extraction.log",
    level=logging.DEBUG,  # Change to logging.INFO or logging.ERROR if needed
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)

# Load environment variables from .env file
load_dotenv()

# Load HTML file
html_file_path = r'data\municipalities\Adjuntas.txt'
try:
    with open(html_file_path, "r", encoding="utf-8") as file:
        html_content = file.read()
    logging.info(f"Successfully loaded HTML content from {html_file_path}")
except Exception as e:
    logging.error(f"Error loading HTML file: {e}")
    raise

# Get DeepSeek API key
deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
if not deepseek_api_key:
    logging.error("DeepSeek API key is missing! Check your .env file.")
    raise ValueError("DeepSeek API key not found.")

def chatbot_deepseek(prompt):
    """
    Interacts with DeepSeek's API with medium creativity.
    """
    logging.info("Calling DeepSeek API...")
    try:
        client = openai.OpenAI(
            api_key=deepseek_api_key,
            base_url="https://api.deepseek.com/v1"  # DeepSeek endpoint
        )

        response = client.chat.completions.create(
            model="deepseek-reasoner",
            messages=[
                {"role": "system", "content": "You are an intelligent chatbot."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.7,
            max_tokens=2000,
            top_p=0.8
        )
        
        result = response.choices[0].message.content
        logging.info("DeepSeek API call successful.")
        return result

    except JSONDecodeError as e:
        logging.error(f"JSON Decode Error in DeepSeek response: {e}")
        return ""
    except Exception as e:
        logging.error(f"Unexpected error calling DeepSeek API: {e}")
        return ""

# Define the base prompt with instructions
base_prompt = '''
Read this HTML. The body of the article is divided into multiple sections, each corresponding to a part of the topic.
Extract these target sections and return as a string:

"Etymology and nicknames",
"History",
"Geography",
"Demographics",
"Special Communities",
"Economy",
"Human resources",
"Culture",
"Transportation",
"Government",
"Symbols"
'''

# Set a batch size (number of characters per batch) and overlap to avoid cutting sections.
batch_size = 30000
overlap = 1000  # Characters of overlap

# Split the HTML into overlapping chunks
chunks = []
start = 0
while start < len(html_content):
    end = start + batch_size
    chunk = html_content[start:end]

    # Add overlap if not at the end
    if end < len(html_content):
        chunk += html_content[end:end+overlap]
    
    chunks.append(chunk)
    start += batch_size

logging.info(f"HTML split into {len(chunks)} batches.")

results = []

# Process each chunk individually
for idx, chunk in enumerate(chunks):
    prompt_with_chunk = f"{base_prompt}\n\nBatch {idx+1}:\n{chunk}"
    logging.info(f"Processing batch {idx+1} of {len(chunks)}...")
    print(f"Processing batch {idx+1} of {len(chunks)}...")

    try:
        response = chatbot_deepseek(prompt_with_chunk)
        results.append(response)
        logging.info(f"Batch {idx+1} processed successfully.")
        print(f"Batch {idx+1} processed successfully.")
    except Exception as e:
        logging.error(f"Error processing batch {idx+1}: {e}")
        print(f"Error processing batch {idx+1}: {e}")
        results.append("")
    
    # Sleep a bit to avoid rate limits
    time.sleep(1)

# Combine all extracted sections
final_extraction = "\n".join(results)
logging.info("All batches processed. Extraction complete.")

# Store results in a DataFrame
df = pd.DataFrame({
    "Batch": list(range(1, len(results)+1)),
    "Extraction": results
})

# Save to file
df.to_csv("deepseek_extracted_sections.csv", index=False, encoding="utf-8")
logging.info("Extraction results saved to deepseek_extracted_sections.csv.")

# Print results
print(tabulate(df, headers="keys", tablefmt="psql"))
print("\nCombined Extraction:\n")
print(final_extraction)


Processing batch 1 of 13...
Batch 1 processed successfully.
Processing batch 2 of 13...
Batch 2 processed successfully.
Processing batch 3 of 13...
Batch 3 processed successfully.
Processing batch 4 of 13...
Batch 4 processed successfully.
Processing batch 5 of 13...
Batch 5 processed successfully.
Processing batch 6 of 13...
Batch 6 processed successfully.
Processing batch 7 of 13...
Batch 7 processed successfully.
Processing batch 8 of 13...
Batch 8 processed successfully.
Processing batch 9 of 13...
Batch 9 processed successfully.
Processing batch 10 of 13...
Batch 10 processed successfully.
Processing batch 11 of 13...
Batch 11 processed successfully.
Processing batch 12 of 13...
Batch 12 processed successfully.
Processing batch 13 of 13...
Batch 13 processed successfully.
+----+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------