# SparkX Analysis

This notebook performs a full-scale analysis of the ArXiv citation network, using a distributed Apache Spark cluster. 

Unlike the initial version of this analysis, this workflow is optimized for Big Data: it reads raw data directly from HDFS, processes citations using distributed functions, and persists intermediate results in Parquet format to overcome memory issues. The analysis utilizes GraphFrames to build the graph and execute complex algorithms at scale.

In [None]:
# !pip install pyspark==3.5.0
# # you need to use Python 3.10 or 3.11 or else it does not work

# !pip install pandas
# !pip install numpy
# !pip install matplotlib
# !pip install seaborn
# !pip install tqdm
# !pip install graphframes

### Import libraries

In [1]:
import json
import re
import pandas as pd
import numpy as np
from collections import Counter, defaultdict
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

# pyspark configuration
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, explode, split, size, array, lit
import pyspark.sql.functions as F

# for graphx we use graphframes
from graphframes import GraphFrame

### Spark initialization

In [None]:
# create spark session with optimized configuration
try:
    spark.stop()
    print("Previous session closed")
except:
    print("No previous session")


spark = SparkSession.builder \
    .appName("Analisis Arxiv - Driver Potente") \
    .master("spark://master:7077") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "1") \
    .config("spark.cores.max", "4") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

print(f"Spark session created: {spark.version}")
print(f"Spark context initiated")

### Citation extraction functions

In this cell we define the main functions to transform the raw json data into a network structure. We define 2 functions: `extract_arxiv_ids_from_text`, an auxiliar function that uses patterns (regex) to find ArXiv ids hidden within texts like abstracts, and `build_citation_graph`, which reads the file directly using Spark Dataframes, extracts the paper details to create the **nodes** and scans the text for reference to create the **edges** (citations), saving the final results directly to HDFS to ensure scalability.

In [18]:
from pyspark.sql.types import ArrayType, StringType, StructType, StructField
from pyspark.sql.functions import udf, col, explode, struct

def extract_arxiv_ids_from_text(text):
    """
    extracts arxiv ids from text using regex
    common formats: arXiv:1234.5678, arXiv:1234.5678v1, 1234.5678
    """
    if not text or text is None:
        return []
    
    import re
    
    # pattern to detect arxiv ids
    patterns = [
        r'arXiv:(\d{4}\.\d{4,5})(v\d+)?',  # arXiv:1234.5678v1
        r'arxiv:(\d{4}\.\d{4,5})(v\d+)?',   # arxiv:1234.5678
        r'(?<![.\d])(\d{4}\.\d{4,5})(?![.\d])',  # 1234.5678 
    ]
    
    ids = set()
    for pattern in patterns:
        matches = re.finditer(pattern, str(text), re.IGNORECASE)
        for match in matches:
            arxiv_id = match.group(1)
            ids.add(arxiv_id)
    
    return list(ids)


def build_citation_graph(spark, data_path, output_path, sample_size=None):
    """
    builds the citation graph from the arxiv json file using distributed processing.
    saves results directly to HDFS without collecting to driver.
    
    arguments:
        spark: active spark session
        data_path: path to the arxiv metadata json file (HDFS)
        output_path: base path to save results in HDFS
        sample_size: if specified only processes the first N papers
    
    returns:
        nodes_count: number of nodes processed
        edges_count: number of edges found
    """
    print(f"\nReading papers from {data_path}...")
    
    # read JSON directly with Spark (distributed)
    df = spark.read.json(data_path)
    
    if sample_size:
        df = df.limit(sample_size)
    
    total_papers = df.count()
    print(f"Papers to process: {total_papers:,}")
    
    # extract node information (distributed operation)
    print("\nExtracting node information...")
    nodes_spark = df.select(
        col("id"),
        col("title"),
        col("categories"),
        col("authors"),
        col("abstract"),
        F.when(
            F.size(col("versions")) > 0,
            F.substring(col("versions").getItem(0).getField("created"), 1, 4)
        ).otherwise("unknown").alias("year")
    )
    
    # register UDF for citation extraction (runs in parallel on executors)
    extract_citations_udf = udf(extract_arxiv_ids_from_text, ArrayType(StringType()))
    
    # extract citations 
    print("Extracting citations (distributed)...")
    df_with_citations = df.select(
        col("id").alias("src"),
        col("abstract"),
        col("comments")
    ).withColumn(
        "combined_text",
        F.concat_ws(" ", F.coalesce(col("abstract"), F.lit("")), F.coalesce(col("comments"), F.lit("")))
    ).withColumn(
        "cited_ids",
        extract_citations_udf(col("combined_text"))
    )
    
    # create edges by exploding cited_ids
    print("Creating edges (distributed)...")
    edges_spark = df_with_citations.select(
        col("src"),
        explode(col("cited_ids")).alias("dst")
    ).filter(
        col("src") != col("dst")  # avoid self-citations
    ).withColumn("citing_paper", col("src")) \
     .withColumn("cited_paper", col("dst")) \
     .distinct()
    
    # cache for counting
    edges_spark.cache()
    nodes_spark.cache()
    
    # count results
    num_edges = edges_spark.count()
    num_nodes = nodes_spark.count()
    
    print(f"\nPapers processed: {num_nodes:,}")
    print(f"Citations found: {num_edges:,}")
    
    # save to hdfs to avoid issues with the cluster memory
    print(f"\nSaving results to HDFS at {output_path}...")
    
    nodes_spark.write.mode("overwrite").parquet(f"{output_path}/nodes")
    print(f"  - Nodes saved to {output_path}/nodes")
    
    edges_spark.write.mode("overwrite").parquet(f"{output_path}/edges")
    print(f"  - Edges saved to {output_path}/edges")
    
    print("\nResults saved successfully in HDFS")
    
    return num_nodes, num_edges



