In [96]:
import sys
sys.path.append('..')

import config
from pymongo import MongoClient
import pandas as pd
import json             # for adding topics
#import numpy as np
import matplotlib.pyplot as plt
from utilities import selected_venues as sv  # for reading the venues selected for this study

import multiprocessing as mp    # for multithreading
import inspect          #for fit
import os               # for reading directories
import gc               # for garbage collector

import time              #for time of execution
import datetime          #for time of execution
import queue             #for multithreading
import threading         #for multithreading

# Connect to MongoDB
client = MongoClient(config.__host, config.__port)
papers_collection = client[config.__db_name][config.__collection_name]
__db_name_new = config.__db_name+"_2"

print('Is connection estabilished? {0}'.format(papers_collection.count_documents({}) != 0))

Is connection estabilished? True


## Create and save new collection

Create a new collections with basic data on papers and the ones that cite them.

Basic data contains only acm_id, authors, year and list of papers that cite it: list of citations contain, for each paper, its acm_id, authors and year.

In [23]:
#Create a new collections with basic data on papers and the ones that cite them
#Basic data contains only acm_id, authors, year and list of papers that cite it.
#List of citations contain, for each paper, its acm_id, authors and year

#return only papers cited by at least one other paper!
def create_collection_cite(year, venues=None, local_collection=None, insert_id=False):
    if venues == None: #no venues
        pipeline = [
                    {'$match': {'year':year, 'acm_id': {'$exists' : True}}}, # filter years
                    {'$project': {'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$cleaned_venue', 'acm_cited_by': '$acm_cited_by'}},
                    {'$unwind': '$acm_cited_by'}, 
                    {'$lookup': { 'from': config.__collection_name, 'localField': 'acm_cited_by', 'foreignField': 'acm_id', 'as': 'acm_cited_by'}},
                    {'$unwind': '$acm_cited_by'},
                    {'$project': {'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$venue', 'acm_cited_by': {'acm_id': '$acm_cited_by.acm_id', 'authors': '$acm_cited_by.authors', 'year': '$acm_cited_by.year'}}},
                    {'$group': {'_id': {'acm_id': "$acm_id", 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$venue'}, 'acm_cited_by': {'$push': '$acm_cited_by'}}},
        ]
    else: #venus: papers must be filtered, also in lookup!
        pipeline = [
                    {'$match': {'year': year, 'cleaned_venue': {'$in': venues}}}, # filter year and venues
                    {'$project': {'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$venue', 'acm_cited_by': '$acm_cited_by'}},
                    {'$unwind': '$acm_cited_by'}, 
                    {'$lookup': { 'from': config.__collection_name, 
                                 'let': { 'acm_cited_by_foreign': "$acm_cited_by" },
                                 'pipeline': [
                                      { '$match':
                                         { '$expr': 
                                            { '$and':
                                               [
                                                 { '$eq': [ "$acm_id",  '$$acm_cited_by_foreign' ] },
                                                 { '$in': ['$cleaned_venue', venues] }
                                               ]
                                            }
                                         }
                                      },
                                   ],
                                 'as': 'acm_cited_by'}},
                    {'$unwind': '$acm_cited_by'},
                    {'$project': {'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$venue', 'acm_cited_by': {'acm_id': '$acm_cited_by.acm_id', 'authors': '$acm_cited_by.authors', 'year': '$acm_cited_by.year'}}},
                    {'$group': {'_id': {'acm_id': "$acm_id", 'authors': '$authors', 'year': '$year', 'venue': '$venue'}, 'acm_cited_by': {'$push': '$acm_cited_by'}}}
        ]

    if insert_id:
        pipeline = pipeline + [{'$project': {'_id': '$_id.acm_id', 'acm_id': '$_id.acm_id', 'title': '$_id.title', 'authors': '$_id.authors', 'year': '$_id.year', 'venue': '$_id.venue', 'acm_cited_by': '$acm_cited_by'}}]
    else:
        pipeline = pipeline + [{'$project': {'_id':0, 'acm_id': '$_id.acm_id', 'title': '$_id.title', 'authors': '$_id.authors', 'year': '$_id.year', 'venue': '$_id.venue', 'acm_cited_by': '$acm_cited_by'}}]
    
    result = papers_collection.aggregate(pipeline, allowDiskUse=True) if (local_collection == None) else local_collection.aggregate(pipeline, allowDiskUse=True)
    return result

#return only papers that are not cited!
def not_cited(year, venues=None, local_collection=None, insert_id=False):
    if venues == None:
        pipeline = [{'$match': {'year':year, 'acm_id': {'$exists' : True}, 'acm_cited_by': {'$size': 0}}}]
    else:
        pipeline = [{'$match': {'year':year, 'acm_id': {'$exists' : True}, 'cleaned_venue': {'$in': venues}, 'acm_cited_by': {'$size': 0}}}]
    
    #use acm_id as _id?
    if insert_id:
        pipeline = pipeline + [{'$project': {'_id':'$acm_id', 'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$cleaned_venue', 'acm_cited_by': '$acm_cited_by'}}]
    else:
        pipeline = pipeline + [{'$project': {'_id':0, 'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$cleaned_venue', 'acm_cited_by': '$acm_cited_by'}}]
    
    result = papers_collection.aggregate(pipeline, allowDiskUse=True) if (local_collection == None) else local_collection.aggregate(pipeline, allowDiskUse=True)
    return result

#number of papers by year
def papers_by_year(venues=None, local_collection=None):
    pipeline = [
                {'$project': {'year': '$year'}},
                {'$group': {'_id' : '$year', 'count': { '$sum':1 }}},
                {'$project': {'_id' :0, 'year': '$_id', 'count': '$count'}},
                {'$sort': {'year': 1}}
    ]
    
    if venues != None:
        pipeline = [{'$match': {'cleaned_venue': {'$in': venues}}}] + pipeline
    
    result = papers_collection.aggregate(pipeline) if (local_collection == None) else local_collection.aggregate(pipeline)
    return result


In [25]:
#Insert documents in a new collection
new_collection_name = 'scipub_minimal'

client[__db_name_new][new_collection_name].drop()
print("Documents in collection", new_collection_name+":", client[__db_name_new][new_collection_name].count_documents({}))
for item in list(papers_by_year()):
    res_1 = list(create_collection_cite(item['year'], insert_id=True))
    res_2 = list(not_cited(item['year'], insert_id=True))
    print(item['year'], ": ", item['count'], "papers ->", (len(res_1)+len(res_2)), "(diff ", (item['count'] - (len(res_1)+len(res_2))), "), where cited ", len(res_1))
    client[__db_name_new][new_collection_name].insert_many(res_1)
    client[__db_name_new][new_collection_name].insert_many(res_2)

print("Documents in collection", new_collection_name+":", client[__db_name_new][new_collection_name].count_documents({}))


Documents in collection scipub_minimal: 0
1954 :  2 papers -> 2 (diff  0 ), where cited  1
1957 :  5 papers -> 5 (diff  0 ), where cited  1
1958 :  10 papers -> 10 (diff  0 ), where cited  4
1959 :  28 papers -> 28 (diff  0 ), where cited  21
1960 :  69 papers -> 69 (diff  0 ), where cited  23
1961 :  92 papers -> 92 (diff  0 ), where cited  43
1962 :  68 papers -> 68 (diff  0 ), where cited  28
1963 :  187 papers -> 187 (diff  0 ), where cited  62
1964 :  259 papers -> 259 (diff  0 ), where cited  92
1965 :  262 papers -> 262 (diff  0 ), where cited  78
1966 :  369 papers -> 369 (diff  0 ), where cited  165
1967 :  471 papers -> 471 (diff  0 ), where cited  213
1968 :  720 papers -> 720 (diff  0 ), where cited  339
1969 :  749 papers -> 749 (diff  0 ), where cited  371
1970 :  750 papers -> 750 (diff  0 ), where cited  320
1971 :  901 papers -> 901 (diff  0 ), where cited  414
1972 :  967 papers -> 967 (diff  0 ), where cited  465
1973 :  1008 papers -> 1008 (diff  0 ), where cited  4

### Create collections with only papers in selected venues

In [76]:
def basic_in_venues(year, venues=None, local_collection=papers_collection):
    if venues == None:
        pipeline = [{'$match': {'year':year, 'acm_id': {'$exists' : True} }}]
    else:
        pipeline = [{'$match': {'year':year, 'acm_id': {'$exists' : True}, 'cleaned_venue': {'$in': venues} }}]
    
    
    pipeline = pipeline + [
                {'$project': {'_id': '$acm_id', 'acm_id': '$acm_id', 'title': '$title', 'authors': '$authors', 'year': '$year', 'venue': '$cleaned_venue'}},
    ]
    
    result = local_collection.aggregate(pipeline)
    return result

#return only papers cited by at least one other paper!
def create_collection_cite(year, local_collection=papers_collection, lookup_collection=config.__collection_name, insert_id=False):
    pipeline = [
                {'$match': {'year':year, 'acm_id': {'$exists' : True}}}, # filter years
                {'$project': {'acm_id': '$acm_id', 'authors': '$authors', 'year': '$year', 'acm_cited_by': '$acm_cited_by'}},
                {'$unwind': '$acm_cited_by'}, 
                {'$lookup': { 'from': lookup_collection, 'localField': 'acm_cited_by', 'foreignField': 'acm_id', 'as': 'acm_cited_by'}},
                {'$unwind': '$acm_cited_by'},
                {'$project': {'acm_id': '$acm_id', 'authors': '$authors', 'year': '$year', 'acm_cited_by': {'acm_id': '$acm_cited_by.acm_id', 'authors': '$acm_cited_by.authors', 'year': '$acm_cited_by.year'}}},
                {'$group': {'_id': {'acm_id': "$acm_id", 'authors': '$authors', 'year': '$year'}, 'acm_cited_by': {'$push': '$acm_cited_by'}}},
    ]

    if insert_id:
        pipeline = pipeline + [{'$project': {'_id': '$_id.acm_id', 'acm_id': '$_id.acm_id', 'authors': '$_id.authors', 'year': '$_id.year', 'acm_cited_by': '$acm_cited_by'}}]
    else:
        pipeline = pipeline + [{'$project': {'_id':0, 'acm_id': '$_id.acm_id', 'authors': '$_id.authors', 'year': '$_id.year', 'acm_cited_by': '$acm_cited_by'}}]
    result = local_collection.aggregate(pipeline, allowDiskUse=True)
    return result

'\n'

In [82]:
new_collection_name = 'scipub_minimal_venus'
client[__db_name_new][new_collection_name].drop()
print("Documents in collection", new_collection_name+":", client[__db_name_new][new_collection_name].count_documents({}))
for item in [papers_by_year().next()]: #list(papers_by_year()): # 
    #insert all papers in venus, without citations received
    res = list(client[config.__db_name][new_collection_tmp_name].find({'year': item['year']}))
    number_of_documents = len(res)
    if number_of_documents != 0:
        client[__db_name_new][new_collection_name].insert_many(res)
    #update all papers, adding the list of citations received
    res = list(create_collection_cite(item['year'], lookup_collection=new_collection_tmp_name, insert_id=True))
    print(item['year'], ": ", item['count'], "papers ->", (number_of_documents), "(diff ", (item['count'] - (number_of_documents)), "), where cited ", len(res))
    if res != []:
        for document in res:
            client[__db_name_new][new_collection_name].update_one({'acm_id': document['acm_id']}, {'$set': {'acm_cited_by': document['acm_cited_by'],} } )
    #for the papers without citations, add an empty list of citations received
#Add empty list of received citations to all papers with no citations
client[__db_name_new][new_collection_name].update_many({'acm_cited_by': {'$exists' : False}}, {'$set': {'acm_cited_by': []}})


Documents in collection scipub_minimal_venus: 0
1954 :  2 papers -> 2 (diff  0 ), where cited  1


<pymongo.results.UpdateResult at 0x7fc9f31df5c8>

In [83]:
#Insert documents in a new collection
new_collection_name = 'scipub_minimal_venus'

#create collection with only papers in venus
client[__db_name_new][new_collection_name].drop()
#create temp collection with only data on papers in venus
new_collection_tmp_name = new_collection_name+"_tmp"
client[config.__db_name][new_collection_tmp_name].drop()
for item in list(papers_by_year()):
    res = list( basic_in_venues(item['year'], venues=sv.considered_venues, local_collection=papers_collection) )
    client[config.__db_name][new_collection_tmp_name].insert_many(res)

client[__db_name_new][new_collection_name].drop()
print("Documents in collection", new_collection_name+":", client[__db_name_new][new_collection_name].count_documents({}))
for item in list(papers_by_year()): # ([{'year': 2009, 'count': 0}]): #
    #insert all papers in venus, without citations received
    res = list(client[config.__db_name][new_collection_tmp_name].find({'year': item['year']}))
    number_of_documents = len(res)
    if number_of_documents != 0:
        client[__db_name_new][new_collection_name].insert_many(res)
    #update all papers, adding the list of citations received
    res = list(create_collection_cite(item['year'], lookup_collection=new_collection_tmp_name, insert_id=True))
    print(item['year'], ": ", item['count'], "papers ->", (number_of_documents), "(diff ", (item['count'] - (number_of_documents)), "), where cited ", len(res))
    if res != []:
        for document in res:
            client[__db_name_new][new_collection_name].update_one({'acm_id': document['acm_id']}, {'$set': {'acm_cited_by': document['acm_cited_by'],} } )
    #for the papers without citations, add an empty list of citations received
#Add empty list of received citations to all papers with no citations
client[__db_name_new][new_collection_name].update_many({'acm_cited_by': {'$exists' : False}}, {'$set': {'acm_cited_by': []}})

#delete temp collection
client[config.__db_name][new_collection_tmp_name].drop()
print("Documents in collection", new_collection_tmp_name+":", client[config.__db_name][new_collection_tmp_name].count_documents({}))

print("Documents in collection", new_collection_name+":", client[__db_name_new][new_collection_name].count_documents({}))


Documents in collection scipub_minimal_venus: 0
1954 :  2 papers -> 2 (diff  0 ), where cited  1
1957 :  5 papers -> 5 (diff  0 ), where cited  1
1958 :  10 papers -> 10 (diff  0 ), where cited  4
1959 :  28 papers -> 26 (diff  2 ), where cited  14
1960 :  69 papers -> 60 (diff  9 ), where cited  20
1961 :  92 papers -> 69 (diff  23 ), where cited  34
1962 :  68 papers -> 48 (diff  20 ), where cited  23
1963 :  187 papers -> 146 (diff  41 ), where cited  50
1964 :  259 papers -> 208 (diff  51 ), where cited  74
1965 :  262 papers -> 172 (diff  90 ), where cited  53
1966 :  369 papers -> 220 (diff  149 ), where cited  108
1967 :  471 papers -> 262 (diff  209 ), where cited  112
1968 :  720 papers -> 463 (diff  257 ), where cited  226
1969 :  749 papers -> 461 (diff  288 ), where cited  249
1970 :  750 papers -> 526 (diff  224 ), where cited  239
1971 :  901 papers -> 624 (diff  277 ), where cited  306
1972 :  967 papers -> 594 (diff  373 ), where cited  294
1973 :  1008 papers -> 599 (d

#### Add topics

In [88]:
topics_csv = pd.read_csv("./data/topics-assignments-2000-2004-153-50-2000-2014.csv", sep=',').rename(columns={"topic_id": "id", "topic_weight": "weight"})
topics_csv[:3]


Unnamed: 0,acm_id,id,weight,year
0,5390877920f70186a0d2e403,12,0.233969,2006
1,5390877920f70186a0d2e403,18,0.08508,2006
2,5390877920f70186a0d2e403,24,0.14027,2006


In [108]:
def update_topics(__db_name, __collection_name, topics_csv, first_year, last_year):
    class myThread(threading.Thread):
        def __init__(self, threadID, q):
            threading.Thread.__init__(self)
            self.thread_id = threadID
            self.q = q
        def run(self):
            process_data(self.thread_id, self.q)

    def process_data(thread_id, q):
        exitFlag = False
        while not exitFlag:
            queueLock.acquire()
            if not workQueue.empty():
                data = q.get()
                queueLock.release()
                #exec
                exec_and_insert(thread_id, data)
            else:
                queueLock.release()
                exitFlag = True

    #insert data of time windows in new collection
    def exec_and_insert(thread_id, data):
        start_time = time.mktime(time.localtime())
        printLock.acquire()
        print("Year:", data["year"], "th:", str(thread_id), "- start @", time.ctime(), "- documents:", data["count"])
        printLock.release()
        for document in client[__db_name][__collection_name].find({'year': data["year"]}, {'_id': 0, 'acm_id': 1}):
            topics_json = json.loads(topics_csv[topics_csv["acm_id"] == document["acm_id"]][["id", "weight"]].to_json(orient="records"))
            if len(topics_json) != 0:
                client[__db_name][__collection_name].update_one({'acm_id': document['acm_id']}, {'$set': {'topics': topics_json} })
            else:
                printLock.acquire()
                print("ERROR: no topics for document", document['acm_id'])
                printLock.release()
        printLock.acquire()
        print("Year:", data["year"], "th:", str(thread_id), "- end @", time.ctime(), "- documents:", data["count"], "- total time:", datetime.timedelta(seconds=time.mktime(time.localtime())-start_time))
        printLock.release()
        return


    # Variables for threads
    number_of_threads = 10
    queueLock = threading.Lock()
    printLock = threading.Lock()
    workQueue = queue.Queue(len(range(first_year, last_year+1)))
    threads = []

    # Fill the queue
    for item in papers_by_year(local_collection=client[__db_name][__collection_name]):
        if( (item["year"] >= first_year) & (item["year"] <= last_year)):
            workQueue.put(item)

    init_time = time.mktime(time.localtime())
    # Create new threads
    for thread_id in range(number_of_threads):
        thread = myThread(thread_id, workQueue)
        thread.start()
        threads.append(thread)

    # Wait for all threads to complete
    for t in threads:
        t.join()
    print("Exiting Main Thread")

    print("\n ### END of procedure! - Total Execution:", datetime.timedelta(seconds=time.mktime(time.localtime())-init_time))
    return


In [109]:
update_topics(__db_name_new, new_collection_name, topics_csv, topics_csv["year"].min(), topics_csv["year"].max())


Year: 2000 th: 0 - start @ Tue Feb  5 18:27:32 2019 - documents: 3915
Year: 2001 th: 1 - start @ Tue Feb  5 18:27:32 2019 - documents: 3784
Year: 2002 th: 2 - start @ Tue Feb  5 18:27:32 2019 - documents: 5925
Year: 2003 th: 3 - start @ Tue Feb  5 18:27:32 2019 - documents: 6505
Year: 2004 th: 4 - start @ Tue Feb  5 18:27:32 2019 - documents: 9048
Year: 2005 th: 5 - start @ Tue Feb  5 18:27:32 2019 - documents: 10896
Year: 2006 th: 6 - start @ Tue Feb  5 18:27:32 2019 - documents: 12394
Year: 2007 th: 7 - start @ Tue Feb  5 18:27:32 2019 - documents: 11126
Year: 2008 th: 8 - start @ Tue Feb  5 18:27:32 2019 - documents: 12043
Year: 2009 th: 9 - start @ Tue Feb  5 18:27:32 2019 - documents: 13421
Year: 2001 th: 1 - end @ Tue Feb  5 19:25:32 2019 - documents: 3784 - total time: 0:58:00
Year: 2010 th: 1 - start @ Tue Feb  5 19:25:32 2019 - documents: 14027
Year: 2000 th: 0 - end @ Tue Feb  5 19:26:55 2019 - documents: 3915 - total time: 0:59:23
Year: 2011 th: 0 - start @ Tue Feb  5 19:26:

In [112]:
tmp = client[__db_name_new][new_collection_name].count_documents({'year': {'$gte': int(topics_csv["year"].min()), '$lte': int(topics_csv["year"].max())}, 'topics': {'$exists': False}})
if tmp == 0:
    print("All topics inserted!")
else:
    print("ERROR:", str(tmp), "documents without topics!")

All topics inserted!
