In [None]:
'''
VMP 2022-03-02:
Filter the data from 'PreprocessingAllDocType' down. 
'''

In [1]:
import sys  
sys.path.insert(0, '/home/vicp')
from MAGspark import get_mag_with_cluster_connection
from MAG import MicrosoftAcademicGraph
import os
from pyspark.sql import functions as F, Window
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql.functions import avg
os.chdir('/home/vicp')
mag, spark = get_mag_with_cluster_connection(63504, 
                               memory_per_executor=16000)

['NAME STATE JOBID', 'cluster_new.job RUNNING 63504', 'jupyter-notebook RUNNING 63712', '']


In [2]:
spark

In [None]:
## to do: 
# (1) run getFilteredPapers() (and uncomment afterwards)
# (2) run ProjPapersAll() for the two new. 
# (3) run PaperAuthorAffiliationsAll() for the two new. 
# (4) add to the MAG file so that we can load them. 
# (5) create new files which actually let us do what we need. 

# Filter Papers (all)

In [5]:
# unique elements
def getFilteredPapers(): 
    
    '''
    Creating a dataframe with one column, containing all the papers that 
    (1) either reference or cite. 
    (2) have less than 25 co-authors. 
    '''
    
    # get papers that reference or cite 
    PaperReferences = mag.getDataframe('PaperReferences')
    PaperId = PaperReferences.select('PaperId').distinct()
    PaperReferenceId = PaperReferences.select('PaperReferenceId').distinct().withColumnRenamed('PaperReferenceId', 'PaperId')
    PapersRefCite = PaperId.join(PaperReferenceId, ['PaperId'], 'outer')
    PapersRefCite = PapersRefCite.distinct() # not run. 
    print(f"PapersRefCite dtypes: {PapersRefCite.dtypes}")
    mag.saveFile(PapersRefCite, "datacuration", "PapersRefCite.txt") # sanity check (and SI)
    
    # get papers that have less than or equal to 25 co-authors
    PapersLessEqual25 = mag.getSubset('ProjectPapersAllDocType', ['PaperId', 'NumAuthors']) \
        .filter(F.col("NumAuthors") <= 25) \
        .select('PaperId').distinct()
    print(f"PapersLessEqual25 dtypes: {PapersLessEqual25.dtypes}")
    mag.saveFile(PapersLessEqual25, "datacuration", "Papers25.txt") # sanity check (and SI)
    
    # join again 
    PapersClean = PapersRefCite.join(PapersLessEqual25, ['PaperId'], 'inner')
    print(f"PapersClean dtypes: {PapersRefCite.dtypes}")
    
    # save it 
    mag.saveFile(PapersClean, "datacuration", "PapersClean.txt")

In [6]:
getFilteredPapers()

PapersLessEqual25 dtypes: [('PaperId', 'bigint')]


# Filter Authors (all)

In [3]:
def getFilteredAuthors(): 
    
    # load files
    paaaD = mag.getSubset('PaperAuthorAffiliationsAttributesAllDocType', ['PaperId', 'AuthorId'])
    PapersClean = mag.getDataframe('PapersClean')
    print(f"paaaD dtypes: {paaaD.dtypes}")
    print(f"PapersClean dtypes: {PapersClean.dtypes}")
    
    # inner join on PaperId
    AuthorsClean = PapersClean.join(paaaD, ['PaperId'], "inner") \
        .select('AuthorId') \
        .distinct() 
    print(f"AuthorsClean dtypes: {AuthorsClean.dtypes}")
    
    # write file 
    mag.saveFile(AuthorsClean, "datacuration", "AuthorsClean.txt")

In [4]:
getFilteredAuthors()

paaaD dtypes: [('PaperId', 'bigint'), ('AuthorId', 'bigint')]
PapersClean dtypes: [('PaperId', 'bigint')]
AuthorsClean dtypes: [('AuthorId', 'bigint')]


