In [None]:
# load header and path to ANI
permissions = "fulldata"  # default or fulldata (depends which cluster you are on)
# set the project name (you receive this via email)
project_name = "hbcu"

In [None]:
%run /Snippets/header_008

PROJECT DETAILS:
Project identifier: hbcu
Cluster data access level: fulldata


EXECUTION DETAILS:
Spark version: 11.3.x-scala2.12, 
Cluster Node Type: i3.xlarge, 
Driver Node Type: c4.2xlarge, 
Worker Node Type: i3.xlarge


DATA SNAPSHOT DETAILS:
This notebook uses the latest snapshot date for this version: "v008.20231003094404/", (03 October 2023)

 The following are the Scopus snapshot dates available for datasets at v008:
['v008.20220314092239/', 'v008.20220906063204/', 'v008.20221021151538/', 'v008.20230509104116/', 'v008.20230821105000/', 'v008.20231003094404/']
(If you want to use a different snapshot, in the first cell of your notebook, set the variable `custom_snapshot_date` to one of the options listed above (string data type))



DATA SAMPLE DETAILS:
Setting paths to run on full datasets (100% Scopus publication volume)


In [None]:
import pyspark.sql.functions as f
from pyspark.sql import Window
import pandas as pd
import numpy as np
import re
from scipy import stats
from pyspark.sql.window import Window



In [None]:
import os

# Get the list of all files and directories
path = "/dbfs/path/hbcu"
dir_list = os.listdir(path)
print(dir_list)

['full_hbcu.csv', 'hbcu_au_eid.csv', 'hbcu_au_eid_match.csv', 'hbcu_cd_pct.csv', 'hbcu_cd_pct_match.csv', 'hbcu_citepct.csv', 'hbcu_citepct_match.csv', 'hbcu_collab_af.csv', 'hbcu_collab_afnum.csv', 'hbcu_collab_fresh.csv', 'hbcu_collab_senior.csv', 'hbcu_doi.csv', 'hbcu_eid.csv', 'hbcu_eid_match.csv', 'hbcu_match_doi.csv', 'hbcu_match_name.csv', 'hbcu_multiinst_info.csv', 'hbcu_novel.csv', 'hbcu_novel_match.csv', 'hbcu_person_name.csv', 'multi_inst_disambiguate.csv']


In [None]:
#dbutils.fs.rm("dbfs:/path/hbcu/hbcu_match_name.csv",True)


In [None]:
# cmd 2 gives us locations
df_ani = spark.read.format("parquet").load(basePath+tablename_ani)
df_doi = (spark.read.format("csv").option("header", "true")
              .load('dbfs:/path/hbcu/hbcu_match_doi.csv') #hbcu_doi.csv
          )

In [None]:
# match DOI to eid
df_doi_eid = (
    df_doi
    # normalize doi
    .withColumn("doi", f.trim(f.lower(f.col("doi"))))
    .join(
        df_ani.select("doi", "eid", "year").withColumn(
            "doi", f.trim(f.lower(f.col("doi")))
        ),
        ["doi"],
        "inner",
    )
    .drop_duplicates()
)
# display(df_doi_eid)

# Get auid for every person

In [None]:
df_name = (spark.read.format("csv").option("header", "true")
              .load('dbfs:/path/hbcu/hbcu_match_name.csv') #hbcu_person_name.csv
          )

#display(df_name)

In [None]:
df_ani_au = (
    df_ani
    # get final author position
    .withColumn("last_author_seq", f.array_max(f.col("Au.Authorseq")))
    .withColumn("Au", f.explode("Au"))
    .select(
        "eid",
      'year',
        "Au.auid",
        "Au.initials",
        "Au.surname",
        "Au.Authorseq",
        "last_author_seq",
      "publication_type", 'Au_Af', 'Af.affiliation_ids'
    )
)

# set mapping between PersonId_control and auid
t = (
    df_doi_eid.join(df_name, ["PersonId_control"])
    .withColumn("first_intial", f.col("firstname").substr(1, 1)).select("PersonId_control", "lastname", "firstname", "first_intial", "eid")
    .join(
        df_ani_au.select("eid", "auid", "initials", "surname", "Authorseq", "last_author_seq",),
        ["eid"],
        "inner",
    )
    .drop_duplicates()
)


