In [1]:
from concurrent.futures import ProcessPoolExecutor
from itertools import repeat
import math
from pathlib import Path
import re
from statistics import mean
import time
from typing import Iterator

from blist import sortedlist, blist
import numpy as np

from config import get_sessionmaker
from models import Page, PageLink, PageTalk, PageQuality

In [2]:
DATABASE_URI = "postgresql://postgres:postgres@localhost:5432/complete_wikipedia"
Session = get_sessionmaker(db_uri=DATABASE_URI)
s = Session()

In [3]:
results = s.query(
         Page, PageTalk, PageQuality,
    ).filter(
         PageQuality.page_id == PageTalk.page_id,
    ).filter(
         PageTalk.page_title == Page.page_title,
    ).all()

(<Page: (
	page_id=21345243,
	page_title='Marquis_de_Sade',
	)>, <PageTalk: (
	page_id=21345106,
	page_title='Marquis_de_Sade',
	)>, <PageQuality: (
	page_id=21345106,
	page_quality='B',
	)>)

In [15]:
for result in results:
    print(result)
    break

(<Page: (
	page_id=47419,
	page_title='Convenience_store',
	)>, <PageTalk: (
	page_id=47455,
	page_title='Convenience_store',
	)>, <PageQuality: (
	page_id=47455,
	page_quality='C',
	)>)


In [20]:
result_ids = []
for result in results:
    result_ids.append(result[0].page_id)
result_ids = np.array(result_ids)
np.save("data/high_quality_ids", result_ids)

In [3]:
result_ids = np.load("data/high_quality_ids.npy")
sl = sortedlist(result_ids)
del result_ids

In [5]:
%%timeit
47419 in sl

8.09 µs ± 78.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [6]:
%%timeit
47419 in result_ids

138 µs ± 102 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [30]:
s.query(
         Page, PageTalk, PageQuality,
    ).filter(
         PageQuality.page_id == PageTalk.page_id,
    ).filter(
         PageTalk.page_title == Page.page_title,
    ).filter(
        Page.page_id == 25202,
    ).first()

(<Page: (
	page_id=25202,
	page_title='Quantum_mechanics',
	)>, <PageTalk: (
	page_id=19594240,
	page_title='Quantum_mechanics',
	)>, <PageQuality: (
	page_id=19594240,
	page_quality='GA',
	)>)

In [24]:
s.query(
         Page, PageTalk, PageQuality,
    ).filter(
         PageQuality.page_id == PageTalk.page_id,
    ).filter(
         PageTalk.page_title == Page.page_title,
    ).filter(
        Page.title_insensitive=="QuAnTum_mEchAnIcs",
    ).first()

(<Page: (
	page_id=25202,
	page_title='Quantum_mechanics',
	)>, <PageTalk: (
	page_id=19594240,
	page_title='Quantum_mechanics',
	)>, <PageQuality: (
	page_id=19594240,
	page_quality='GA',
	)>)

- 53008 lines
- 1,420,256,990 links (all ns)
- 626,327,844 links (ns 0 to all)
- 561,899,045 links (ns 0 to ns 0)
- at 6,301,566 ns 0 pages, 99.4 links/page (to all pages) & 89.1 links/page (to other ns 0 pages)


In [4]:
line_start = re.compile(b"INSERT INTO (.*?) VALUES ")
item_ex = re.compile(b"\((.*?),(.*?),(.*?),(.*?)\)")

In [5]:
def count_lines(line):
    return 1

def file_generator(file):
    for line in file:
        if line_start.search(line) is not None:
            yield line

def get_line_iterator(line) -> Iterator:
    if line_start.search(line) is not None:
        item_iter = item_ex.finditer(line)
        return item_iter
    else:
        return None


def get_items_in_line(line_iter):
    # sess_maker = get_sessionmaker(db_uri=DATABASE_URI)
    # sess = sess_maker()
    item_count = 0
    skip_count = 0
    links = []
    for entry in line_iter:
        if entry.group(4) == b"0" and entry.group(2) == b"0":
            pl_from = int(entry.group(1))
            pl_title = entry.group(3)
            if pl_from not in sl:
                skip_count += 1
                continue
            links.append([pl_from, pl_title])
            item_count += 1
    return [item_count, skip_count, links]


def process_file(path):
    with open(path, "rb") as f:
        line_iter = get_line_iterator(f.read())
        if line_iter is None:
            return None
        items = get_items_in_line(line_iter)
        return items
    

In [6]:

chunk_directory = Path("data/chunks")
item_counts = blist()
skip_counts = blist()
items = blist()
lookup = blist()
chunk_size = 7500
with ProcessPoolExecutor() as executor:
    for i, result in enumerate(executor.map(process_file, chunk_directory.iterdir())):
        if result is None:
            print("Error in regex for lines")
        item_counts.append(result[0])
        skip_counts.append(result[1])
        for item in result[2]:
            items.append(item)
        del result
        if i % chunk_size == 0:
            print(f"Counted {i} items:")
            avg_items = mean(item_counts[-chunk_size:])
            avg_skips = mean(skip_counts[-chunk_size:])
            print(f"chunk avg {avg_items} items")
            print(f"chunk avg {avg_skips} skips")
            start_time = time.time()
            print("starting sort")
            items.sort()
            delta = time.time() - start_time
            print(f"Finished sort in {delta/60} min")
        

