In [1]:
import os
from warcio.archiveiterator import ArchiveIterator
from bs4 import BeautifulSoup
from datetime import datetime
from transformers import pipeline
import sqlite3
import re

2024-08-19 07:31:14.050468: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Initialize the zero-shot classification pipeline
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

# Define the candidate labels
candidate_labels_1 = ["positive news", "negative news"]
candidate_labels_2 = [
    'Politics', 'Business',
    'Technology', 'Science', 'Health', 'Sports', 'Entertainment', 
    'Lifestyle', 'Education', 'Environment', 'Crime', 
    'Weather', 'Economy', 'Real Estate', 'Automotive', 'Travel'
]



In [3]:
def is_english_page(content):
    """
    Determines if a page is in English based on its content.
    """
    try:
        soup = BeautifulSoup(content, 'html.parser')
        html_tag = soup.find('html')
        if html_tag and html_tag.get('lang') == 'en':
            return True
        
        meta_tag = soup.find('meta', {'http-equiv': 'Content-Language'})
        if meta_tag and meta_tag.get('content') == 'en':
            return True
        
        text = soup.get_text()
        english_words = re.findall(r'\b\w+\b', text.lower())
        english_count = sum(1 for word in english_words if word in set([
            "the", "and", "is", "in", "it", "you", "that", "he", "was", "for", 
            "on", "are", "with", "as", "I", "his", "they", "be", "at", "one", 
            "have", "this", "from", "or", "had", "by", "not", "word", "but", 
            "what", "some", "we", "can", "out", "other", "were", "all", "there", 
            "when", "up", "use", "your", "how", "said", "an", "each", "she"]))
        return english_count / len(english_words) > 0.5 if english_words else False
    except Exception as e:
        print(f"An error occurred while determining page language: {e}")
        return False


In [4]:
def insert_record(cur, record, conn):
    """
    Insert a record into the SQLite database and commit the transaction.
    """
    try:
        cur.execute('''
            INSERT INTO news_articles (url, url_Timestamp, category, sentiment, score)
            VALUES (?, ?, ?, ?, ?)
        ''', (record['url'], record['timestamp'], record['category'], record['predicted'], record['score']))
        
        # Commit the transaction immediately after each insert
        conn.commit()
        
    except sqlite3.Error as e:
        print(f"Error inserting record: {e}")


In [5]:
def process_warc_records(warc_file_path, cur, conn):
    """
    Process all WARC records and store relevant data in the SQLite database.
    """
    try:
        with open(warc_file_path, 'rb') as stream:
            for record in ArchiveIterator(stream):
                warc_date = record.rec_headers.get_header("WARC-Date")
                if warc_date:
                    try:
                        content = record.content_stream().read() if record.content_stream() else b''
                        content = content.decode(errors='ignore')

                        soup = BeautifulSoup(content, 'html.parser')
                        title = soup.title.string if soup.title else ''
                        headers = ' '.join([h.get_text() for h in soup.find_all(['h1', 'h2', 'h3'])])
                        combined_content = f"{title} {headers}"
                        first_15_words = ' '.join(combined_content.split()[:15])
                        first_40_words = ' '.join(combined_content.split()[:40])
                        if is_english_page(content):
                            url = record.rec_headers.get_header("WARC-Target-URI")
                            
                            result_1 = classifier(first_15_words, candidate_labels_1)
                            label_1 = result_1['labels'][0]
                            score_1 = result_1['scores'][0]
                            result_2 = classifier(first_40_words, candidate_labels_2)
                            label_2 = result_2['labels'][0]
                            score_2 = result_2['scores'][0]
                            
                            iso_string = warc_date
                            dt = datetime.fromisoformat(iso_string.replace('Z', '+00:00'))
                            formatted_dt = dt.strftime('%Y-%m-%d %H:%M:%S')
                            label_number = 1 if label_1 == 'positive news' else 0
                            
                            record_to_insert = {
                                'url': url,
                                'timestamp': formatted_dt,
                                'content': first_15_words,
                                'predicted': label_number,
                                'score': score_1,
                                'category': label_2
                            }
                            insert_record(cur, record_to_insert, conn)
                            
                    except ValueError:
                        print(f"Error parsing date: {warc_date}")
    except FileNotFoundError:
        print(f"File not found: {warc_file_path}")
    except Exception as e:
        print(f"An error occurred: {e}")


In [6]:
def process_directory(base_dir, conn):
    """
    Process all WARC files in the specified directory and its subdirectories.
    
    Parameters:
    - base_dir: Path to the base directory containing WARC files.
    - conn: SQLite database connection object.
    """
    cur = conn.cursor()
    
    for root, dirs, files in os.walk(base_dir):
        for file in files:
            if file.endswith(".warc.gz"):
                warc_file_path = os.path.join(root, file)
                print(f"Processing {warc_file_path}...")
                process_warc_records(warc_file_path, cur, conn)


In [7]:
# Connect to SQLite
try:
    conn = sqlite3.connect('newswarc.db')
except sqlite3.Error as e:
    print(f"Error connecting to SQLite: {e}")
    sys.exit(1)

# Example usage
base_dir = '/lfs01/datasets/commoncrawl/2023-2024/data.commoncrawl.org/crawl-data/CC-NEWS/2023'
process_directory(base_dir, conn)

# Close the connection
conn.close()

Processing /lfs01/datasets/commoncrawl/2023-2024/data.commoncrawl.org/crawl-data/CC-NEWS/2023/01/CC-NEWS-20230103233758-03409.warc.gz...


KeyboardInterrupt: 