In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, schema_of_json, concat_ws
import os

# Start Spark session
spark = SparkSession.builder.appName("FullDatasetLoadingWithProgress").getOrCreate()

# Directory containing saved batches
data_dir = "C:/Users/andre/Downloads/saved_data"

# List all .txt batch files
batch_files = [f for f in os.listdir(data_dir) if f.endswith(".txt")]
print(f"Total batch files found: {len(batch_files)}")

Total batch files found: 1365


In [2]:
# Uninstall conflicting packages first
!pip uninstall -y transformers numpy scikit-learn scipy peft datasets -q

# Install stable and compatible versions in order
!pip install numpy==1.25.2 --force-reinstall -q
!pip install scipy==1.11.4 -q
!pip install scikit-learn==1.3.0 -q
!pip install transformers==4.40.1 -q
!pip install peft==0.11.1 -q
!pip install datasets -q

# Confirm versions
!pip show numpy scipy scikit-learn transformers datasets peft

You can safely remove it manually.


Name: numpy
Version: 1.25.2
Summary: Fundamental package for array computing in Python
Home-page: https://www.numpy.org
Author: Travis E. Oliphant et al.
Author-email: 
License: BSD-3-Clause
Location: C:\Users\andre\Downloads\spark\spark\.pixi\envs\default\Lib\site-packages
Requires: 
Required-by: accelerate, datasets, pandas, peft, scikit-learn, scipy, torchvision, transformers
---
Name: scipy
Version: 1.11.4
Summary: Fundamental algorithms for scientific computing in Python
Home-page: https://scipy.org/
Author: 
Author-email: 
License: Copyright (c) 2001-2002 Enthought, Inc. 2003-2023, SciPy Developers.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:

1. Redistributions of source code must retain the above copyright
   notice, this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above
   copyright notice, this list

In [4]:
# Use the first batch to infer schema
sample_path = os.path.join(data_dir, batch_files[0])
df_sample = spark.read.text(sample_path)
sample_json = df_sample.select("value").first()["value"]
schema = schema_of_json(sample_json)
print("Schema inferred successfully.")

Schema inferred successfully.


In [5]:
# Initialize an empty DataFrame
df_all = None

# Loop through each batch and append
for i, file in enumerate(batch_files):
    file_path = os.path.join(data_dir, file)
    df_raw = spark.read.text(file_path)
    df_parsed = df_raw.select(from_json("value", schema).alias("data")).select("data.*")
    
    if df_all is None:
        df_all = df_parsed
    else:
        df_all = df_all.union(df_parsed)
    
    print(f"Loaded and parsed batch {i+1}/{len(batch_files)}: {file}")

Loaded and parsed batch 1/1365: astro-ph.CO_batch_0.txt
Loaded and parsed batch 2/1365: astro-ph.CO_batch_1.txt
Loaded and parsed batch 3/1365: astro-ph.CO_batch_2.txt
Loaded and parsed batch 4/1365: astro-ph.CO_batch_3.txt
Loaded and parsed batch 5/1365: astro-ph.CO_batch_4.txt
Loaded and parsed batch 6/1365: astro-ph.CO_batch_5.txt
Loaded and parsed batch 7/1365: astro-ph.CO_batch_6.txt
Loaded and parsed batch 8/1365: astro-ph.CO_batch_7.txt
Loaded and parsed batch 9/1365: astro-ph.CO_batch_8.txt
Loaded and parsed batch 10/1365: astro-ph.CO_batch_9.txt
Loaded and parsed batch 11/1365: astro-ph.EP_batch_0.txt
Loaded and parsed batch 12/1365: astro-ph.EP_batch_1.txt
Loaded and parsed batch 13/1365: astro-ph.EP_batch_2.txt
Loaded and parsed batch 14/1365: astro-ph.EP_batch_3.txt
Loaded and parsed batch 15/1365: astro-ph.EP_batch_4.txt
Loaded and parsed batch 16/1365: astro-ph.EP_batch_5.txt
Loaded and parsed batch 17/1365: astro-ph.EP_batch_6.txt
Loaded and parsed batch 18/1365: astro-p

In [6]:
# Drop nulls and combine title + summary
df_all = df_all.dropna(subset=["title", "summary", "main_category"])
df_all = df_all.withColumn("text", concat_ws(" ", col("title"), col("summary")))

print("Data cleaned and ready.")
df_all.select("main_category").groupBy("main_category").count().show(truncate=False)

Data cleaned and ready.
+------------------+-----+
|main_category     |count|
+------------------+-----+
|astro-ph.CO       |1000 |
|astro-ph.EP       |1000 |
|astro-ph.GA       |1000 |
|astro-ph.HE       |1000 |
|astro-ph.IM       |1000 |
|astro-ph.SR       |400  |
|cond-mat.dis-nn   |900  |
|cond-mat.mes-hall |1000 |
|cond-mat.mtrl-sci |700  |
|cond-mat.other    |1000 |
|cond-mat.quant-gas|1000 |
|cond-mat.soft     |1000 |
|cond-mat.stat-mech|1000 |
|cond-mat.str-el   |1000 |
|cond-mat.supr-con |1000 |
|cs.AI             |700  |
|cs.AR             |1000 |
|cs.CC             |1000 |
|cs.CE             |1000 |
|cs.CG             |1000 |
+------------------+-----+
only showing top 20 rows



