## 2.2 HTML to text conversion

NOTE: it is gzcat on MacOS to view the files…

In [1]:
from resiliparse.parse.encoding import detect_encoding
from resiliparse.extract.html2text import extract_plain_text

def run_extract_text_from_html_bytes(html_bytes: bytes) -> str | None:

    # Detect encoding of the byte string
    enc = detect_encoding(html_bytes)
    print(enc)

    # Decode the byte string into a Unicode string
    html = html_bytes.decode('utf-8')
    print(html)

    # If the encoding is not UTF-8, try to decode it using the detected encoding
    if enc != 'utf-8':
        try:
            html = html_bytes.decode(enc)
        except UnicodeDecodeError:
            return None

    # Extract text from the HTML string
    text = extract_plain_text(html)

    return text

In [2]:
# run_extract_text_from_html_bytes(b'\xff\xfeH\x00e\x00l\x00l\x00o\x00 \x00W\x00o\x00r\x00l\x00d\x00')
run_extract_text_from_html_bytes(b'<html><head><title>Test</title></head><body><h1>Hello World</h1></body></html>')

cp1252
<html><head><title>Test</title></head><body><h1>Hello World</h1></body></html>


'Hello World'

In [7]:
from fastwarc import ArchiveIterator, WarcRecordType
import gzip

from resiliparse.parse.encoding import detect_encoding
from resiliparse.extract.html2text import extract_plain_text

# Function to process a WARC file
def process_warc_file(warc_path):
    with gzip.open(warc_path, 'rb') as warc_gz:
        for record in ArchiveIterator(warc_gz):
            if record.record_type == WarcRecordType.response:
                html_bytes = record.reader.read()
                text = run_extract_text_from_html_bytes(html_bytes)
                if text:
                    print(text[:500])  # Print first 500 characters as a preview

process_warc_file("/data/CC-MAIN-20210722174000-20210722194000-00111.warc.gz")

FileNotFoundError: [Errno 2] No such file or directory: '/data/CC-MAIN-20210722174000-20210722194000-00111.warc.gz'

# 2.3

In [None]:
import fasttext

def run_identify_language(text: str) -> tuple[str, float]:
    
    model = fasttext.load_model('lid.176.bin')

    # Predict the language of the text
    text = text.replace('\n', ' ') # Remove newlines
    predictions = model.predict(text, k=1) # k=1 means we only want the top prediction

    return (predictions[0][0], predictions[1][0]) # Return the language code and the confidence score

# 2.4

In [34]:
# mask emails function

def run_mask_emails(text: str) -> tuple[str, int]:
    import re

    # Define the regular expression pattern for email addresses
    email_pattern = r'[\w\.-]+@[\w\.-]+'

    # Find all email addresses in the text
    emails = re.findall(email_pattern, text)

    # Replace each email address with a placeholder
    masked_text = re.sub(email_pattern, '|||EMAIL_ADDRESS|||', text)

    return (masked_text, len(emails)) # returns the masked string and the number of emails found that were masked

In [35]:
test_string = "I emailed cadekane@hawaii.edu and returned from that guy at chaserp@gmail.com and he sent it to airmed10@yahoo.com"

test_output = run_mask_emails(test_string)

In [36]:
test_output

('I emailed |||EMAIL_ADDRESS||| and returned from that guy at |||EMAIL_ADDRESS||| and he sent it to |||EMAIL_ADDRESS|||',
 3)

In [40]:
# Write a function to mask out PHONE NUMBERS

def run_mask_phone_numbers(text: str) -> tuple[str, int]:
    import re

    # Define the regular expression pattern for phone numbers
    # phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
    phone_pattern = r'(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]\d{3}[\s.-]\d{4}'
    phone_pattern = r'(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}'

    # Find all phone numbers in the text
    phones = re.findall(phone_pattern, text)

    # Replace each phone number with a placeholder
    masked_text = re.sub(phone_pattern, '|||PHONE_NUMBER|||', text)

    return (masked_text, len(phones)) # returns the masked string and the number of phone numbers found that were masked

In [41]:
test_phone = """Hello there here are the 2831823829, 123-456-7890, (123) 456-7890, 123 456 7890, 123.456.7890, +91 (123) 456-7890 phone numbers"""