df_auid = (
    t
    # normalize names
    .withColumn("first_intial", f.trim(f.lower(f.col("first_intial"))))
    .withColumn(
        "lastname",
        f.trim(f.lower(f.regexp_replace(f.col("lastname"), "[^a-zA-Z]", ""))),
    )
    .withColumn("scopus_intial", f.trim(f.lower(f.col("initials"))))
    .withColumn(
        "scopus_lastname",
        f.trim(f.lower(f.regexp_replace(f.col("surname"), "[^a-zA-Z]", ""))),
    )
    # find equal names; many authors don't have given names, use initial instead
    .filter(
        (f.col("lastname") == f.col("scopus_lastname"))
        & (f.col("scopus_intial").contains(f.col("first_intial")))
    )
    # get the final mapping
    .select("PersonId_control", "auid")
    .drop_duplicates()
)

# why some pub authors don't have match?
# e.g., TAKACS HAYNES - Haynes; MARTINEZ-LOPEZ - Martínez-López

df_auid_plus = (
    t.join(
        df_auid.select("PersonId_control").withColumn("ind", f.lit(1)).drop_duplicates(),
        ["PersonId_control"],
        "left",
    )
    .filter("ind is null")
    # replace special letters
    .withColumn("clean_lastname", f.lower(f.regexp_replace("lastname", "-", " ")))
    .withColumn("clean_scopus_lastname", f.lower(f.regexp_replace("surname", "-", " ")))
    .withColumn(
        "clean_lastname",
        f.translate(f.col("clean_lastname"), "áàâäãåąÁÀÂÄÃÅĄ", "a" * 14),
    )
    .withColumn(
        "clean_lastname", f.translate(f.col("clean_lastname"), "éèêëęÉÈÊËĘ", "e" * 10)
    )
    .withColumn(
        "clean_lastname", f.translate(f.col("clean_lastname"), "íìîïÍÌÎÏ", "i" * 8)
    )
    .withColumn(
        "clean_lastname", f.translate(f.col("clean_lastname"), "óòôöõøÓÒÔÖÕØ", "o" * 12)
    )
    .withColumn(
        "clean_lastname", f.translate(f.col("clean_lastname"), "úùûüÚÙÛÜ", "u" * 8)
    )
    .withColumn(
        "clean_lastname",
        f.translate(
            f.col("clean_lastname"),
            "ćçğłńñřșşśšțýźžĆÇĞŁÑŃŘȘŞŚŠȚÝŹŽ",
            "ccglnnrsssstyzzCCGLNNRSSSSTYZZ",
        ),
    )
    .withColumn(
        "clean_scopus_lastname",
        f.translate(f.col("clean_scopus_lastname"), "áàâäãåąÁÀÂÄÃÅĄ", "a" * 14),
    )
    .withColumn(
        "clean_scopus_lastname",
        f.translate(f.col("clean_scopus_lastname"), "éèêëęÉÈÊËĘ", "e" * 10),
    )
    .withColumn(
        "clean_scopus_lastname",
        f.translate(f.col("clean_scopus_lastname"), "íìîïÍÌÎÏ", "i" * 8),
    )
    .withColumn(
        "clean_scopus_lastname",
        f.translate(f.col("clean_scopus_lastname"), "óòôöõøÓÒÔÖÕØ", "o" * 12),
    )
    .withColumn(
        "clean_scopus_lastname",
        f.translate(f.col("clean_scopus_lastname"), "úùûüÚÙÛÜ", "u" * 8),
    )
    .withColumn(
        "clean_scopus_lastname",
        f.translate(
            f.col("clean_scopus_lastname"),
            "ćçğłńñřșşśšțýźžĆÇĞŁÑŃŘȘŞŚŠȚÝŹŽ",
            "ccglnnrsssstyzzCCGLNNRSSSSTYZZ",
        ),
    )
    .withColumn("first_intial", f.trim(f.lower(f.col("first_intial"))))
    .withColumn("scopus_intial", f.trim(f.lower(f.col("initials"))))
    # split multi-word last names
    .withColumn("lastname_list", f.split("clean_lastname", " "))
    .withColumn("scopus_lastname_list", f.split("clean_scopus_lastname", " "))
    # Determine the common words in both last name lists
    .withColumn(
        "common_words", f.array_intersect("lastname_list", "scopus_lastname_list")
    )
    .filter(
        (f.size("common_words") >= 1)
        & (f.col("scopus_intial").contains(f.col("first_intial")))
    )
    # get the final mapping
    .select("PersonId_control", "auid")
    .drop_duplicates()
)

