In [1]:
from utils.commit_utils import *
import os
from multiprocessing import Pool
import subprocess
import mwxml
import numpy as np
from duckduckgo_search import ddg
import json
import time
from tqdm import tqdm

In [2]:
DOCS_DIR = 'data/data_uploading/documents'
PAGES_DIR = 'data/data_uploading/revision'
DUMPS_DIR = 'data/data_uploading/dumps'

if not os.path.exists(DOCS_DIR):
    os.makedirs(DOCS_DIR)
    
if not os.path.exists(PAGES_DIR):
    os.makedirs(PAGES_DIR)

if not os.path.exists(DUMPS_DIR):
    os.makedirs(DUMPS_DIR)

In [3]:
down_docs = os.listdir(DOCS_DIR)
down_page = os.listdir(PAGES_DIR)
len(down_docs), len(down_page)

(4, 4)

In [4]:
DUMPS = [
    # "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history2.xml-p60834p62692.7z",
    # "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history2.xml-p64379p66077.7z",
    # "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history2.xml-p66078p67792.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history2.xml-p67793p69852.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history2.xml-p69853p72172.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history25.xml-p60275287p60527634.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history25.xml-p60527635p60799673.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history25.xml-p60799674p61049885.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history25.xml-p61049886p61308529.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history25.xml-p61308530p61507683.7z",
    "https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history25.xml-p61507684p61774102.7z"
]

## Docs downloading

In [5]:
import datetime
import time
start_time = datetime.datetime.now()
print(start_time)

2023-01-14 14:48:52.275307


In [6]:
def link2names(link):
    dump_name = link.split('-')[-1].split('.')[0]
    dump_7z_name = link.split('20221001/')[1]
    dump_unzip_name = dump_7z_name.split('.7z')[0]
    return dump_name, dump_7z_name, dump_unzip_name

def download_dump(dump_link):
    download_command = f'wget {dump_link}'
    download_result = subprocess.call(download_command, shell=True) 
    return download_result

def unzip_dump(dump_7z_name):
    unzip_command = f"7za x {dump_7z_name}"
    unzip_result = subprocess.call(unzip_command, shell=True)
    return unzip_result

def delete_7z_dump(dump_7z_name):
    delete_7z_command = f'rm -f {dump_7z_name}'
    delete_7z_result = subprocess.call(delete_7z_command, shell=True)
    return delete_7z_result

def filter_duplicate8vandal_revisions(page):
    num_revisions, revisions = 0, []
    
    for rev in page:
        revisions.append(rev)
            
    if len(revisions) < 2: 
        return [], 0
    
    good_revisions = []
    last_added = len(revisions)
    for cur_rev_id in range(len(revisions) - 1, 0, -1):
        if cur_rev_id >= last_added:
            continue
        cur_rev = revisions[cur_rev_id]
        if cur_rev.text:
            cur_rev_text = cur_rev.text # clean_text(cur_rev.text)
        else:
            cur_rev_text = ''

        for new_rev_id in range(cur_rev_id, -1, -1):
            new_rev = revisions[new_rev_id]

            if new_rev.text:
                new_rev_text = new_rev.text # clean_text(new_rev.text)
            else:
                new_rev_text = ''

            if cur_rev_text == new_rev_text:
                last_added = new_rev_id

        add_rev = revisions[last_added]
        user = ''
        if add_rev.user:
            if add_rev.user.text:
                user = add_rev.user.text.lower()
        revision_dict = {
            'text': cur_rev_text,
            'comment': add_rev.comment,
            'id': add_rev.id,
            'page_name': page.title,
            'user_name': user
        }
        good_revisions.append(revision_dict)
        num_revisions += 1
    good_revisions = good_revisions[::-1]
    return good_revisions, num_revisions

In [None]:
dump_link = 'https://dumps.wikimedia.org/enwiki/20221001/enwiki-20221001-pages-meta-history2.xml-p67793p69852.7z'
dump_name, dump_7z_name, dump_unzip_name = link2names(dump_link)
dump = mwxml.Dump.from_file(open(f'{dump_unzip_name}', encoding="utf-8"))
counter, total_pair_rev_counter, total_revisions_counter = 0, 0, 0
if not os.path.exists(f"{DOCS_DIR}/{dump_name}"):
    os.makedirs(f"{DOCS_DIR}/{dump_name}")
if not os.path.exists(f"{PAGES_DIR}/{dump_name}"):
    os.makedirs(f"{PAGES_DIR}/{dump_name}")

