In [1]:
import pandas as pd
import os
from sklearn.model_selection import train_test_split

# === Paths ===
data_dir = "E:/amazon-sentiment-analyzer/backend/data/"
full_dataset_path = os.path.join(data_dir, "train_all_3class.csv")  # Replace with your 23.4M-row CSV
train_path = os.path.join(data_dir, "train_full.parquet")
val_path = os.path.join(data_dir, "val_full.parquet")
test_path = os.path.join(data_dir, "test_full.parquet")
CHUNKSIZE = 100_000

# === Create output directory ===
os.makedirs(data_dir, exist_ok=True)

# === Read and inspect dataset ===
print("🔍 Inspecting full dataset...")
labels = []
reader = pd.read_csv(full_dataset_path, chunksize=CHUNKSIZE)
for chunk in reader:
    chunk['label'] = pd.to_numeric(chunk['label'], errors='coerce')
    chunk = chunk.dropna(subset=['label'])
    labels.extend(chunk['label'].astype(int).tolist())
label_counts = pd.Series(labels).value_counts(dropna=False)
print("Label counts:\n", label_counts)

# === Split dataset ===
print("🔄 Splitting dataset...")
train_dfs, val_dfs, test_dfs = [], [], []
reader = pd.read_csv(full_dataset_path, chunksize=CHUNKSIZE)
for chunk in reader:
    chunk['text'] = chunk['text'].fillna("").astype(str).str.lower()
    chunk['label'] = pd.to_numeric(chunk['label'], errors='coerce')
    chunk = chunk.dropna(subset=['label']).astype({'label': int})
    if len(chunk) == 0:
        continue
    # Stratified split per chunk
    train_val, test = train_test_split(chunk, test_size=0.1, stratify=chunk['label'], random_state=42)
    train, val = train_test_split(train_val, test_size=0.1111, stratify=train_val['label'], random_state=42)  # 0.1111 ~ 10% of 90%
    train_dfs.append(train)
    val_dfs.append(val)
    test_dfs.append(test)
    print(f"Processed chunk with {len(chunk):,} rows")

# === Save splits as Parquet ===
print("💾 Saving splits...")
pd.concat(train_dfs).to_parquet(train_path, index=False)
pd.concat(val_dfs).to_parquet(val_path, index=False)
pd.concat(test_dfs).to_parquet(test_path, index=False)
print(f"Saved train: {train_path}, val: {val_path}, test: {test_path}")

🔍 Inspecting full dataset...
Label counts:
 2    18079660
0     4115909
1     1715821
Name: count, dtype: int64
🔄 Splitting dataset...
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk with 100,000 rows
Processed chunk

In [3]:
import pandas as pd
import joblib
import os
import time
import sklearn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.utils.class_weight import compute_class_weight
import psutil
import numpy as np
import pyarrow.parquet as pq
import pyarrow as pa

# === Check scikit-learn version ===
print(f"📦 scikit-learn version: {sklearn.__version__}")

# === Paths ===
data_dir = "E:/amazon-sentiment-analyzer/backend/data/"
train_path = os.path.join(data_dir, "train_full.parquet")
val_path = os.path.join(data_dir, "val_full.parquet")
test_path = os.path.join(data_dir, "test_full.parquet")
CHUNKSIZE = 50_000  # For 8GB RAM with TfidfVectorizer

# === Load val/test ===
print("📂 Loading val/test...")
try:
    val_df = pd.read_parquet(val_path)
    test_df = pd.read_parquet(test_path)
    val_df['text'] = val_df['text'].fillna("").astype(str).str.lower()
    test_df['text'] = test_df['text'].fillna("").astype(str).str.lower()
    print(f"Memory usage after loading: {psutil.Process().memory_info().rss / 1024**2:.2f} MB")
except FileNotFoundError as e:
    raise FileNotFoundError(f"Parquet file not found: {e}")

# === Compute Class Weights ===
print("🔍 Computing class weights...")
sample_size = 500_000
y_sample = []
parquet_file = pq.ParquetFile(train_path)
total_rows = 0
for batch in parquet_file.iter_batches(batch_size=CHUNKSIZE):
    chunk = batch.to_pandas()
    chunk['label'] = pd.to_numeric(chunk['label'], errors='coerce')
    chunk = chunk.dropna(subset=['label']).astype({'label': int})
    y_sample.extend(chunk['label'].tolist())
    total_rows += len(chunk)
    if total_rows >= sample_size:
        break
if not y_sample:
    raise ValueError("No valid labels found in sample.")

unique_labels = np.unique(y_sample)
print(f"Unique labels: {unique_labels}")
try:
    class_weights = compute_class_weight('balanced', classes=unique_labels, y=y_sample)
    class_weight_dict = {unique_labels[i]: class_weights[i] for i in range(len(unique_labels))}
    print(f"Class weights: {class_weight_dict}")
except ValueError as e:
    print(f"⚠️ Error computing class weights: {e}. Using equal weights.")
    class_weight_dict = {label: 1.0 for label in unique_labels}