In this cell we execute the data extraction on the full dataset. It calls the previously defined functions to read the json metadata, identify citation links within the text, and save the results to HDFS.

Also, it prints basic statistics to provide an overview of the network's volume and connectivity.

In [19]:
# we need to upload the json document to hdfs before in order to excute this cell
# in order to upload the document:
# in your cluster terminal:
# 1. hdfs dfs -mkdir -p /data # create hdfs directory
# 2. hdfs dfs -put arxiv-metadata-oai-snapshot.json /data/ # upload the document (from wherever you have it)
# 3. hdfs dfs -ls -h /data/ # to verify it uploaded correctly

print("\n" + "="*70)
print("STEP 1: REFERENCE AND CITATION EXTRACTION")
print("="*70)

SAMPLE_SIZE = None  # process all dataset

num_nodes, num_edges = build_citation_graph(
    spark, 
    'hdfs:///data/arxiv-metadata-oai-snapshot.json',
    'hdfs:///output/citation_graph',  # output path in HDFS
    SAMPLE_SIZE
)

print("\nBasic statistics:")
print(f"  - Nodes (articles): {num_nodes:,}")
print(f"  - Edges (citations): {num_edges:,}")
print(f"  - Average density: {num_edges/num_nodes:.2f} citations/article")


STEP 1: REFERENCE AND CITATION EXTRACTION

Reading papers from hdfs:///data/arxiv-metadata-oai-snapshot.json...


                                                                                

Papers to process: 973,085

Extracting node information...
Extracting citations (distributed)...
Creating edges (distributed)...


26/01/11 16:45:14 WARN CacheManager: Asked to cache already cached data.
                                                                                


Papers processed: 973,085
Citations found: 40,358

Saving results to HDFS at hdfs:///output/citation_graph...


                                                                                

  - Nodes saved to hdfs:///output/citation_graph/nodes
  - Edges saved to hdfs:///output/citation_graph/edges

Results saved successfully in HDFS

Basic statistics:
  - Nodes (articles): 973,085
  - Edges (citations): 40,358
  - Average density: 0.04 citations/article




It shows a very low connectivity density (0.04 citations per article). This low connectivity is expected, as the extraction is limited to the abstracts and not full bibliographies. Consequently, the resulting graph will probably consist mostly of isolated nodes and small clusters rather than a dense network.

### Graph cleaning and filering

In this section, we clean the data to keep only valid connections. We filter out citations that point to papers missing from our sample dataset and remove the papers that do not have any links.

In [21]:
print("\n" + "="*70)
print("STEP 2: GRAPH CLEANING AND FILTERING")
print("="*70)

# load from hdfs
print("\nLoading data from HDFS...")
nodes_spark = spark.read.parquet("hdfs:///output/citation_graph/nodes")
edges_spark = spark.read.parquet("hdfs:///output/citation_graph/edges")

initial_nodes = nodes_spark.count()
initial_edges = edges_spark.count()

print(f"Loaded {initial_nodes:,} nodes and {initial_edges:,} edges")

# filter edges where both papers exist in our dataset 
print("\nFiltering edges where both papers exist...")
valid_ids = nodes_spark.select("id")

edges_filtered = edges_spark.join(
    valid_ids.withColumnRenamed("id", "src_check"),
    edges_spark.src == col("src_check"),
    "inner"
).drop("src_check").join(
    valid_ids.withColumnRenamed("id", "dst_check"),
    edges_spark.dst == col("dst_check"),
    "inner"
).drop("dst_check")

num_edges_filtered = edges_filtered.count()

print(f"\nEdges after filtering: {num_edges_filtered:,}")
print(f"Reduction: {(1 - num_edges_filtered/initial_edges)*100:.1f}%")

# identify papers with at least one citation 
print("\nIdentifying connected papers...")
connected_src = edges_filtered.select(col("src").alias("id"))
connected_dst = edges_filtered.select(col("dst").alias("id"))
connected_ids = connected_src.union(connected_dst).distinct()

