In [1]:
import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import base64
import time
import csv

### Web scraping to download images per category in any given news website

In [2]:
def get_news_categories(url):
    # news categories (and associated href) fetched via nav components
    try:
        response = requests.get(url)
        response.raise_for_status()
    except Exception as e:
        print(f"Error fetching the URL: {e}")
        return []

    soup = BeautifulSoup(response.content, 'html.parser')
    
    categories = []
    navs = soup.find_all('nav')

    for nav in navs:
        for link in nav.find_all('a'):
            category = link.get_text(strip=True)
            category_url = link.get('href')
            if category and category_url:
                categories.append((category, urljoin(url, category_url)))

    return categories

def create_directories(base_url, categories):
    # create the following dir struct; outputs > base website > categories
    base_dir = os.path.join("outputs", urlparse(base_url).netloc)
    if not os.path.exists(base_dir):
        os.makedirs(base_dir)
    
    for category, _ in categories:
        category_dir = os.path.join(base_dir, category)
        if not os.path.exists(category_dir):
            os.makedirs(category_dir)

    return base_dir

In [72]:
def download_image(img_url, save_dir, img_name):
    try:
        if img_url.startswith('data:'):
            save_data_uri_image(img_url, save_dir, img_name)
        else:
            save_url_image(img_url, save_dir, img_name)

    except Exception as e:
        print(f'Error downloading image {img_url}: {e}')

def get_extension_from_header(header):
    if "image/jpeg" in header:
        return ".jpg"
    elif "image/png" in header:
        return ".png"
    elif "image/xml" in header:
        return ".xml"
    return None

def save_data_uri_image(img_url, save_dir, img_name):
    header, encoded = img_url.split(',', 1)
    data = base64.b64decode(encoded)
    ext = get_extension_from_header(header)
    if ext:
        img_name = img_name.split('.')[0] + ext
        img_path = os.path.join(save_dir, img_name)
        with open(img_path, 'wb') as f:
            f.write(data)

def save_url_image(img_url, save_dir, img_name):
    response = requests.get(img_url)
    response.raise_for_status()

    img_path = os.path.join(save_dir, img_name)
    with open(img_path, 'wb') as f:
        f.write(response.content)

In [101]:
def process_article(link_tup, base_url, category_url, save_dir, csv_writer, include_content=False):
    article_number, link = link_tup
    article_url = urljoin(category_url, link.get('href'))
    article_response = requests.get(article_url)
    article_response.raise_for_status()
    article_soup = BeautifulSoup(article_response.content, 'html.parser')
    article_heading = extract_heading(link, article_soup)
    images = article_soup.find_all('img')
    print(len(images))

    ############# SPECIFIC FOR WEBSITE #####################

    images_to_download = set()
    alt_set = set()
    for img in images:
        if (img.find_parent('picture') or img.find_parent('figure')) \
            and img.find_parent('article', {'tabindex': '-1'}) \
            and img['alt'] not in alt_set:

            images_to_download.add(img)
            print(img)
            alt_set.add(img['alt'])
    
    content = ''
    if include_content:
        main_content = article_soup.find('div', class_="sp-cn ins_storybody")
    
        if main_content:
            paragraphs = main_content.find_all('p')
            content = ' '.join([p.text for p in paragraphs])
    ############# SPECIFIC FOR WEBSITE #####################
    
    #images_to_download = filter_images(images)
    download_and_record_images(article_number, list(images_to_download), base_url, article_url, save_dir, csv_writer, article_heading, content)

def extract_heading(link, soup):
    heading = link.get('aria-label')
    if not heading:
        heading = soup.find('h1').get_text(strip=True)
    if not heading:
        heading = soup.find('h2').get_text(strip=True)
    if not heading:
        meta_title = soup.find('meta', attrs={'property': 'og:title'}).get_text(strip=True)
        if meta_title:
            return meta_title.get('content').strip()
    if not heading:
        heading = soup.find('title').get_text(strip=True)

    return heading if heading else 'No Heading'

def download_and_record_images(article_number, images_to_download, base_url, article_url, save_dir, csv_writer, article_heading, content=''):
    count=0
    for img in images_to_download:
        src = img.get('src') 
        img_url = urljoin(article_url, src)

        img_name = f'image_{article_number}_{count}.jpg'
        count+=1

        alt_text = img.get('alt', '')

        cols = [base_url, os.path.basename(save_dir), article_number, count, alt_text, article_heading, article_url]
        if content:
            cols.append(content)

        download_image(img_url, save_dir, img_name)
        csv_writer.writerow(cols)
        
        time.sleep(0.001)

