# 🛒 Dagster Pipeline with Amazon Product Reviews (Colab-ready)

**Purpose:** Students will gain hands-on experience in data collection, markup, cleaning, and active learning; learn how to automate the process using Dagster; and understand how to improve data using entropy and active learning metrics.

Run the cells top → bottom. This notebook uses `amazon_polarity` (Hugging Face dataset) as a real example of product reviews.

In [None]:
# Install dependencies (run this cell only once)
!pip install --upgrade pip --quiet
!pip install dagster datasets scikit-learn pandas numpy matplotlib scikit-plot --quiet


In [None]:
# Imports
from dagster import graph, op
import pandas as pd
import numpy as np
from datasets import load_dataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import math
import warnings
warnings.filterwarnings('ignore')


## 1) Data collection — why this dataset?

- We use **`amazon_polarity`** because it contains real product reviews and binary sentiment labels (fast to run in Colab).
- In practice, replace this with scraped news, API data, or images from OpenImages / Kaggle.
- This notebook takes a **small subset** (2k rows) so it runs quickly for teaching.

In [None]:
@op
def collect_data_op(n_samples: int = 2000):
    raw = load_dataset('amazon_polarity', split=f'train[:{n_samples}]')
    df = raw.to_pandas()
    df = df.rename(columns={'content': 'text', 'label': 'label'})
    df['label'] = df['label'].astype(int)
    df['text'] = df['text'].astype(str)
    df.to_parquet('amazon_polarity_subset.parquet', index=False)
    return df

# Quick run to collect and show data
df_raw = collect_data_op(2000)
print('Loaded rows:', len(df_raw))
df_raw.head()


### Quick EDA: label balance and text length


In [None]:
def show_eda(df):
    print('Shape:', df.shape)
    plt.figure(figsize=(6,4))
    df['label'].value_counts().sort_index().plot(kind='bar')
    plt.title('Label distribution (0=neg,1=pos)')
    plt.xlabel('Label')
    plt.ylabel('Count')
    plt.show()

    df['text_len'] = df['text'].str.len()
    plt.figure(figsize=(6,4))
    plt.hist(df['text_len'], bins=40)
    plt.title('Text length distribution')
    plt.show()

show_eda(df_raw)


## 2) Data markup — automatic labeling and entropy

We simulate automatic markup by using the dataset's label as `auto_label`. Entropy here is a simple per-sample proxy (binary entropy of the label probability).

In [None]:
@op
def auto_label_op(df: pd.DataFrame):
    df = df.copy()
    df['auto_label'] = df['label'].apply(lambda x: 'positive' if x==1 else 'negative')
    p = df['label'].astype(float)
    df['entropy'] = - (p * np.log2(p + 1e-12) + (1-p) * np.log2(1-p + 1e-12))
    return df

df_marked = auto_label_op(df_raw)
df_marked.head()


In [None]:
plt.figure(figsize=(6,4))
plt.hist(df_marked['entropy'], bins=30)
plt.title('Entropy distribution after auto-label')
plt.show()


## 3) Data cleanup

Remove duplicates, normalize whitespace, drop too-short texts, and filter extremely long outliers.

In [None]:
@op
def clean_data_op(df: pd.DataFrame):
    df = df.copy()
    df = df.drop_duplicates(subset='text')
    df['text'] = df['text'].str.replace(r'\s+', ' ', regex=True).str.strip()
    df = df[df['text'].str.len() > 10]
    max_len = df['text'].str.len().quantile(0.999)
    df = df[df['text'].str.len() <= math.ceil(max_len)]
    df.reset_index(drop=True, inplace=True)
    return df

df_clean = clean_data_op(df_marked)
print('After cleaning rows:', len(df_clean))
df_clean.head()


## 4) Active learning (uncertainty sampling) — sklearn-based

We vectorize text with TF-IDF, train a RandomForest on a small seed, then iteratively query the most uncertain sample from the pool, add it to training, and retrain. This simulates annotator-in-the-loop.