nodes_filtered = nodes_spark.join(connected_ids, "id", "inner")

num_nodes_filtered = nodes_filtered.count()

print(f"\nConnected nodes: {num_nodes_filtered:,}")
print(f"Main component coverage: {num_nodes_filtered/initial_nodes*100:.1f}% of total")

# cache filtered results for next steps
edges_filtered.cache()
nodes_filtered.cache()

# save processed data to HDFS
print("\nSaving filtered data to HDFS...")
nodes_filtered.write.mode("overwrite").parquet("hdfs:///output/citation_graph/nodes_filtered")
edges_filtered.write.mode("overwrite").parquet("hdfs:///output/citation_graph/edges_filtered")

print("Filtered data saved to HDFS")


STEP 2: GRAPH CLEANING AND FILTERING

Loading data from HDFS...


                                                                                

Loaded 973,085 nodes and 40,358 edges

Filtering edges where both papers exist...


                                                                                


Edges after filtering: 39,867
Reduction: 1.2%

Identifying connected papers...


                                                                                


Connected nodes: 60,068
Main component coverage: 6.2% of total

Saving filtered data to HDFS...




Filtered data saved to HDFS




The filtering process has removed 1.2% of the edges, leaving 39,867 valid citations. More importantly, the active subgraph consists of 60,068 papers (approx. 6.2% of the total dataset), which confirms that the vast majority of articles are isolated and do not form part of the citation network. For the following steps of this analysis, we will focus only on this active sub-group of articles.

### Graph construction in Spark

In this step, we load the filtered citation data directly from HDFS where it was previously stored in distributed Parquet format. The vertices (papers) and edges (citations) are already stored as Spark DataFrames across the cluster, we combine these distributed DataFrames to create a `GraphFrame` object. This object is the specific format required to run the parallel graph algorithms in the next steps.

In [None]:
print("\n" + "="*70)
print("STEP 3: GRAPH CONSTRUCTION IN SPARK")
print("="*70)

# load filtered data from hdfs (already spark dataframes)
print("\nLoading filtered data from HDFS...")
vertices = spark.read.parquet("hdfs:///output/citation_graph/nodes_filtered")
edges = spark.read.parquet("hdfs:///output/citation_graph/edges_filtered")

# create graphframe (interface for graphx in python)
graph = GraphFrame(vertices, edges)

print(f"\nGraph created in Spark")
print(f"  - Vertices: {graph.vertices.count():,}")
print(f"  - Edges: {graph.edges.count():,}")

# basic graph information
print("\nVertex sample:")
graph.vertices.select('id', 'title', 'categories').show(5, truncate=50)

print("\nEdge sample:")
graph.edges.select('src', 'dst').show(5)

### Basic metrics (degree)

In this section, we use Spark's distributed processing to calculate two fundamental graph metrics: **In-Degree** and **Out-Degree**. In-Degree measures popularity by counting how many times a paper is cited by others, while Out-Degree measures activity by counting how many references a paper contains. We order these results to identify the top 10 of these 2 metrics.

In [None]:
print("\n" + "="*70)
print("STEP 4: BASIC METRICS CALCULATION")
print("="*70)

# in-degree: how many times a paper is cited
print("\nCalculating in-degree (most cited papers)...")
in_degrees = graph.inDegrees
top_cited = in_degrees.orderBy(col("inDegree").desc())

# save to hdfs
in_degrees.write.mode("overwrite").parquet("hdfs:///output/citation_graph/in_degrees")
print("In-degrees saved to HDFS")

print("\nTop 10 most cited papers:")
top_cited_sample = top_cited.limit(10)

# join with vertex information for display
top_cited_with_info = top_cited_sample.join(
    graph.vertices.select("id", "title", "categories"),
    "id"
)

# collect only top 10 for display
for idx, row in enumerate(top_cited_with_info.collect(), 1):
    print(f"{idx}. [{row['id']}] {row['title'][:60]}...")
    print(f"   Citations: {row['inDegree']}, Category: {row['categories']}")

# out-degree: how many references a paper makes
print("\nCalculating out-degree (papers citing the most)...")
out_degrees = graph.outDegrees
top_citing = out_degrees.orderBy(col("outDegree").desc())

# save to hdfs
out_degrees.write.mode("overwrite").parquet("hdfs:///output/citation_graph/out_degrees")
print("Out-degrees saved to HDFS")

print("\nTop 10 papers making the most references:")
top_citing_sample = top_citing.limit(10)

# join with vertex information for display
top_citing_with_info = top_citing_sample.join(
    graph.vertices.select("id", "title", "categories"),
    "id"
)

# collect only top 10 for display
for idx, row in enumerate(top_citing_with_info.collect(), 1):
    print(f"{idx}. [{row['id']}] {row['title'][:60]}...")
    print(f"   References: {row['outDegree']}, Category: {row['categories']}")