df_auid = df_auid.unionByName(df_auid_plus)

# Reconstruct their pub list
this is an author level feature table / author-eid mapping table

In [None]:
df_publist = (
    df_ani_au
    # filter paper type and pubyear
    .filter("publication_type in ('ar', 'cp', 're')")
    .filter("year >= 2006")
    .drop("initials", "surname")
    .join(df_auid, ["auid"])
)

df_publist = (
    df_publist
    # only keep pubs in the AA data before 2020; keep the rest
    .join(
        df_doi_eid.select("eid", "PersonId_control", "doi").withColumn("isAApub", f.lit(1)),
        ["eid", "PersonId_control"],
        "outer",
    )
    # exclude super author article
    .filter(f.col("last_author_seq") < 100).drop_duplicates()
)

In [None]:
# filter Scopus appended new pubs by aff
# t = df_publist.filter('isAApub = 1').select('eid', 'PersonId_control').join(df_ani.select('eid', 'Au_Af'), ['eid'], 'left').drop_duplicates()
# display(t.filter("Au_Af is null").count() / t.count())

t = (
    df_publist.withColumn("auaf", func.explode("Au_Af"))
    .filter("Authorseq=auaf.Authorseq")
    .withColumn("affiliation", func.expr("affiliation_ids[auaf.affiliation_seq-1]"))
    .withColumn("affiliation", f.explode("affiliation"))
)

tScopus = (
    df_publist.filter("isAApub is null")
    .withColumn("affiliation", f.explode(f.flatten("affiliation_ids")))
    .join(
        t.filter("isAApub = 1").select(
            "PersonId_control",
            "affiliation",
        ),
        ["PersonId_control", "affiliation"],
    )
    .drop("Au_Af", "auaf", "affiliation", "affiliation_ids")
)

df_publist = (
    df_publist.filter("isAApub = 1")
    .drop("Au_Af", "affiliation_ids")
    .unionByName(tScopus)
    .drop_duplicates()
)

In [None]:
# append Author gender by Scopus
#df_gender = spark.read.format("parquet").load(basePath + tablename_gender_inference)
# df_publist = df_publist.join(
#     df_gender.select("auid", f.col("Inferred_Probable_Gender").alias("scopus_gender")),
#     ["auid"],
#     "left",
# )

In [None]:
# export
df_export = df_publist.toPandas()
df_export.to_csv('/dbfs/path/hbcu/hbcu_au_eid_match.csv')

# citation

calculate all publications' citations. Rank all pub's citations by year & subfield. Get percentile.

In [None]:
# get each paper's citations in all dataset
df_ani_t = (
    df_ani
    # concat date
    .withColumn(
        "time",
        f.to_date(f.concat_ws("", f.col("year"), f.col("date_sort_month")), "yyyyMM"),
    )
)

df_cite_ani = (
    df_ani_t.select(
        f.col("eid").alias("citing_eid"),
        f.col("time").alias("citing_time"),
        "references",
    )  # e.g. [438974823, 437082432, 8943021, 30482]
    .withColumn("cited_eid", f.explode("references"))
    .drop("references")
    # use ani to get citation year
    .join(
        df_ani_t
        # romove cited papers that don't fit the standard
        .filter("publication_type in ('ar', 'cp', 're')")
        .filter("year >= 2006")
        .select(
            [
                "EID",
                f.col("time").alias("cited_time"),
            ]
        ).alias("t1"),
        f.col("t1.EID") == f.col("cited_eid"),
        "inner",
    )
    .drop("EID")
    # set year fit logic
    .filter((f.col("citing_time") >= f.col("cited_time")))
    .drop_duplicates()
)