pbar = tqdm(position=0, leave=True)
EMPTY_DOCS = 0
for page in dump:
    if not filter_page(page.title):
        continue

    good_revisions, num_revisions = filter_duplicate8vandal_revisions(page)
    total_revisions_counter += num_revisions

    for prev_rev, new_rev in zip(good_revisions[:], good_revisions[1:]):
        total_pair_rev_counter += 1
        pbar.update(1)

        comment = new_rev['comment']
        if not comment or len(comment.strip()) < MIN_COMMENT_LEN:
            continue
        if not filter_comment(comment, new_rev['user_name']):
            continue

        if np.abs(len(new_rev['text']) - len(prev_rev['text'])) < MIN_CHANGE_SYMB_LEN:
            continue

        if np.abs(len(new_rev['text']) - len(prev_rev['text'])) > MAX_CHANGE_SYMB_LEN:
            continue

        if np.abs(len(new_rev['text'])) > MAX_PAGE_SYMB_LEN:
            continue

        prev_section_titles, prev_section_texts = extract_important_sections(clean_text(prev_rev['text']))
        new_section_titles, new_section_texts = extract_important_sections(clean_text(new_rev['text']))

        r_idx, r = get_diff_num(prev_section_texts, new_section_texts)
        if len(r) != 1 or 'newline' not in r[0]:
            continue

        section_name = ''
        try:
            section_name_t = new_section_titles[r_idx[0]]
            if section_name_t:
                section_name = section_name_t
        except:
            pass

        # print()
        ts = new_rev['page_name'] + ' ' + section_name
        all_changes_r, all_changes_sents_r = get_changes(r)
        if len(all_changes_sents_r) == 0 or len(all_changes_sents_r[0]) == 0:
            continue

        final_page_path = f"{PAGES_DIR}/{dump_name}/{counter}.json"
        final_docs_path = f"{DOCS_DIR}/{dump_name}/{counter}.txt"
        if os.path.exists(final_page_path) and os.path.exists(final_docs_path):
            counter += 1
            continue

        downloaded_docs = []
        search_queries_list = []
        q2docs_num = []
        
        for ch_text_idx, ch_text in enumerate(all_changes_r[0][0][:10]):
            fq = ts.strip() + ' ' + ch_text
            fq = fq[:300]
            # print(f'Final search query {ch_text_idx}:\t', fq)
            search_queries_list.append(fq)
            search_result = ddg(fq)
            counter_found_docs = 0
            if search_result is not None:
                for search_result_obj in search_result:
                    downloaded_docs.append(search_result_obj['body'])
                    counter_found_docs += 1
            q2docs_num.append(counter_found_docs)
                     
        for ch_text_idx, ch_text in enumerate(all_changes_sents_r[0][:5]):
            fq = ts.strip() + ' ' + ch_text
            fq = fq[:300]
            # print(f'Final search query {ch_text_idx}:\t', fq)
            search_queries_list.append(fq)
            search_result = ddg(fq)
            counter_found_docs = 0
            if search_result is not None:
                for search_result_obj in search_result:
                    downloaded_docs.append(search_result_obj['body'])
                    counter_found_docs += 1
            q2docs_num.append(counter_found_docs)

        json_obj = {
            "old_text": r[0]['line'],
            "new_text": r[0]['newline'],
            "title": new_rev['page_name'],
            "comment": comment,
            "section_name": section_name,
            "search_queries": search_queries_list,
            "counter_found_docs": q2docs_num,
            "change_texts": all_changes_r
        }

        with open(final_page_path, 'w', encoding='utf-8') as f:
            json.dump(json_obj, f)

        with open(final_docs_path, 'w', encoding='utf-8') as f:
            for doc_text_idx, doc_text in enumerate(downloaded_docs):
                f.write(doc_text)
                if doc_text_idx != len(downloaded_docs) - 1:
                    f.write("\n\nDOC_DELIMITER_TOKEN\n\n")

        # counter += 1
        # if len(downloaded_docs) < 2:
        #    EMPTY_DOCS += 1
        #    if EMPTY_DOCS > 40:
        #        time.sleep(120)
        # else:
        #    EMPTY_DOCS = 0

delete_dump_command = f'rm -f {dump_unzip_name}'
delete_dump_result = subprocess.call(delete_dump_command, shell=True) 
if delete_dump_result != 0:
    print(f"ERROR_DELETE_DUMP_{dump_unzip_name}")
print("SUCCESS", dump_link)

112553it [6:58:48,  2.95s/it]

