## Data Collection

#### Project Description
In this notebook, we collect using crawling and clean data for building a chatbot. The data will be retrieved from multiple web sources, cleaned, and prepared for further use in training a language model.


In [3]:
# Magic commands
%load_ext autoreload
%autoreload 2
%load_ext dotenv
%dotenv

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


#### Importing Libraries

In [47]:
import os
import requests
import time
from typing import Optional
from typing import Tuple
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from markdownify import markdownify as md
import pandas as pd
import json
import csv

#### Configuration and Setup for Data Collection

This block of code sets up the basic configuration needed for the data collection process. Additionally, it ensures that the directory for storing data exists, creating it if necessary.


In [37]:
host = 'https://www.familysearch.org'
base_dir = 'data/raw'
bs_parser = 'html.parser'
delay_seconds = 5
site = 'source-linker-learning-center'
site2 = 'article/unfinished-attachments-how-to-add-others-on-record-to-family-tree#attaching-individuals-from-the-record-to-a-new-profile-in-family-tree'

if not os.path.exists(base_dir):
    os.makedirs(base_dir)

Get the article page from URL and return status code and response text as tuple of strings

In [6]:
def get_page(
    url: str,
    delay_seconds: int = 30,
    headers: Optional[dict[str, str]] = None,
    encoding: str = "utf-8",
    timeout: int = 30,
) -> Tuple[str, str]:
    if headers is None:
        headers = {
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "en-US,en;q=0.9",
        "Cache-Control": "no-cache",
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
    }
    # Get Response from URL and return status code and response text
    response = requests.get(url, headers=headers, timeout=timeout)
    time.sleep(delay_seconds)
    if encoding:
        response.encoding = encoding
    return response.status_code, response.text


# Get Response from URL with raise exception if status code is not 200
def get_response_from_url(url: str, delay_seconds: int = 30) -> str:
    status_code, response = get_page(url, delay_seconds)
    if status_code != 200:
        raise Exception(f"Failed to get response from {url}")
    return response

In [7]:
def _is_article(url):
    """A talk URL has 6 components (first component is empty) and last component does not end in -session.
    """  
    path_components = urlparse(url).path.split('/')
    return len(path_components) == 6 and not path_components[-2].endswith('fieldops')

In [40]:
def get_article_with_urls(base_url):
    """
    Find and return article URLs from the base URL, recursively following links found in each page.
    
    Parameters:
        - base_url: The URL to fetch and scan for articles.
        - visited: A set of URLs that have already been visited to avoid repetition.
        - max_depth: Maximum recursion depth to prevent infinite loops.
        - depth: Current recursion depth.
        
    Returns:
        A set of all found URLs.
    """
    # Set to store all unique URLs
    article_urls = set()
    dir_html = get_response_from_url(base_url, delay_seconds)
    soup = BeautifulSoup(dir_html, bs_parser)
    
    # Step 1: Collect URLs from the main page
    for a in soup.find_all('a', href=True):
        url = urljoin(base_url, a['href'])
        if _is_article(url):
            article_urls.add(url)
    
    # Step 2: For each article URL, fetch and collect the links inside those pages
    for article_url in list(article_urls):  # Create a list to avoid modifying the set while iterating
        inner_html = get_response_from_url(article_url, delay_seconds)
        if not inner_html:
            continue  # Skip if the inner page couldn't be fetched
        
        inner_soup = BeautifulSoup(inner_html, bs_parser)
        for a in inner_soup.find_all('a', href=True):
            # Check if the 'href' attribute exists and is not empty
            if 'href' in a.attrs and a['href']:
                inner_url = urljoin(article_url, a['href'])
                if _is_article(inner_url):
                    article_urls.add(inner_url)
    
    return article_urls

In [41]:
def get_article_main_urls(base_url):
    """Find and return article URLs from the base URL."""
    dir_html = get_response_from_url(base_url, delay_seconds)
    soup = BeautifulSoup(dir_html, bs_parser)
    return set(urljoin(base_url, a['href']) for a in soup.find_all('a',href=True)\
        if _is_article(urljoin(base_url, a['href'])))

Get URLs from the host page and save them to an array