# exclude self citation
# get each article's first and last author, if any is same, it's self-citation
# first author list
t1 = df_ani_au.filter("Authorseq = 1").drop_duplicates()
# last
tlast = (
    df_ani_au.filter("Authorseq = last_author_seq")
    .select("eid", "auid")
    .drop_duplicates()
)
df_cite_ani_noself = (
    df_cite_ani
    # append first
    .join(
        t1.select(f.col("eid").alias("cited_eid"), f.col("auid").alias("cited_1_auid")),
        ["cited_eid"],
        "left",
    )
    .join(
        t1.select(
            f.col("eid").alias("citing_eid"), f.col("auid").alias("citing_1_auid")
        ),
        ["citing_eid"],
        "left",
    )
    .filter("cited_1_auid != citing_1_auid")
    # append last
    .join(
        tlast.select(
            f.col("eid").alias("cited_eid"), f.col("auid").alias("cited_last_auid")
        ),
        ["cited_eid"],
        "left",
    )
    .join(
        tlast.select(
            f.col("eid").alias("citing_eid"), f.col("auid").alias("citing_last_auid")
        ),
        ["citing_eid"],
        "left",
    )
    .filter("cited_last_auid != citing_last_auid")
    .select("citing_eid", "cited_eid", "citing_time", "cited_time")
)

# display(df_cite_ani_noself)

In [None]:
# use SM to find subfield average citations as denom
df_smc_ani = spark.read.format("parquet").load(basePath + tablename_smc_complete)

# for every paper after 2006, get its SM field
# add SM fields; hybrid first, then match journal

# use hybrid
df_ani_sm = df_ani.join(
    df_smc_ani.select("eid", "subfield_hybrid"), ["eid"], "left"
).select("eid", "subfield_hybrid")

# find unmatched
t_na = df_ani_sm.filter(f.col("subfield_hybrid").isNull())
if t_na.count() > 0:
    t2 = (
        t_na.join(df_ani.select("eid", "source.srcid"), ["eid"], "inner")
        .join(
            df_smc_ani.select("srcid", "subfield_journal").drop_duplicates(),
            ["srcid"],
            "inner",
        )
        .drop("subfield_hybrid")
        .withColumnRenamed("subfield_journal", "subfield_hybrid")
        .select("eid", "subfield_hybrid")
    )
    df_ani_sm = df_ani_sm.dropna().unionByName(t2).drop_duplicates()

In [None]:
# get each paper's citation: 3-year, 5-year
df_cite_metric3 = (
    df_cite_ani_noself.filter(
        f.col("citing_time") <= f.add_months(f.col("cited_time"), 12 * 3)
    )
    .groupby("cited_eid")
    .agg(f.countDistinct("citing_eid").alias("cite3"))
    .withColumnRenamed("cited_eid", "eid")
)

df_cite_metric5 = (
    df_cite_ani_noself.filter(
        f.col("citing_time") <= f.add_months(f.col("cited_time"), 12 * 5)
    )
    .groupby("cited_eid")
    .agg(f.countDistinct("citing_eid").alias("cite5"))
    .withColumnRenamed("cited_eid", "eid")
)

w3 = Window.partitionBy("subfield_hybrid", "year").orderBy("cite3")
w5 = Window.partitionBy("subfield_hybrid", "year").orderBy("cite5")

df_cite_metric = (
    df_ani_t
    # romove cited papers that don't fit the standard
    .filter("publication_type in ('ar', 'cp', 're')")
    .filter("year >= 2006")
    .join(
        df_ani_sm.select("eid", "subfield_hybrid"),
        ["eid"],
        "inner",
    )
    .join(df_cite_metric3, ["eid"], "left")
    .join(df_cite_metric5, ["eid"], "left")
    .select("eid", "subfield_hybrid", "cite3", "cite5", 'year')
    .fillna(0)
    .drop_duplicates()
    # add percentile for every paper by subfield and year
    .withColumn("cite3_pct", f.percent_rank().over(w3))
    .withColumn("cite5_pct", f.percent_rank().over(w5))
)

