In [1]:
import pandas as pd, numpy as np
from matplotlib import pyplot as plt
pd.set_option('display.max_rows', 200)
pd.set_option('display.max_columns', 100)

import json, pickle as pkl
from unidecode import unidecode
from glob import glob
from collections import Counter, defaultdict
from itertools import product, combinations
from copy import deepcopy

from functools import partial
from tqdm import tqdm, trange
tqdm.pandas(ncols=100, mininterval=1)
tqdm, trange = partial(tqdm, ncols=100, mininterval=1), partial(trange, ncols=100, mininterval=1)

conf_list = [
    ('spark.app.name', 'Shaoerzhuo@TeamScience'),
    ('spark.local.dir', '/tmp/spark'),
    ('spark.rdd.compress', 'true'),
    ('spark.driver.memory', '1024g'),
    ('spark.driver.maxResultSize', '200g'),
    ('spark.serializer.objectStreamReset', '100'),
    ('spark.master', 'local[48]'),
    ('spark.submit.deployMode', 'client'),
    ('spark.ui.showConsoleProgress', 'false'),

    ('spark.driver.extraJavaOptions', '-Djava.security.manager=allow'), 
    ('spark.executor.extraJavaOptions', '-Djava.security.manager=allow'),

    ('spark.sql.adaptive.enabled', 'true'),
    ('spark.sql.adaptive.coalescePartitions.enabled', 'true'),
    ('spark.sql.adaptive.localShuffleReader.enabled', 'true'),
    ('spark.sql.adaptive.skewJoin.enabled', 'true'),
    ('spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes', str(256 * 1024 * 1024)),  # 256MB
]

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, explode_outer, udf, size, lower
from pyspark.sql import functions as F
from pyspark.sql import types as T
if 'sc' not in locals():
    conf = SparkConf().setAll(conf_list)
    sc = SparkContext(conf = conf)
    sc.setLogLevel('ERROR')
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/08 04:38:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/08 04:38:54 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in standalone/kubernetes and LOCAL_DIRS in YARN).


In [None]:
@udf
def paper_id_to_int(paper_id):
    return int(paper_id[4:])

citation_by_time = spark.read.parquet("../parquet/processed/pub_citing_cited_years.parquet")

citation_by_time = citation_by_time.select(
    paper_id_to_int("citing_paperid").cast(T.LongType()).alias("citing_paperid"),
    paper_id_to_int("cited_paperid").cast(T.LongType()).alias("cited_paperid"),
    F.col("citing_year").cast(T.LongType()).alias("citing_year"),
    F.col("cited_year").cast(T.LongType()).alias("cited_year"),
)

In [None]:
# Create dict_cited_to_citing using Spark DataFrame operations
# Test with only top 10000 rows
citation_by_time_sample = citation_by_time

dict_cited_to_citing = citation_by_time_sample.groupBy("cited_paperid").agg(
    F.collect_list(F.struct("citing_paperid", "citing_year")).alias("citing_list")
)

dict_citing_to_cited = citation_by_time_sample.groupBy("citing_paperid").agg(
    F.collect_list(F.struct("cited_paperid", "cited_year")).alias("cited_list")
)

dict_paper_id_to_year = citation_by_time_sample.select(
    col("cited_paperid").alias("paperid"),
    col("cited_year").alias("year")
).union(
    citation_by_time_sample.select(
        col("citing_paperid").alias("paperid"),
        col("citing_year").alias("year")
    )
).distinct()

# Save to parquet files instead of converting to dict
dict_cited_to_citing.write.mode("overwrite").parquet("intermediate/dict_cited_to_citing_id_and_year.parquet")
dict_citing_to_cited.write.mode("overwrite").parquet("intermediate/dict_citing_to_cited_id_and_year.parquet")
dict_paper_id_to_year.write.mode("overwrite").parquet("intermediate/dict_paper_id_to_year.parquet")

In [None]:
l = glob('intermediate/dict_paper_id_to_year.parquet/*.parquet')
%time dict_paper_id_to_year = pd.concat([pd.read_parquet(f) for f in tqdm(l)])
%time dict_paper_id_to_year.to_feather('data/dict_paper_id_to_year.feather')

