In [7]:
import xml.etree.ElementTree as ET

def view_partial_xml_structure(xml_file, max_elements=20):
    """
    Function to view part of the XML structure without loading the entire file.
    
    Args:
        xml_file: Path to the XML file.
        max_elements: Maximum number of elements to display.
    """
    context = ET.iterparse(xml_file, events=('start',))
    _, root = next(context)  # Get the root element

    elements_displayed = 0
    for event, elem in context:
        # Print the tag of the element
        print(elem.tag)

        elements_displayed += 1
        if elements_displayed >= max_elements:
            break

    root.clear()  # Clear the root to release memory

# Example usage
xml_file_path = "data/viwiki-20240201-multistream.xml"
view_partial_xml_structure(xml_file_path, max_elements=100)

{http://www.mediawiki.org/xml/export-0.10/}siteinfo
{http://www.mediawiki.org/xml/export-0.10/}sitename
{http://www.mediawiki.org/xml/export-0.10/}dbname
{http://www.mediawiki.org/xml/export-0.10/}base
{http://www.mediawiki.org/xml/export-0.10/}generator
{http://www.mediawiki.org/xml/export-0.10/}case
{http://www.mediawiki.org/xml/export-0.10/}namespaces
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http://www.mediawiki.org/xml/export-0.10/}namespace
{http:/

In [1]:
import xml.etree.ElementTree as ET
import codecs
import csv
import time
import os
import re

PATH_WIKI_XML = 'data/'
FILENAME_WIKI = 'viwiki-20240201-multistream.xml'
FILENAME_ARTICLES = 'articles.csv'
FILENAME_REDIRECT = 'articles_redirect.csv'
FILENAME_TEMPLATE = 'articles_template.csv'
ENCODING = "utf-8"


In [2]:
def hms_string(sec_elapsed):
    h = int(sec_elapsed / (60 * 60))
    m = int((sec_elapsed % (60 * 60)) / 60)
    s = sec_elapsed % 60
    return "{}:{:>02}:{:>05.2f}".format(h, m, s)

def strip_tag_name(t):
    t = elem.tag
    idx = k = t.rfind("}")
    if idx != -1:
        t = t[idx + 1:]
    return t

In [3]:
pathWikiXML = os.path.join(PATH_WIKI_XML, FILENAME_WIKI)
pathArticles = os.path.join(PATH_WIKI_XML, FILENAME_ARTICLES)
pathArticlesRedirect = os.path.join(PATH_WIKI_XML, FILENAME_REDIRECT)
pathTemplateRedirect = os.path.join(PATH_WIKI_XML, FILENAME_TEMPLATE)

In [4]:
# Check if files exist, if not, create them
if not os.path.exists(pathArticles):
    with open(pathArticles, 'w', newline='', encoding=ENCODING) as f:
        csv.writer(f).writerow(['id', 'title', 'redirect', 'text'])

if not os.path.exists(pathArticlesRedirect):
    with open(pathArticlesRedirect, 'w', newline='', encoding=ENCODING) as f:
        csv.writer(f).writerow(['id', 'title', 'redirect','text'])

if not os.path.exists(pathTemplateRedirect):
    with open(pathTemplateRedirect, 'w', newline='', encoding=ENCODING) as f:
        csv.writer(f).writerow(['id', 'title','text'])

In [24]:
def extract_text(text):
    if not text:
        return None

    if '{|' in text or '{{' in text:
        inside_basket = False
        lines = []
        for line in text.split('\n'):
            if not inside_basket and ('{|' in line or '{{' in line):
                inside_basket = True
                continue
            if inside_basket and ('|}' in line or '}}' in line):
                inside_basket = False
                continue
            if line.strip().startswith('|'):
                continue
            if not inside_basket:
                lines.append(line)
        return ' '.join(lines).strip()
    else:
        # If no baskets, extract until first '==' or empty text
        return text.split('==', 1)[0].strip()

# def extract_text(text):
#     if text is None:
#         return 'NONE'

#     # Pattern to match {{Infobox ... }} or {|{{Infobox ... }} |}
#     infobox_pattern = r'\{\{\s*Infobox\s.*?\}\}|\{\|\s*\{\{\s*Infobox\s.*?\}\}\s*\|\}'
#     # Pattern to match {| and |}
#     table_pattern = r'\{\|\s*|\s*\|\}'
    
#     end_pattern = r'=='
    
#     cleaned_text = re.sub(infobox_pattern, '', text, flags=re.DOTALL)
#     cleaned_text = re.sub(table_pattern, '', cleaned_text, flags=re.DOTALL)
    
#     end_pos = re.search(end_pattern, cleaned_text, flags=re.IGNORECASE)
#     if end_pos:
#         cleaned_text = cleaned_text[:end_pos.start()]
    
#     # Remove quotes
#     cleaned_text = cleaned_text.replace('"', '')
    
#     return cleaned_text.strip()


In [20]:
def extract_text(text):
    if text is None:
        return 'NONE'
    # Pattern to match {{Infobox ... }} or {|{{Infobox ... }} |}
    infobox_pattern = r'\{\{\s*[^{}]*\}\}|\{\|\s*\{\{\s*[^{}]*\}\}\s*\|\}'
    infobox_pattern = r'\{\{\s*Infobox\s[^{}]*\}\}|\{\|\s*\{\{\s*Infobox\s[^{}]*\}\}\s*\|\}'
    # Pattern to match {| and |}
    table_pattern = r'\{\|\s*|\s*\|\}'
    
    end_pattern = r'=='
    
    curly_braces_pattern = r'\{\{'
    
    cleaned_text = re.sub(infobox_pattern, '', text, flags=re.DOTALL)
    cleaned_text = re.sub(table_pattern, '', cleaned_text, flags=re.DOTALL)
    
    
    # Find the position of the first occurrence of '==' or '{{'
    end_pos = re.search(end_pattern, cleaned_text)
    # end_pos1 = re.search(end_pattern, cleaned_text)
    # end_pos2 = re.search(curly_braces_pattern, cleaned_text)
    
    # If '==' or '{{' found, extract the text before it, otherwise return cleaned text
    
    if end_pos:
        cleaned_text = cleaned_text[:end_pos.start()]
    
    # if end_pos1 and end_pos2:
    #     # Extract text until the first occurrence of '==' or '{{'
    #     end_pos = min(end_pos1.start(), end_pos2.start())
    #     cleaned_text = cleaned_text[:end_pos]
    # elif end_pos1:
    #     cleaned_text = cleaned_text[:end_pos1.start()]
    # elif end_pos2:
    #     cleaned_text = cleaned_text[:end_pos2.start()]
    
    return cleaned_text.strip()

In [5]:
def extract_text(text):
    result = []
    lines = text.split('\n')
    lines_iter = iter(lines)  # Convert lines to an iterator
    inside_brackets = False  # to track if we are inside brackets
    skip_special = False  # to track if we are skipping special lines
    last_special_line = ''  # Keep track of the last line starting with special characters
    
    for line in lines_iter:  # Now iterate over the lines iterator
        line = line.strip()
        if line.startswith('{{Infobox') or line.startswith('{|') or line.startswith('{{Thông tin') or line.startswith('{{Chú thích') or line.startwith('{{Hộp thông tin'):
            inside_brackets = True
        elif line.endswith('}}') or line.endswith('|}'):
            inside_brackets = False
            skip_special = False  # Reset skip_special when exiting brackets
        elif not inside_brackets and line and not line.startswith('=='):
            result.append(line)
        elif inside_brackets and line.startswith(('|', '!', '#')):
            # Skip lines starting with special characters within brackets
            while line.startswith(('|', '!', '#')):
                line = next(lines_iter, None)  # Use lines_iter
                if line is None:
                    break
                line = line.strip()
            if not skip_special:
                last_special_line = line
                skip_special = True
        elif line and not line.startswith('=='):
            # Extract non-empty lines until '==' or '}}'
            
            # if skip_special and not line.startswith(('|', '!', '#')):
            #     result.append(last_special_line)
            #     skip_special = False
            
            result.append(line)
            if line.endswith('==') or line.endswith('}}'):
                break 
        elif line.startswith('=='):
            break
    return '\n'.join(result)

In [6]:
totalCount = 0
articleCount = 0
redirectCount = 0
templateCount = 0
title = None
start_time = time.time()

with codecs.open(pathArticles, "w", ENCODING) as articlesFH, \
        codecs.open(pathArticlesRedirect, "w", ENCODING) as redirectFH, \
        codecs.open(pathTemplateRedirect, "w", ENCODING) as templateFH:
    articlesWriter = csv.writer(articlesFH, quoting=csv.QUOTE_MINIMAL)
    redirectWriter = csv.writer(redirectFH, quoting=csv.QUOTE_MINIMAL)
    templateWriter = csv.writer(templateFH, quoting=csv.QUOTE_MINIMAL)
    
    articlesWriter.writerow(['id', 'title', 'redirect','text'])
    redirectWriter.writerow(['id', 'title', 'redirect'])
    templateWriter.writerow(['id', 'title'])

    for event, elem in ET.iterparse(pathWikiXML, events=('start', 'end')):
        tname = strip_tag_name(elem.tag)

        if event == 'start':
            if tname == 'page':
                title = ''
                id = -1
                redirect = ''
                text = ''
                inrevision = False
                ns = 0
            elif tname == 'revision':
                # Do not pick up on revision id's
                inrevision = True
            elif tname == 'title':
                title = elem.text

            elif tname == 'id' and not inrevision and elem.text is not None:
                id = int(elem.text)
            elif tname == 'redirect':
                redirect = elem.get('title', '')
            elif tname == 'ns' and elem.text is not None:
                ns = int(elem.text)
            elif tname == 'text' and inrevision:
                # text = elem.text if elem.text is not None else None
                text = elem.text
                if text is not None:
                    text = text.strip()
                    text = extract_text(text)

        elif tname == 'page':
            totalCount += 1

            if ns == 10:
                templateCount += 1
                templateWriter.writerow([id, title])
            elif len(redirect) > 0:
                articleCount += 1
                redirectWriter.writerow([id, title, redirect])                
            else:
                redirectCount += 1
                articlesWriter.writerow([id, title, redirect, text])

            if totalCount > 1 and (totalCount % 100000) == 0:
                print("{:,}".format(totalCount))

        elem.clear()

time_took = time.time() - start_time
print(f"Total runtime: {hms_string(time_took)}")

100,000
200,000
300,000
400,000
500,000
600,000
700,000
800,000
900,000
1,000,000
1,100,000
1,200,000
1,300,000
1,400,000
1,500,000
1,600,000
1,700,000
1,800,000
1,900,000
2,000,000
2,100,000
Total runtime: 0:06:50.82


# Crawl from WWIKIPEDIA DIRECTLY

In [None]:
import requests
from bs4 import BeautifulSoup
import csv

def get_soup(id):
    url = f'https://vi.wikipedia.org/wiki?curid={id}'
    response = requests.get(url)
    if response.status_code != 200:
        print(f"Failed to retrieve data for ID {id}. Status code: {response.status_code}")
        return None
    soup = BeautifulSoup(response.text, 'html.parser')
    return soup

def get_paragraphs(soup):
    meta_toc = soup.find('meta', {'property': 'mw:PageProp/toc'})
    if meta_toc:
        paragraphs = meta_toc.find_all_previous('p')
    else:
        h2_toc = soup.find('h2')
        paragraphs = h2_toc.find_all_previous('p')
    paragraphs.reverse()
    
    all_text = '\n\n'.join([paragraph.text.strip() for paragraph in paragraphs])
    return all_text

def check_person(soup, title):
    infobox_table = soup.find('table', {'class': 'infobox'})
    if infobox_table is None:
        return False
    return True

def main(start_id, end_id):
    data = []
    break_title = ['Wikipedia', 'MediaWiki', 'Bản mẫu', 'Cổng thông tin', 'Thảo luận', 'Thành viên', 'Tập tin', 
                   'Trợ giúp', 'Thể loại', 'Cổng thông tin', 'TimedText', 'TimedText talk', 'Mô đun', 'Tiện ích', 'Định nghĩa tiện ích']

    with open('fix_text.txt', 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip().split(':')
            data.append(line)

    index = []
    title = []
    text_list = []
    
    for i in data:
        index.append(i[0])
        title.append(i[1])
        
    # Write to CSV directly
    csv_file = 'extracted_data_2.csv'
    with open(csv_file, mode='w', encoding='utf-8-sig', newline='') as file:
        fieldnames = ['id', 'title', 'text']
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()

        start_index = index.index(start_id)
        end_index = index.index(end_id)

        for i in range(start_index, end_index + 1):  # +1 to include the end_id
            current_title = title[i]

            if "Thảo luận" in current_title or current_title in break_title:
                print(f"Skipping {current_title}...")
                continue

            try:
                soup = get_soup(index[i])
                if soup:
                    if check_person(soup, current_title):
                        text = get_paragraphs(soup)
                        if text:
                            writer.writerow({'id': index[i], 'title': current_title, 'text': text})
                            print(f"Extracted and wrote data for {current_title}")
                        else:
                            print(f"No content extracted for {current_title}")
                    else:
                        print(f"No infobox found for {current_title}")
                else:
                    print(f"Failed to retrieve data for {current_title}")
            except AttributeError as e:
                print(f"AttributeError: Skipping ID {index[i]}")
                continue

    print("CSV file saved.")

# Start and end extracting from specific IDs
start_id = '844208'
end_id = '844210'
main(start_id, end_id)

extract id missing text in csv

In [7]:
import csv

def extract_ids_with_empty_text(csv_file, output_file):
    with open(csv_file, mode='r') as csvfile, open(output_file, mode='w') as fix_text_file:
        csv_reader = csv.DictReader(csvfile)
        for row in csv_reader:
            if not row['text']:  # Check if 'text' field is empty
                # Write 'id' and 'title' with a separator ':' to fix_text.txt
                fix_text_file.write(row['id'] + '\n')

    print("Extraction complete.")

# Usage
csv_file = 'D:/scap_wiki/data/articles_with_image_test_3.csv'
output_file = 'tempo.txt'
extract_ids_with_empty_text(csv_file, output_file)

Extraction complete.


In [10]:
import re

with open('temptemp.txt', 'r') as file:
    text = file.read()
# Use regular expression to find all numbers after "ID"
numbers = re.findall(r'ID (\d+)', text)

# Write the extracted numbers to a text file
with open('extracted_numbers.txt', 'w') as file:
    for number in numbers:
        file.write(number + '\n')

print("Numbers extracted and saved to extracted_numbers.txt")

Numbers extracted and saved to extracted_numbers.txt


# Clean data

remove fully non text containg articles

In [3]:
import pandas as pd

df = pd.read_csv('data/articles_with_image_test_7.csv')
# Read IDs from a text file
with open('compare.txt', 'r') as file:
    ids_to_remove = [int(line.strip()) for line in file]
# Filter out rows with specified IDs
filtered_df = df[~df['id'].isin(ids_to_remove)]
# Save the filtered DataFrame to a new CSV file
filtered_df.to_csv('data/articles_with_image_test_9.csv', index=False)
print("Filtered data saved to 'articles_with_image_test_8.csv'")


Filtered data saved to 'articles_with_image_test_8.csv'


In [6]:
#check length of id
import csv
import pandas as pd

df = pd.read_csv('data/articles_with_image_test_7.csv')

# Count the number of rows in the DataFrame object using the built-in len() function
num_lines = len(df)
print(num_lines)

374763


remove {{ [[ ''' ]]}}

In [1]:
import csv

# List of characters to remove
chars_to_remove = ["{{", "}}", "[[", "]]", "'''", "'''''"]

# Function to remove characters
def remove_chars(text):
    for char in chars_to_remove:
        text = text.replace(char, '')
    return text

# Input and Output file paths
input_file = 'ok_di.csv'
output_file = 'convert_file.csv'

# Read CSV file, remove characters, and write to new CSV file
with open(input_file, mode='r') as infile, open(output_file, mode='w', newline='') as outfile:
    reader = csv.DictReader(infile)
    fieldnames = reader.fieldnames


    # Write header
    writer = csv.DictWriter(outfile, fieldnames=fieldnames)
    writer.writeheader()

    for row in reader:
        # Assuming the text column is 'Text'
        text_column = row['text']
        cleaned_text = remove_chars(text_column)
        
        # Update the row with cleaned text
        row['text'] = cleaned_text

        # Write the row to the output CSV file
        writer.writerow(row)

print("Characters removed and saved to", output_file)


Characters removed and saved to convert_file.csv


remove the tagging of comment: [1],[2],[3],...

remove unknown character

In [2]:
import pandas as pd
import re

def clean_text(text):
    if isinstance(text, str):  # Check if 'text' is a string
        # Remove [1], [2], [3], etc.
        text = re.sub(r'\[\d+\]', '', text)
        # Replace U+00A0 with space
        text = text.replace('\u00A0', ' ')
    return text

# Input and Output file paths
input_file = 'new3.csv'
output_file = 'new4.csv'

# Chunk size for reading large CSV file
chunk_size = 10_000  # Adjust as needed

chunk_iterator = pd.read_csv(input_file, chunksize=chunk_size, encoding='utf-8')

# Loop through the chunks and save to the output file
for i, chunk in enumerate(chunk_iterator):
    # Clean the 'text' column
    chunk['text'] = chunk['text'].apply(clean_text)
    
    # Append to output file (create header for the first chunk)
    if i == 0:
        chunk.to_csv(output_file, mode='w', index=False)
    else:
        chunk.to_csv(output_file, mode='a', index=False, header=False)

print("Cleaning complete. Cleaned data saved to:", output_file)

Cleaning complete. Cleaned data saved to: new4.csv


remove dupicate ids ans sort

In [1]:

import pandas as pd

# Read the CSV file into a DataFrame
df = pd.read_csv('data/articles_with_image.csv')

# Remove duplicates based on 'id' column
df = df.drop_duplicates(subset=['id'], keep='first')

# Sort the DataFrame by 'id'
df_sorted = df.sort_values('id')

# If you want to overwrite the original file with the sorted and cleaned data:
# df_sorted.to_csv('article.csv', index=False)

# If you want to save to a new file:
df_sorted.to_csv('data/articles_with_image_test_1.csv', index=False)

print("Duplicates removed and data sorted by 'id'.")

Duplicates removed and data sorted by 'id'.


# from image reduce the csv file

In [2]:
import cv2
import os

# Load the pre-trained face detection model
face_model = cv2.dnn.readNetFromCaffe(
    "D:/scap_wiki/deploy.prototxt.txt",
    "D:/scap_wiki/res10_300x300_ssd_iter_140000.caffemodel"
)

# Function to check if an image contains a human face
def is_human_face(image_path):
    image = cv2.imread(image_path)
    (h, w) = image.shape[:2]

    # Preprocess the image for face detection
    blob = cv2.dnn.blobFromImage(cv2.resize(image, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))

    # Pass the blob through the network to detect faces
    face_model.setInput(blob)
    detections = face_model.forward()

    # Check if any faces were detected
    for i in range(0, detections.shape[2]):
        confidence = detections[0, 0, i, 2]

        # Minimum confidence threshold for considering a face detection
        if confidence > 0.5:
            return True

    return False

# Path to the folder containing JPG images
folder_path = "data/face_wiki"

# Create a folder to save images with human faces
output_folder = "data/face_wiki_2"
os.makedirs(output_folder, exist_ok=True)

# Iterate through images in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".jpg"):
        image_path = os.path.join(folder_path, filename)
        
        # Check if the image contains a human face
        if is_human_face(image_path):
            # If yes, save it to the output folder
            output_path = os.path.join(output_folder, filename)
            cv2.imwrite(output_path, cv2.imread(image_path))


In [2]:
import os

# Directory containing the images
directory = 'data/face_wiki_2'

# List all files in the directory
files = os.listdir(directory)

# Filter files with .jpg extension
jpg_files = [file for file in files if file.endswith('.jpg')]

# Remove the .jpg extension from the file names
image_names = [file.split('.')[0] for file in jpg_files]

# Name of the text file to save the list
txt_file = 'image_list.txt'

# Write the list of image names to a text file
with open(txt_file, 'w') as f:
    for name in image_names:
        f.write(name + '\n')

print(f"List of image names saved to {txt_file}")


List of image names saved to image_list.txt


In [3]:
import csv

# Function to read data from CSV file and return it as a dictionary
def read_csv(filename):
    data = {}
    with open(filename, 'r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            data[row['id']] = {'title': row['title'], 'text': row['text']}
    return data

# Function to match image names with data and write to a new CSV file
def match_and_write(image_list_file, data_csv_file, output_csv_file):
    # Read data from CSV file
    data = read_csv(data_csv_file)
    
    # Open text file containing image names
    with open(image_list_file, 'r') as file:
        image_names = file.readlines()
        image_names = [name.strip() for name in image_names]  # Remove newline characters
    
    # Create a new CSV file and write header
    with open(output_csv_file, 'w', newline='') as csvfile:
        fieldnames = ['id', 'title', 'text']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        writer.writeheader()
        
        # Match image names with data and write to CSV
        for image_name in image_names:
            image_id = image_name.split('.')[0]  # Assuming image names are in the format id.jpg
            if image_id in data:
                writer.writerow({'id': image_id, 'title': data[image_id]['title'], 'text': data[image_id]['text']})

# Example usage:
image_list_file = 'image_list.txt'
data_csv_file = 'data/articles.csv'
output_csv_file = 'data/matched_articles.csv'

match_and_write(image_list_file, data_csv_file, output_csv_file)


# convert to parquet datset

In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os

In [None]:
# Paths to CSV and image folders
data_folder = '/scap_wiki/data'
working_folder = '/scap_wiki/working'
images_folder = '/scap_wiki/data/face_wiki_2'

# Read the CSV file
csv_path = os.path.join(data_folder, 'matched_articles.csv')
df = pd.read_csv(csv_path)

In [None]:
# Function to read and encode image to bytes
def read_and_encode_image(image_path):
    with open(image_path, 'rb') as f:
        image_bytes = f.read()
    return {'bytes': image_bytes}  # Return as dictionary with 'bytes' key

# Batch size for processing images
batch_size = 100
num_images = len(df)

# Path to the Parquet file
parquet_path = os.path.join(working_folder, 'face_wiki.parquet')

# Define a custom PyArrow type for 'image' column
custom_type = pa.struct([
    pa.field("bytes", pa.binary())
])

# Initialize a Parquet schema with 'image', 'title', and 'text' columns
schema = pa.schema([
    ('image', custom_type),
    ('title', pa.string()),
    ('text', pa.string())
])

# Function to convert image data to PyArrow struct
def image_to_struct(image_path):
    image_bytes = read_and_encode_image(image_path)['bytes']
    return pa.array([(image_bytes,)], type=custom_type)

# Function to save batch to Parquet
def save_batch_to_parquet(batch_df, parquet_writer):
    image_data = [image_to_struct(os.path.join(images_folder, f'{x}.jpg')) for x in batch_df['id']]
    batch_df['image'] = pa.chunked_array(image_data)
    table = pa.Table.from_pandas(batch_df[['image', 'title', 'text']], schema=schema)
    parquet_writer.write_table(table)

# Create a Parquet writer
with pq.ParquetWriter(parquet_path, schema) as writer:
    # Iterate through batches of images
    for i in range(0, num_images, batch_size):
        if i + batch_size < num_images:
            # If not the last batch
            batch_df = df.iloc[i:i+batch_size].copy()
        else:
            # If last batch, adjust the batch size
            batch_df = df.iloc[i:].copy()
        save_batch_to_parquet(batch_df, writer)

print("Parquet file saved successfully.")