These results reinforce the observation of a high sparsity in the dataset. The most cited article only has 9 citations, and the top citing one has also 9. This confirms that we are observing small and isolated loops and not a complete citation history. 

Both lists are dominated by Statistics (stat) and Mathematics (math) papers, which suggests that authors in these fields may be more likely to reference other works in their abstracts compared to other disciplines. The presence of the same paper (`0804.0079`) at the top of both lists indicates a tight, self-contained cluster of discussion.

### PageRank algorithm

In this cell, we apply the **PageRank** algorithm to measure the importance of each paper within the network. Unlike simple citation counts, PageRank determines influence recursively: a paper becomes important if it is cited by other important papers. We execute the algorithm using Spark's distributed GraphX system, extract the top 20 most influential papers, and save the results to HDFS in parquet format.

In [None]:
print("\n" + "="*70)
print("STEP 5: PAGERANK - MOST INFLUENTIAL PAPERS")
print("="*70)

print("\nRunning PageRank...")
pagerank_results = graph.pageRank(resetProbability=0.15, maxIter=10)

# get top papers by pagerank
top_pagerank = pagerank_results.vertices.select("id", "pagerank") \
    .orderBy(col("pagerank").desc())

# save complete PageRank results to HDFS
pagerank_results.vertices.write.mode("overwrite").parquet("hdfs:///output/citation_graph/pagerank_results")
print("PageRank results saved to HDFS")

print("\nTop 20 most influential papers by PageRank:")
top_pr_sample = top_pagerank.limit(20)

# join with vertex information for display
top_pr_with_info = top_pr_sample.join(
    graph.vertices.select("id", "title", "categories", "year"),
    "id"
)

# collect only top 20 for display
for idx, row in enumerate(top_pr_with_info.collect(), 1):
    print(f"\n{idx}. PageRank: {row['pagerank']:.4f}")
    print(f"   ID: {row['id']}")
    print(f"   Title: {row['title'][:70]}...")
    print(f"   Categories: {row['categories']}")
    print(f"   Year: {row['year']}")

# save top results with metadata to HDFS
top_pagerank_full = top_pagerank.join(
    graph.vertices.select("id", "title", "categories", "year"),
    "id"
)
top_pagerank_full.write.mode("overwrite").parquet("hdfs:///output/citation_graph/top_pagerank_with_metadata")
print("\nTop PageRank papers with metadata saved to HDFS")

The top paper (`0804.0079`) is the same as the most cited paper found in the previous step, confirming it as the central node of this subgraph. 

The articles in places 2, 3, and 4 share the exact same PageRank score (10.7929). This  typically indicates a symmetric structure (such as a closed loop where the papers cite each other) or a scenario where they are all cited by the same set of external sources. 

The dominance of Statistics categories confirms that the most active "islands" in this sample are concentrated in that field.

### Connected components

In this step, we run the **connected components** algorithm to decompose the network into independent clusters where every article is linked to the others. The objective is to identify if there is a large central community ("Giant Component") connecting most of the researchers or if the graph is fragmented into many isolated groups.

In [None]:
print("\n" + "="*70)
print("STEP 6: CONNECTED COMPONENTS - COMMUNITY DETECTION")
print("="*70)

print("\nCalculating connected components...")

# configure checkpoint directory in HDFS
sc.setCheckpointDir("hdfs:///tmp/spark_checkpoint")
print(f"Checkpoint directory: hdfs:///tmp/spark_checkpoint")

try:
    print("Attempting graphx algorithm...")
    components = graph.connectedComponents(
        algorithm="graphx",
        checkpointInterval=5
    )
    
    # force execution
    component_count = components.select("component").distinct().count()
    print(f"Connected components calculated: {component_count} components found")
    
    # save components to HDFS
    components.write.mode("overwrite").parquet("hdfs:///output/citation_graph/connected_components")
    print("Connected components saved to HDFS")
    
    # analyze component sizes
    component_sizes = components.groupBy("component") \
        .count() \
        .orderBy(col("count").desc())
    
    # get total nodes count
    total_nodes = components.count()
    
    # collect only top 10 for display
    comp_sizes_top = component_sizes.limit(10).collect()
    
    print("\nComponent distribution:")
    print(f"  - Total components: {component_count}")
    print(f"  - Largest component: {comp_sizes_top[0]['count']:,} papers")
    print(f"  - % in main component: {comp_sizes_top[0]['count']/total_nodes*100:.1f}%")

    print("\nTop 10 largest components:")
    for row in comp_sizes_top:
        print(f"{row['component']:20} {row['count']:10,}")

    # analyze papers in the main component
    main_component_id = comp_sizes_top[0]['component']
    main_component_papers = components.filter(col("component") == main_component_id)

    print(f"\nAnalyzing main component ({main_component_id})...")
    main_comp_categories = main_component_papers.groupBy("categories") \
        .count() \
        .orderBy(col("count").desc())

    print("\nMost common categories in main component:")
    for row in main_comp_categories.limit(15).collect():
        print(f"{row['categories']:40} {row['count']:10,}")
    
