<a href="https://colab.research.google.com/github/Rodeffs/Year4_Programming/blob/master/Big_Data/03_lab/spark/spark_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Checking java version

In [None]:
!java --version

openjdk 17.0.16 2025-07-15
OpenJDK Runtime Environment (build 17.0.16+8-Ubuntu-0ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 17.0.16+8-Ubuntu-0ubuntu122.04.1, mixed mode, sharing)


Installing Apache Spark

In [None]:
!wget https://dlcdn.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz

--2025-12-05 18:53:48--  https://dlcdn.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 548955321 (524M) [application/x-gzip]
Saving to: ‘spark-4.0.1-bin-hadoop3.tgz’


2025-12-05 18:54:19 (68.1 MB/s) - ‘spark-4.0.1-bin-hadoop3.tgz’ saved [548955321/548955321]



In [None]:
!tar -xf spark-4.0.1-bin-hadoop3.tgz

Python libraries to work with Spark

In [None]:
!pip install -q findspark

In [None]:
!pip install -q pyspark

Installing the dataset

In [None]:
!curl -L https://www.kaggle.com/api/v1/datasets/download/beta3logic/3m-academic-papers-titles-and-abstracts -o dataset.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 1409M  100 1409M    0     0  67.3M      0  0:00:20  0:00:20 --:--:-- 74.9M


In [None]:
!unzip dataset.zip

Archive:  dataset.zip
  inflating: cleaned_papers.csv      