# merge to publist
df_eid_citepct = (
    df_publist.select("eid", "year")
    .join(df_cite_metric, ["eid", "year"], "left")
    .drop_duplicates()
)

df_export = df_eid_citepct.toPandas()
df_export.to_csv('/dbfs/path/hbcu/hbcu_citepct_match.csv')

## Average relative citation

In [None]:
# a df containing all cited paper's field
df_cited_field = df_cite_ani.join(
    df_ani_sm.select("eid", "subfield_hybrid"),
    f.col("eid") == f.col("cited_eid"),
    "inner",
)

# get 3-year total citations per year
t3 = (
    df_cited_field.filter(f.col("citing_time") <= f.add_months(f.col("cited_time"), 36))
    .groupby("subfield_hybrid", f.year(f.col("cited_time")).alias("year"))
    .agg(f.countDistinct("citing_eid").alias("sum_cite_3yr"))
)

# get 5-year total citations per year
t5 = (
    df_cited_field.filter(f.col("citing_time") <= f.add_months(f.col("cited_time"), 60))
    .groupby("subfield_hybrid", f.year(f.col("cited_time")).alias("year"))
    .agg(f.countDistinct("citing_eid").alias("sum_cite_5yr"))
)

# count denom; contains 0 citation papers
denom = (
    df_ani.select("eid", "year")
    .join(df_ani_sm.select("eid", "subfield_hybrid"), ["eid"], "inner")
    .groupby("subfield_hybrid", "year")
    .agg(f.countDistinct("eid").alias("denom_eid_num"))
)

df_baseline_cite = (
    t3
    .join(t5, ["subfield_hybrid", "year"], "outer")
    .join(denom, ["subfield_hybrid", "year"], "left")
    .withColumn("mean_cite_3yr", f.col("sum_cite_3yr") / f.col("denom_eid_num"))
    .withColumn("mean_cite_5yr", f.col("sum_cite_5yr") / f.col("denom_eid_num"))
    .select(
        "subfield_hybrid", "year", "mean_cite_3yr", "mean_cite_5yr"
    )
)

# display(df_baseline_cite)

In [None]:
# sources schema for SJR and SNIP
df_sources = spark.read.format("parquet").load(basePath + tablename_sources)
df_sources = (
    df_sources.filter("isActive == 'true'")
    .withColumn("metrics_calculations", f.explode("metrics_calculations"))
    .select(
        [
            "srcid",
            "metrics_calculations.year",
            "metrics_calculations.SJR",
            "metrics_calculations.SNIP",
        ]
    )
    .filter("year >=2006")
)

df_cite_count = (
    df_publist
    .join(df_cite_metric, ["eid"], "left")
    # add year, srcid
    .join(
        df_ani[['eid', "source.srcid", "year"]],
        ["eid"],
        "left",
    )
    .join(
        df_ani_sm.select(
            "eid", "subfield_hybrid"
        ).drop_duplicates(),
        ["eid"],
        "left",
    )
    .join(df_baseline_cite, ["subfield_hybrid", "year"], "left")
    # calculate ARC
    .withColumn("arc_3yr_noself", f.col("cite3") / f.col("mean_cite_3yr"))
    .withColumn("arc_5yr_noself", f.col("cite5") / f.col("mean_cite_5yr"))
    # Journal prestige (SJR, SNIP)
    .join(df_sources.select("srcid", "year", "SJR", "SNIP"), ["srcid", "year"], "left")
    .drop("mean_cite_3yr", "mean_cite_5yr")
    .drop_duplicates()
)
# display(df_cite_count)

In [None]:
# citing year/month, doc type, # authors, # refs
df_title = df_ani_t.select(
    "eid", "time", "publication_type", f.size("Au").alias("n_au"), "n_references"
)