except Exception as e:
    print(f"\nError calculating connected components: {e}")
    print("This is common with small or highly disconnected graphs")
    print("Continuing with rest of analysis...")
    components = None

The results show extreme fragmentation. We have found 1,979 distinct components, with the largest one containing only 13 articles. This proves there is no central community, and the network is made up of many small and disconnected communities. The largest is a group focused on Representation Theory.

### Label propagation (LBA)

In this step, we apply the **label propagation algorithm** (LPA) to detect communities based on the density of the connections. Unlike connected components, which simply finds linked islands, LPA allows us to identify tight-knit clusters where papers reference each other frequently. 
Later, we analyze the categories of the largest communities to check if these mathematical clusters correspond to their actual topics.

In [None]:
print("\n" + "="*70)
print("STEP 7: LABEL PROPAGATION - SUB-COMMUNITY DETECTION")
print("="*70)

print("\nRunning Label Propagation Algorithm...")
lpa_result = graph.labelPropagation(maxIter=5)

# save communities to HDFS
lpa_result.write.mode("overwrite").parquet("hdfs:///output/citation_graph/label_propagation")
print("Label propagation results saved to HDFS")

# analyze detected communities
community_sizes = lpa_result.groupBy("label") \
    .count() \
    .orderBy(col("count").desc())

# get total community count
total_communities = community_sizes.count()

# collect top 15 for display
comm_sizes_top = community_sizes.limit(15).collect()

print("\nDetected communities:")
print(f"  - Total communities: {total_communities}")
print(f"  - Largest community: {comm_sizes_top[0]['count']:,} papers")

print("\nTop 15 largest communities:")
for row in comm_sizes_top:
    print(f"{row['label']:20} {row['count']:10,}")

# analyze categories by community
print("\nAnalyzing composition of the 5 largest communities...")
for i in range(min(5, len(comm_sizes_top))):
    comm_id = comm_sizes_top[i]['label']
    comm_size = comm_sizes_top[i]['count']
    
    print(f"\n--- Community {i+1} (ID: {comm_id}, Size: {comm_size}) ---")
    
    comm_papers = lpa_result.filter(col("label") == comm_id)
    comm_cats = comm_papers.groupBy("categories") \
        .count() \
        .orderBy(col("count").desc())
    
    print("Main categories:")
    for row in comm_cats.limit(5).collect():
        print(f"  - {row['categories']}: {row['count']} papers ({row['count']/comm_size*100:.1f}%)")

# save community results with metadata to HDFS
lpa_with_metadata = lpa_result.join(
    graph.vertices.select("id", "title", "categories"),
    "id"
)
lpa_with_metadata.write.mode("overwrite").parquet("hdfs:///output/citation_graph/communities_with_metadata")
print("\nCommunities with metadata saved to HDFS")

The algorithm identified 3,926 distinct communities, a number very closde to the total number of nodes (4,643), confirming that the network is not only disconneted but also lacks internal density. The largest community found only contains 9 papers. 

However, the results confirm that the algorithm works well. For example, Community number 5 only contains papers about Finance and Statistics. This proves that the algorithm successfully grouped papers that actually discuss the same topic.

### Triangle count and clustering

In this step, we count the number of triangles in the graph. A triangle is formed when three papers are all connected to each other (meaning, A cites B, B cites C, and C cites A). This metric helps us identify groups of articles that are strongly connected to each other, and not just linked in a simple chain.

In [None]:
print("\n" + "="*70)
print("STEP 8: TRIANGLE ANALYSIS")
print("="*70)

print("\nCounting triangles in the graph...")
try:
    triangle_counts = graph.triangleCount()
    
    # save triangle counts to HDFS
    triangle_counts.write.mode("overwrite").parquet("hdfs:///output/citation_graph/triangle_counts")
    print("Triangle counts saved to HDFS")
    
    # papers with most triangles
    top_triangles = triangle_counts.orderBy(col("count").desc())
    
    print("\nTop 10 papers with most triangles:")
    top_tri_sample = top_triangles.limit(10)
    
    # join with vertex information for display
    top_tri_with_info = top_tri_sample.join(
        graph.vertices.select("id", "title"),
        "id"
    )
    
    # collect only top 10 for display
    for idx, row in enumerate(top_tri_with_info.collect(), 1):
        print(f"{idx}. Triangles: {row['count']}")
        print(f"   ID: {row['id']}")
        print(f"   Title: {row['title'][:60]}...")
    
    # calculate total triangles
    total_triangles = triangle_counts.select(F.sum("count")).collect()[0][0]
    print(f"\nTotal triangles: {total_triangles:,}")
    
except Exception as e:
    print(f"Error calculating triangles: {e}")
    print("   (This may fail on very large graphs)")