Setting the variables

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["JRE_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64/jre"
os.environ["SPARK_HOME"] = "/content/spark-4.0.1-bin-hadoop3"
os.environ["PATH"] += ":$JAVA_HOME/bin:$JRE_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin"

Now we can begin to work

In [None]:
import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("popular_topics").getOrCreate()

In [None]:
# Reading the CSV

df = spark.read.csv("/content/cleaned_papers.csv", header=True, sep=',', quote='"', escape='"', multiLine=True)

df.show()

+--------------------+--------------------+
|               title|            abstract|
+--------------------+--------------------+
|Dynamic Backtracking|Because of their ...|
|A Market-Oriented...|Market price syst...|
|An Empirical Anal...|We describe an ex...|
|The Difficulties ...|As real logic pro...|
|Software Agents: ...|To support the go...|
|Decidable Reasoni...|Terminological kn...|
|Teleo-Reactive Pr...|A formalism is pr...|
|Learning the Past...|Learning the past...|
|Substructure Disc...|The ability to id...|
|Bias-Driven Revis...|The theory revisi...|
|Exploring the Dec...|We report on a se...|
|A Semantics and C...|This paper analyz...|
|Applying GSAT to ...|In this paper we ...|
|Random Worlds and...|Given a knowledge...|
|Pattern Matching ...|Information extra...|
|A System for Indu...|This article desc...|
|On Planning while...|This paper introd...|
|Wrap-Up: a Traina...|The vast amounts ...|
|Operations for Le...|This paper is a m...|
|Total-Order and P...|For many y

In [None]:
# Adding a column that unifies title and abstract into one

df = df.withColumn("entry", F.concat_ws(". ", "title", "abstract"))
df = df.withColumn("entry", F.lower("entry"))  # проще работать в нижнем регистре
df = df.withColumn("entry", F.regexp_replace(F.col("entry"), r"\s*\n\s*", ' '))  # убрать лишние переносы строки
df = df.drop("title", "abstract")  # они больше не нужны

df.show()

+--------------------+
|               entry|
+--------------------+
|dynamic backtrack...|
|a market-oriented...|
|an empirical anal...|
|the difficulties ...|
|software agents: ...|
|decidable reasoni...|
|teleo-reactive pr...|
|learning the past...|
|substructure disc...|
|bias-driven revis...|
|exploring the dec...|
|a semantics and c...|
|applying gsat to ...|
|random worlds and...|
|pattern matching ...|
|a system for indu...|
|on planning while...|
|wrap-up: a traina...|
|operations for le...|
|total-order and p...|
+--------------------+
only showing top 20 rows


In [None]:
# Now we use regexp to split each row into word combinations

regexp = r"([^a-z^\s^'^-])|(?:^|[^a-z])['-]|['-](?:^|[^a-z])|'*(?<![a-z-])(?:a|an|the|and|or|as|of|in|on|yet|our|than|then|however|at|but|was|were|which|there|this|that|thus|we|to|for|is|are|where|have|has|been|since|with|such|another|also|by|often|can|could|so|from|its|via|will|hence|should|would|shall|what|although|these|those|do|does|did|under|above|else|if|while|when|who|based|way|very|many|much|due|because|onto|into|out|finally|their|they|may|might|up|down|either|neither|nor|within|according|others|about|therefore|no|not|towards|beyond|behind|over|how|both|without|other|another|more|most|moreover|be|furthermore|why|paper|focuses|well|must|consider|using|used|commonly|some|given|among|able|present|his|her|he|she|obtained|makes|give|make|further|use|introduce|employ|uses|show|allows|gives|introduces|considers|through|take|takes|enable|enables|allow|every|each|called|provide|provides|cannot|allowing|even|though|after|around|upon|you|new)(?![a-z-])'*"

df_entry = df.select(F.explode(F.split(F.col("entry"), regexp)).alias("entry"))
df_entry = df_entry.withColumn("entry", F.trim(F.col("entry")))  # обрезаем лишние пробелы
df_entry = df_entry.filter(F.size(F.split(F.col("entry"), r"\s+")) >= 2)  # за темы считаем комбинации слов >= 2

df_entry.show()

+--------------------+
|               entry|
+--------------------+
|dynamic backtracking|
|     occasional need|
|      shallow points|
|         search tree|
|existing backtrac...|
|sometimes erase m...|
|      search problem|
|    backtrack points|
|        moved deeper|
|        search space|
|    thereby avoiding|
| technique developed|
|dependency-direct...|
|only polynomial s...|
|still providing u...|
|completeness guar...|
|  earlier approaches|
|market-oriented p...|
|distributed multi...|
|market price syst...|
+--------------------+
only showing top 20 rows


In [None]:
# Mapping

df_mapped = df_entry.withColumn("value", F.lit(1))

df_mapped.show()

+--------------------+-----+
|               entry|value|
+--------------------+-----+
|dynamic backtracking|    1|
|     occasional need|    1|
|      shallow points|    1|
|         search tree|    1|
|existing backtrac...|    1|
|sometimes erase m...|    1|
|      search problem|    1|
|    backtrack points|    1|
|        moved deeper|    1|
|        search space|    1|
|    thereby avoiding|    1|
| technique developed|    1|
|dependency-direct...|    1|
|only polynomial s...|    1|
|still providing u...|    1|
|completeness guar...|    1|
|  earlier approaches|    1|
|market-oriented p...|    1|
|distributed multi...|    1|
|market price syst...|    1|
+--------------------+-----+
only showing top 20 rows


In [None]:
# Reduce

df_result = df_mapped.groupBy("entry").agg(F.sum("value").alias("total")).orderBy("total", ascending=False)

df_result.show()

+--------------------+-----+
|               entry|total|
+--------------------+-----+
|experimental results|49397|
|          wide range|49310|
|     proposed method|43782|
|    machine learning|41577|
|large language mo...|40533|
|          first time|40390|
|      magnetic field|34954|
|       deep learning|33439|
|        recent years|33121|
|extensive experim...|29208|
|     results suggest|29061|
|        large number|28488|
|      standard model|27719|
|      important role|26117|
| confidence interval|25524|
|         dark matter|24879|
|     neural networks|24831|
|numerical simulat...|24760|
|    results indicate|22983|
|reinforcement lea...|22299|
+--------------------+-----+
only showing top 20 rows
