In [76]:
import boto3
import time
import json
import csv
import pandas as pd
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError

from src.data_reader import DataReader, RetractionFinder
from src.to_gexf import to_gexf
from src.get_redacted import get_paper, load_redacted, get_doi, gen_retracted
import src.load_data as load_data

import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import dataframe
import graphframes

import requests


In [2]:
client = MongoClient()
db = client['capstone']
papers = db.papers
s3 = boto3.client('s3')

In [3]:
# papers.find_one('883a188c2aad75e2e12769e5f48fe582223b3507')

In [47]:
def construct_component(collection, start_id=None, 
                        max_depth=3, max_size=1e6,
                        component=set(), depth=0, v=False):
    if start_id in component or depth > max_depth and max_depth > 0:
        return component
    component |= {start_id}
    node = collection.find_one(start_id)
    if node is None: 
        return component
    for other in node['inCitations'] + node['outCitations']:
        component = construct_component(collection, other, 
                                        max_depth, component, 
                                        depth=depth + 1, v=v)
    if v and len(component):
        print(len(component), depth)
    return component

In [51]:
def write_component(collection, component, path='data/component'):
    result = collection.find({'_id': {'$in': list(component)}})
    with open(path, 'w') as f:
        for itm in result:
            f.writelines(json.dumps(itm))
            f.writelines('\n')

In [48]:
comp = construct_component(papers, '1c2a234329c161bd187b61cb9794d68b03dd1296', 2, v=False)

In [52]:
write_component(papers, comp)

In [53]:
reader = DataReader('data/component')
reader.write(dynamic=True)

In [58]:
edges = spark.createDataFrame(reader.read_edges(), schema=StructType(
[
    StructField(name='src', dataType=StringType()),
    StructField(name='dst', dataType=StringType()),
    StructField(name='year', dataType=StringType())
]
)).withColumn('id', F.monotonically_increasing_id())


nodes = spark.createDataFrame(reader.read_nodes(), schema=StructType(
[
    StructField(name='id', dataType=StringType()),
    StructField(name='year', dataType=StringType()),
    StructField(name='authors', dataType=StringType())
]
))

In [66]:
iter_nodes = g.vertices.rdd.map(lambda v: f'      <node id="{v["id"]}"  start="{v["year"]}" end="2018" />\n').toLocalIterator()
iter_nodes

<itertools.chain at 0x1887ca278>

In [74]:
nodes.toJSON().first()

KeyboardInterrupt: 

In [57]:
g = graphframes.GraphFrame(e=edges, v=nodes)
to_gexf(g, 'data/component.gexf', dynamic=True)

KeyboardInterrupt: 

In [4]:
papers.find_one('1c2a234329c161bd187b61cb9794d68b03dd1296')