These results show that the articles with the most triangles are part of related discassions. We can observe that in the titles, which include words like "Reply", "Response" or "Comment". This indicates that the triangles in this graph are formed by authors citing each other back and forth discussing specific results.

### Visualizations and statistical analysis

This cell genarates six visualizations to summarize the graph's structure. It uses histograms to visualize how citations are distributed, bar charts to show the size of the communities and top papers, and a scatter plot to show the correlation betwwn popularity (count of citations) and influence (PageRank).

**ADAPT IN CASE WE NEED TO DO SAMPLING FOR THE VISUALIZATIONS**

In [None]:
# v1: without sampling, visualizing the whole dataset

print("\n" + "="*70)
print("STEP 9: VISUALIZATIONS")
print("="*70)

# load data from HDFS for visualization
print("\nLoading data from HDFS for visualization...")
in_degrees_viz = spark.read.parquet("hdfs:///output/citation_graph/in_degrees").toPandas()
out_degrees_viz = spark.read.parquet("hdfs:///output/citation_graph/out_degrees").toPandas()

print("Data loaded successfully")

# get top 15 pagerank
top_pr_viz = spark.read.parquet("hdfs:///output/citation_graph/top_pagerank_with_metadata") \
    .limit(15).toPandas()

# get top 20 component sizes
comp_sizes_viz = spark.read.parquet("hdfs:///output/citation_graph/connected_components") \
    .groupBy("component").count().orderBy(col("count").desc()) \
    .limit(20).toPandas()

# get top 20 community sizes
comm_sizes_viz = spark.read.parquet("hdfs:///output/citation_graph/label_propagation") \
    .groupBy("label").count().orderBy(col("count").desc()) \
    .limit(20).toPandas()

# pagerank vs in-degree (join and collect all)
pr_indeg_viz = spark.read.parquet("hdfs:///output/citation_graph/pagerank_results") \
    .join(in_degrees_viz, "id") \
    .toPandas()

# configure matplotlib
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 10)

fig, axes = plt.subplots(2, 3, figsize=(18, 12))

# in-degree distribution
axes[0, 0].hist(in_degrees_viz['inDegree'], bins=50, edgecolor='black', color='steelblue')
axes[0, 0].set_xlabel('In-Degree (citations received)')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].set_title('In-Degree Distribution')
axes[0, 0].set_yscale('log')

# out-degree distribution
axes[0, 1].hist(out_degrees_viz['outDegree'], bins=50, edgecolor='black', color='coral')
axes[0, 1].set_xlabel('Out-Degree (references made)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].set_title('Out-Degree Distribution')
axes[0, 1].set_yscale('log')

# top 15 papers by pagerank
axes[0, 2].barh(range(len(top_pr_viz)), top_pr_viz['pagerank'], color='darkgreen')
axes[0, 2].set_yticks(range(len(top_pr_viz)))
axes[0, 2].set_yticklabels([f"{row['id'][:10]}..." for _, row in top_pr_viz.iterrows()])
axes[0, 2].set_xlabel('PageRank Score')
axes[0, 2].set_title('Top 15 Papers by PageRank')
axes[0, 2].invert_yaxis()

# component sizes
axes[1, 0].bar(range(len(comp_sizes_viz)), 
               comp_sizes_viz['count'], 
               color='mediumpurple', edgecolor='black')
axes[1, 0].set_xlabel('Component ID')
axes[1, 0].set_ylabel('Size')
axes[1, 0].set_title('Size of Top 20 Connected Components')
axes[1, 0].set_yscale('log')

# community sizes (lpa)
axes[1, 1].bar(range(len(comm_sizes_viz)), 
               comm_sizes_viz['count'], 
               color='orange', edgecolor='black')
axes[1, 1].set_xlabel('Community ID')
axes[1, 1].set_ylabel('Size')
axes[1, 1].set_title('Size of Top 20 Communities')
axes[1, 1].set_yscale('log')

# pagerank vs in-degree
axes[1, 2].scatter(pr_indeg_viz['inDegree'], pr_indeg_viz['pagerank'], 
                   alpha=0.5, s=20, color='crimson')
axes[1, 2].set_xlabel('In-Degree')
axes[1, 2].set_ylabel('PageRank')
axes[1, 2].set_title('PageRank vs Citations Correlation')
axes[1, 2].set_xscale('log')

plt.tight_layout()
plt.savefig('arxiv_graph_analysis.png', dpi=300, bbox_inches='tight')
print("\nVisualizations saved to 'arxiv_graph_analysis.png'")
plt.show()

In [None]:
# v2: sampling, taking into account that the number of documents is very big, so it may fail
print("\n" + "="*70)
print("STEP 9: VISUALIZATIONS")
print("="*70)

# load data from HDFS for visualization
print("\nLoading data from HDFS for visualization...")
in_degrees_viz = spark.read.parquet("hdfs:///output/citation_graph/in_degrees")
out_degrees_viz = spark.read.parquet("hdfs:///output/citation_graph/out_degrees")

# sample data for visualization (avoid collecting everything)
print("Sampling data for efficient visualization...")
in_deg_sample = in_degrees_viz.sample(fraction=0.5, seed=42).toPandas()
out_deg_sample = out_degrees_viz.sample(fraction=0.5, seed=42).toPandas()

# get top 15 pagerank
top_pr_viz = spark.read.parquet("hdfs:///output/citation_graph/top_pagerank_with_metadata") \
    .limit(15).toPandas()

# get top 20 component sizes
comp_sizes_viz = spark.read.parquet("hdfs:///output/citation_graph/connected_components") \
    .groupBy("component").count().orderBy(col("count").desc()) \
    .limit(20).toPandas()

# get top 20 community sizes
comm_sizes_viz = spark.read.parquet("hdfs:///output/citation_graph/label_propagation") \
    .groupBy("label").count().orderBy(col("count").desc()) \
    .limit(20).toPandas()

# pagerank vs in-degree (sample)
pr_indeg_viz = spark.read.parquet("hdfs:///output/citation_graph/pagerank_results") \
    .join(in_degrees_viz, "id") \
    .sample(fraction=0.2, seed=42) \
    .toPandas()

print("Data loaded and sampled successfully")

# configure matplotlib
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 10)

