<a href="https://colab.research.google.com/github/Tiwari666/BigData/blob/main/big_data_pyspark_RDD_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Main documnet:https://spark.apache.org/docs/latest/rdd-programming-guide.html

Main Data source: https://www.kaggle.com/Cornell-University/arxiv/version/62

In [1]:
# Install Kaggle CLI
!pip install kaggle

# Install PySpark
!pip install pyspark



 **Google Colab** is a cloud-based environment that does not have direct access to our Kaggle account or files. To download datasets from Kaggle into Colab, you need to use the Kaggle CLI, which acts as a bridge between our Kaggle account and Colab. The Kaggle CLI is a tool that allows us to:

--Authenticate our Kaggle account using an API key (kaggle.json).

--Search and download datasets from Kaggle directly into Colab.

--Perform tasks like submitting to competitions or listing available datasets.

Without the Kaggle CLI, we would need to manually download the dataset to our local machine and upload it to Colab, which is time-consuming for large files.

**Simple Exmaple**

We want to order pizza from our favorite restaurant. We can either:

--Call the restaurant directly and place our order (CLI approach, fast and efficient).

--Walk to the restaurant, place our order, and wait for it (manual download/upload, slow and tedious).

--The Kaggle CLI is like calling the restaurant—it lets us access Kaggle directly from Colab and quickly download our data.



**Step 1: Generate Kaggle API Token**

--To create the kaggle.json file:

--Visit the Kaggle Account Settings page.

--Scroll down to the API section.

--Click on "Create New API Token".

This will download a file named kaggle.json to our computer.

**Step 2: Upload kaggle.json to Google Colab**

In [1]:
from google.colab import files
files.upload()  # This will open a dialog to upload files

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"narendratiwari1","key":"6f6ff19aec2f64f50c4ea2a53f678434"}'}

**Step 3: Configure Kaggle in Google Colab**

Move the kaggle.json file to the .kaggle directory and set proper permissions:

In [2]:
# Create the .kaggle directory
!mkdir -p ~/.kaggle

# Move the kaggle.json file to the .kaggle directory
!cp kaggle.json ~/.kaggle/

# Set the appropriate permissions
!chmod 600 ~/.kaggle/kaggle.json

**Step 4: Verify Kaggle CLI**

Test the Kaggle CLI to ensure it's set up correctly:

In [3]:
!kaggle datasets list
#This will display a list of available datasets on Kaggle.

ref                                                         title                                             size  lastUpdated          downloadCount  voteCount  usabilityRating  
----------------------------------------------------------  -----------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
bhadramohit/customer-shopping-latest-trends-dataset         Customer Shopping (Latest Trends) Dataset         76KB  2024-11-23 15:26:12          19153        377  1.0              
hopesb/student-depression-dataset                           Student Depression Dataset.                      454KB  2024-11-22 17:56:03          15676        222  1.0              
oktayrdeki/houses-in-london                                 Houses in London                                  21KB  2024-12-15 19:27:42           1416         27  1.0              
mhassansaboor/intel-stock-data-1980-2024                    Intel Stock Data (1980-2024)       

**Step 5: Search for the Desired Dataset**

Download the arXiv Dataset
Run the following command to download the arXiv Dataset:

In [4]:
!kaggle datasets download -d Cornell-University/arxiv

Dataset URL: https://www.kaggle.com/datasets/Cornell-University/arxiv
License(s): CC0-1.0
Downloading arxiv.zip to /content
 99% 1.37G/1.38G [00:10<00:00, 162MB/s]
100% 1.38G/1.38G [00:10<00:00, 137MB/s]


**Unzip the Dataset**

Extract the contents of the downloaded zip file:

In [5]:
!unzip arxiv.zip

Archive:  arxiv.zip
  inflating: arxiv-metadata-oai-snapshot.json  


**Load the Data**

Since the dataset is now unzipped, load it into a Spark RDD.

In [6]:
# Import required libraries
from pyspark.sql import SparkSession
import os

In [7]:
# Set JAVA_HOME if not set
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

**Verify Java Installation**

Ensure Java is properly installed and set up in Google Colab.

