In [1]:
import json
import sys
import os
import gzip
import internetarchive
import time
import shutil
import numpy as np

import pandas as pd
from io import StringIO

import threading

In [2]:
search = internetarchive.search_items('collection:WikiHist_html')
itemids = [r['identifier'] for r in search][1:]

In [3]:
DATA_DIR_TMP = 'wiki_data_tmp_download'
DATA_DIR = 'wiki_data'
URL_BASE = 'https://archive.org/download/'
FAILURES_FILE = 'failures.txt'

class Capturing(list):
    def __enter__(self):
        self._stdout = sys.stdout
        sys.stdout = self._stringio = StringIO()
        return self
    def __exit__(self, *args):
        self.extend(self._stringio.getvalue().splitlines())
        del self._stringio    # free up some memory
        sys.stdout = self._stdout

COLS = ['parentid',
        'id',
        'cont_username',
        'cont_id',
        'timestamp', 
        'format',
        'page_id',
        'title']

def get_csv(itemid):
    return f"{os.path.join(DATA_DIR, itemid)}.csv"

def get_data(itemid):
    print(f'======= Downloading data for {itemid} =======')
    t_init = time.time()
    
    if not os.path.isdir(DATA_DIR_TMP):
        os.mkdir(DATA_DIR_TMP)

    if not os.path.isdir(DATA_DIR):
        os.mkdir(DATA_DIR)
    
    if os.path.isfile(os.path.join(DATA_DIR, f'{itemid}.csv')):
        print('Already downloaded.')
        return
    
    t = time.time()
    
    download_data(itemid)    
    
    t_new = time.time()
    print(f"======= Downloaded Data in {round(t_new - t, 2)} seconds ========")
    
    def save_and_delete_data_thread(itemid):
        # save data locally as pandas data frame
        success = save_data(itemid)
    
        # delete temp data
        if success:
            for f in os.listdir(DATA_DIR_TMP):
                shutil.rmtree(os.path.join(DATA_DIR_TMP, f), ignore_errors=True)
    
        print(f"======= Completed download of {itemid} in {round(time.time() - t_init, 2)} seconds ========")
    
        return
    
    thread = threading.Thread(target=save_and_delete_data_thread, args=(itemid,))
    thread.start()
    
def download_data(itemid, n_threads=10):
    item = internetarchive.get_item(itemid)
        
    with Capturing() as all_files:
        item.download(dry_run=True)
    
    if os.path.isdir(os.path.join(DATA_DIR_TMP, itemid)):
        downloaded = [f'{URL_BASE}{itemid}/{fn}' for fn in os.listdir(os.path.join(DATA_DIR_TMP, itemid))]
        to_download = list(set(all_files).difference(downloaded))
    else:
        to_download = all_files
    
    print(f'Downloading {len(to_download)} files')
    
    to_download = [s.split('/')[-1] for s in to_download]
    
    splits = np.array_split(to_download, n_threads)
                
    def download_thread_function(split):
        if len(split) == 0:
            return
        item.download(files=list(split), destdir=DATA_DIR_TMP)

    threads = list()
    for i in range(n_threads):
        x = threading.Thread(target=download_thread_function, args=(splits[i],))
        threads.append(x)
        x.start()
    
    for thread in threads:
        thread.join()
    
def save_data(itemid):
    dfs = []
    fns = [fn for fn in os.listdir(os.path.join(DATA_DIR_TMP, itemid)) if('.json' in fn)]
    
    t = time.time() 
    print(f"======= Loading {len(fns)} files ========")
    for i, fn in enumerate(fns):
        try:
            with gzip.open(os.path.join(DATA_DIR_TMP, itemid, fn), "rb") as f:
                data = [json.loads(line) for line in f]
                dfs += [pd.DataFrame(data)[COLS]]
        except:
            print('Error on ' + fn)
            with open(os.path.join(DATA_DIR_TMP, FAILURES_FILE), 'a') as f_failures:
                f_failures.write(f'{itemid}/{fn}\n')
        
        if (i % 50 == 0 and i > 0):
            t_new = time.time()
            print(f"Loaded {i} files in {round(t_new - t, 2)} seconds")
            t = t_new
    
    t = time.time()
    df = pd.concat(dfs)
    df.to_csv(get_csv(itemid))
    print(f"======= Saved data in {round(time.time() - t, 2)} seconds =======")
    
    return True

def redownload_failures():
    with open('failures.txt', 'r') as f:
        files = f.readlines()
    print(f)

In [4]:
for i in range(0, 10):
    get_data(itemids[i])

Already downloaded.
Already downloaded.
Already downloaded.
Already downloaded.
Already downloaded.
Already downloaded.
Already downloaded.
Already downloaded.
Already downloaded.
Downloading 1332 files
Loaded 50 files in 71.96 seconds
Loaded 100 files in 83.91 seconds
Loaded 150 files in 59.35 seconds
Loaded 200 files in 65.06 seconds
Error on 1202000.json.gz
Loaded 250 files in 65.08 seconds
Error on 743000.json.gz
Loaded 300 files in 63.39 seconds
Error on 1271000.json.gz
Loaded 350 files in 77.43 seconds
Error on 1300000.json.gz
Loaded 400 files in 56.57 seconds
Loaded 450 files in 82.91 seconds
Loaded 500 files in 85.22 seconds
Loaded 550 files in 54.79 seconds
Loaded 600 files in 66.55 seconds
Loaded 650 files in 74.85 seconds
Error on 88000.json.gz
Loaded 700 files in 71.69 seconds
Loaded 750 files in 75.13 seconds
Loaded 800 files in 72.91 seconds
Error on 1239000.json.gz
Loaded 850 files in 74.43 seconds
Loaded 900 files in 69.52 seconds
Error on 1406000.json.gz
Loaded 950 fil