In [None]:
l = glob('intermediate/dict_citing_to_cited_id_and_year.parquet/*.parquet')
%time dict_citing_to_cited_id_and_year = pd.concat([pd.read_parquet(f) for f in tqdm(l)])
dict_citing_to_cited_id_and_year["cited_list"] = dict_citing_to_cited_id_and_year["cited_list"].progress_map(
    lambda x: [(i["cited_paperid"], i["cited_year"]) for i in x])
%time dict_citing_to_cited_id_and_year.to_feather('data/dict_citing_to_cited_id_and_year.feather')

In [None]:
l = glob('intermediate/dict_cited_to_citing_id_and_year.parquet/*.parquet')
%time dict_cited_to_citing_id_and_year = pd.concat([pd.read_parquet(f) for f in tqdm(l)])
dict_cited_to_citing_id_and_year["citing_list"] = dict_cited_to_citing_id_and_year["citing_list"].progress_map(
    lambda x: [(i["citing_paperid"], i["citing_year"]) for i in x])
%time dict_cited_to_citing_id_and_year.to_feather('data/dict_cited_to_citing_id_and_year.feather')

In [2]:
%%time

papers = pd.read_parquet('../parquet/processed/publications.parquet')

CPU times: user 6min 56s, sys: 1min 19s, total: 8min 15s
Wall time: 5min 25s


In [None]:
%time papers_2 = papers.dropna(subset=["year", "id"])
%time papers_2["year"] = papers_2["year"].astype(int)
%time papers_2["paper_id"] = papers_2["id"].progress_map(lambda x: int(x[4:]))
%time dict_year_to_paper_ids = papers_2.groupby("year")["paper_id"].apply(list).reset_index()

dict_year_to_paper_ids.head()

CPU times: user 26.6 s, sys: 10.6 s, total: 37.2 s
Wall time: 37 s


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


CPU times: user 421 ms, sys: 295 ms, total: 716 ms
Wall time: 716 ms


100%|████████████████████████████████████████████| 152327403/152327403 [02:06<00:00, 1199711.78it/s]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


CPU times: user 2min 2s, sys: 5.22 s, total: 2min 7s
Wall time: 2min 7s
CPU times: user 13.5 s, sys: 2.38 s, total: 15.9 s
Wall time: 15.9 s


Unnamed: 0,year,paper_id
0,1665,"[1051816633, 1001261415, 1008254293, 102786731..."
1,1666,"[1046462870, 1034167481, 1011608840, 103042842..."
2,1667,"[1086930131, 1087186495, 1099352398, 109950846..."
3,1668,"[1040983623, 1038172664, 1035491401, 108718649..."
4,1669,"[1044301911, 1016722741, 1086858230, 102238151..."


In [23]:
dict_year_to_paper_ids.sort_values("year", inplace=True)
%time dict_year_to_paper_ids.to_feather('intermediate/dict_year_to_paper_ids.feather')

CPU times: user 11.8 s, sys: 570 ms, total: 12.4 s
Wall time: 12.3 s


In [None]:
%%time
papers_2 = papers.dropna(subset=["id", "journal_id"]).reset_index(drop=True)
papers_2["paper_id"] = papers_2["id"].str.replace("pub.", "").astype(int)
papers_2["journal_id"] = papers_2["journal_id"].str.replace("jour.", "").astype(int)
dict_paper_to_journal_id = papers_2[["paper_id", "journal_id"]]
dict_paper_to_journal_id.columns = ["paper_id", "journal_id"]
dict_paper_to_journal_id.to_feather('intermediate/dict_paper_to_journal_id.feather')

CPU times: user 44.7 s, sys: 12 s, total: 56.7 s
Wall time: 56.4 s
CPU times: user 28.8 s, sys: 4 s, total: 32.8 s
Wall time: 32.8 s
CPU times: user 28.2 s, sys: 2.68 s, total: 30.9 s
Wall time: 30.9 s
CPU times: user 583 ms, sys: 211 ms, total: 794 ms
Wall time: 793 ms
CPU times: user 17.5 s, sys: 1.29 s, total: 18.8 s
Wall time: 10.6 s