# Map FieldOfStudyId to NormalizedName

In [5]:
def NormalizedName(): 
    
    # NormalizedName and FieldOfStudyId mapping. 
    FoS = mag.getSubset("FieldsOfStudy", ['FieldOfStudyId', 'NormalizedName']) \
        .distinct() 
    print(f"dtypes: {FoS.dtypes}")
    
    # write file 
    mag.saveFile(FoS, "datacuration", "FoS.txt")

In [6]:
NormalizedName()

dtypes: [('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string')]


## Create Final ProjectPapersAll

In [3]:
def ProjPapersAll(PaperSub, outname): 
    
    # load files 
    ProjectPapersAllDocType = mag.getDataframe('ProjectPapersAllDocType')
    PapersClean = mag.getDataframe(PaperSub) # here as well 
    FoS = mag.getDataframe('FoS')
    
    # filter (only want 4 out of 7 columns I think). 
    ProjectPapersAll = ProjectPapersAllDocType.join(PapersClean, ['PaperId'], "inner") \
        .join(FoS, ['FieldOfStudyId'], "inner") \
        .select('PaperId', 'FieldOfStudyId', 'NormalizedName', 'DocType', 'Date') \
        .distinct() 
    print(f"ProjectPapersAll dtypes: {ProjectPapersAll.dtypes}")
    
    # write file 
    mag.saveFile(ProjectPapersAll, "datacuration", outname)

In [4]:
ProjPapersAll('PapersClean', 'ProjectPapersAll.txt')