{'_id': '1c2a234329c161bd187b61cb9794d68b03dd1296',
 'entities': ['Menopause',
  'Ovarian Follicle',
  'Ovariectomy',
  'Ovary',
  'Sampling - Surgical action',
  'ovarian'],
 'journalVolume': '23 3',
 'journalPages': '699-708',
 'pmid': '18192670v1',
 'year': 2008,
 'outCitations': ['9b1e57ec9106433f8232ee223c31d6582c7f6564',
  '60eeeffc4bd3771d15e7737aa9324954d0f5d81f',
  'afb10655a42480d5dc2468445631ea404e7626d9',
  'debddbed085de019220505df2b12fef19c4aed0c',
  'fe89c0cb096cf1534ac257165e2d254cb408f833',
  'f10dcaa5469fd912910fa4aed9144e511279f34e',
  '82f8f57851c0588a4c1d3aba1f10943208efd5ed',
  '84615196d075f3c729e9536f278aa4d92096341d',
  'd6ac761b8b2de5acdc7ac59b73205333af1b5f8f',
  '813b27308a05e9b5a081d62bcc8d2ec4ca490979',
  'f65edb631d31d03b2a087d49798117a8002dadcb',
  '52481f12a09167ebac6ec2ebf333ce61622a79f7',
  'e29a4f411799e857e8f2ad0278a6091a7734264f',
  '9ad37584ed807667a305e65bbc4b94fe25da23b6',
  'ce573ea2ba601cae295bc3f8dbd392eb4afc8109',
  'bbf55ad95c2c1d9cc4124d30

In [21]:
# def to_gexf(edges_path, node_path, path):
#     """
#     Export two files to a .gexf file for use in Gephi.
#     ============
#     Takes:
#         g: pyspark graphframe object-graph to export
#         path: string-location to save file
#     ============
#     Returns: None
#     """
    
#     header = f"""<?xml version="1.0" encoding="UTF-8"?>
#     <gexf xmlns="http://www.gexf.net/1.2draft" version="1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.w3.org/2001/XMLSchema-instance">
#       <graph mode="{'static' if not dynamic else 'dynamic'}" defaultedgetype="directed" name="">
#         <nodes>
#   """
#     with open(path, 'w') as f:
#         f.write(header)
#         with open(nodes_path, 'r') as nodes:
#             for line in nodes:
#                 v = json.loads(line)
#                 f.writelines(f'      <node id="{v["_id"]}"  start="{v["year"]}" end="2018" />\n')
#         f.writelines("""
#          </nodes>
#          <edges>
#   """)
#         with open(edges_path, 'r') as edges:
#             for line in edges:
#                 e = json.loads(line)
#                 f.writelines(f'      <edge id="{e["_id"]}" source="{e["src"]}" target="{e["dst"]}" start="{e["year"]}" end="2018" />\n')
#         f.writelines("""         </edges>
#        </graph>
#      </gexf>""")


In [104]:
def pd_to_gexf(edges, nodes, path, dynamic=False):
    """
    Export two pandas frames to a single .gexf file for use in Gephi.
    ============
    Takes:
        edges: DataFrame object containing edges to export
        nodes: DataFrame object containing nodes to export
        path: string-location to save file
    ============
    Returns: None
    """
    if dynamic:
      nodes = ''.join(nodes.T.apply(lambda v: f'      <node id="{v["id"]}"  start="{v["year"]}" end="2018" year="{v["year"]}" />\n'))
      edges = ''.join(edges.T.apply(lambda e: f'      <edge id="{e["id"]}" source="{e["src"]}" target="{e["dst"]}" start="{e["year"]}" end="2018" year="{e["year"]}" />\n'))
    else:
      nodes = ''.join(g.vertices.rdd.map(lambda v: f'      <node id="{v["id"]}" />\n').collect())
      edges = ''.join(g.edges.rdd.map(lambda e: f'      <edge id="{e["id"]}" source="{e["src"]}" target="{e["dst"]}" />\n').collect())
    string =f"""<?xml version="1.0" encoding="UTF-8"?>
    <gexf xmlns="http://www.gexf.net/1.2draft" version="1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.w3.org/2001/XMLSchema-instance">
      <graph mode="{'static' if not dynamic else 'dynamic'}" defaultedgetype="directed" name="">
        <nodes>
  {nodes}
        </nodes>
        <edges>
  {edges}
        </edges>
      </graph>
    </gexf>"""
    with open(path, 'w') as f:
        f.write(string)

In [92]:
nodes.rdd.toLocalIterator()
# map(lambda e: f'      <edge id="{e["id"]}" source="{e["src"]}" target="{e["dst"]}" start="{e["year"]}" end="2018" />\n')
# test.first()

<itertools.chain at 0x127ac0a90>

In [98]:
nodes_df = pd.read_csv('data/nodes', sep='|', header=None)
nodes_df.columns = ['id','year','authors']
edges_df = pd.read_csv('data/edges', sep='|', header=None)
edges_df.columns = ['src','dst','year']
edges_df['id'] = edges_df.index

In [105]:
pd_to_gexf(edges_df, nodes_df, 'data/nak2.gexf', dynamic=True)

In [None]:
g = graphframes.GraphFrame(e=edges, v=nodes)
to_gexf(g, 'data/component.gexf', dynamic=True)