In [42]:
# Website 1: Get URLs with internal links
dir_url = f"{host}/en/help/helpcenter/{site}"
urls_with_links = get_article_with_urls(dir_url)
# Website 2: Get URLs without following internal links
dir_url2 = f"{host}/en/help/helpcenter/{site2}"
urls_no_links = get_article_main_urls(dir_url2)
# Combine both sets of URLs
urls = urls_with_links.union(urls_no_links)
print(dir_url, len(urls))

https://www.familysearch.org/en/help/helpcenter/source-linker-learning-center 40


In [45]:
# Set the display options for Pandas
pd.set_option('display.width', 1000)  # Adjust the overall width
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_colwidth', None)  # Disable column width limitation
pd.set_option('display.expand_frame_repr', False)  # Prevent line wrapping of the DataFrame
# Create a DataFrame to hold the URLs
urls = list(urls)
df = pd.DataFrame(urls, columns=["URL"])
# Print the DataFrame
df

Unnamed: 0,URL
0,https://www.familysearch.org/en/help/helpcenter/article/unfinished-attachments-how-to-add-others-on-record-to-family-tree#remove-the-unfinished-attachments-notification
1,https://www.familysearch.org/en/help/helpcenter/article/how-do-i-attach-the-other-people-in-a-record-hint-in-family-tree
2,https://www.familysearch.org/en/help/helpcenter/article/viewing-the-source-image-and-person-side-panel
3,https://www.familysearch.org/en/help/helpcenter/article/what-is-a-source-box
4,https://www.familysearch.org/en/help/helpcenter/article/unfinished-attachments-how-to-add-others-on-record-to-family-tree
5,https://www.familysearch.org/en/help/helpcenter/article/what-are-record-hints-in-family-tree
6,https://www.familysearch.org/en/help/helpcenter/article/creating-new-people-in-family-tree
7,https://www.familysearch.org/en/help/helpcenter/article/understanding-the-focus-person-in-source-linker
8,https://www.familysearch.org/en/help/helpcenter/article/how-do-i-attach-record-hints-in-family-tree
9,https://www.familysearch.org/en/help/helpcenter/article/detaching-sources-that-should-not-be-attached


In [55]:

# Save the DataFrame to a CSV file
processed_dir = '../data/processed'
# Create the directory if it doesn't exist
if not os.path.exists(processed_dir):
    os.makedirs(processed_dir)

# Convert URLs into a list of dictionaries
urls_list = [{"url": url} for url in urls]  # Each URL as a dictionary

# Step 1: Save URLs to a JSON file
def save_urls_to_json(url_list, filename):
    with open(filename, "w", encoding="utf-8") as json_file:
        json.dump(url_list, json_file, ensure_ascii=False, indent=4)

json_file_path = os.path.join(processed_dir, 'downloaded_urls.json')
save_urls_to_json(urls_list, json_file_path)
print(f"URLs saved to {json_file_path} as JSON.")

# Step 2: Convert JSON to CSV
def load_urls_json(filename):
    with open(filename, "r", encoding="utf-8") as json_file:
        return json.load(json_file)

def convert_to_csv(data, filename, fieldnames):
    with open(filename, "w", newline="", encoding="utf-8") as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writeheader()
        for item in data:
            writer.writerow(item)

# Load the JSON file and convert to CSV
urls_from_json = load_urls_json(json_file_path)

# Debugging: Print the type and structure of the loaded JSON
print("Loaded URLs from JSON:")
print(type(urls_from_json))  # Should be a list
print(urls_from_json)         # Inspect the data structure

# Ensure urls_from_json is a list of dictionaries
if isinstance(urls_from_json, list) and all(isinstance(item, dict) for item in urls_from_json):
    csv_file_path = os.path.join(processed_dir, 'downloaded_urls.csv')
    fieldnames = ["url"]
    
    convert_to_csv(urls_from_json, csv_file_path, fieldnames)
    print(f"URLs converted to CSV and saved to {csv_file_path}")
else:
    print("Error: The loaded JSON data is not in the expected format.")
    print("Data structure:", urls_from_json)  # Print the structure for debugging

