Skip to content

Commit

Permalink
Added cyclical learning rate schedular
Browse files Browse the repository at this point in the history
  • Loading branch information
adnaniazi committed Jul 15, 2024
1 parent 11564fb commit ed68a21
Show file tree
Hide file tree
Showing 7 changed files with 474 additions and 11 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Added Cyclical learning rate scheduler

### Removed
- Removed reduce learning rate on plateau

## [0.2.1] - 2024-07-11
### Added
Expand Down Expand Up @@ -69,4 +74,3 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
[0.1.2]: https://github.com/adnaniazi/capfinder/compare/0.1.1...0.1.2
[0.1.1]: https://pypi.org/manage/project/capfinder/release/0.1.1/
[0.1.0]: https://pypi.org/manage/project/capfinder/release/0.1.0/

2 changes: 1 addition & 1 deletion src/capfinder/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def extract_cap_signal(
- 4 represents TMG Cap \n
- 5 represents NAD Cap \n
- 6 represents FAD Cap \n
- -99 represents and unknown cap(s). \n
- -99 represents an unknown cap(s). \n
""",
),
] = -99,
Expand Down
215 changes: 215 additions & 0 deletions src/capfinder/cyclic_learing_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from typing import Any, Callable, Dict, Optional, Union

import numpy as np
from comet_ml import Experiment
from numpy.typing import NDArray

from capfinder.ml_libs import Callback, keras


class CometLRLogger(Callback):
"""
A callback to log the learning rate to Comet.ml during training.
This callback logs the learning rate at the beginning of each epoch
and at the end of each batch to a Comet.ml experiment.
Attributes:
experiment (Experiment): The Comet.ml experiment to log to.
"""

def __init__(self, experiment: Experiment) -> None:
"""
Initialize the CometLRLogger.
Args:
experiment (Experiment): The Comet.ml experiment to log to.
"""
super().__init__()
self.experiment: Experiment = experiment

def on_epoch_begin(self, epoch: int, logs: Optional[Dict[str, Any]] = None) -> None:
"""
Log the learning rate at the beginning of each epoch.
Args:
epoch (int): The current epoch number.
logs (Optional[Dict[str, Any]]): The logs dictionary.
"""
lr: Union[float, np.ndarray] = self.model.optimizer.learning_rate
if hasattr(lr, "numpy"):
lr = lr.numpy()
self.experiment.log_metric("learning_rate", lr, step=epoch)

def on_batch_end(self, batch: int, logs: Optional[Dict[str, Any]] = None) -> None:
"""
Log the learning rate at the end of each batch.
Args:
batch (int): The current batch number.
logs (Optional[Dict[str, Any]]): The logs dictionary.
"""
lr: Union[float, np.ndarray] = self.model.optimizer.learning_rate
if hasattr(lr, "numpy"):
lr = lr.numpy()
self.experiment.log_metric(
"learning_rate", lr, step=self.model.optimizer.iterations.numpy()
)


class CustomProgressCallback(keras.callbacks.Callback):
"""
A custom callback to print the learning rate at the end of each epoch.
This callback prints the current learning rate after Keras' built-in
progress bar for each epoch.
"""

def __init__(self) -> None:
"""Initialize the CustomProgressCallback."""
super().__init__()

def on_epoch_end(self, epoch: int, logs: Optional[Dict[str, Any]] = None) -> None:
"""
Print the learning rate at the end of each epoch.
Args:
epoch (int): The current epoch number.
logs (Optional[Dict[str, Any]]): The logs dictionary.
"""
lr: Union[float, np.ndarray] = self.model.optimizer.learning_rate
if hasattr(lr, "numpy"):
lr = lr.numpy()
print(f"\nLearning rate: {lr:.6f}")


class CyclicLR(Callback):
"""
This callback implements a cyclical learning rate policy (CLR).
The method cycles the learning rate between two boundaries with
some constant frequency.
# Arguments
base_lr: initial learning rate which is the
lower boundary in the cycle.
max_lr: upper boundary in the cycle. Functionally,
it defines the cycle amplitude (max_lr - base_lr).
The lr at any cycle is the sum of base_lr
and some scaling of the amplitude; therefore
max_lr may not actually be reached depending on
scaling function.
step_size: number of training iterations per
half cycle. Authors suggest setting step_size
2-8 x training iterations in epoch.
mode: one of {triangular, triangular2, exp_range}.
Default 'triangular'.
Values correspond to policies detailed above.
If scale_fn is not None, this argument is ignored.
gamma: constant in 'exp_range' scaling function:
gamma**(cycle iterations)
scale_fn: Custom scaling policy defined by a single
argument lambda function, where
0 <= scale_fn(x) <= 1 for all x >= 0.
mode paramater is ignored
scale_mode: {'cycle', 'iterations'}.
Defines whether scale_fn is evaluated on
cycle number or cycle iterations (training
iterations since start of cycle). Default is 'cycle'.
"""

def __init__(
self,
base_lr: float = 0.001,
max_lr: float = 0.006,
step_size: float = 2000.0,
mode: str = "triangular",
gamma: float = 1.0,
scale_fn: Optional[Callable[[float], float]] = None,
scale_mode: str = "cycle",
) -> None:
super().__init__()

self.base_lr: float = base_lr
self.max_lr: float = max_lr
self.step_size: float = step_size
self.mode: str = mode
self.gamma: float = gamma

if scale_fn is None:
if self.mode == "triangular":
self.scale_fn = lambda x: 1.0
self.scale_mode = "cycle"
elif self.mode == "triangular2":
self.scale_fn = lambda x: 1 / (2.0 ** (x - 1))
self.scale_mode = "cycle"
elif self.mode == "exp_range":
self.scale_fn = lambda x: gamma**x
self.scale_mode = "iterations"
else:
self.scale_fn = scale_fn
self.scale_mode = scale_mode

self.clr_iterations: float = 0.0
self.trn_iterations: float = 0.0
self.history: Dict[str, list] = {}

self._reset()

def _reset(
self,
new_base_lr: Optional[float] = None,
new_max_lr: Optional[float] = None,
new_step_size: Optional[float] = None,
) -> None:
"""Resets cycle iterations.
Optional boundary/step size adjustment.
"""
if new_base_lr is not None:
self.base_lr = new_base_lr
if new_max_lr is not None:
self.max_lr = new_max_lr
if new_step_size is not None:
self.step_size = new_step_size
self.clr_iterations = 0.0

def clr(self) -> Union[float, NDArray[np.float64]]:
cycle: float = np.floor(1 + self.clr_iterations / (2 * self.step_size))
x: float = np.abs(self.clr_iterations / self.step_size - 2 * cycle + 1)
clr_value: float = (
self.base_lr
+ (self.max_lr - self.base_lr)
* np.maximum(0, (1 - x))
* self.scale_fn(cycle)
if self.scale_mode == "cycle"
else self.scale_fn(self.clr_iterations)
)
return (
float(clr_value)
if isinstance(clr_value, float)
else np.array(clr_value, dtype=np.float64)
)

def on_train_begin(self, logs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the learning rate to the base learning rate."""
logs = logs or {}

if self.clr_iterations == 0:
self.model.optimizer.learning_rate.assign(self.base_lr)
else:
self.model.optimizer.learning_rate.assign(self.clr())

def on_batch_end(self, batch: int, logs: Optional[Dict[str, Any]] = None) -> None:
"""Record previous batch statistics and update the learning rate."""
logs = logs or {}
self.trn_iterations += 1
self.clr_iterations += 1

self.history.setdefault("lr", []).append(
self.model.optimizer.learning_rate.numpy()
)
self.history.setdefault("iterations", []).append(self.trn_iterations)

for k, v in logs.items():
self.history.setdefault(k, []).append(v)

self.model.optimizer.learning_rate.assign(self.clr())
100 changes: 100 additions & 0 deletions src/capfinder/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# from capfinder.train_etl import (
# list_csv_files,
# load_csv,
# concatenate_dataframes,
# make_x_y_read_id_sets,
# )
# from capfinder.inference_data_loader import load_inference_dataset
# from typing import Any, List, Optional, Tuple, Type, Union
# import os
# import numpy as np
# import polars as pl
# from loguru import logger
# from polars import DataFrame
# from prefect import flow, task
# from prefect.engine import TaskRunContext
# from prefect.tasks import task_input_hash
# from typing_extensions import Literal

# from capfinder.utils import get_dtype

# # Define custom types
# TrainData = Tuple[
# np.ndarray, # x_train
# np.ndarray, # y_train
# pl.Series, # read_id_train
# ]
# DtypeLiteral = Literal["float16", "float32", "float64"]
# DtypeNumpy = Union[np.float16, np.float32, np.float64]


# def save_to_file(x, y, read_id, output_dir):
# if not os.path.exists(output_dir):
# os.makedirs(output_dir, exist_ok=True)

# dfp_x = pl.from_numpy(data=x.reshape(x.shape[:2]), orient="row")
# dfp_y = pl.from_numpy(data=y.reshape(y.shape[:2]), orient="row")
# dfp_id = pl.DataFrame(output_dir)

# x_path = os.path.join(output_dir, "x.csv")
# dfp_x.write_csv(file=x_path)
# y_path = os.path.join(output_dir, "y.csv")
# dfp_y.write_csv(file=y_path)
# id_path = os.path.join(output_dir, "read_id.csv")
# dfp_id.write_csv(file=id_path)


# # Top level task because Prefect does not allow flow level
# # results to be cached to memory. We create a flow as a task
# # to save the result to memory.
# # https://github.com/PrefectHQ/prefect/issues/7288
# @flow(name="prepare-inference-data")
# def prepare_inference_data(
# data_dir: str, output_dir: str, target_length: int, dtype_n: Type[np.floating]
# ) -> TrainData:
# """Pipeline task to load and concatenate CSV files.

# Parameters
# ----------
# data_dir: str
# Path to the directory containing raw CSV files.

# output_dir: str
# Path to the directory where the processed data will be saved.

# target_length: int
# The desired length of each time series.

# dtype_n: Type[np.floating]
# The data type to use for the features.
# """

# csv_files = list_csv_files(data_dir)
# loaded_dataframes = [load_csv(file_path) for file_path in csv_files]
# concatenated_df = concatenate_dataframes(loaded_dataframes)
# x, y, read_id = make_x_y_read_id_sets(concatenated_df, target_length, dtype_n)
# save_to_file(x, y, read_id, output_dir)


# def main(
# cap_signal_dir: str,
# output_dir: str,
# dtype: DtypeLiteral,
# target_length: int = 500,
# batch_size: int = 1024,
# model_version: str = "latest",
# ):

# prepare_inference_data(
# data_dir=cap_signal_dir,
# output_dir=output_dir,
# target_length=target_length,
# dtype=dtype,
# )
# load_inference_dataset(
# x_path=os.path.join(output_dir, "x.csv"),
# y_path=os.path.join(output_dir, "y.csv"),
# read_id_path=os.path.join(output_dir, "read_id.csv"),
# batch_size=batch_size,
# num_timesteps=target_length,
# )
Loading

0 comments on commit ed68a21

Please sign in to comment.