Counted 0 items:
chunk avg 0 items
chunk avg 0 skips
starting sort
Finished sort in 4.251797993977865e-07 min
Counted 7500 items:
chunk avg 1713.2358666666667 items
chunk avg 8678.794666666667 skips
starting sort
Finished sort in 0.594546898206075 min
Counted 15000 items:
chunk avg 1773.4085333333333 items
chunk avg 8821.331333333334 skips
starting sort
Finished sort in 0.9705104509989421 min
Counted 22500 items:
chunk avg 1859.1426666666666 items
chunk avg 9045.441066666666 skips
starting sort
Finished sort in 1.3678282936414083 min
Counted 30000 items:
chunk avg 1784.5790666666667 items
chunk avg 8883.1452 skips
starting sort
Finished sort in 1.6144380529721578 min
Counted 37500 items:
chunk avg 1797.3246666666666 items
chunk avg 8785.241066666667 skips
starting sort
Finished sort in 1.7807731628417969 min
Counted 45000 items:
chunk avg 1773.3138666666666 items
chunk avg 8756.650666666666 skips
starting sort
Finished sort in 2.248443885644277 min
Counted 52500 items:
chunk avg 1752.7

In [7]:
start_time = time.time()
print("starting sort")
items.sort()
delta = time.time() - start_time
print(f"Finished sort in {delta/60} min")

starting sort
Finished sort in 1.8410709818204245 min


In [8]:
%%time
item = items.pop()
current_idx = item[0]
current_page = [current_idx, [item[1].decode("utf-8")]]
pl_added = 0
while True:
    try:
        item = items.pop()
        if item[0] == current_idx:
            current_page[1].append(item[1].decode("utf-8"))
        else:
            link = PageLink(
                pl_from=int(current_page[0]),
                pl_titles=current_page[1]
            )
            s.add(link)
            pl_added += 1
            if pl_added % 100 == 0:
                s.commit()
                print(f"added {pl_added} pages' links")
            current_idx = item[0]
            current_page = [current_idx, [item[1].decode("utf-8")]]
    except IndexError:
        link = PageLink(
            pl_from=current_page[0],
            pl_titles=current_page[1]
        )
        s.add(link)
        s.commit()
        break

links
added 445300 pages' links
added 445400 pages' links
added 445500 pages' links
added 445600 pages' links
added 445700 pages' links
added 445800 pages' links
added 445900 pages' links
added 446000 pages' links
added 446100 pages' links
added 446200 pages' links
added 446300 pages' links
added 446400 pages' links
added 446500 pages' links
added 446600 pages' links
added 446700 pages' links
added 446800 pages' links
added 446900 pages' links
added 447000 pages' links
added 447100 pages' links
added 447200 pages' links
added 447300 pages' links
added 447400 pages' links
added 447500 pages' links
added 447600 pages' links
added 447700 pages' links
added 447800 pages' links
added 447900 pages' links
added 448000 pages' links
added 448100 pages' links
added 448200 pages' links
added 448300 pages' links
added 448400 pages' links
added 448500 pages' links
added 448600 pages' links
added 448700 pages' links
added 448800 pages' links
added 448900 pages' links
added 449000 pages' links
added 

In [None]:
# chunk_directory = Path("data/chunks")
# item_counts = blist()
# skip_counts = blist()
# items = blist()
# lookup = blist()
# chunk_size = 2000
# with ProcessPoolExecutor() as executor:
#     for i, result in enumerate(executor.map(process_file, chunk_directory.iterdir())):
#         if result is None:
#             print("Error in regex for lines")
#         item_counts.append(result[0])
#         skip_counts.append(result[1])
#         for item in result[2]:
#             try:
#                 idx = lookup.index(item[1])
#                 items.append([item[0], idx])
#             except ValueError:
#                 lookup.append(item[1])
#                 idx = lookup.index(item[1])
#                 items.append([item[0], idx])
#         if i % chunk_size == 0:
#             print(f"Counted {i} items:")
#             avg_items = mean(item_counts[-chunk_size:])
#             avg_skips = mean(skip_counts[-chunk_size:])
#             print(f"chunk avg {avg_items} items")
#             print(f"chunk avg {avg_skips} skips")
#         del result

In [9]:
len(items)

1574945

To run through whole link set:
CPU times: user 17 s, sys: 1.73 s, total: 18.7 s
Wall time: 14min 22s

Running through and getting links:
CPU times: user 1min 36s, sys: 8.03 s, total: 1min 44s
Wall time: 15min 47s

In [None]:
whilte True:
    try:
        row = 

In [33]:
items = sortedlist()

In [41]:
items.add(blist([4, 8]))

In [42]:
items.index()

sortedlist([blist([4, 1]), blist([4, 4]), blist([4, 5]), blist([4, 8])])

In [31]:
type(items[0])

list

In [18]:
items[:5]