# display(df_title)

In [None]:
df_past_prod = (
    df_publist.select("eid")
    .join(
        df_ani_au.filter("Authorseq = 1 or Authorseq = last_author_seq").select(
            "eid", "auid", "year"
        ),
        ["eid"],
    )
    .join(
        df_ani_au
        # only count article, conf, review
        .filter("publication_type in ('ar', 'cp', 're')").select(
            f.col("eid").alias("past_eid"),
            f.col("auid").alias("auid2"),
            f.col("year").alias("past_year"),
        ),
        ((f.col("auid") == f.col("auid2")) & (f.col("year") > f.col("past_year"))),
    )
    .groupby("eid", "auid")
    .agg(f.countDistinct("past_eid").alias("num_past_eid"))
    .groupby("eid")
    .agg(f.mean("num_past_eid").alias("avgnum_past_eid"))
)


In [None]:
# eid level data
df_eid_basic = (
    df_publist.select("eid", "PersonId_control")
    .join(df_cite_count.withColumnRenamed("cited_eid", "eid"), ["eid"], "outer")
    .join(df_past_prod, ["eid"], "left")
    .join(df_title, ["eid"], "left")
    .drop_duplicates()
)

# export 2
df_export = df_eid_basic.toPandas()
df_export.to_csv('/dbfs/path/hbcu/hbcu_eid_match.csv')

# Disruption metric (CD index)
https://link.springer.com/article/10.1007/s11192-023-04644-2

In [None]:
df_eid = pd.read_csv(
    r"/dbfs/path/hbcu/hbcu_au_eid_match.csv"
)
output = []

for year in range(2006, 2021):
  try:
    df = pd.read_csv(r"/dbfs/path/tenure_cd_{}.csv".format(year))
    output.append(df_eid[['eid']].merge(df[['eid','cd5','cd5_pct']], on='eid', how='inner'))
    print(year, len(df))
  except:
    print(year, 'wrong')

df_output = pd.concat(output).drop_duplicates()
df_output.to_csv(r"/dbfs/path/hbcu/hbcu_cd_pct_match.csv")

2006 1280137
2007 1444434
2008 1601573
2009 1760327
2010 1880480
2011 2034645
2012 2146010
2013 2257428
2014 2349333
2015 2386661
2016 2494264
2017 2583831
2018 2749014
2019 2997639
2020 3154400