fig, axes = plt.subplots(2, 3, figsize=(18, 12))

# in-degree distribution
axes[0, 0].hist(in_deg_sample['inDegree'], bins=50, edgecolor='black', color='steelblue')
axes[0, 0].set_xlabel('In-Degree (citations received)')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].set_title('In-Degree Distribution')
axes[0, 0].set_yscale('log')

# out-degree distribution
axes[0, 1].hist(out_deg_sample['outDegree'], bins=50, edgecolor='black', color='coral')
axes[0, 1].set_xlabel('Out-Degree (references made)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].set_title('Out-Degree Distribution')
axes[0, 1].set_yscale('log')

# top 15 papers by pagerank
axes[0, 2].barh(range(len(top_pr_viz)), top_pr_viz['pagerank'], color='darkgreen')
axes[0, 2].set_yticks(range(len(top_pr_viz)))
axes[0, 2].set_yticklabels([f"{row['id'][:10]}..." for _, row in top_pr_viz.iterrows()])
axes[0, 2].set_xlabel('PageRank Score')
axes[0, 2].set_title('Top 15 Papers by PageRank')
axes[0, 2].invert_yaxis()

# component sizes
axes[1, 0].bar(range(len(comp_sizes_viz)), 
               comp_sizes_viz['count'], 
               color='mediumpurple', edgecolor='black')
axes[1, 0].set_xlabel('Component ID')
axes[1, 0].set_ylabel('Size')
axes[1, 0].set_title('Size of Top 20 Connected Components')
axes[1, 0].set_yscale('log')

# community sizes (lpa)
axes[1, 1].bar(range(len(comm_sizes_viz)), 
               comm_sizes_viz['count'], 
               color='orange', edgecolor='black')
axes[1, 1].set_xlabel('Community ID')
axes[1, 1].set_ylabel('Size')
axes[1, 1].set_title('Size of Top 20 Communities')
axes[1, 1].set_yscale('log')

# pagerank vs in-degree
axes[1, 2].scatter(pr_indeg_viz['inDegree'], pr_indeg_viz['pagerank'], 
                   alpha=0.5, s=20, color='crimson')
axes[1, 2].set_xlabel('In-Degree')
axes[1, 2].set_ylabel('PageRank')
axes[1, 2].set_title('PageRank vs Citations Correlation')
axes[1, 2].set_xscale('log')

plt.tight_layout()
plt.savefig('arxiv_graph_analysis.png', dpi=300, bbox_inches='tight')
print("\nVisualizations saved to 'arxiv_graph_analysis.png'")
plt.show()

The visualizations further confirm the network's highly fragmented structure:
1. **Sparsity:** the 'In-Degree' and 'Out-Degree' histograms show a big decline. The majority of papers only have 1 or 2 connections and the maximum is 9.
2. **Top influencers:** the paper `0804.0079` stands out as the clear leader. The balanced scores of the other top articles suggest they are probably the leaders of their own separate communities.
3. **Small communities:** The 'Component' and 'Community' size charts reveal that the largest groups contain less than 15 papers. This visually cinfirms that the network consist of small isolated communities, rather than a big cohesive web.
4. **Influence**: the scatter plot shoes a positive correlation between the number of citations and the PageRank score. However, the variance indicates that PageRank is capurating a structural importance that goes further than just popularity. 

### Analysis by category

In this final step, we aggregate the results by field in order to understand which areas carry the most weight. We extract the primary category from each paper and calculate the average PageRank score for that group. This allows us to see which specific areas tend to produce the most influential articles in this network.

In [None]:
print("\n" + "="*70)
print("STEP 10: ANALYSIS BY SCIENTIFIC CATEGORIES")
print("="*70)

# load vertices and pagerank from HDFS
vertices_cat = spark.read.parquet("hdfs:///output/citation_graph/nodes_filtered")
pagerank_cat = spark.read.parquet("hdfs:///output/citation_graph/pagerank_results")

# extract primary category (distributed operation)
from pyspark.sql.functions import split
vertices_with_primary = vertices_cat.withColumn(
    'primary_category',
    split(col('categories'), ' ').getItem(0)
)

# merge with pagerank (distributed join)
pagerank_with_cat = pagerank_cat.join(
    vertices_with_primary.select('id', 'primary_category'),
    'id'
)

# calculate average pagerank by category (distributed aggregation)
category_pagerank = pagerank_with_cat.groupBy('primary_category').agg(
    F.mean('pagerank').alias('mean'),
    F.count('pagerank').alias('count')
).orderBy(col('mean').desc())

# get top 20 categories
top_categories = category_pagerank.limit(20).toPandas()

print("\nTop 20 categories by average PageRank:")
print(top_categories.to_string(index=False))

# visualization
plt.figure(figsize=(12, 6))
plt.barh(range(len(top_categories)), top_categories['mean'], color='teal')
plt.yticks(range(len(top_categories)), top_categories['primary_category'])
plt.xlabel('Average PageRank')
plt.ylabel('Category')
plt.title('Average Importance by Scientific Category')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('category_importance.png', dpi=300, bbox_inches='tight')
print("\nGraph saved to 'category_importance.png'")
plt.show()

The bar chart shows that Computer Science and Statistics fileds have the highest average PageRank score.

However, the text output reveals a very important detail: the top categories have very few papers, meaning their high score relies on just a couple of articles, while Statistical Methodology maintains a very high average score (3.14) across 87 articles. This confirms once again that Statistics is the central and most interconnected topic in this dataset.

### Final summary

In [None]:
print("\n" + "="*70)
print("FINAL ANALYSIS SUMMARY")
print("="*70)

# load summary statistics from HDFS
num_nodes_filtered = spark.read.parquet("hdfs:///output/citation_graph/nodes_filtered").count()
num_edges_filtered = spark.read.parquet("hdfs:///output/citation_graph/edges_filtered").count()

# component statistics
comp_stats = spark.read.parquet("hdfs:///output/citation_graph/connected_components") \
    .groupBy("component").count().orderBy(col("count").desc()).limit(1).collect()
total_components = spark.read.parquet("hdfs:///output/citation_graph/connected_components") \
    .select("component").distinct().count()

# community statistics
comm_stats = spark.read.parquet("hdfs:///output/citation_graph/label_propagation") \
    .groupBy("label").count().orderBy(col("count").desc()).limit(1).collect()
total_communities = spark.read.parquet("hdfs:///output/citation_graph/label_propagation") \
    .select("label").distinct().count()

# top cited paper
top_cited_stats = spark.read.parquet("hdfs:///output/citation_graph/in_degrees") \
    .orderBy(col("inDegree").desc()).limit(1).collect()

# top pagerank
top_pr_stats = spark.read.parquet("hdfs:///output/citation_graph/pagerank_results") \
    .orderBy(col("pagerank").desc()).limit(1).collect()

print(f"""
GRAPH STATISTICS:
  - Papers analyzed: {num_nodes_filtered:,}
  - Total citations: {num_edges_filtered:,}
  - Average density: {num_edges_filtered/num_nodes_filtered:.2f} citations/paper
  
COMPONENTS AND COMMUNITIES:
  - Connected components: {total_components}
  - Size of main component: {comp_stats[0]['count']:,} papers
  - Detected communities (LPA): {total_communities}
  
MOST IMPORTANT PAPERS:
  - Most cited paper: {top_cited_stats[0]['inDegree']} citations
  - Max PageRank: {top_pr_stats[0]['pagerank']:.4f}
  
GENERATED FILES IN HDFS:
  - hdfs:///output/citation_graph/nodes_filtered - Filtered nodes
  - hdfs:///output/citation_graph/edges_filtered - Filtered edges
  - hdfs:///output/citation_graph/pagerank_results - PageRank results
  - hdfs:///output/citation_graph/connected_components - Components
  - hdfs:///output/citation_graph/label_propagation - Communities
  - hdfs:///output/citation_graph/in_degrees - Citation counts
  - hdfs:///output/citation_graph/out_degrees - Reference counts
  
LOCAL VISUALIZATIONS:
  - arxiv_graph_analysis.png - Main visualizations
  - category_importance.png - Category analysis
""")

### Close Spark session

In [None]:
# close spark session
print("\nClosing Spark session...")
spark.stop()
print("Session closed successfully")