In [7]:
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, schema_of_json, concat_ws, col

# Initialize Spark
spark = SparkSession.builder.appName("ChunkedCategorySplitter").getOrCreate()

# Set paths
data_dir = r"C:/Users/andre/Downloads/saved_data"
train_dir = os.path.join(data_dir, "../train_data")
test_dir = os.path.join(data_dir, "../test_data")
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# List all batch files
all_files = sorted([f for f in os.listdir(data_dir) if f.endswith(".txt")])
print(f"Total batch files: {len(all_files)}")

# Infer schema from one file
sample_path = os.path.join(data_dir, all_files[0])
sample_json = spark.read.text(sample_path).select("value").first()["value"]
schema = schema_of_json(sample_json)
print("Schema inferred.")

# Split and save function
def process_chunk(chunk_files, chunk_index):
    print(f"\nProcessing chunk {chunk_index + 1} with {len(chunk_files)} files...")
    paths = [os.path.join(data_dir, f) for f in chunk_files]
    df_raw = spark.read.text(paths)
    df = df_raw.select(from_json("value", schema).alias("data")).select("data.*")
    df = df.dropna(subset=["summary", "title", "main_category"])
    df = df.withColumn("text", concat_ws(" ", col("title"), col("summary")))

    categories = [row["main_category"] for row in df.select("main_category").distinct().collect()]
    
    for cat in categories:
        df_cat = df.filter(df.main_category == cat)
        train_split, test_split = df_cat.randomSplit([0.6, 0.4], seed=chunk_index)

        train_records = [json.dumps(row.asDict()) for row in train_split.collect()]
        test_records = [json.dumps(row.asDict()) for row in test_split.collect()]

        train_path = os.path.join(train_dir, f"{cat}_train.txt")
        test_path = os.path.join(test_dir, f"{cat}_test.txt")

        with open(train_path, "a", encoding="utf-8") as f:
            f.write("\n".join(train_records) + "\n")

        with open(test_path, "a", encoding="utf-8") as f:
            f.write("\n".join(test_records) + "\n")

        print(f"  {cat}: +{len(train_records)} train / +{len(test_records)} test")

# Chunk files
chunk_size = 200
chunks = [all_files[i:i + chunk_size] for i in range(0, len(all_files), chunk_size)]

# Process each chunk
for idx, chunk in enumerate(chunks):
    process_chunk(chunk, idx)

print("\n All batches processed and split into train/test sets.")

Total batch files: 1365
Schema inferred.

Processing chunk 1 with 200 files...
  astro-ph.GA: +598 train / +402 test
  cs.AR: +617 train / +383 test
  astro-ph.EP: +594 train / +406 test
  astro-ph.CO: +592 train / +408 test
  cs.CE: +602 train / +398 test
  astro-ph.HE: +580 train / +420 test
  astro-ph.IM: +612 train / +388 test
  astro-ph.SR: +223 train / +177 test
  cs.CG: +597 train / +403 test
  cs.CC: +609 train / +391 test
  cs.AI: +436 train / +264 test
  cond-mat.quant-gas: +605 train / +395 test
  cs.CL: +602 train / +398 test
  cond-mat.supr-con: +611 train / +389 test
  cond-mat.mtrl-sci: +433 train / +267 test
  cond-mat.str-el: +625 train / +375 test
  cond-mat.other: +606 train / +394 test
  cond-mat.soft: +600 train / +400 test
  cond-mat.stat-mech: +592 train / +408 test
  cond-mat.dis-nn: +513 train / +387 test
  cond-mat.mes-hall: +581 train / +419 test
  cs.CR: +179 train / +121 test

Processing chunk 2 with 200 files...
  cs.MM: +590 train / +410 test
  cs.CY: +60

In [8]:
import sys
print(sys.executable)

C:\Users\andre\Downloads\spark\spark\.pixi\envs\default\python.exe


In [9]:
import os
import json
from sklearn.preprocessing import LabelEncoder

train_dir = r"C:/Users/andre/Downloads/train_data"
test_dir = r"C:/Users/andre/Downloads/test_data"

# === Mapping ===
main_category_mapping = {
    "cs": "Computer Science",
    "econ": "Economics",
    "eess": "Electrical Engineering and Systems Science",
    "math": "Mathematics",
    "astro-ph": "Physics", "cond-mat": "Physics", "gr-qc": "Physics",
    "hep-ex": "Physics", "hep-lat": "Physics", "hep-ph": "Physics",
    "hep-th": "Physics", "math-ph": "Physics", "nlin": "Physics",
    "nucl-ex": "Physics", "nucl-th": "Physics", "physics": "Physics",
    "quant-ph": "Physics",
    "q-bio": "Quantitative Biology",
    "q-fin": "Quantitative Finance",
    "stat": "Statistics"
}

