# Phase 3: Big Data Analysis with Apache Spark
## Scientific Articles Analysis - Advanced Indicators

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.clustering import LDA
import json

spark = SparkSession.builder \
    .appName("ScientificArticlesAnalysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

## 1. Load Data from HDFS

In [None]:
# Load from HDFS or local
try:
    df = spark.read.json("hdfs://localhost:9000/bigdata/scientific_articles/all_articles.json")
except:
    df = spark.read.json("/root/bigdata-bi-project/scientific_scraper/hdfs_data/all_articles.json")

df.printSchema()
print(f"Total articles: {df.count()}")

## 2. Analysis 1: Publications Evolution by Year

In [1]:
publications_by_year = df.groupBy("year") \
    .agg(count("*").alias("count")) \
    .orderBy("year")

publications_by_year.show()
publications_by_year.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/publications_by_year.csv", header=True)

NameError: name 'df' is not defined

## 3. Analysis 2: Top Authors by Productivity

In [None]:
authors_df = df.select(explode("authors").alias("author"), "article_id")

top_authors = authors_df.groupBy("author") \
    .agg(count("*").alias("publications")) \
    .orderBy(desc("publications")) \
    .limit(50)

top_authors.show(20)
top_authors.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/top_authors.csv", header=True)

## 4. Analysis 3: Co-Author Collaboration Network

In [None]:
from pyspark.sql import Window

# Create co-author pairs
authors_exploded = df.select("article_id", explode("authors").alias("author"))
coauthors = authors_exploded.alias("a1").join(
    authors_exploded.alias("a2"),
    (col("a1.article_id") == col("a2.article_id")) & (col("a1.author") < col("a2.author"))
).select(
    col("a1.author").alias("author1"),
    col("a2.author").alias("author2")
)

coauthor_network = coauthors.groupBy("author1", "author2") \
    .agg(count("*").alias("collaborations")) \
    .orderBy(desc("collaborations")) \
    .limit(100)

coauthor_network.show(10)
coauthor_network.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/coauthor_network.csv", header=True)

## 5. Analysis 4: Distribution by University/Lab/Country

In [None]:
# By affiliation
affiliations_df = df.select(explode("affiliations").alias("affiliation"))
top_affiliations = affiliations_df.groupBy("affiliation") \
    .agg(count("*").alias("count")) \
    .orderBy(desc("count")) \
    .limit(30)

top_affiliations.show()
top_affiliations.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/top_affiliations.csv", header=True)

# By source
by_source = df.groupBy("source").count().orderBy(desc("count"))
by_source.show()
by_source.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/by_source.csv", header=True)

## 6. Analysis 5: Distribution by Quartile

In [None]:
quartile_dist = df.groupBy("quartile") \
    .agg(count("*").alias("count")) \
    .orderBy("quartile")

quartile_dist.show()
quartile_dist.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/quartile_distribution.csv", header=True)

## 7. Analysis 6: Emerging Trends - Keyword Frequency Over Time

In [None]:
keywords_by_year = df.select("year", explode("keywords").alias("keyword")) \
    .filter(col("keyword").isNotNull()) \
    .groupBy("year", "keyword") \
    .agg(count("*").alias("frequency")) \
    .orderBy(desc("frequency"))

keywords_by_year.show(20)
keywords_by_year.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/keywords_by_year.csv", header=True)

## 8. TF-IDF Analysis on Abstracts

In [None]:
# Prepare text data
text_df = df.select("article_id", "abstract").filter(col("abstract").isNotNull())

# Tokenize
tokenizer = Tokenizer(inputCol="abstract", outputCol="words")
words_df = tokenizer.transform(text_df)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_df)

# TF
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
tf_df = hashingTF.transform(filtered_df)

# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)

print("TF-IDF computed successfully")
tfidf_df.select("article_id", "features").show(5, truncate=False)

## 9. LDA Topic Modeling - Emerging Themes

In [None]:
# LDA with 10 topics
lda = LDA(k=10, maxIter=20, featuresCol="features")
lda_model = lda.fit(tfidf_df)

# Get topics
topics = lda_model.describeTopics(10)
print("\nTop 10 Topics:")
topics.show(truncate=False)

# Transform documents
lda_result = lda_model.transform(tfidf_df)
lda_result.select("article_id", "topicDistribution").show(5, truncate=False)

## 10. Weak Signal Detection - Recent Emerging Terms

In [None]:
# Define emerging terms to track
emerging_terms = ["federated learning", "quantum ml", "quantum machine learning", 
                  "explainable ai", "edge computing", "neuromorphic", "gpt", "llm"]

# Recent years (2023+)
recent_df = df.filter(col("year") >= 2023)

weak_signals = []
for term in emerging_terms:
    count = recent_df.filter(
        lower(col("title")).contains(term.lower()) | 
        lower(col("abstract")).contains(term.lower())
    ).count()
    weak_signals.append((term, count))

weak_signals_df = spark.createDataFrame(weak_signals, ["term", "occurrences"]) \
    .orderBy(desc("occurrences"))

print("\nWeak Signals - Emerging Terms (2023+):")
weak_signals_df.show()
weak_signals_df.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/weak_signals.csv", header=True)

## 11. Generate Final Aggregated DataFrame

In [None]:
# Comprehensive summary
summary_stats = df.agg(
    count("*").alias("total_articles"),
    countDistinct("doi").alias("unique_dois"),
    min("year").alias("earliest_year"),
    max("year").alias("latest_year"),
    countDistinct("journal").alias("unique_journals")
)

summary_stats.show()
summary_stats.write.mode("overwrite").csv("/root/bigdata-bi-project/phase3_spark/output/summary_stats.csv", header=True)

print("\nâœ“ All analyses complete!")
print("Output files saved to: /root/bigdata-bi-project/phase3_spark/output/")