# Novelty/conventionality
(10.1073/pnas.2118046119; https://www.science.org/doi/10.1126/science.1240474)
https://www.sciencedirect.com/science/article/pii/S0048733314001826

In [None]:
df_eid = pd.read_csv(
    r"/dbfs/path/hbcu/hbcu_au_eid_match.csv"
)
output = []

for year in range(2006, 2021):
  try:
    df = pd.read_csv(r"/dbfs/path/tenure_novel_pct_{}.csv".format(year))
    output.append(df_eid[['eid']].merge(df[['eid','novelty','novelty_pct']], on='eid', how='inner'))
    print(year, len(df))
  except:
    print(year, 'wrong')

df_output = pd.concat(output).drop_duplicates()
df_output.to_csv(r"/dbfs/path/hbcu/hbcu_novel_match.csv")

2006 1383389
2007 1468531
2008 1554054
2009 1704200
2010 1801035
2011 1940442
2012 2035027
2013 2132071
2014 2262712
2015 2303219
2016 2337913
2017 2421731
2018 2574154
2019 2801095
2020 2937820


# Collaborator

In [None]:
# control version
df_ipr = spark.read.format("parquet").load(basePath + tablename_ipr)

# get collaborator affiliation & country
df_collab_af = (
    df_publist
    # remove single author
    .filter("Authorseq > 1")
    .select("eid")
    .join(df_ani.select("eid", "Af.afid"), ["eid"], "inner")
    .withColumn("afid", f.explode("afid"))
    .join(
        df_ipr.select(
            "afid", f.col("preferred_name").alias("afname"), "state", "country"
        ),
        ["afid"],
        "left",
    )
    .drop_duplicates()
)

df_collab_af.toPandas().to_csv(f'/dbfs/path/hbcu/hbcu_collab_af_match.csv')

In [None]:
df_afnum = df_collab_af.groupby("eid").agg(
    # mark same-af papers
    f.countDistinct("afid").alias("af_num"),
    # mark same-country papers
    f.countDistinct("country").alias("ctry_num"),
)

#display(df_afnum.groupby('af_num').agg(f.countDistinct('eid')))
df_afnum.toPandas().to_csv(f'/dbfs/path/hbcu/hbcu_collab_afnum.csv')

In [None]:
# get collaborator seniority

df_senior = (
    df_publist
    .select("eid", 'year', f.col('auid').alias('self_auid'))
    .join(df_ani.withColumn("Au", f.explode("Au")).select("eid", "Au.auid", 'Au.Authorseq'), ["eid"], "inner")
    .filter('self_auid != auid')
    .join(df_ani.select("Au.auid", f.col('year').alias('collab_pubyear')).withColumn("auid", f.explode("auid")), ["auid"], "inner")
    # get max publication length
    .groupBy("eid", 'auid', 'year', 'Authorseq').agg(f.min('collab_pubyear').alias('min_year'))
    .withColumn('seniority', f.col('year') - f.col('min_year') + 1)
)

df_senior.toPandas().to_csv(f'/dbfs/path/hbcu/hbcu_collab_senior.csv')
#display(df_senior)

In [None]:
# number new collaborators who did not appear in past five years

df_publist_alltime = (
    df_ani_au
    # filter paper type and pubyear
    .filter("publication_type in ('ar', 'cp', 're')")
    .drop("initials", "surname")
    .join(df_auid, ["auid"])
    .select('eid', 'auid', 'year')
)

In [None]:
df_au_eid = pd.read_csv(
    r"/dbfs/path/tenure_au_eid_1106.csv"
)
df_au_eid = df_au_eid[["auid", "eid", "PersonId", "year"]]
df_au_eid = spark.createDataFrame(df_au_eid)

# merge
df_compare1 = df_au_eid.join(
    df_au_eid.select(
        f.col("PersonId"),
        f.col("eid").alias("past_eid"),
        f.col("year").alias("past_year"),
    ),
    ["PersonId"],
).filter("year - past_year <= 5 and year - past_year > 0 and eid != past_eid")
df_compare2 = df_au_eid.join(
    df_publist_alltime.select(
        f.col("auid"), f.col("eid").alias("past_eid"), f.col("year").alias("past_year")
    ),
    ["auid"],
).filter("year - past_year <= 5 and year - past_year > 0 and eid != past_eid")

df_compare = (
    df_compare1.unionByName(df_compare2)
    .withColumnRenamed("auid", 'self_auid')
    # get all authors
    .join(df_ani_au.select(f.col("auid"), f.col("eid")), ["eid"])
    .join(
        df_ani_au.select(
            f.col("auid").alias("past_auid"), f.col("eid").alias("past_eid")
        ),
        ["past_eid"],
    )
    # remove self
    .filter("auid != self_auid and past_auid != self_auid")
    .drop("self_auid")
    .drop_duplicates()
    .withColumn("is_same", f.when(f.col("auid") == f.col("past_auid"), 1).otherwise(0))
    # aggregate
    .groupby("PersonId", 'eid', "auid")
    .agg(f.sum('is_same').alias('past_freq'))
)

# past freq 0 authors
df_0_past_freq = df_compare.filter("past_freq == 0").groupby("PersonId", 'eid').agg(f.countDistinct('auid').alias('fresh_collab_num'))
# all collaborator
df_all = df_compare.groupby("PersonId", 'eid').agg(f.countDistinct('auid').alias('collab_num'))
df_fresh_collab = df_all.join(df_0_past_freq, ["PersonId", 'eid'], 'left').fillna(0).withColumn('fresh_perc', f.col('fresh_collab_num') / f.col('collab_num'))

df_fresh_collab.toPandas().to_csv(f'/dbfs/path/hbcu/hbcu_collab_fresh.csv')