nth'"],
  [605120, b"'Rajinikanth'"],
  [606490, b"'Rajinikanth'"],
  [620504, b"'Rajinikanth'"],
  [630081, b"'Rajinikanth'"],
  [648216, b"'Rajinikanth'"],
  [665443, b"'Rajinikanth'"],
  [693943, b"'Rajinikanth'"],
  [708039, b"'Rajinikanth'"],
  [711707, b"'Rajinikanth'"],
  [841432, b"'Rajinikanth'"],
  [857051, b"'Rajinikanth'"],
  [858980, b"'Rajinikanth'"],
  [859118, b"'Rajinikanth'"],
  [872828, b"'Rajinikanth'"],
  [882809, b"'Rajinikanth'"],
  [886858, b"'Rajinikanth'"],
  [935787, b"'Rajinikanth'"],
  [966831, b"'Rajinikanth'"],
  [1024408, b"'Rajinikanth'"],
  [1050838, b"'Rajinikanth'"],
  [1075725, b"'Rajinikanth'"],
  [1079155, b"'Rajinikanth'"],
  [1114328, b"'Rajinikanth'"],
  [1137487, b"'Rajinikanth'"],
  [1163998, b"'Rajinikanth'"],
  [1168455, b"'Rajinikanth'"],
  [1176977, b"'Rajinikanth'"],
  [1181499, b"'Rajinikanth'"],
  [1202209, b"'Rajinikanth'"],
  [1249907, b"'Rajinikanth'"],
  [1280630, b"'Rajinikanth'"],
  [1304177, b"'Rajinikanth'"],
  [1307870, b"'Raj

In [35]:
561899045/6301566 

89.16815994627368

In [34]:
with open("data/enwiki-20210520-pagelinks.sql", "rb") as file:
    for i, line in enumerate(file_generator(file)):
        f = open(f"data/chunks/{i}.sql", "wb")
        f.write(line)
        f.close()

KeyboardInterrupt: 

PosixPath('data/chunks/28951.sql')

In [5]:
line_count

53008

In [None]:


def test_func(a):
    return 1
test_list = [1,2,3,4,5,6,7]
with concurrent.futures.ProcessPoolExecutor() as executor:
    for result in executor.map(test_func, test_list)


In [45]:
%%time
num_qualities = s.query(PageQuality).count()
offset = 0
limit = 50000
offset_multipliers = range(math.ceil(num_qualities/limit))
retrieved = 0

def scan_db(offset_multiplier):
    with Session() as sess:
        offset = offset_multiplier * limit
        results = sess.query(PageQuality).offset(offset).limit(limit)
    return offset_multiplier #results.count()

# for i in offset_multipliers:
#     retrieved += scan_db(i, limit)
#     print(f"retrieved {retrieved} entries")
# args = ((multiplier, limit) for multiplier in offset_multipliers)
with concurrent.futures.ProcessPoolExecutor() as executor:
    for result_count in executor.map(scan_db, offset_multipliers):
        retrieved += result_count
        print(f"retrieved {retrieved} entries")

retrieved 0 entries
retrieved 1 entries
retrieved 3 entries
retrieved 6 entries
retrieved 10 entries
retrieved 15 entries
retrieved 21 entries
retrieved 28 entries
retrieved 36 entries
retrieved 45 entries
retrieved 55 entries
CPU times: user 13.4 ms, sys: 72.5 ms, total: 85.9 ms
Wall time: 116 ms


In [48]:
%%time
num_qualities = s.query(PageQuality).count()
offset = 0
limit = 50000
offset_multipliers = range(math.ceil(num_qualities/limit))
retrieved = 0

def scan_db(offset_multiplier):
    DATABASE_URI = "postgresql://postgres:postgres@localhost:5432/complete_wikipedia"
    sess_maker = get_sessionmaker(db_uri=DATABASE_URI)
    with sess_maker() as sess:
        offset = offset_multiplier * limit
        results = sess.query(PageQuality).offset(offset).limit(limit)
        for result in results:
            result.page_talk.page.page_title
    return results.count()

with concurrent.futures.ProcessPoolExecutor() as executor:
    for result_count in executor.map(scan_db, offset_multipliers):
        retrieved += result_count
        print(f"retrieved {retrieved} entries")

retrieved 50000 entries
retrieved 100000 entries
retrieved 150000 entries
retrieved 200000 entries
retrieved 250000 entries
retrieved 300000 entries
retrieved 350000 entries
retrieved 400000 entries
retrieved 450000 entries
retrieved 500000 entries
retrieved 522137 entries
CPU times: user 19.1 ms, sys: 71.9 ms, total: 91 ms
Wall time: 1min 7s


## Loop through db:
Series, 1 CPU:
- CPU times: user 4min 52s
- sys: 16.3 s
- total: 5min 8s
- Wall time: 6min 57s

Parallel, 12 CPUs:
- CPU times: user 19.1 ms
- sys: 71.9 ms
- total: 91 ms
- Wall time: 1min 7s

In [4]:
result.page_talk.page

<Page: (
	page_id=733981,
	page_title='Flecainide',
	)>