In [None]:
for dump_link in DUMPS:
    dump_name, dump_7z_name, dump_unzip_name = link2names(dump_link)
    
    final_commits = f"{PAGES_DIR}/{dump_name}"
    final_docs = f"{DOCS_DIR}/{dump_name}"
    if os.path.exists(final_commits) and os.path.exists(final_docs):
        print("EXISTS", dump_link, '\t\t', dump_name)
        continue

    download_result = download_dump(dump_link)
    if download_result != 0:
        print(f"ERROR_DOWNLOAD_{dump_link}")
        continue
              
    unzip_result = unzip_dump(dump_7z_name)
    if unzip_result != 0:
        print(f"ERROR_UNZIP_{dump_7z_name}")
        continue

    delete_7z_result = delete_7z_dump(dump_7z_name)
    if delete_7z_result != 0:
        print(f"ERROR_DELETE_{dump_7z_name}")
        continue
    
    dump = mwxml.Dump.from_file(open(f'{dump_unzip_name}', encoding="utf-8"))
    counter, total_pair_rev_counter, total_revisions_counter = 0, 0, 0
    if not os.path.exists(f"{DOCS_DIR}/{dump_name}"):
        os.makedirs(f"{DOCS_DIR}/{dump_name}")
    if not os.path.exists(f"{PAGES_DIR}/{dump_name}"):
        os.makedirs(f"{PAGES_DIR}/{dump_name}")
    
    pbar = tqdm(position=0, leave=True)
    EMPTY_DOCS = 0
    for page in dump:
        if not filter_page(page.title):
            continue

        good_revisions, num_revisions = filter_duplicate8vandal_revisions(page)
        total_revisions_counter += num_revisions

        for prev_rev, new_rev in zip(good_revisions[:], good_revisions[1:]):
            total_pair_rev_counter += 1
            pbar.update(1)
            
            comment = new_rev['comment']
            if not comment or len(comment.strip()) < MIN_COMMENT_LEN:
                continue
            if not filter_comment(comment, new_rev['user_name']):
                continue
                    
            if np.abs(len(new_rev['text']) - len(prev_rev['text'])) < MIN_CHANGE_SYMB_LEN:
                continue
            
            if np.abs(len(new_rev['text']) - len(prev_rev['text'])) > MAX_CHANGE_SYMB_LEN:
                continue
                            
            if np.abs(len(new_rev['text'])) > MAX_PAGE_SYMB_LEN:
                continue
                                
            prev_section_titles, prev_section_texts = extract_important_sections(clean_text(prev_rev['text']))
            new_section_titles, new_section_texts = extract_important_sections(clean_text(new_rev['text']))

            r_idx, r = get_diff_num(prev_section_texts, new_section_texts)
            if len(r) != 1 or 'newline' not in r[0]:
                continue
            
            section_name = ''
            try:
                section_name_t = new_section_titles[r_idx[0]]
                if section_name_t:
                    section_name = section_name_t
            except:
                pass
            
            ts = new_rev['page_name'] + ' ' + section_name
            all_changes_r, all_changes_sents_r = get_changes(r)
            if len(all_changes_sents_r) == 0 or len(all_changes_sents_r[0]) == 0:
                continue
                                               
            final_page_path = f"{PAGES_DIR}/{dump_name}/{counter}.json"
            final_docs_path = f"{DOCS_DIR}/{dump_name}/{counter}.txt"
            if os.path.exists(final_page_path) and os.path.exists(final_docs_path):
                counter += 1
                continue

            downloaded_docs = []
            search_queries_list = []
            q2docs_num = []
            '''
            for ch_text_idx, ch_text in enumerate(all_changes_r[0][0]):
                fq = ts.strip() + ' ' + ch_text
                # print(f'Final search query {ch_text_idx}:\t', fq)
                search_queries_list.append(fq)
                search_result = ddg(fq)
                counter_found_docs = 0
                if search_result is not None:
                    for search_result_obj in search_result:
                        downloaded_docs.append(search_result_obj['body'])
                        counter_found_docs += 1
                q2docs_num.append(counter_found_docs)
            '''               
            for ch_text_idx, ch_text in enumerate(all_changes_sents_r[0]):
                ch_text = ' '.join(ch_text)
                fq = ts.strip() + ' ' + ch_text
                # print(f'Final search query {ch_text_idx}:\t', fq)
                search_queries_list.append(fq)
                search_result = ddg(fq)
                counter_found_docs = 0
                if search_result is not None:
                    for search_result_obj in search_result:
                        downloaded_docs.append(search_result_obj['body'])
                        counter_found_docs += 1
                q2docs_num.append(counter_found_docs)

            json_obj = {
                "old_text": r[0]['line'],
                "new_text": r[0]['newline'],
                "title": new_rev['page_name'],
                "comment": comment,
                "section_name": section_name,
                "search_queries": search_queries_list,
                "counter_found_docs": q2docs_num,
                "change_texts": all_changes_r
            }

            with open(final_page_path, 'w', encoding='utf-8') as f:
                json.dump(json_obj, f)

            with open(final_docs_path, 'w', encoding='utf-8') as f:
                for doc_text_idx, doc_text in enumerate(downloaded_docs):
                    f.write(doc_text)
                    if doc_text_idx != len(downloaded_docs) - 1:
                        f.write("\n\nDOC_DELIMITER_TOKEN\n\n")

            # counter += 1
            # if len(downloaded_docs) < 2:
            #    EMPTY_DOCS += 1
            #    if EMPTY_DOCS > 40:
            #        time.sleep(120)
            # else:
            #    EMPTY_DOCS = 0
        
    delete_dump_command = f'rm -f {dump_unzip_name}'
    delete_dump_result = subprocess.call(delete_dump_command, shell=True) 
    if delete_dump_result != 0:
        print(f"ERROR_DELETE_DUMP_{dump_unzip_name}")
    print("SUCCESS", dump_link)

In [None]:
dump_name, dump_link