def get_main_category(raw_cat):
    prefix = raw_cat.split(".")[0].lower()
    if prefix in ["astro-ph", "cond-mat", "hep-ex", "hep-lat", "hep-ph", "hep-th",
                  "math-ph", "nucl-ex", "nucl-th", "quant-ph"]:
        full_prefix = ".".join(raw_cat.lower().split(".")[:2])
        return main_category_mapping.get(full_prefix, main_category_mapping.get(prefix, "Unknown"))
    return main_category_mapping.get(prefix, "Unknown")

# === Load, map, and filter data ===
def load_jsonlines_from_dir(directory):
    records = []
    for file in os.listdir(directory):
        if file.endswith(".txt"):
            with open(os.path.join(directory, file), encoding="utf-8") as f:
                for line in f:
                    try:
                        record = json.loads(line)
                        record["main_category"] = get_main_category(record.get("categories", ""))
                        records.append(record)
                    except json.JSONDecodeError:
                        continue
    return records

# Load and map
train_records = load_jsonlines_from_dir(train_dir)
test_records = load_jsonlines_from_dir(test_dir)

# Filter records with unrecognized categories
train_records = [r for r in train_records if r["main_category"] != "Unknown"]
test_records = [r for r in test_records if r["main_category"] != "Unknown"]

print(f"Filtered train size: {len(train_records)}")
print(f"Filtered test size: {len(test_records)}")

Filtered train size: 484062
Filtered test size: 317508


In [10]:
# Encode string labels into integers
le = LabelEncoder()
all_labels = [r["main_category"] for r in train_records + test_records]
le.fit(all_labels)

# Apply label transformation
for r in train_records:
    r["label"] = int(le.transform([r["main_category"]])[0])
for r in test_records:
    r["label"] = int(le.transform([r["main_category"]])[0])

# Save label mapping for later use
with open("label_mapping.json", "w") as f:
    json.dump({i: label for i, label in enumerate(le.classes_)}, f)

print(f"Encoded {len(le.classes_)} categories.")

Encoded 8 categories.


In [11]:
import sys
!{sys.executable} -m pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

Looking in indexes: https://download.pytorch.org/whl/cu121


In [12]:
import torch
print("CUDA available:", torch.cuda.is_available())
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "None")

CUDA available: True
GPU name: NVIDIA GeForce RTX 3050 Ti Laptop GPU


In [13]:
from datasets import Dataset, DatasetDict
from transformers import DistilBertTokenizerFast

# Convert to HF dataset
train_ds = Dataset.from_list(train_records)
test_ds = Dataset.from_list(test_records)
dataset = DatasetDict({"train": train_ds, "test": test_ds})

# Load tokenizer
tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

# Tokenization function
def tokenize(batch):
    return tokenizer(batch["text"], truncation=True, padding="max_length")

dataset = dataset.map(tokenize, batched=True)
dataset = dataset.remove_columns(["text", "main_category", "aid", "title", "summary", "published", "categories"])
dataset.set_format("torch")

print("Tokenization complete.")

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]



Map:   0%|          | 0/484062 [00:00<?, ? examples/s]

Map:   0%|          | 0/317508 [00:00<?, ? examples/s]

Tokenization complete.


In [14]:
from transformers import DistilBertForSequenceClassification, Trainer, TrainingArguments

model = DistilBertForSequenceClassification.from_pretrained(
    "distilbert-base-uncased",
    num_labels=len(le.classes_)
)

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir='./logs',
    load_best_model_at_end=True,
    logging_steps=20
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    tokenizer=tokenizer
)

trainer.train()

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss
1,0.0505,0.940307
2,0.0498,1.287209
3,0.0,1.486158


TrainOutput(global_step=181524, training_loss=0.0961589629548997, metrics={'train_runtime': 133785.9098, 'train_samples_per_second': 10.855, 'train_steps_per_second': 1.357, 'total_flos': 1.923878853710807e+17, 'train_loss': 0.0961589629548997, 'epoch': 3.0})

In [1]:
model.save_pretrained("fine_tuned_distilbert")
tokenizer.save_pretrained("fine_tuned_distilbert")
print("Model saved to 'fine_tuned_distilbert'")

NameError: name 'model' is not defined

In [16]:
from sklearn.metrics import accuracy_score
from transformers import Trainer

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)
    acc = accuracy_score(labels, predictions)
    return {"accuracy": acc}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

results = trainer.evaluate()
print(f"Accuracy: {results['eval_accuracy'] * 100:.2f}%")

Accuracy: 87.33%


In [17]:
# Evaluate accuracy on the training set
train_results = trainer.evaluate(eval_dataset=dataset["train"])
print(f"Training Accuracy: {train_results['eval_accuracy'] * 100:.2f}%")

Training Accuracy: 98.91%