In [10]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-8-openjdk-amd64/bin"

**Install PySpark**

Install the required version of PySpark in Colab.

In [11]:
!pip install pyspark



**Configure SparkSession**

In [12]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Arxiv Dataset Analysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("SparkSession started successfully!")

SparkSession started successfully!


**Load the Data**

In [13]:
# Load JSON file as RDD
rdd = spark.read.json("/content/arxiv-metadata-oai-snapshot.json").rdd
print(f"RDD successfully created with {rdd.count()} rows.")

RDD successfully created with 2635119 rows.


**Transformation Operations**

Transformations are operations on RDDs that return another RDD. They are lazy, meaning they are not executed until an action is called.

**Filter Operation**

Filter rows to find those that belong to a specific category (e.g., "physics"):

In [14]:
physics_rdd = rdd.filter(lambda row: "physics" in row["categories"])
print(f"Filtered RDD with rows related to physics created.")

Filtered RDD with rows related to physics created.


**Map Operation**

Extract specific fields (e.g., "title") from the dataset:




In [15]:
titles_rdd = rdd.map(lambda row: row["title"])
print(f"Mapped RDD created with only titles.")

Mapped RDD created with only titles.


**FlatMap Operation**

Split categories and flatten them into individual entries:

In [16]:
categories_rdd = rdd.flatMap(lambda row: row["categories"].split())
print(f"FlatMapped RDD created for categories.")

FlatMapped RDD created for categories.


**Distinct Operation**

Get unique categories:

In [17]:
unique_categories_rdd = categories_rdd.distinct()
print(f"Distinct RDD created with unique categories.")

Distinct RDD created with unique categories.


**Sample Operation**

Sample 1% of the data for quick computation:

In [18]:
sampled_rdd = rdd.sample(False, 0.01, seed=42)
print(f"Sampled RDD with 1% of the data created.")

Sampled RDD with 1% of the data created.


**Action Operations**

Actions are operations that trigger computation. They return results to the driver or save results to storage.

**Count Operation**

Count the total number of rows in the RDD:

In [19]:
total_rows = rdd.count()
print(f"Total rows in the RDD: {total_rows}")

Total rows in the RDD: 2635119


**Take Operation**

Get the first 5 rows of the RDD:




In [20]:
sample_rows = rdd.take(5)
print(f"Sample rows: {sample_rows}")