test_output = run_mask_phone_numbers(test_phone)

In [42]:
test_output

('Hello there here are the |||PHONE_NUMBER|||, |||PHONE_NUMBER|||, |||PHONE_NUMBER|||, |||PHONE_NUMBER|||, |||PHONE_NUMBER|||, |||PHONE_NUMBER||| phone numbers',
 6)

In [None]:
# Function to mask out IP addresses
# Only have to focus on IPv4 addresses

def run_mask_ips(text: str) -> tuple[str, int]:
    import re

    # Define the regular expression pattern for IP addresses
    ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' # for IPv4 addresses

    ips = re.findall(ip_pattern, text)

    masked_test = re.sub(ip_pattern, '|||IP_ADDRESS|||', text)

    return (masked_test, len(ips))

## 2.5 Harmful content

In [None]:
# NSFW classifier

def run_classify_nsfw(text: str) -> tuple[Any, float]:
    
    model = fasttext.load_model('jigsaw_fasttext_bigrams_nsfw_final.bin')

    # Predict the probability of the text being NSFW
    text = text.replace('\n', ' ') # Remove newlines

    predictions = model.predict(text, k=1) # k=1 means we only want the top prediction

    predicted_nsfw = predictions[0][0].replace('__label__', '') # Remove the '__label__' prefix
    confidence_score = predictions[1][0]

    return (predicted_nsfw, confidence_score) # Return the language code and the confidence score

In [None]:
# Toxic speech classifier

def run_classify_toxic_speech(text: str) -> tuple[Any, float]:
    
    model = fasttext.load_model('jigsaw_fasttext_bigrams_hatespeech_final.bin')

    # Predict the probability of the text being NSFW
    text = text.replace('\n', ' ') # Remove newlines

    predictions = model.predict(text, k=1) # k=1 means we only want the top prediction

    predicted_speech = predictions[0][0].replace('__label__', '') # Remove the '__label__' prefix
    confidence_score = predictions[1][0]

    return (predicted_speech, confidence_score) # Return the language code and the confidence score

## 2.6 Quality filter (golpher)

In [48]:
# Filter out documents that
# • Contain less than 50 or more than 100,000 words. 
# • Have a mean word length outside the range of 3 to 10 characters. 
# • Have more than 30% of lines ending with an ellipsis (”...”). 
# • Contain less than 80% of words with at least one alphabetic character.

# use the NLTK package nltk.word_tokenize for tokenizing text into words. Then, you can count them, calculate the mean word length, etc.

import nltk
nltk.download('punkt')  # Ensure tokenizer is available

def run_gopher_quality_filter(text: str) -> bool:

    # Tokenize the text into words
    words = nltk.word_tokenize(text)

    # Filter out documents that contain less than 50 or more than 100,000 words
    if len(words) < 50 or len(words) > 100000:
        return False

    # Calculate the mean word length and filter out if it is outside the range of 3 to 10 characters
    avg_word_length = sum(len(word) for word in words) / len(words) if len(words) > 0 else 0
    if avg_word_length < 3 or avg_word_length > 10:
        return False

    # Filter out documents that have more than 30% of lines ending with an ellipsis
    lines = text.split('\n')
    ellipsis_count = sum(line.endswith('...') for line in lines)
    ellipsis_ratio = ellipsis_count / len(lines) if len(lines) > 0 else 0
    if ellipsis_ratio > 0.3:
        return False

    # Filter out documents that contain less than 80% of words with at least one alphabetic character
    alpha_word_count = sum(any(c.isalpha() for c in word) for word in words)
    alpha_ratio = alpha_word_count / len(words) if len(words) > 0 else 0
    if alpha_ratio < 0.8:
        return False

    return True  # Passes all filters

[nltk_data] Downloading package punkt to /Users/cadekane/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [54]:
nltk.word_tokenize("Hello, world!", preserve_line=True)

['Hello', ',', 'world', '!']

In [49]:
example_text = """This is a test document with 100 words. It has a mean word length of 4.5 characters.
It has 3 lines, 1 of which ends with an ellipsis. It contains 90% of words with at least one alphabetic character..."""

run_gopher_quality_filter(example_text)