In [None]:
@op
def active_learning_op(df: pd.DataFrame, n_queries: int = 30):
    df = df.copy()
    texts = df['text'].tolist()
    labels = df['label'].values

    vec = TfidfVectorizer(max_features=6000, ngram_range=(1,2))
    X_all = vec.fit_transform(texts)

    # seed small training set and large pool
    X_train, X_pool, y_train, y_pool = train_test_split(X_all, labels, test_size=0.95, random_state=42, stratify=labels)

    model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
    model.fit(X_train, y_train)

    accuracies = []
    pool_sizes = []

    queries = min(n_queries, X_pool.shape[0]-1)
    X_train_arr = X_train.toarray() if hasattr(X_train, 'toarray') else X_train
    X_pool_arr = X_pool.toarray()

    for i in range(queries):
        probs = model.predict_proba(X_pool_arr)
        uncertainties = 1 - np.max(probs, axis=1)
        query_idx = int(np.argmax(uncertainties))

        X_new = X_pool_arr[query_idx].reshape(1, -1)
        y_new = np.array([y_pool[query_idx]])
        X_train_arr = np.vstack([X_train_arr, X_new])
        y_train = np.append(y_train, y_new)

        X_pool_arr = np.delete(X_pool_arr, query_idx, axis=0)
        y_pool = np.delete(y_pool, query_idx, axis=0)

        model.fit(X_train_arr, y_train)

        y_pred_all = model.predict(X_all.toarray())
        acc_all = accuracy_score(labels, y_pred_all)
        accuracies.append(acc_all)
        pool_sizes.append(X_pool_arr.shape[0])

    df['active_label'] = model.predict(X_all.toarray())
    df['active_accuracy'] = accuracies[-1] if len(accuracies)>0 else accuracy_score(labels, model.predict(X_all.toarray()))
    return {"df": df, "accuracies": accuracies, "pool_sizes": pool_sizes, "vectorizer": vec, "model": model}

active_result = active_learning_op(df_clean, n_queries=25)
print('Active learning finished; last accuracy:', active_result['df']['active_accuracy'])


### Plot active learning accuracy curve

In [None]:
accuracies = active_result['accuracies']
plt.figure(figsize=(6,4))
plt.plot(range(1, len(accuracies)+1), accuracies, marker='o')
plt.title('Accuracy over active learning iterations')
plt.xlabel('Iteration')
plt.ylabel('Accuracy')
plt.grid(True)
plt.show()


## 5) Final training and evaluation

Train a final model on the active-labeled data and show confusion matrix.

In [None]:
@op
def train_model_op(active_result: dict):
    df = active_result['df'].copy()
    vec = active_result['vectorizer']
    model = active_result['model']

    X = vec.transform(df['text'].tolist()).toarray()
    y = df['active_label'].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    final_model = RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1)
    final_model.fit(X_train, y_train)
    y_pred = final_model.predict(X_test)
    final_acc = accuracy_score(y_test, y_pred)

    df['final_predictions'] = final_model.predict(X)
    df['final_accuracy'] = final_acc
    return {"df": df, "final_model": final_model, "y_test": y_test, "y_pred": y_pred}

final_result = train_model_op(active_result)
print('Final test accuracy:', final_result['df']['final_accuracy'].iloc[0] if 'final_accuracy' in final_result['df'].columns else final_result['y_pred'].shape)


In [None]:
# Confusion matrix
y_test = final_result['y_test']
y_pred = final_result['y_pred']
plt.figure(figsize=(6,6))
ConfusionMatrixDisplay.from_predictions(y_test, y_pred)
plt.title(f"Final model confusion matrix (acc={final_result['df']['final_accuracy'].iloc[0]:.3f})")
plt.show()


## Save outputs

Saved processed dataset and final result for later use.

In [None]:
final_df = final_result['df']
final_df.to_parquet('amazon_pipeline_result.parquet', index=False)
print('Saved amazon_pipeline_result.parquet')


### Notes for instructors

- The active learning loop uses the true labels from the pool as a simulated oracle. In real setups, you would push queried samples to an annotation UI (Label Studio) and ingest labels back.
- To adapt for images, swap TF-IDF + RF for CNN features and use YOLO/Detectron for object markup.
- Encourage students to swap the dataset in `collect_data_op` with scraped news, a Kaggle CSV, or an S3/DB source.