URLs saved to ../data/processed\downloaded_urls.json as JSON.
Loaded URLs from JSON:
<class 'list'>
[{'url': 'https://www.familysearch.org/en/help/helpcenter/article/unfinished-attachments-how-to-add-others-on-record-to-family-tree#remove-the-unfinished-attachments-notification'}, {'url': 'https://www.familysearch.org/en/help/helpcenter/article/how-do-i-attach-the-other-people-in-a-record-hint-in-family-tree'}, {'url': 'https://www.familysearch.org/en/help/helpcenter/article/viewing-the-source-image-and-person-side-panel'}, {'url': 'https://www.familysearch.org/en/help/helpcenter/article/what-is-a-source-box'}, {'url': 'https://www.familysearch.org/en/help/helpcenter/article/unfinished-attachments-how-to-add-others-on-record-to-family-tree'}, {'url': 'https://www.familysearch.org/en/help/helpcenter/article/what-are-record-hints-in-family-tree'}, {'url': 'https://www.familysearch.org/en/help/helpcenter/article/creating-new-people-in-family-tree'}, {'url': 'https://www.familysearch.org/e

In [11]:
def clean_html(soup):
    """
    Clean HTML from unwanted elements.
    """  
    for img in soup.find_all("img"):
        img.decompose()
    for span in soup.find_all("div", class_="Enhancement"):
        span.decompose()
    for div in soup.find_all("div", class_="ArticlePage-byline"):
        div.decompose()

    #     # Limpiar espacios en blanco adicionales en el HTML
    # for tag in soup.find_all(True):  # True encuentra todas las etiquetas
    #     tag.string = tag.get_text(strip=True)
    
    return soup

In [None]:
def extract_information(url):
    """
    Extract information from the URL.
    """  
    try:
        response = requests.get(url)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, bs_parser)
        content = soup.find("div", class_="ArticlePage-wrapper")
        if content:
            cleaned_content = clean_html(content)
            # Eliminar líneas vacías adicionales
            cleaned_content = str(cleaned_content)
            cleaned_content = "\n".join(line.strip() for line in cleaned_content.splitlines() if line.strip())
            return md(cleaned_content, heading_style="ATX")
        return "No content found"
    except requests.exceptions.RequestException as e:
        return f"Error: {e}"

In [None]:
def faq_main_page(base_url):
    try:
        dir_html = get_response_from_url(base_url, delay_seconds)
        soup = BeautifulSoup(dir_html, bs_parser)
        content = soup.find_all("div", class_="PromoFAQ-content")
        if content:
            # Convertir el contenido a texto antes de pasarlo a `md`
            content_str = "\n".join([str(item) for item in content])
            return md(str(content_str), heading_style="ATX")
        return "No content found"
    except requests.exceptions.RequestException as e:
        return f"Error: {e}"

In [None]:
def get_article_path(i):
    """Return the file path for saving the talk."""
    path_components = urlparse(urls[i]).path.split('/')
    title = [word for word in path_components[-1].split('-')]
    if len(title) > 3:
        return f"{title[0]}-{title[1]}-{title[2]}-{title[3]}.md"
    return f"{title[0]}-{title[1]}-{title[2]}.md"
    # year, month, title = path_components[3:6]
    #path_components[3:6] = ['2024', '04', '11oaks']
    # return os.path.join(base_dir, f"{year}-{month}-{title}.json")
    # os.path.join(base_dir) = 'data/raw'

In [None]:
def one_file_content(content, filename = "data.md"):
    file_path = os.path.join(base_dir, filename)
    with open(file_path, "w", encoding="utf-8") as file:
        for idx, info in enumerate(content):
            file.write(f"### Section {idx+1}\n")
            file.write(f"{info}")

In [None]:
def create_readme(full_content, filename="data.md"):
    one_file_content(full_content)
    for idx, content in enumerate(full_content):
        filename = get_article_path(idx)
        if os.path.exists(filename):
            continue
        print("    ", filename)
        file_path = os.path.join(base_dir, filename)
        with open(file_path, "w", encoding="utf-8") as file:  
            file.write(f"{content}")

Extract information form the URLs

In [None]:
all_content = [extract_information(url) for url in urls]
all_content.append(faq_main_page(dir_url)) 
urls.append(dir_url)
# Create the markdown files and  the list of urls to be used in the next step.
create_readme(all_content)