ProjectPapersAll dtypes: [('PaperId', 'bigint'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string'), ('DocType', 'string'), ('Date', 'date')]


In [5]:
ProjPapersAll('PapersRefCite', 'ProjectPapersRefCite.txt')

ProjectPapersAll dtypes: [('PaperId', 'bigint'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string'), ('DocType', 'string'), ('Date', 'date')]


In [6]:
ProjPapersAll('Papers25', 'ProjectPapers25.txt')

ProjectPapersAll dtypes: [('PaperId', 'bigint'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string'), ('DocType', 'string'), ('Date', 'date')]


## Create Final ProjectAuthorsAll


In [9]:
def ProjAuthorsAll(): 
    
    # load files 
    ProjectAuthorsAllDocType = mag.getDataframe('ProjectAuthorsAllDocType')
    AuthorsClean = mag.getDataframe('AuthorsClean')
    
    # filter (keeping most columns here)
    ProjectAuthorsAll = ProjectAuthorsAllDocType.join(AuthorsClean, ['AuthorId'], "inner") \
        .select('AuthorId', 'DisplayName', 'Country', 'Gender', 'Genderized', 'MinDate') \
        .distinct()
    print(f"ProjectAuthorsAll dtypes: {ProjectAuthorsAll.dtypes}")
    
    # write file
    mag.saveFile(ProjectAuthorsAll, "datacuration", "ProjectAuthorsAll.txt")

In [10]:
ProjAuthorsAll()

ProjectAuthorsAll dtypes: [('AuthorId', 'bigint'), ('DisplayName', 'string'), ('Country', 'string'), ('Gender', 'string'), ('Genderized', 'int'), ('MinDate', 'date')]


## Create Final PaperAuthorAffiliationsAttributesAll

In [12]:
def PaperAuthorAffiliationsFun(ProjectPapersSub, outname): 
    
    # load files (remember, ProjectPapersAll is already filtered).
    ProjectPapersAll = mag.getSubset(ProjectPapersSub, ['PaperId', 'DocType', 'FieldOfStudyId'])
    PaperAuthorAffiliationsAttributesAllDocType = mag.getDataframe('PaperAuthorAffiliationsAttributesAllDocType')
    FoS = mag.getDataframe('FoS')
    
    # join 
    PaperAuthorAll = PaperAuthorAffiliationsAttributesAllDocType.join(ProjectPapersAll, ['PaperId'], "inner") \
        .join(FoS, ['FieldOfStudyId'], "inner") \
        .select('PaperId', 'AuthorId', 'Date', 'Gender', 'ScientificAge', 'CountryCode', 'DocType', 'FieldOfStudyId', "NormalizedName")
    print(f"dtypes: {PaperAuthorAll.dtypes}")

    # write file 
    mag.saveFile(PaperAuthorAll, "datacuration", outname)

In [13]:
# all doctypes and both filters (not run right now)
PaperAuthorAffiliationsFun('ProjectPapersAll', 'PaperAuthorAffiliationsAttributesAll.txt')

dtypes: [('PaperId', 'bigint'), ('AuthorId', 'bigint'), ('Date', 'date'), ('Gender', 'int'), ('ScientificAge', 'float'), ('CountryCode', 'string'), ('DocType', 'string'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string')]


In [14]:
# all doctypes and 25 filter
PaperAuthorAffiliationsFun('ProjectPapers25', 'PaperAuthorAffiliationsAttributes25.txt')

dtypes: [('PaperId', 'bigint'), ('AuthorId', 'bigint'), ('Date', 'date'), ('Gender', 'int'), ('ScientificAge', 'float'), ('CountryCode', 'string'), ('DocType', 'string'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string')]


In [15]:
# all doctypes and ref/cite filter
PaperAuthorAffiliationsFun('ProjectPapersRefCite', 'PaperAuthorAffiliationsAttributesRefCite.txt')

dtypes: [('PaperId', 'bigint'), ('AuthorId', 'bigint'), ('Date', 'date'), ('Gender', 'int'), ('ScientificAge', 'float'), ('CountryCode', 'string'), ('DocType', 'string'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string')]


In [16]:
# no filter (should actually be pretty much the same as PaperAuthorAffiliationsAttributesAllDocType) (not run right now)
PaperAuthorAffiliationsFun('ProjectPapersAllDocType', "PaperAuthorAffiliationsAttributesNoFilter.txt")

dtypes: [('PaperId', 'bigint'), ('AuthorId', 'bigint'), ('Date', 'date'), ('Gender', 'int'), ('ScientificAge', 'float'), ('CountryCode', 'string'), ('DocType', 'string'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string')]


## Create Final ProjectPapersRepo

In [17]:
def ProjPapersRepo(): 
    
    # load files 
    ProjPapersAll = mag.getDataframe("ProjectPapersAll")
    
    # filter
    ProjPapersRepo = ProjPapersAll.filter(F.col("DocType") == "Repository")
    print(f"dtypes: {ProjPapersRepo.dtypes}")
    
    # write file
    mag.saveFile(ProjPapersRepo, "datacuration", "ProjectPapersRepo.txt")

In [18]:
ProjPapersRepo()

dtypes: [('PaperId', 'bigint'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string'), ('DocType', 'string'), ('Date', 'date')]


## Create Final PaperAuthorAffiliationsAttributesRepo

In [19]:
def PaperAuthorAffiliationsRepo(): 
    
    # load files 
    PaperAuthorAll = mag.getDataframe("PaperAuthorAffiliationsAttributesAll")
    
    # filter
    PaperAuthorRepo = PaperAuthorAll.filter(F.col("DocType") == "Repository")
    print(f"dtypes: {PaperAuthorRepo.dtypes}")
    
    # write file
    mag.saveFile(PaperAuthorRepo, "datacuration", "PaperAuthorAffiliationsAttributesRepo.txt")

In [20]:
PaperAuthorAffiliationsRepo()

dtypes: [('PaperId', 'bigint'), ('AuthorId', 'bigint'), ('Date', 'date'), ('Gender', 'int'), ('ScientificAge', 'float'), ('CountryCode', 'string'), ('DocType', 'string'), ('FieldOfStudyId', 'bigint'), ('NormalizedName', 'string')]
