In [None]:
import json
import pyarrow.parquet as pq
import pandas as pd
import pyarrow as pa
import numpy as np

Get venues

In [None]:
f = open('mag_venues.txt', 'r')
venues = set()
for line in f.readlines():
    venue = json.loads(line)
    venues.add(venue['id'])
print(f'There are {len(venues)} venues')
venues = list(venues)
venues.sort()
df = pd.DataFrame({'id': venues})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'mag_venue_id.parquet')
print("venue data type:", type(venues[0]))
venues = set(venues)

Get affiliations

In [None]:
f = open('mag_affiliations.txt', 'r')
affs = list()
aff_names = list()
for line in f.readlines():
    aff = json.loads(line)
    affs.append(aff['id'])
    aff_names.append(aff['NormalizedName'])
print(f'There are {len(affs)} affiliations')
df = pd.DataFrame({'id': affs, 'name': aff_names})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'mag_affs.parquet')

df = pd.DataFrame({'id': affs})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'mag_aff_id.parquet')
print('Afflication data type:', type(affs[0]))
affs = set(affs)

Get affiliation of authors and save the author-affiliation pairs in the parquet files. Not all authors have known affiliations.

In [None]:
full_authors = []
for i in range(5):
    f = open(f'mag_authors_{i}.txt', 'r')
    authors = []
    affs = []
    num_authors = 0
    for line in f.readlines():
        num_authors += 1
        author = json.loads(line)
        full_authors.append(author['id'])
        if 'last_known_aff_id' in author:
            authors.append(author['id'])
            affs.append(int(author['last_known_aff_id']))
            #assert author['last_known_aff_id'] in affs, f"{author['last_known_aff_id']} does not exist"
            #assert author['id'] not in author2aff
    df = pd.DataFrame({'author': authors, 'affiliation': affs})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'mag_author2aff_{i}.parquet')
    print(f'There are {num_authors} authors and {len(authors)} of them have affiliations')
    
df = pd.DataFrame({'id': full_authors})
print('author ID data type:', type(full_authors[0]))
table = pa.Table.from_pandas(df)
pq.write_table(table, 'mag_author_id.parquet')

In [None]:
def parse_paper_file(i):
    f = open(f'mag_papers_{i}.txt', 'r')
    num_paper_venue = 0
    venue_ids = set()
    fos_set = set()

    paper_ids = []
    titles = []
    years = []
    paper2author = ([], [], [])
    paper2venue = ([], [])
    paper2fos = ([], [], [])
    paper2paper = ([], [])
    for line in f.readlines():
        paper = json.loads(line)
        if 'id' not in paper or 'title' not in paper or 'year' not in paper or 'authors' not in paper or len(paper['authors']) == 0:
            continue
        paper_ids.append(paper['id'])
        titles.append(paper['title'])
        years.append(paper['year'])
        for order, author in enumerate(paper['authors']):
            paper2author[0].append(paper['id'])
            paper2author[1].append(author['id'])
            paper2author[2].append(order)
        #if len(paper['authors']) == 0:
        #    print(paper)
        if 'venue' in paper:
            num_paper_venue += 1
            venue = paper['venue']
            if 'id' in venue:
                venue_ids.add(venue['id'])
                paper2venue[0].append(paper['id'])
                paper2venue[1].append(venue['id'])
                assert venue['id'] in venues
        if 'fos' in paper:
            for fos in paper['fos']:
                fos_set.add(fos['name'])
                paper2fos[0].append(paper['id'])
                paper2fos[1].append(fos['name'])
                paper2fos[2].append(fos['w'])
        if 'references' in paper:
            for ref in paper['references']:
                paper2paper[0].append(paper['id'])
                paper2paper[1].append(ref)

    df = pd.DataFrame({'paper': paper_ids, 'title': titles, 'year': years})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'mag_papers_{i}.parquet')
    print(f'There are {len(paper_ids)} papers in file {i}', flush=True)

    df = pd.DataFrame({'paper': paper2author[0], 'author': paper2author[1], 'order': paper2author[2]})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'mag_paper2author_{i}.parquet')
    print(f'There are {len(paper2author[0])} paper-author pairs', flush=True)
    
    df = pd.DataFrame({'paper': paper2venue[0], 'venue': paper2venue[1]})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'mag_paper2venue_{i}.parquet')
    print(f'There are {len(paper2venue[0])} paper-venue pairs', flush=True)
    
    df = pd.DataFrame({'paper': paper2fos[0], 'fos': paper2fos[1], 'w': paper2fos[2]})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'mag_paper2fos_{i}.parquet')
    print(f'There are {len(paper2fos[0])} paper-fos pairs', flush=True)

    df = pd.DataFrame({'src_paper': paper2paper[0], 'dst_paper': paper2paper[1]})
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f'mag_paper2paper_{i}.parquet')
    print(f'There are {len(paper2paper[0])} paper citations', flush=True)
    
    return num_paper_venue, venue_ids, fos_set

In [None]:
import multiprocessing
from multiprocessing import Process
import queue
import gc

def worker_fn(task_queue, res_queue, user_parser):
    """ The worker function in the worker pool

    Parameters
    ----------
    task_queue : Queue
        The queue that contains all tasks
    res_queue : Queue
        The queue that contains the processed data. This is used for
        communication between the worker processes and the master process.
    user_parser : callable
        The user-defined function to read and process the data files.
    """
    try:
        while True:
            # If the queue is empty, it will raise the Empty exception.
            i, in_file = task_queue.get_nowait()
            data = user_parser(in_file)
            res_queue.put((i, data))
            gc.collect()
    except queue.Empty:
        pass
    
class WorkerPool:
    def __init__(self, name, in_files, num_processes, user_parser):
        self.name = name
        self.processes = []
        manager = multiprocessing.Manager()
        self.task_queue = manager.Queue()
        self.res_queue = manager.Queue(8)
        self.num_files = len(in_files)
        for i, in_file in enumerate(in_files):
            self.task_queue.put((i, in_file))
        for _ in range(num_processes):
            proc = Process(target=worker_fn, args=(self.task_queue, self.res_queue, user_parser))
            proc.start()
            self.processes.append(proc)

    def get_data(self):
        """ Get the processed data.

        Returns
        -------
        a dict : key is the file index, the value is processed data.
        """
        return_dict = {}
        while len(return_dict) < self.num_files:
            file_idx, vals= self.res_queue.get()
            return_dict[file_idx] = vals
            gc.collect()
        return return_dict
    
    def close(self):
        """ Stop the process pool.
        """
        for proc in self.processes:
            proc.join()

In [None]:
pool = WorkerPool("mag", [i for i in range(51)], num_processes=8, user_parser=parse_paper_file)
data = pool.get_data()
pool.close()

num_paper_venue = 0
venue_ids = set()
fos_set = set()
for i in data:
    num_paper_venue += data[i][0]
    venue_ids.update(data[i][1])
    fos_set.update(data[i][2])

In [None]:
print(f'There are {num_paper_venue} papers that have venues.')
print(f'There are {len(venue_ids)} venues with IDs')
print(f'There are {len(fos_set)} topic fields')

In [None]:
df = pd.DataFrame({'id': np.array(list(fos_set))})
table = pa.Table.from_pandas(df)
print('fos data type:', type(list(fos_set)[0]))
pq.write_table(table, 'mag_fos.parquet')