# UCI Adult Dataset or Census Income

This is a very popular ML task, with tabular data. The objective is to predict whether income exceeds $50K/yr based on census data. 
Also known as "Census Income" dataset.

The data is old and biased on different ways ... but it can be used opaquely for ML experimentation.



## Environment Set Up

Let's set up `go.mod` to use the local copy of GoMLX, so it can be developed jointly the dataset code with the model. That's often how data pre-processing and model code is developed together with experimentation.

If you are not changing code, feel free to simply skip this cell. Or if you used a different directory for you projects, change it below.

Notice the directory `${HOME}/Projects/gomlx` is where the GoMLX code is copied by default in [its Docker](https://hub.docker.com/repository/docker/janpfeifer/gomlx_jupyterlab/general).

In [1]:
!*go mod edit -replace github.com/gomlx/gomlx="${HOME}/Projects/gomlx"

## Data Preparation

GoMLX provides [a simple `adult` library](https://pkg.go.dev/github.com/gomlx/gomlx/examples/adult) to facilitate downdoaling and preprocessing the data. Data is available in [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult).

After downloading the data and validating the checksum (both training and testing), it generates the quantiles for the continuous features, and the vocabularies for the categorical features. It saves all this info for faster restart later in a binary file. So this won't be necessary a second time.

The quantiles are used to calibrate the values, using a piece-wise-lienar calibration, very good for these things. See [`layers.PieceWiseLinearCalibration` documentation](https://pkg.go.dev/github.com/gomlx/gomlx@v0.1.0/ml/layers#PieceWiseLinearCalibration).

We create a flag `--data` to define the directory where to save the intermediary files: downloaded and preprocessed datasets.
In this examle we set it to `~/work/uci-adult`. Verbosity can be contolled with the `--verbosity` flag. 

We set default in Go for these flags, but they can easily be reset for a new run by providing them after the `%%` Jupyter kernel meta-command -- in indicates that the subsequent lines should be put in to a `func main`.


In [2]:
import (
    "flag"
    
    "github.com/gomlx/gomlx/examples/adult"
)

var (
	flagDataDir        = flag.String("data", "~/work/uci-adult", "Directory to save and load downloaded and generated dataset files.")
    flagVerbosity = flag.Int("verbosity", 0, "Level of verbosity, the higher the more verbose.")
    flagForceDownload  = flag.Bool("force_download", false, "Force re-download of Adult dataset files.")
    flagNumQuantiles    = flag.Int("quantiles", 100, "Max number of quantiles to use for numeric features, used during piece-wise linear calibration. It will only use unique values, so if there are fewer variability, fewer quantiles are used.")
)

%% --verbosity=2
adult.LoadAndPreprocessData(*flagDataDir, *flagNumQuantiles, *flagForceDownload, *flagVerbosity)


Sample Categorical: (24.08% positive ratio, 23.86% weighted positive ratio)
	Row 0:	[7 10 5 1 2 5 2 39]
	Row 1:	[6 10 3 4 1 5 2 39]
	Row 2:	[4 12 1 6 2 5 2 39]
	...
	Row 32558:	[4 12 7 1 5 5 1 39]
	Row 32559:	[4 12 5 1 4 5 2 39]
	Row 32560:	[5 12 3 4 6 5 1 39]

Sample Continuous:
	Row 0:	[39 13 2174 0 40]
	Row 1:	[50 13 0 0 13]
	Row 2:	[38 9 0 0 40]
	...
	Row 32558:	[58 9 0 0 40]
	Row 32559:	[22 9 0 0 20]
	Row 32560:	[52 9 15024 0 40]


In [3]:
!ls -lh ~/work/uci-adult

total 7.0M
-rw-r--r-- 1 janpf janpf 3.8M Mar 21 09:14 adult.data
-rw-r--r-- 1 janpf janpf 1.3M Mar 21 09:14 adult_data-100_quantiles.bin
-rw-r--r-- 1 janpf janpf 2.0M Mar 21 09:14 adult.test


### Creating Datasets

First we create the GoMLX's `Manager`: it's the object that manages the underlying XLA
setup, connection and execution. It's needed to create tensors.

With that we create the samplers of data that we will use to train and evaluate. They implement 
GoMLX's `train.Dataset` interface, which is what is used by our training loop to draw batches to
train, or our eval loop to draw batches to evaluate.

The inputs are 3 tensors: *categorical values*, *continuous values* and *weights*.

In the cell below we define the `Manager` flags, `BuildSamplers` and printout some samples.

In [4]:
import (
    "flag"
    "fmt"
    "io"

    . "github.com/gomlx/gomlx/graph"
    "github.com/gomlx/gomlx/examples/adult"
    "github.com/gomlx/gomlx/ml/train"
)

var (
    flagNumThreads     = flag.Int("num_threads", -1, "Number of threads for XLA. Leave as -1 to use as many as there are cores.")
    flagNumReplicas    = flag.Int("num_replicas", 1, "Number of replicas for XLA. Leave as 1 for now.")
    flagPlatform       = flag.String("platform", "", "Platform to use, if empty uses the default one. Usually 'Host' "+
                                     "(for CPU), 'CUDA' (for CPU) or 'TPU'")
    flagBatchSize      = flag.Int("batch", 128, "BatchSampler size for training")
)

// BuildDatasets returns 3 `train.Dataset`:
// * trainingSampler is an endless random sampler used for training.
// * trainingEvalSampler samples through exactly one epoch of the train dataset.
// * testEvalSampler samples through exactly one epoch of the test dataset.
func BuildDatasets(manager *Manager) (trainingDS, trainingEvalDS, testEvalDS train.Dataset) {
    trainingDS = adult.NewDataset("batched train", adult.Data.Train, manager, *flagBatchSize)
    trainingEvalDS = adult.NewDataset("train", adult.Data.Train, manager, -1)
    testEvalDS = adult.NewDataset("test", adult.Data.Test, manager, -1)
    return
}

// AssertNoError checks that err is nil, otherwise if `log.Fatal` with the error message.
func AssertNoError(err error) {
    if err != nil {
        log.Fatalf("Failed: %+v", err)
    }
}

// PositiveRatio finds out the the ratio of positive labels in the
// training and testing data.
//
// We could do this easily with GoMLX computation model (just `ReduceAllSum`), but
// this examples shows it's also ok to mix Go computations.
func PositiveRatio(ds train.Dataset) float32 {
    ds.Reset()  // Start from beginning.
    var sum float32
    var count float32
    for {
        _, _, labels, err := ds.Yield()
        if err == io.EOF {
            break;
        }
        AssertNoError(err)
        data := tensor.Data[float32](labels[0].Local())
        for _, value := range data {
            sum += value
        }
        count += float32(len(data))
    }
    return sum/count
}

%%
adult.LoadAndPreprocessData(*flagDataDir, *flagNumQuantiles, *flagForceDownload, *flagVerbosity)    
manager := BuildManager().NumThreads(*flagNumThreads).NumReplicas(*flagNumReplicas).Platform(*flagPlatform).MustDone()
trainingDS, trainingEvalDS, testEvalDS := BuildDatasets(manager)

// Take one batch.
_, inputs, labels, err := trainingDS.Yield()
AssertNoError(err)
fmt.Printf("Inputs of batch (size %d):\n", *flagBatchSize)
fmt.Printf("\tcategorical:\n\t\tFeatures=%v\n", adult.Data.VocabulariesFeatures)
fmt.Printf("\t\tValues: %s\n", inputs[0].Local().StringN(16))
fmt.Printf("\tcontinuous:\n\t\tFeatures=%v\n", adult.Data.QuantilesFeatures)
fmt.Printf("\t\tValues: %s\n", inputs[1].Local().StringN(10))
fmt.Printf("\tweights: %s\n", inputs[2].Local().StringN(5))
fmt.Printf("\nLabels of batch:\n\t%s\n", labels[0].Local().StringN(10))

fmt.Printf("\nLabels distributions:\n\tTrain:\t%.2f%% positive\n\tTest:\t%.2f%% positive\n",
           PositiveRatio(trainingEvalDS)*100.0, PositiveRatio(testEvalDS)*100.0)


Inputs of batch (size 128):
	categorical:
		Features=[workclass education marital-status occupation relationship race sex native-country]
		Values: (Int64)[128 8]: (... too large, 1024 values ..., first 16 values: [4 12 4 4 2 5 1 39 4 10 3 12 1 5 2 39])
	continuous:
		Features=[age education-num capital-gain capital-loss hours-per-week]
		Values: (Float32)[128 5]: (... too large, 640 values ..., first 10 values: [54 9 0 0 43 38 13 0 0 50])
	weights: (Float32)[128 1]: (... too large, 128 values ..., first 5 values: [278329 174717 54782 152328 254613])

Labels of batch:
	(Float32)[128 1]: (... too large, 128 values ..., first 10 values: [0 1 0 0 0 0 0 1 0 0])

Labels distributions:
	Train:	24.08% positive
	Test:	23.62% positive


## Model Definition

Lots of hyper-parameter flags, but otherwise a straight forward FNN, using piece-wise linear calibration of the continuous features, and embeddings for the categorical features.

> **Note**: building models is a constant checking that shapes are compatible. It's a bit annoying, in particular because shapes are known in runtime only -- no compile time check. GoMLX tries to help providing a stack trace of where errors happen so one can pin-point issues quickly. But often it involves lots of experimentation (more than ordinary Go code).
>
> Developing with a Noteboook (see [GoNB](https://github.com/janpfeifer/gonb)) or simply a unit test on your `ModelGraph` function are quick/convenient ways to develop models -- before actually training them.

In [5]:
import (
    "fmt"
    "io"

    . "github.com/gomlx/gomlx/graph"

    "github.com/gomlx/gomlx/ml/context"
    "github.com/gomlx/gomlx/examples/adult"
    "github.com/gomlx/gomlx/ml/train"
    "github.com/gomlx/gomlx/ml/train/optimizers"
    "github.com/gomlx/gomlx/types/shapes"
)

var (
    // ModelDType used for the model. Must match RawData Go types.
    ModelDType = shapes.Float32
    
    // Model hyperparameters.
    flagUseCategorical       = flag.Bool("use_categorical", true, "Use categorical features.")
    flagUseContinuous        = flag.Bool("use_continuous", true, "Use continuous features.")
    flagTrainableCalibration = flag.Bool("trainable_calibration", true, "Allow piece-wise linear calibration to adjust outputs.")
    flagEmbeddingDim    = flag.Int("embedding_dim", 8, "Default embedding dimension for categorical values.")
    flagNumHiddenLayers = flag.Int("hidden_layers", 8, "Number of hidden layers, stacked with residual connection.")
    flagNumNodes        = flag.Int("num_nodes", 32, "Number of nodes in hidden layers.")
    flagDropoutRate     = flag.Float64("dropout", 0, "Dropout rate")
    
    // Training parameter, referenced here.
    flagLearningRate    = flag.Float64("learning_rate", 0.001, "Initial learning rate.")
    flagNumSteps       = flag.Int("steps", 5000, "Number of gradient descent steps to perform")
)


// ModelGraph outputs the logits (not the probabilities). The parameter inputs should contain 3 tensors:
//
// - categorical inputs, shaped  `(int64)[batch_size, len(VocabulariesFeatures)]`
// - continuous inputs, shaped `(float32)[batch_size, len(Quantiles)]`
// - weights: not currently used, but shaped `(float32)[batch_size, 1]`.
func ModelGraph(ctx *context.Context, spec any, inputs []*Node) []*Node {
    _ = spec // Not used, since the dataset is always the same.
    g := inputs[0].Graph()
    
    // Use Cosine schedule of the learning rate.
    optimizers.CosineAnnealingSchedule(ctx, g, ModelDType).
        PeriodInSteps(*flagNumSteps/3).Done()
    
    categorical, continuous := inputs[0], inputs[1]
    batchSize := categorical.Shape().Dimensions[0]
    
    var allEmbeddings []*Node
    graph := categorical.Graph()

    if *flagUseCategorical {
        // Embedding of categorical values, each with its own vocabulary.
        numCategorical := categorical.Shape().Dimensions[1]
        for catIdx := 0; catIdx < numCategorical; catIdx++ {
            // Take one column at a time of the categorical values.
            split := Slice(categorical, AxisRange(), AxisRange(catIdx, catIdx+1))
            // Embed it accordingly.
            embedCtx := ctx.In(fmt.Sprintf("categorical_%d_%s", catIdx, adult.Data.VocabulariesFeatures[catIdx]))
            vocab := adult.Data.Vocabularies[catIdx]
            vocabSize := len(vocab)
            embedding := layers.Embedding(embedCtx, split, ModelDType, vocabSize, *flagEmbeddingDim)
            if !embedding.AssertDims(batchSize, *flagEmbeddingDim) {  // 2-dim tensor, with batch size as the leading dimension.
                return nil
            }
            allEmbeddings = append(allEmbeddings, embedding)
        }
    }

    if *flagUseContinuous {
        // Piecewise-linear calibration of the continuous values. Each feature has its own number of quantiles.
        numContinuous := continuous.Shape().Dimensions[1]
        for contIdx := 0; contIdx < numContinuous; contIdx++ {
            // Take one column at a time of the continuous values.
            split := Slice(continuous, AxisRange(), AxisRange(contIdx, contIdx+1))
            featureName := adult.Data.QuantilesFeatures[contIdx]
            calibrationCtx := ctx.In(fmt.Sprintf("continuous_%d_%s", contIdx, featureName))
            quantiles := adult.Data.Quantiles[contIdx]
            if err := layers.ValidateQuantilesForPWLCalibration(quantiles); err != nil {
                graph.SetError(errors.Wrapf(err, "quantile for features %q invalid", featureName))
                return nil
            }
            calibrated := layers.PieceWiseLinearCalibration(calibrationCtx, split, Const(graph, quantiles), *flagTrainableCalibration)
            if !calibrated.AssertDims(batchSize, 1) { // 2-dim tensor, with batch size as the leading dimension.
                return nil
            }
            allEmbeddings = append(allEmbeddings, calibrated)
        }
    }

    layer := Concatenate(allEmbeddings, -1)
    if !layer.AssertDims(batchSize, -1) { // 2-dim tensor, with batch size as the leading dimension.
        return nil
    }
    
    layer = layers.DenseWithBias(ctx.In(fmt.Sprintf("DenseLayer_%d", 0)), layer, *flagNumNodes)
    for ii := 1; ii < *flagNumHiddenLayers; ii++ {
        ctx := ctx.In(fmt.Sprintf("DenseLayer_%d", ii))
        // Add layer with residual connection.
        tmp := Sigmoid(layer)
        if *flagDropoutRate > 0 {
            tmp = layers.Dropout(ctx, tmp, Const(graph, shapes.CastAsDType(*flagDropoutRate, ModelDType)))
        }
        tmp = layers.DenseWithBias(ctx, tmp, *flagNumNodes)
        layer = Add(layer, tmp)  // Residual connections
    }
    layer = Sigmoid(layer)
    logits := layers.DenseWithBias(ctx.In("DenseFinal"), layer, 1)
    if !logits.AssertDims(batchSize, 1) { // 2-dim tensor, with batch size as the leading dimension.
        return nil
    }
    return []*Node{logits}
}

%% --platform=Host
adult.LoadAndPreprocessData(*flagDataDir, *flagNumQuantiles, *flagForceDownload, *flagVerbosity)    
manager := BuildManager().NumThreads(*flagNumThreads).NumReplicas(*flagNumReplicas).Platform(*flagPlatform).MustDone()

// Let's just check that we get the right shape from the model function, wihtout any real data.
graph := manager.NewGraph("test")
ctx := context.NewContext(manager)
ctx.SetParam(optimizers.LearningRateKey, *flagLearningRate)

inputs := []*Node{
    // Categorical: shaped [batch_size, num_categorical]
    graph.Parameter("categorical", shapes.Make(shapes.Int64, *flagBatchSize, len(adult.Data.VocabulariesFeatures))),
    // Continuous: shaped [batch_size, num_continuos]
    graph.Parameter("continuous", shapes.Make(shapes.Float32, *flagBatchSize, len(adult.Data.QuantilesFeatures))),
    // Weights: shaped [batch_size, 1]
    graph.Parameter("weights", shapes.Make(shapes.Float32, *flagBatchSize, 1)),    
}
logits := ModelGraph(ctx, nil, inputs)
AssertNoError(graph.Error())
AssertNoError(ctx.Error())
fmt.Printf("Logits shape for batch_size=%d: %s\n", *flagBatchSize, logits[0].Shape())

Logits shape for batch_size=128: (Float32)[128 1]


## Training Loop

We can create a training loop with only a `Manager`, a `Context` (for the model varibles) and the `ModelGraph` function.

To make it more interesting we also add the following:

* Accuracy metrics for training and testing.
* Checkpoints -- so trained model can be saved, and reloaded.
* A progress-bar that also shows training metrics.
* We dynamically plot how the loss and accuracy evolve.

First we define the corresponding flags and the `trainModel` function, and run it for very few steps to make sure
it is working.

In [6]:
import (
    "fmt"
    "io"
    "time"

    . "github.com/gomlx/gomlx/graph"

    "github.com/gomlx/gomlx/examples/adult"
    "github.com/gomlx/gomlx/examples/notebook/gonb/margaid"
    "github.com/gomlx/gomlx/ml/context"
    "github.com/gomlx/gomlx/ml/train"
    "github.com/gomlx/gomlx/types/shapes"
    "github.com/gomlx/gomlx/types/slices"
    "github.com/gomlx/gomlx/types/tensor"
    "github.com/janpfeifer/gonb/gonbui"
)

var (
    flagOptimizer       = flag.String("optimizer", "adam", "Type of optimizer to use: 'sgd' or 'adam'")
    flagLearningRate    = flag.Float64("learning_rate", 0.001, "Initial learning rate.")
    flagCheckpoint     = flag.String("checkpoint", "", "Directory save and load checkpoints from. If left empty, no checkpoints are created.")
    flagCheckpointKeep = flag.Int("checkpoint_keep", 10, "Number of checkpoints to keep, if --checkpoint is set.")
    flagNumPlotPoints = flag.Int("plot_points", 10, "Number points to plot using Chart.JS.")
)

func trainModel() {
    manager := BuildManager().NumThreads(*flagNumThreads).NumReplicas(*flagNumReplicas).Platform(*flagPlatform).MustDone()
    adult.LoadAndPreprocessData(*flagDataDir, *flagNumQuantiles, *flagForceDownload, *flagVerbosity)    
    trainingDS, trainingEvalDS, testEvalDS := BuildDatasets(manager)

    // Context holds the variables and optionally hyperparameters for the model.
    ctx := context.NewContext(manager)
    ctx.SetParam(optimizers.LearningRateKey, *flagLearningRate)

    // Metrics we are interested.
    meanAccuracyMetric := metrics.NewMeanBinaryLogitsAccuracy("Mean Accuracy", "#acc")
    movingAccuracyMetric := metrics.NewMovingAverageBinaryLogitsAccuracy("Moving Average Accuracy", "~acc", 0.01)

    // Checkpoints saving.
    var checkpoint *checkpoints.Handler
    if *flagCheckpoint != "" {
        var err error
        checkpoint, err = checkpoints.Build(ctx).Dir(*flagCheckpoint).Keep(*flagCheckpointKeep).Done()
        AssertNoError(err)
    }

    // Pick a known optimizer.
    optimizerFn, found := optimizers.KnownOptimizers[*flagOptimizer]
    if !found {
        log.Fatalf("Unknown optimizer %q, please use one of %v",
            *flagOptimizer, slices.Keys(optimizers.KnownOptimizers))
    }

    // Create a train.Trainer: this object will orchestrate running the model, feeding
    // results to the optimizer, evaluating the metrics, etc. (all happens in trainer.TrainStep)
    trainer := train.NewTrainer(manager, ctx, ModelGraph, losses.BinaryCrossentropyLogits,
        optimizerFn(),
        []metrics.Interface{movingAccuracyMetric}, // trainMetrics
        []metrics.Interface{meanAccuracyMetric})   // evalMetrics
    AssertNoError(ctx.Error())

    // Use standard training loop.
    loop := train.NewLoop(trainer)
    commandline.AttachProgressBar(loop) // Attaches a progress bar to the loop.

    // Attach a checkpoint.
    if checkpoint != nil {
        train.NTimesDuringLoop(loop, *flagNumPlotPoints, "checkpointing", 100, func(_ *train.Loop, _ []tensor.Tensor) error {
            return checkpoint.Save()
        })
    }

    // Attach a margaid plots.
    if *flagNumPlotPoints > 0 {
        margaid.New(1024, 400, testEvalDS).DynamicUpdates().Attach(loop, *flagNumPlotPoints)
    }

    // Run the given number of steps.
    _, err := loop.RunSteps(trainingDS, *flagNumSteps)
    AssertNoError(err)

    // Print a final evaluation on train and test datasets.
    fmt.Println()
    err = commandline.ReportEval(trainer, trainingEvalDS, testEvalDS)
    AssertNoError(err)
    fmt.Println()
}

// Notice command line flags are passed in the %% notebook command. We set --plot_points=0 here to disable plotting
// since this is only a quick test that our train() loop is working.
%% --platform Host --steps=500 --plot_points=0
trainModel()

Training (500 steps):  100% [[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m] (218 steps/s)[0m [loss=0.347] [~loss=0.354] [~acc=84.40%]        

Results on train:
	Mean Loss (#loss): 0.343
	Mean Accuracy (#acc): 84.46%
Results on test:
	Mean Loss (#loss): 0.341
	Mean Accuracy (#acc): 84.69%



## Final run

With everything working, we can do our final run.

> **Note** here is where someone might want to hyperparameter tune, trying out different hyperparameters.

In [7]:
%% --platform Host --steps=5000 --plot_points=50 
trainModel()

Training (5000 steps):  100% [[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m[32m=[0m] (546 steps/s)[0m [loss=0.291] [~loss=0.273] [~acc=87.49%]        



Results on train:


	Mean Loss (#loss): 0.276
	Mean Accuracy (#acc): 87.12%
Results on test:
	Mean Loss (#loss): 0.283
	Mean Accuracy (#acc): 87.05%

