In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print("Spark version:", spark.version)
print("PySpark is working!")

Spark version: 4.0.1
PySpark is working!


In [1]:
import pandas as pd

preview = pd.read_csv("author_identification_dataset_final.csv")

preview.head()

Unnamed: 0,author,title,text,text_type,word_count
0,Abraham Lincoln,Lincoln Letters,LINCOLN LETTERS By Abraham Lincoln Published b...,letters,1065
1,Abraham Lincoln,Lincoln's First Inaugural Address,"Lincoln's First Inaugural Address March 4, 186...",letters,3626
2,Abraham Lincoln,"Lincoln's Gettysburg Address, given November 1...","Lincoln's Gettysburg Address, given November 1...",letters,299
3,Abraham Lincoln,"Lincoln's Inaugurals, Addresses and Letters (S...",Longman's English Classics LINCOLN'S INAUGURAL...,letters,43649
4,Abraham Lincoln,Lincoln's Second Inaugural Address,"Lincoln's Second Inaugural Address March 4, 18...",letters,703


In [1]:
import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split

# -------------------------
# 1. Read CSV (pandas)
# -------------------------
df = pd.read_csv("author_identification_dataset_final.csv")

# Use 'title' as book_id to avoid leakage
df = df[['author', 'title', 'text']].rename(columns={'title': 'book_id'})

# -------------------------
# 2. Parameters
# -------------------------
MIN_LEN = 1000
CHUNK_SIZE = 2000
MAX_CHUNKS_PER_AUTHOR = 1000

# -------------------------
# 3. Chunking function
# -------------------------
def chunk_text(text, chunk_size=CHUNK_SIZE, min_len=MIN_LEN):
    text = str(text)
    chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
    return [c for c in chunks if len(c) >= min_len]

# -------------------------
# 4. Apply chunking across dataframe
# -------------------------
rows = []
for _, row in df.iterrows():
    chunks = chunk_text(row['text'])
    for chunk in chunks:
        rows.append({
            'author': row['author'],
            'book_id': row['book_id'],
            'chunk': chunk
        })

df_chunks = pd.DataFrame(rows)
print("Total chunks before balance:", len(df_chunks))

# -------------------------
# 5. Cap per author to avoid imbalance
# -------------------------
df_chunks['rn'] = df_chunks.groupby('author').cumcount() + 1
df_balanced = df_chunks[df_chunks['rn'] <= MAX_CHUNKS_PER_AUTHOR].drop(columns=['rn'])
print("After capping per author:", len(df_balanced))

# -------------------------
# 6. Train/Dev/Test split by book_id (no leakage between splits)
# -------------------------
unique_books = df_balanced['book_id'].unique()
train_books, test_books = train_test_split(unique_books, test_size=0.3, random_state=42)
dev_books, test_books = train_test_split(test_books, test_size=0.5, random_state=42)

def assign_split(book_id):
    if book_id in train_books: return 'train'
    if book_id in dev_books: return 'dev'
    return 'test'

df_balanced['split'] = df_balanced['book_id'].map(assign_split)
print(df_balanced['split'].value_counts())

# -------------------------
# 7. Save as Parquet (or CSV)
# -------------------------
os.makedirs("author_chunks_dataset", exist_ok=True)

for split in ['train', 'dev', 'test']:
    out = f"author_chunks_dataset/{split}.parquet"
    df_balanced[df_balanced['split'] == split].to_parquet(out, index=False)
    print(f"Saved {split} to {out}")

Total chunks before balance: 1526225
After capping per author: 770655
split
train    545285
dev      113592
test     111778
Name: count, dtype: int64
Saved train to author_chunks_dataset/train.parquet
Saved dev to author_chunks_dataset/dev.parquet
Saved test to author_chunks_dataset/test.parquet


In [2]:
import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split

# -------------------------
# 1. Read CSV (pandas)
# -------------------------
df = pd.read_csv("author_identification_dataset_final.csv")

# Use 'title' as book_id to avoid leakage
df = df[['author', 'title', 'text']].rename(columns={'title': 'book_id'})

# -------------------------
# 2. Parameters
# -------------------------
MIN_LEN = 1000
CHUNK_SIZE = 2000
MAX_CHUNKS_PER_AUTHOR = 1000

# -------------------------
# 3. Chunking function
# -------------------------
def chunk_text(text, chunk_size=CHUNK_SIZE, min_len=MIN_LEN):
    text = str(text)
    chunks = [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
    return [c for c in chunks if len(c) >= min_len]

# -------------------------
# 4. Apply chunking across dataframe
# -------------------------
rows = []
for _, row in df.iterrows():
    chunks = chunk_text(row['text'])
    for chunk in chunks:
        rows.append({
            'author': row['author'],
            'book_id': row['book_id'],
            'chunk': chunk
        })

df_chunks = pd.DataFrame(rows)
print("Total chunks before balance (raw dataset):", len(df_chunks))

# -------------------------
# Save the full raw 1.5M dataset
# -------------------------
os.makedirs("author_chunks_dataset", exist_ok=True)
df_chunks.to_parquet("author_chunks_dataset/full_dataset.parquet", index=False)
df_chunks.to_csv("author_chunks_dataset/full_dataset.csv", index=False)
print("Saved full raw dataset (1.5M chunks)")

# -------------------------
# 5. Cap per author to avoid imbalance
# -------------------------
df_chunks['rn'] = df_chunks.groupby('author').cumcount() + 1
df_balanced = df_chunks[df_chunks['rn'] <= MAX_CHUNKS_PER_AUTHOR].drop(columns=['rn'])
print("After capping per author (balanced dataset):", len(df_balanced))

# -------------------------
# 6. Train/Dev/Test split by book_id (no leakage)
# -------------------------
unique_books = df_balanced['book_id'].unique()
train_books, test_books = train_test_split(unique_books, test_size=0.3, random_state=42)
dev_books, test_books = train_test_split(test_books, test_size=0.5, random_state=42)

def assign_split(book_id):
    if book_id in train_books: return 'train'
    if book_id in dev_books: return 'dev'
    return 'test'

df_balanced['split'] = df_balanced['book_id'].map(assign_split)
print("Balanced dataset split counts:\n", df_balanced['split'].value_counts())

# -------------------------
# 7. Save splits as Parquet AND CSV
# -------------------------
for split in ['train', 'dev', 'test']:
    split_df = df_balanced[df_balanced['split'] == split]

    # Parquet
    out_parquet = f"author_chunks_dataset/{split}.parquet"
    split_df.to_parquet(out_parquet, index=False)

    # CSV
    out_csv = f"author_chunks_dataset/{split}.csv"
    split_df.to_csv(out_csv, index=False)

    print(f"Saved {split}: {len(split_df)} rows to {out_parquet} and {out_csv}")

Total chunks before balance (raw dataset): 1526225
Saved full raw dataset (1.5M chunks)
After capping per author (balanced dataset): 770655
Balanced dataset split counts:
 split
train    545285
dev      113592
test     111778
Name: count, dtype: int64
Saved train: 545285 rows to author_chunks_dataset/train.parquet and author_chunks_dataset/train.csv
Saved dev: 113592 rows to author_chunks_dataset/dev.parquet and author_chunks_dataset/dev.csv
Saved test: 111778 rows to author_chunks_dataset/test.parquet and author_chunks_dataset/test.csv