LookupError: 
**********************************************************************
  Resource [93mpunkt_tab[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('punkt_tab')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mtokenizers/punkt_tab/english/[0m

  Searched in:
    - '/Users/cadekane/nltk_data'
    - '/Users/cadekane/miniconda3/envs/cs336_data/nltk_data'
    - '/Users/cadekane/miniconda3/envs/cs336_data/share/nltk_data'
    - '/Users/cadekane/miniconda3/envs/cs336_data/lib/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/local/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/local/lib/nltk_data'
**********************************************************************


In [None]:
import nltk
nltk.download('punkt') # punkt should be punkt_… something

# 3 Deduplication

## 3.1 Exact line deduplication

In [None]:
# a function that takes a list of paths to input files and performs exact line deduplication on them.

def run_exact_line_deduplication(
    input_files: list[os.PathLike], output_directory: os.PathLike
):
    # Ensure the output directory exists and make it if it doesn't
    os.makedirs(output_directory, exist_ok=True)

    # First pass: Count frequency of each line, using the hash of the line as the key
    line_counts = {}
    for path in input_paths:
        with open(path, 'r') as file:
            for line in file:
                # Compute the hash of the line
                line_hash = hash(line) # hash() function computes a hash value of the input
                line_counts[line_hash] = line_counts.get(line_hash, 0) + 1
    
    # Second pass: write each file's unique lines (those with FREQUENCY 1) to the output directory
    for path in input_paths:
        # Preserve the file name in the output directory
        output_path = os.path.join(output_directory, os.path.basename(path))
        with open(path, 'r') as file, open(output_path, 'w') as output_file:
            for line in file:
                line_hash = hash(line)
                # Only write the line if it occurs exactly once in the whole corpus.
                if line_counts[line_hash] == 1:
                    output_file.write(line)

## 3.2 MinHash + LSH document deduplication

In [None]:
def run_minhash_deduplication(
    input_files: list[os.PathLike],
    num_hashes: int,
    num_bands: int,
    ngrams: int,
    jaccard_threshold: float,
    output_directory: os.PathLike,
):
    # 1. Compute minhash signatures for each document
    minhash_signatures = {}

    for path in input_files:
        with open(path, 'r') as file:
            text = file.read()
            minhash_signatures[path] = compute_minhash_signature(text, num_hashes, ngrams) # finished but unsure if works

    # 2. Use LSH with the provided # of bands to identify candidate duplicates
    candidate_pairs = lsh(minhash_signatures, num_bands) # unfinished

    # 3. Compute Jaccard similarity for candidate pairs and cluster pairs with common documents, such as pair AB and BC into ABC
    clusters = cluster_candidates(candidate_pairs, minhash_signatures, jaccard_threshold) # unfinished

    # 4. Remove a random document from each duplicate cluster and write the remaining documents to the output directory
    for cluster in clusters:
        # Choose a random document to discard
        discard_doc = random.choice(cluster)
        # Write the remaining documents to the output directory
        for doc in cluster:
            if doc != discard_doc:
                output_path = os.path.join(output_directory, os.path.basename(doc))
                shutil.copy(doc, output_path)

In [None]:
import nltk

def compute_minhash_signature(text: str, num_hashes: int, ngrams: int) -> list[int]:
    # Tokenize the text into n-grams
    n_grams = set(nltk.ngrams(text, n=ngrams)) # n number of n-grams? 

    # Initialize the signature with infinity for each hash function
    signature = [float('inf')] * num_hashes

    # Generate hash functions
    hash_funcs = generate_hash_functions(num_hashes) # k hash functions

    # Is this correct? I'm not sure if hash func for loop should be outside the n_gram for loop
    # Update the signature for each n-gram
    for n_gram in n_grams:
        for i, hash_func in enumerate(hash_funcs):
            hash_val = hash_func(n_gram)
            signature[i] = min(signature[i], hash_val)
    
    return signature # size of num_hashes

def generate_hash_functions(num_hashes: int):
    # Generate a list of hash functions using random seeds
    hash_funcs = []
    for seed in range(num_hashes):
        hash_funcs.append(generate_hash_function(seed))
    return hash_funcs

def generate_hash_function(seed: int):
    # Generate a hash function using the given seed
    def hash_func(n_gram):
        return hash(n_gram) ^ seed
    return hash_func

In [None]:
from collections import defaultdict
from itertools import combinations

def lsh(minhash_signatures: dict, num_bands: int) -> set:
    """
    Given a dictionary mapping document IDs (or file names) to their MinHash signatures,
    this function splits each signature into `num_bands` and buckets documents that share the
    same band signature. If two documents share a band, they are added as a candidate pair.

    Args:
        minhash_signatures (dict): A mapping from document ID to its MinHash signature (list of ints).
        num_bands (int): The number of bands to split each signature into.

    Returns:
        set: A set of tuples, each tuple containing a pair of document IDs that are candidate duplicates.
    """
    buckets = defaultdict(list)
    candidate_pairs = set()
    
    # Assuming all signatures are of equal length.
    sample_signature = next(iter(minhash_signatures.values()))
    sig_length = len(sample_signature)
    band_size = sig_length // num_bands
    # Note: This assumes sig_length is exactly divisible by num_bands.
    
    # Process each document and its signature.
    for doc_id, signature in minhash_signatures.items():
        for band in range(num_bands):
            start = band * band_size
            end = start + band_size
            # Create a tuple that represents the signature for this band.
            band_signature = tuple(signature[start:end])
            # Use a combination of band index and band_signature as the bucket key.
            buckets[(band, band_signature)].append(doc_id) # doc_id is really just the path
    
    # For each bucket, any documents sharing the same band are candidate duplicates.
    for docs in buckets.values():
        if len(docs) > 1:
            # Generate all unique pairs from the documents in this bucket.
            for pair in combinations(sorted(docs), 2): # combinations returns all unique pairs
                candidate_pairs.add(pair)
    
    return candidate_pairs

In [None]:
from collections import defaultdict
from itertools import combinations

def jaccard_similarity(set1: set, set2: set) -> float: # how exactly to compute jaccard similarity between two documents?
    """Computes the Jaccard similarity between two sets."""
    intersection = set1 & set2
    union = set1 | set2
    return len(intersection) / len(union) if union else 0.0

class UnionFind:
    def __init__(self):
        self.parent = {}

    def find(self, a):
        # Initialize the parent of 'a' to itself if not present.
        if a not in self.parent:
            self.parent[a] = a
        # Path compression.
        if self.parent[a] != a:
            self.parent[a] = self.find(self.parent[a])
        return self.parent[a]

    def union(self, a, b):
        rootA = self.find(a)
        rootB = self.find(b)
        if rootA != rootB:
            self.parent[rootB] = rootA

def cluster_duplicates(candidate_pairs: set, doc_shingles: dict, threshold: float) -> list:
    """
    Clusters duplicate documents based on candidate pairs that exceed the Jaccard similarity threshold.
    
    Args:
        candidate_pairs (set of tuple): Set of candidate duplicate pairs, each represented as a tuple of document IDs.
        doc_shingles (dict): A mapping from document ID to its set of tokens (or shingles).
        threshold (float): Jaccard similarity threshold above which two documents are considered duplicates.
        
    Returns:
        list: A list of clusters, where each cluster is a set of document IDs.
    """
    uf = UnionFind()
    
    # Process each candidate pair and compute Jaccard similarity.
    for doc_a, doc_b in candidate_pairs:
        sim = jaccard_similarity(doc_shingles[doc_a], doc_shingles[doc_b])
        if sim >= threshold:
            uf.union(doc_a, doc_b)
    
    # Group documents by their root representative to form clusters.
    clusters = defaultdict(set)
    for doc in doc_shingles:
        root = uf.find(doc)
        clusters[root].add(doc)
    
    # Return clusters as a list of sets.
    return list(clusters.values())


In [None]:

# Example usage:

# Assume we have three documents with token sets (shingles)
doc_shingles = {
    "doc1": {"the", "cat", "sat"},
    "doc2": {"the", "cat", "sat", "on", "the", "mat"},
    "doc3": {"dog", "barked"},
    "doc4": {"the", "cat", "sat"},
}

# Suppose our LSH step produced the following candidate pairs:
candidate_pairs = {("doc1", "doc2"), ("doc1", "doc4"), ("doc2", "doc3")}

# Set a Jaccard similarity threshold.
threshold = 0.5

clusters = cluster_duplicates(candidate_pairs, doc_shingles, threshold)
print("Clusters:", clusters)