Sample rows: [Row(abstract='  A fully differential calculation in perturbative quantum chromodynamics is\npresented for the production of massive photon pairs at hadron colliders. All\nnext-to-leading order perturbative contributions from quark-antiquark,\ngluon-(anti)quark, and gluon-gluon subprocesses are included, as well as\nall-orders resummation of initial-state gluon radiation valid at\nnext-to-next-to-leading logarithmic accuracy. The region of phase space is\nspecified in which the calculation is most reliable. Good agreement is\ndemonstrated with data from the Fermilab Tevatron, and predictions are made for\nmore detailed tests with CDF and DO data. Predictions are shown for\ndistributions of diphoton pairs produced at the energy of the Large Hadron\nCollider (LHC). Distributions of the diphoton pairs from the decay of a Higgs\nboson are contrasted with those produced from QCD processes at the LHC, showing\nthat enhanced sensitivity to the signal can be obtained with judiciou

**Collect Operation**

Collect and print the first 10 unique categories

In [21]:
# Sample 1% of the RDD for safe computation
sampled_rdd = rdd.sample(False, 0.01, seed=42)

# Extract and get distinct categories from the sampled RDD
sampled_categories_rdd = sampled_rdd.flatMap(lambda row: row["categories"].split()).distinct()

# Collect and print the first 10 unique categories from the sampled data
unique_categories = sampled_categories_rdd.take(10)
print(f"Unique categories (from sample): {unique_categories}")

Unique categories (from sample): ['q-bio.QM', 'physics.atom-ph', 'physics.geo-ph', 'math.GM', 'cs.CL', 'q-fin.RM', 'astro-ph', 'math.QA', 'math.CA', 'math.MG']


**Action Operations**

Actions are operations that trigger computation. They return results to the driver or save results to storage.

**Count Operation**

Count the total number of rows in the RDD:

In [22]:
# Count the total rows in the sampled RDD
sampled_count = sampled_rdd.count()
print(f"Total rows in the sampled RDD: {sampled_count}")

Total rows in the sampled RDD: 26128


**Count the Occurrences of Each Category**

This will show how many papers belong to each category in the sampled data.

In [None]:
# Map each category to (category, 1) and reduce by key to count occurrences
category_counts_rdd = sampled_categories_rdd.map(lambda category: (category, 1)).reduceByKey(lambda a, b: a + b)

# Collect and display the counts for a few categories
category_counts = category_counts_rdd.take(10)  # Fetch the first 10 categories for display
print(f"Category counts (sampled): {category_counts}")

**Filter Papers Related to Specific Categories**

For example, filter papers with the "physics" category.

In [23]:
# Filter papers containing the "physics" category in their categories field
physics_papers_rdd = sampled_rdd.filter(lambda row: "physics" in row["categories"])

# Count and display a few sample rows of physics-related papers
physics_papers_count = physics_papers_rdd.count()
sample_physics_papers = physics_papers_rdd.take(5)
print(f"Total physics-related papers in the sample: {physics_papers_count}")
print(f"Sample physics-related papers: {sample_physics_papers}")

Total physics-related papers in the sample: 2650
Sample physics-related papers: [Row(abstract='  After the pioneered experimental works on superlubricity by Martin et al. on\nMoS2 [1], Hirano et al. on tungsten and silicon [2] and the further\nconfirmation by Dienwiebel et al. on graphite [3], many groups around the word\ninvestigated the occurrence of near frictionless sliding contacts. This large\nmobilization of tribologists, material sciences specialists and physicists has\nlead to emerging solutions involving new materials and coatings, the most\npromising being carbon based like graphite, diamond, carbon composites or\ndiamond-like-carbons. Some of them are currently used in practical\napplications. The situation is different especially in EHL: the highest\nfriction coefficients are close to 10% when traction fluids are involved, i.e.\nfluids that have especially designed to transmit the highest friction, and they\nvary within 3-6% for the rest of lubricants. The range of variati

**Group Papers by Year**

This will group papers based on their publication year (if available).

In [24]:
# Extract year from the "update_date" field and group papers by year
year_papers_rdd = sampled_rdd.map(lambda row: (row["update_date"][:4], 1)).reduceByKey(lambda a, b: a + b)

# Collect and display the counts for a few years
year_paper_counts = year_papers_rdd.take(10)  # Fetch first 10 years for display
print(f"Papers grouped by year (sampled): {year_paper_counts}")

Papers grouped by year (sampled): [('2011', 607), ('2020', 1769), ('2007', 1436), ('2016', 1345), ('2017', 1256), ('2008', 657), ('2021', 1804), ('2015', 2050), ('2014', 845), ('2024', 3173)]


**Persist Intermediate Results**

Cache the sampled RDD to avoid recomputation during repeated operations.

Caching or persisting the RDD saves intermediate results in memory, so Spark doesn't repeat the same calculations every time when we run an operation.

**Why do it?**

--Faster Analysis: Avoids recomputing data for repeated actions.

--Prevents Crashes: Reduces workload, especially for large datasets.

--Efficient Workflow: Saves time when performing multiple operations on the same data.

**Purpose:**

It speeds up our workflow and makes analysis smooth and stable.




In [25]:
# Cache the sampled RDD
sampled_rdd.cache()
print("Sampled RDD cached for reuse.")

Sampled RDD cached for reuse.


**Check Number of Partitions**

In [26]:
num_partitions = rdd.getNumPartitions()
print(f"Number of partitions in the RDD: {num_partitions}")

Number of partitions in the RDD: 34


**ML Preparation**

In [27]:
df = rdd.toDF()
print(f"Converted RDD to DataFrame for ML analysis.")

Converted RDD to DataFrame for ML analysis.


**Check the Schema**

Inspect the structure of the DataFrame to understand its columns and data types.

In [28]:
# Display the schema of the DataFrame
df.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



**Explore the Data**

View a sample of the data to ensure it looks as expected.

In [29]:
# Show the first few rows of the DataFrame
df.show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-

In [30]:
# List all column names
print(df.columns)

['abstract', 'authors', 'authors_parsed', 'categories', 'comments', 'doi', 'id', 'journal-ref', 'license', 'report-no', 'submitter', 'title', 'update_date', 'versions']


In [31]:
# Show the first 10 rows of the DataFrame
df.show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [32]:
# Display the first 10 rows of the DataFrame with all columns
df.limit(10).toPandas()

Unnamed: 0,abstract,authors,authors_parsed,categories,comments,doi,id,journal-ref,license,report-no,submitter,title,update_date,versions
0,A fully differential calculation in perturba...,"C. Bal\'azs, E. L. Berger, P. M. Nadolsky, C.-...","[[Balázs, C., ], [Berger, E. L., ], [Nadolsky,...",hep-ph,"37 pages, 15 figures; published version",10.1103/PhysRevD.76.013009,704.0001,"Phys.Rev.D76:013009,2007",,ANL-HEP-PR-07-12,Pavel Nadolsky,Calculation of prompt diphoton production cros...,2008-11-26,"[(Mon, 2 Apr 2007 19:18:42 GMT, v1), (Tue, 24 ..."
1,"We describe a new algorithm, the $(k,\ell)$-...",Ileana Streinu and Louis Theran,"[[Streinu, Ileana, ], [Theran, Louis, ]]",math.CO cs.CG,To appear in Graphs and Combinatorics,,704.0002,,http://arxiv.org/licenses/nonexclusive-distrib...,,Louis Theran,Sparsity-certifying Graph Decompositions,2008-12-13,"[(Sat, 31 Mar 2007 02:26:18 GMT, v1), (Sat, 13..."
2,The evolution of Earth-Moon system is descri...,Hongjun Pan,"[[Pan, Hongjun, ]]",physics.gen-ph,"23 pages, 3 figures",,704.0003,,,,Hongjun Pan,The evolution of the Earth-Moon system based o...,2008-01-13,"[(Sun, 1 Apr 2007 20:46:54 GMT, v1), (Sat, 8 D..."
3,We show that a determinant of Stirling cycle...,David Callan,"[[Callan, David, ]]",math.CO,11 pages,,704.0004,,,,David Callan,A determinant of Stirling cycle numbers counts...,2007-05-23,"[(Sat, 31 Mar 2007 03:16:14 GMT, v1)]"
4,In this paper we show how to compute the $\L...,Wael Abu-Shammala and Alberto Torchinsky,"[[Abu-Shammala, Wael, ], [Torchinsky, Alberto, ]]",math.CA math.FA,,,704.0005,"Illinois J. Math. 52 (2008) no.2, 681-689",,,Alberto Torchinsky,From dyadic $\Lambda_{\alpha}$ to $\Lambda_{\a...,2013-10-15,"[(Mon, 2 Apr 2007 18:09:58 GMT, v1)]"
5,We study the two-particle wave function of p...,Y. H. Pong and C. K. Law,"[[Pong, Y. H., ], [Law, C. K., ]]",cond-mat.mes-hall,"6 pages, 4 figures, accepted by PRA",10.1103/PhysRevA.75.043613,704.0006,,,,Yue Hin Pong,Bosonic characters of atomic Cooper pairs acro...,2015-05-13,"[(Sat, 31 Mar 2007 04:24:59 GMT, v1)]"
6,A rather non-standard quantum representation...,"Alejandro Corichi, Tatjana Vukasinac and Jose ...","[[Corichi, Alejandro, ], [Vukasinac, Tatjana, ...",gr-qc,"16 pages, no figures. Typos corrected to match...",10.1103/PhysRevD.76.044016,704.0007,"Phys.Rev.D76:044016,2007",,IGPG-07/03-2,Alejandro Corichi,Polymer Quantum Mechanics and its Continuum Limit,2008-11-26,"[(Sat, 31 Mar 2007 04:27:22 GMT, v1), (Wed, 22..."
7,A general formulation was developed to repre...,Damian C. Swift,"[[Swift, Damian C., ]]",cond-mat.mtrl-sci,Minor corrections,10.1063/1.2975338,704.0008,"Journal of Applied Physics, vol 104, 073536 (2...",http://arxiv.org/licenses/nonexclusive-distrib...,"LA-UR-07-2051, LLNL-JRNL-410358",Damian Swift,Numerical solution of shock and ramp compressi...,2009-02-05,"[(Sat, 31 Mar 2007 04:47:20 GMT, v1), (Thu, 10..."
8,We discuss the results from the combined IRA...,"Paul Harvey, Bruno Merin, Tracy L. Huard, Luis...","[[Harvey, Paul, ], [Merin, Bruno, ], [Huard, T...",astro-ph,,10.1086/518646,704.0009,"Astrophys.J.663:1149-1173,2007",,,Paul Harvey,"The Spitzer c2d Survey of Large, Nearby, Inste...",2010-03-18,"[(Mon, 2 Apr 2007 19:41:34 GMT, v1)]"
9,Partial cubes are isometric subgraphs of hyp...,Sergei Ovchinnikov,"[[Ovchinnikov, Sergei, ]]",math.CO,"36 pages, 17 figures",,704.001,,,,Sergei Ovchinnikov,"Partial cubes: structures, characterizations, ...",2007-05-23,"[(Sat, 31 Mar 2007 05:10:16 GMT, v1)]"


**Check for Duplicate Rows**

In [33]:
# Count total rows and distinct rows
total_rows = df.count()
distinct_rows = df.distinct().count()

print(f"Total Rows: {total_rows}")
print(f"Distinct Rows: {distinct_rows}")

# Check for duplicates
duplicate_rows = total_rows - distinct_rows
print(f"Number of duplicate rows: {duplicate_rows}")

Total Rows: 2635119
Distinct Rows: 2635109
Number of duplicate rows: 10


**Check for Missing Values**

checking for missing values column-wise:

In [34]:
# Count missing values for each column
from pyspark.sql.functions import col, sum

missing_values = df.select(
    [sum(col(column).isNull().cast("int")).alias(column) for column in df.columns]
)

missing_values.show()  # Show the count of missing values for each column

+--------+-------+--------------+----------+--------+-------+---+-----------+-------+---------+---------+-----+-----------+--------+
|abstract|authors|authors_parsed|categories|comments|    doi| id|journal-ref|license|report-no|submitter|title|update_date|versions|
+--------+-------+--------------+----------+--------+-------+---+-----------+-------+---------+---------+-----+-----------+--------+
|       0|      0|             0|         0|  672050|1419097|  0|    1761899| 452795|  2451733|    15189|    0|          0|       0|
+--------+-------+--------------+----------+--------+-------+---+-----------+-------+---------+---------+-----+-----------+--------+



**Drop Duplicates and Rows with Missing Values**

Using PySpark to clean the data:

In [35]:
# Remove duplicate rows
df = df.dropDuplicates()

# Remove rows with any missing values
df = df.dropna()

# Check the new row count
print(f"DataFrame after cleaning: {df.count()} rows")

DataFrame after cleaning: 41198 rows


**Data Exploration:**

Before jumping into ML tasks, explore the data to understand its structure and distribution.

**Show the Schema:**

Check the data types of the columns.

In [37]:
df.printSchema()
df.show(5)

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+

**Summary Statistics:**

Get a statistical summary of numerical columns.

In [38]:
df.describe().show()

+-------+--------------------+--------------------+---------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+-----------+
|summary|            abstract|             authors|     categories|            comments|                 doi|                id|         journal-ref|             license|           report-no|       submitter|               title|update_date|
+-------+--------------------+--------------------+---------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+-----------+
|  count|               41198|               41198|          41198|               41198|               41198|             41198|               41198|               41198|               41198|           41198|               41198|      41198|
|   mean|                NULL|  

**Count Unique Categories:**

Explore the categories column to see how many unique categories exist.

In [39]:
categories_count = df.select("categories").distinct().count()
print(f"Number of unique categories: {categories_count}")

Number of unique categories: 3528


**Top Categories:**

Show the most frequent categories.

In [40]:
from pyspark.sql.functions import col, count
df.groupBy("categories").agg(count("*").alias("count")).orderBy(col("count").desc()).show(10)

+---------------+-----+
|     categories|count|
+---------------+-----+
|         hep-ph| 5008|
|         hep-ex| 3961|
|         hep-th| 3451|
|  hep-ph hep-ex| 1981|
|math.ST stat.TH| 1107|
|   hep-th gr-qc|  931|
|        math.PR|  922|
|  hep-ph hep-th|  619|
|        stat.AP|  596|
|  hep-th hep-ph|  589|
+---------------+-----+
only showing top 10 rows



**Explanation of the code:**
--from pyspark.sql.functions import col, count

--This line imports specific functions from the pyspark.sql.functions module. col is used to access a DataFrame's column, and count is used to count rows.

--df.groupBy("categories").agg(count("*").alias("count")).orderBy(col("count").desc()).show(10)


--df.groupBy("categories"): This groups the DataFrame df by the values in the "categories" column.

--.agg(count("*").alias("count")): This aggregates the grouped DataFrame by counting the rows in each group and naming this count column as "count".

--.orderBy(col("count").desc()): This orders the aggregated results in descending order based on the "count" column.

--.show(10): This displays the first 10 rows of the final result.


**Understanding the Dot (.)**
In Python, the dot (.) operator is used for accessing methods and properties of an object. It's a way of "chaining" methods together to perform complex operations step by step. For instance:

--df.groupBy("categories") calls the groupBy method on the DataFrame df.

--.agg(count("*").alias("count")) then calls the agg method on the result of the previous groupBy method.

--And so on...

**When to Use the Dot**

Use the dot (.) whenever you need to call a method or access a property of an object. It's essential in object-oriented programming to invoke functions on instances of classes or to retrieve attributes of objects.


**What's NEXT?**
In big datasets like this, if we try to process every column, it could consume a lot of memory and crash our system.

**Key Points to Consider:**

**Select Relevant Columns:**

For ML, we don't need to use all columns. Focus on columns that contain valuable information for our analysis.

--Example: title, abstract, and categories are often the most useful:

--title/abstract: Used to extract text-based features for predictive modeling.

--categories: Often serves as the target variable for classification tasks.

**Avoid Wasting Resources:**

--Columns like id, authors, license, or submitter may not contribute much to our analysis and can be ignored.

**Efficient Feature Selection:**

Instead of transforming all string columns, focus on one or two text-heavy columns (like abstract) to generate features.

This saves memory and avoids crashing.


**Step-by-Step Instructions for ML Analysis:**

**Select Only Useful Columns**

Filter the DataFrame to keep only abstract, title, and categories:

In [41]:
df = df.select("abstract", "title", "categories")
print(f"Columns selected: {df.columns}")

Columns selected: ['abstract', 'title', 'categories']


**Preprocess Text Columns**

Transform abstract or title into numeric features for ML analysis.

Tokenize Text: Split the text into words.

In [42]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="abstract", outputCol="tokens")
df = tokenizer.transform(df)
df.select("abstract", "tokens").show(5, truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                          abstract|                                            tokens|
+--------------------------------------------------+--------------------------------------------------+
|  We present a universal Dirac operator for non...|[, , we, present, a, universal, dirac, operator...|
|  We solve the largest sample eigenvalue distri...|[, , we, solve, the, largest, sample, eigenvalu...|
|  A model has been proposed in which neutral sc...|[, , a, model, has, been, proposed, in, which, ...|
|  This paper is concerned with the Einstein equ...|[, , this, paper, is, concerned, with, the, ein...|
|  We discuss a new kind of astrophysical transp...|[, , we, discuss, a, new, kind, of, astrophysic...|
+--------------------------------------------------+--------------------------------------------------+
only showing top 5 rows



**Convert Words into Numeric Features:**

 Use HashingTF or TF-IDF to convert words into numeric vectors.

In [43]:
from pyspark.ml.feature import HashingTF

hashingTF = HashingTF(inputCol="tokens", outputCol="features", numFeatures=1000)
df = hashingTF.transform(df)
df.select("abstract", "features").show(5, truncate=50)

+--------------------------------------------------+--------------------------------------------------+
|                                          abstract|                                          features|
+--------------------------------------------------+--------------------------------------------------+
|  We present a universal Dirac operator for non...|(1000,[17,30,39,93,121,130,143,165,173,195,199,...|
|  We solve the largest sample eigenvalue distri...|(1000,[17,40,46,99,102,162,179,209,214,249,280,...|
|  A model has been proposed in which neutral sc...|(1000,[17,29,58,66,69,78,80,100,102,106,133,157...|
|  This paper is concerned with the Einstein equ...|(1000,[7,17,19,47,48,78,115,116,133,135,137,157...|
|  We discuss a new kind of astrophysical transp...|(1000,[4,17,44,54,63,66,76,80,85,95,112,117,130...|
+--------------------------------------------------+--------------------------------------------------+
only showing top 5 rows



**Code Breakdown**

--from pyspark.ml.feature import HashingTF

--This line imports the HashingTF class from the pyspark.ml.feature module. HashingTF is a feature transformer that converts a collection of tokens into a fixed-length feature vector using the hashing trick.

--hashingTF = HashingTF(inputCol="tokens", outputCol="features", numFeatures=1000)

--This creates an instance of HashingTF with the following parameters:

--inputCol="tokens": The column name for the input tokens.

--outputCol="features":

The column name for the output feature vector.

--numFeatures=1000: The number of features (dimensionality of the feature vector).

---df = hashingTF.transform(df)

--This transforms the DataFrame df by applying the HashingTF transformer to it. The transformation converts the "tokens" column into a "features" column containing the hashed feature vectors.

---df.select("abstract", "features").show(5, truncate=50)

--This selects the "abstract" and "features" columns from the DataFrame and displays the first 5 rows. The truncate=50 parameter truncates long strings in the output to 50 characters for easier readability.



**Encode Target Column (categories)**

Convert categories into numeric labels for classification.

In [44]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="categories", outputCol="label")
df = indexer.fit(df).transform(df)
df.select("categories", "label").show(5)

+----------+-----+
|categories|label|
+----------+-----+
|    hep-th|  2.0|
|   math.PR|  6.0|
|    hep-ph|  0.0|
|     gr-qc| 11.0|
|  astro-ph| 47.0|
+----------+-----+
only showing top 5 rows



**Code Breakdown**

---from pyspark.ml.feature import StringIndexer

--This line imports the StringIndexer class from the pyspark.ml.feature module. StringIndexer is a feature transformer that encodes a string column of labels to a column of label indices.

---indexer = StringIndexer(inputCol="categories", outputCol="label")


--This creates an instance of StringIndexer with the following parameters:

--inputCol="categories": The column name for the input string values.

--outputCol="label": The column name for the output indexed values.

---df = indexer.fit(df).transform(df)

--This does two things:

--indexer.fit(df): Fits the StringIndexer model on the DataFrame df. During this fitting process, StringIndexer learns the mapping from string labels to integer indices.

--.transform(df): Transforms the DataFrame df using the fitted StringIndexer model, adding a new column "label" with the indexed values.

---df.select("categories", "label").show(5)


--This selects the "categories" and "label" columns from the DataFrame and displays the first 5 rows.


**Split Data**

Split the data into training (80%) and testing (20%) datasets.

In [45]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
print(f"Training Data: {train_data.count()} rows")
print(f"Testing Data: {test_data.count()} rows")

Training Data: 33096 rows
Testing Data: 8102 rows


**Note: I cannot run the following scripts in the local machine due to the large size of data....**

**Suggestions:**

--Train on a smaller sample of the data ( 10% of data) to verify if the process is working as expected.

sample_train_data = train_data.sample(fraction=0.1, seed=42)  # Use 10% of the data

model = lr.fit(sample_train_data)




**Train a Simple Model**

Train a classification model (e.g., Logistic Regression) using the features and labels.

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)
print("Model training completed!")

**Evaluate the Model**

Use the test data to check how well the model performs.

In [None]:
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(10)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")