processed_articles = set()  
def download_images(base_url, category_url, save_dir, csv_writer):
    response = requests.get(category_url)
    response.raise_for_status()
    soup = BeautifulSoup(response.content, 'html.parser')

    articles = set()

    ############# SPECIFIC FOR WEBSITE #####################

    for link in soup.find_all('a', href=True):
        if 'rcna' in link['href']:
            if (link['href'] not in processed_articles):
                articles.add(link)
                processed_articles.add(link['href'])

    ############# SPECIFIC FOR WEBSITE #####################
    
    article_dict = {i+1: link for i, link in enumerate(list(articles)[:15])}  # Limit to 10
    for link_dict in article_dict.items():
        process_article(link_dict, base_url, category_url, save_dir, csv_writer)

def filter_images(images):
    images_to_download = []
    for img in images:
        width = img.get('width')
        if width:
            width_is_large = ('%' in width and float(width.replace('%', '')) > 60) or (width.isdigit() and float(width) > 100)
    
            if width_is_large:
                images_to_download.append(img)

    return images_to_download

In [102]:
def setup_csv(csv_file_path, include_content=False):
    csv_file = open(csv_file_path, mode='w', newline='', encoding='utf-8')
    cols = ['website', 'category', 'article_number', 'image number', 'alt', 'article_heading', 'article_url']
    if include_content:
        cols.append('content')

    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(cols)
    
    return csv_file, csv_writer

def filter_categories(categories, exclude_keywords):
    filtered_categories = []
    for category, category_url in categories:
        category_words = category.lower().split()
        if not any(keyword in category_words for keyword in exclude_keywords):
            filtered_categories.append((category, category_url))
    return filtered_categories

exclude_keywords = ['sign', 'login', 'subscribe', 'advertisement', 'privacy', 'terms', 'contact']

In [103]:
### jugaru way to get articles - for in article
def download_images_2(base_url, category_url, save_dir, csv_writer, include_content=True):
    response = requests.get(category_url)
    response.raise_for_status()
    soup = BeautifulSoup(response.content, 'html.parser')

    articles = set()

    with open('outputs/file.txt', 'r') as file:
        urls = [line.strip() for line in file.readlines()]
        
    for link in soup.find_all('a', href=True):
        href = link['href']
        if href.startswith('//'):
            href = href.lstrip('//')
        print(urljoin(base_url, href))

        ### change back to category url
        if urljoin(base_url, href) in urls and href not in processed_articles:
            articles.add(link)
            processed_articles.add(href)
    
    article_dict = {i+1: link for i, link in enumerate(list(articles)[:15])}  # Limit to 10
    print(article_dict)

    for link_dict in article_dict.items():
        process_article(link_dict, base_url, category_url, save_dir, csv_writer, include_content=include_content)

In [None]:
# for categories
# categories = [
#     ("2024_elections", "https://www.nbcnews.com/politics/2024-presidential-election"),
#     ("science_space", "https://www.nbcnews.com/science/space")
# ]
# 
# #base_url = "https://nbcnews.com"
# base_dir = create_directories(base_url, categories)
# 
# csv_file, csv_writer = setup_csv(base_dir)
# 
# for category, category_url in tqdm(categories, desc='Downloading images for each category'):
#     category_dir = os.path.join(base_dir, category.replace('/', '_'))
#     os.makedirs(category_dir, exist_ok=True)
#     download_images(base_url, category_url, category_dir, csv_writer)
#     
# csv_file.close()

In [None]:
# for in-article
base_url = "https://news.yahoo.com/"
category_url = "https://news.yahoo.com/tagged/climate-change/"

category_dir = "outputs/across_article/news.yahoo.com"
if not os.path.exists(category_dir):
    os.makedirs(category_dir)

csv_file, csv_writer = setup_csv(f'{category_dir}/image_data.csv', include_content=False)

os.makedirs(category_dir, exist_ok=True)
download_images_2(base_url, category_url, category_dir, csv_writer, include_content=False)
    
csv_file.close()