# === Initialize Vectorizer ===
print("🔠 Fitting TfidfVectorizer...")
train_texts = []
parquet_file = pq.ParquetFile(train_path)
total_rows = 0
for batch in parquet_file.iter_batches(batch_size=CHUNKSIZE):
    chunk = batch.to_pandas()
    chunk['text'] = chunk['text'].fillna("").astype(str).str.lower()
    train_texts.extend(chunk['text'].tolist())
    total_rows += len(chunk)
    if total_rows >= sample_size:
        break
vectorizer = TfidfVectorizer(max_features=20_000, stop_words='english')
vectorizer.fit(train_texts)
print("✅ Vectorizer fitted.")

# === Initialize Model ===
model = SGDClassifier(
    loss='log_loss',
    max_iter=50,
    learning_rate='adaptive',
    eta0=0.0005,
    alpha=0.00005,
    n_jobs=-1
)

# === Train Model Incrementally ===
print("🚀 Training SGDClassifier...")
start = time.time()
for epoch in range(4):  # Increased to 4 passes
    parquet_file = pq.ParquetFile(train_path)
    for i, batch in enumerate(parquet_file.iter_batches(batch_size=CHUNKSIZE)):
        chunk_start = time.time()
        chunk = batch.to_pandas()
        chunk['text'] = chunk['text'].fillna("").astype(str).str.lower()
        chunk['label'] = pd.to_numeric(chunk['label'], errors='coerce')
        chunk = chunk.dropna(subset=['label']).astype({'label': int})
        X_chunk = vectorizer.transform(chunk['text'])
        y_chunk = chunk['label']
        valid_idx = y_chunk.isin(class_weight_dict.keys())
        if not valid_idx.all():
            print(f"⚠️ Epoch {epoch+1}, Chunk {i+1} has invalid labels. Filtering...")
            X_chunk = X_chunk[valid_idx]
            y_chunk = y_chunk[valid_idx]
        if len(y_chunk) == 0:
            print(f"⚠️ Epoch {epoch+1}, Chunk {i+1} has no valid labels. Skipping.")
            continue
        model.partial_fit(X_chunk, y_chunk, classes=unique_labels, sample_weight=[class_weight_dict[y] for y in y_chunk])
        print(f"📦 Epoch {epoch+1}, Chunk {i+1} with {len(chunk):,} rows in {time.time() - chunk_start:.2f}s")
        print(f"Memory usage: {psutil.Process().memory_info().rss / 1024**2:.2f} MB")
train_time = time.time() - start
print(f"✅ Training complete in {train_time:.2f}s")

# === Evaluate ===
print("📊 Evaluating...")
X_val = vectorizer.transform(val_df['text'])
X_test = vectorizer.transform(test_df['text'])
y_val = val_df['label'].astype(int)
y_test = test_df['label'].astype(int)

print("Validation Accuracy:", accuracy_score(y_val, model.predict(X_val)))
print("Test Accuracy:", accuracy_score(y_test, model.predict(X_test)))
print("Test Classification Report:\n", classification_report(y_test, model.predict(X_test), zero_division=0))

# === Save Model ===
model_dir = "E:/amazon-sentiment-analyzer/backend/model/sgdclassifier_full"
os.makedirs(model_dir, exist_ok=True)
joblib.dump(model, os.path.join(model_dir, "model.pkl"), compress=3)
joblib.dump(vectorizer, os.path.join(model_dir, "vectorizer.pkl"), compress=3)
print(f"💾 Saved model to {model_dir}")

📦 scikit-learn version: 1.6.1
📂 Loading val/test...
Memory usage after loading: 2047.65 MB
🔍 Computing class weights...
Unique labels: [0 1 2]
Class weights: {np.int64(0): np.float64(2.638216143772227), np.int64(1): np.float64(3.9516007934814392), np.int64(2): np.float64(0.422316201654297)}
🔠 Fitting TfidfVectorizer...
✅ Vectorizer fitted.
🚀 Training SGDClassifier...
📦 Epoch 1, Chunk 1 with 50,000 rows in 2.60s
Memory usage: 1513.71 MB
📦 Epoch 1, Chunk 2 with 50,000 rows in 2.06s
Memory usage: 1517.05 MB
📦 Epoch 1, Chunk 3 with 50,000 rows in 1.84s
Memory usage: 1518.56 MB
📦 Epoch 1, Chunk 4 with 50,000 rows in 1.61s
Memory usage: 1518.34 MB
📦 Epoch 1, Chunk 5 with 50,000 rows in 1.45s
Memory usage: 1518.71 MB
📦 Epoch 1, Chunk 6 with 50,000 rows in 1.47s
Memory usage: 1520.61 MB
📦 Epoch 1, Chunk 7 with 50,000 rows in 1.37s
Memory usage: 1521.12 MB
📦 Epoch 1, Chunk 8 with 50,000 rows in 1.45s
Memory usage: 1521.60 MB
📦 Epoch 1, Chunk 9 with 50,000 rows in 1.39s
Memory usage: 1525.40 MB
