![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [1]:
#Random Sampling
from AlgorithmImports import *
import random
from datetime import datetime, timedelta

def fetch_random_slices(symbol, frontier_date, ending_date, interval=Resolution.Minute, 
                        slice_length=timedelta(days=1), n_slices=5):
    """
    Picks independent slices of stock movement. Each slice is:
    random start time → start + slice_length.
    
    Parameters
    ----------
    symbol : str
        Ticker symbol (e.g., "AAPL").
    frontier_date : datetime
        Earliest possible start date.
    ending_date : datetime
        Latest possible end date.
    interval : Resolution
        Data granularity (Minute, Hour, Daily).
    slice_length : timedelta
        How long each slice should be (e.g., 1 day, 3 hours).
    n_slices : int
        Number of random slices to return.
    
    Returns
    -------
    list of dicts { "start_date", "start_price", "end_date", "end_price" }
    """
    
    # Load all history once
    history = qb.History([symbol], frontier_date, ending_date, interval)
    if history.empty:
        return []
    
    data = history["close"].reset_index()
    
    results = []
    for _ in range(n_slices):
        # Pick random start time ensuring we can go slice_length forward
        latest_start = ending_date - slice_length
        if frontier_date >= latest_start:
            break  # Not enough room for slices
        
        rand_start = frontier_date + timedelta(
            seconds=random.randint(0, int((latest_start - frontier_date).total_seconds()))
        )
        rand_end = rand_start + slice_length
        
        # Find nearest available prices
        start_row = data.loc[data["time"] >= rand_start].head(1)
        end_row   = data.loc[data["time"] >= rand_end].head(1)
        
        if not start_row.empty and not end_row.empty:
            results.append({
                "start_date": start_row["time"].iloc[0],
                "start_price": float(start_row["close"].iloc[0]),
                "end_date": end_row["time"].iloc[0],
                "end_price": float(end_row["close"].iloc[0])
            })
    
    return results

In [None]:
from AlgorithmImports import *
from datetime import datetime, timedelta

def fetch_consecutive_slices(symbol, frontier_date, ending_date, interval=Resolution.Minute, 
                             slice_length=timedelta(days=1), max_slices=None):
    """
    Fetches consecutive, non-overlapping slices of stock movement.
    Each slice is frontier_date → frontier_date+slice_length, then continues until ending_date.
    
    Parameters
    ----------
    symbol : str
        Ticker symbol (e.g., "AAPL").
    frontier_date : datetime
        Earliest possible start date.
    ending_date : datetime
        Latest possible end date.
    interval : Resolution
        Data granularity (Minute, Hour, Daily).
    slice_length : timedelta
        Length of each slice (e.g., 1 day, 3 hours).
    max_slices : int, optional
        Maximum number of slices to return. Default = all possible slices.
    
    Returns
    -------
    list of dicts { "start_date", "start_price", "end_date", "end_price" }
    """
    
    # Load history once
    history = qb.History([symbol], frontier_date, ending_date, interval)
    if history.empty:
        return []
    
    data = history["close"].reset_index()
    
    results = []
    current_start = frontier_date
    n_slices = 0
    
    while current_start + slice_length <= ending_date:
        current_end = current_start + slice_length
        
        # Find nearest available prices
        start_row = data.loc[data["time"] >= current_start].head(1)
        end_row   = data.loc[data["time"] >= current_end].head(1)
        
        if not start_row.empty and not end_row.empty:
            results.append({
                "start_date": start_row["time"].iloc[0],
                "start_price": float(start_row["close"].iloc[0]),
                "end_date": end_row["time"].iloc[0],
                "end_price": float(end_row["close"].iloc[0])
            })
            n_slices += 1
        
        # Stop if we've hit the slice cap
        if max_slices is not None and n_slices >= max_slices:
            break
        
        # Advance to next slice
        current_start = current_end
    
    return results

from AlgorithmImports import *
from datetime import datetime, timedelta

def fetch_consecutive_slices(symbol, frontier_date, ending_date, interval=Resolution.Minute, 
                             slice_length=timedelta(days=1), max_slices=None):
    """
    Fetches consecutive, non-overlapping slices of stock movement.
    Each slice is frontier_date → frontier_date+slice_length, then continues until ending_date.
    
    Parameters
    ----------
    symbol : str
        Ticker symbol (e.g., "AAPL").
    frontier_date : datetime
        Earliest possible start date.
    ending_date : datetime
        Latest possible end date.
    interval : Resolution
        Data granularity (Minute, Hour, Daily).
    slice_length : timedelta
        Length of each slice (e.g., 1 day, 3 hours).
    max_slices : int, optional
        Maximum number of slices to return. Default = all possible slices.
    
    Returns
    -------
    list of dicts { "start_date", "start_price", "end_date", "end_price" }
    """
    
    # Load history once
    history = qb.History([symbol], frontier_date, ending_date, interval)
    if history.empty:
        return []
    
    data = history["close"].reset_index()
    
    results = []
    current_start = frontier_date
    n_slices = 0
    
    while current_start + slice_length <= ending_date:
        current_end = current_start + slice_length
        
        # Find nearest available prices
        start_row = data.loc[data["time"] >= current_start].head(1)
        end_row   = data.loc[data["time"] >= current_end].head(1)
        
        if not start_row.empty and not end_row.empty:
            results.append({
                "start_date": start_row["time"].iloc[0],
                "start_price": float(start_row["close"].iloc[0]),
                "end_date": end_row["time"].iloc[0],
                "end_price": float(end_row["close"].iloc[0])
            })
            n_slices += 1
        
        # Stop if we've hit the slice cap
        if max_slices is not None and n_slices >= max_slices:
            break
        
        # Advance to next slice
        current_start = current_end
    
    return results

In [None]:
import pandas as pd
import numpy as np

def slices_to_extended_dataframe(slices, time_format="%Y-%m-%d|%H:%M"):
    """
    Converts slice outputs into an extended DataFrame with:
    - linked date ranges
    - raw price change
    - % change
    - log returns (%)
    - cumulative % return (compounded)
    - cumulative log return (%)
    - cumulative return from logs (%)
    
    Parameters
    ----------
    slices : list of dicts
        Output from fetch_random_slices or fetch_consecutive_slices.
    time_format : str
        How to format the timestamps in the linked range.
    
    Returns
    -------
    pd.DataFrame
        Columns: 
        [date_range, start_date, end_date, raw_change, pct_change, 
         log_return, cum_return, cum_log_return, cum_return_from_logs]
    """
    if not slices:
        return pd.DataFrame(columns=[
            "date_range", "start_date", "end_date", 
            "raw_change", "pct_change", "log_return", 
            "cum_return", "cum_log_return", "cum_return_from_logs"
        ])
    
    rows = []
    for s in slices:
        start_date = s["start_date"]
        end_date   = s["end_date"]
        start_price = s["start_price"]
        end_price   = s["end_price"]
        
        # Format date range string
        start_str = start_date.strftime(time_format)
        end_str   = end_date.strftime(time_format)
        date_range = f"{start_str} : {end_str}"
        
        # Compute changes
        raw_change = end_price - start_price
        pct_change = (raw_change / start_price) * 100
        log_return = np.log(end_price / start_price) * 100  # now %
        
        rows.append({
            "date_range": date_range,
            "start_date": start_date,
            "end_date": end_date,
            "raw_change": raw_change,
            "pct_change": pct_change,
            "log_return": log_return
        })
    
    df = pd.DataFrame(rows)
    
    # Cumulative compounded % return
    df["cum_return"] = (1 + df["pct_change"]/100).cumprod() - 1
    df["cum_return"] = df["cum_return"] * 100  # %
    
    # Cumulative additive log return (in %)
    df["cum_log_return"] = df["log_return"].cumsum()
    
    # Convert cumulative log return into implied compounded return (%)
    df["cum_return_from_logs"] = (np.exp(df["cum_log_return"]/100) - 1) * 100
    
    return df

In [None]:
import pandas as pd

def slices_to_dataframe(slices):
    """
    Converts slice output into a DataFrame with raw % change.
    
    Parameters
    ----------
    slices : list of dicts
        Output from fetch_random_slices or fetch_consecutive_slices.
    
    Returns
    -------
    pd.DataFrame
        Columns: [start_date, end_date, start_price, end_price, pct_change]
    """
    if not slices:
        return pd.DataFrame(columns=["start_date", "end_date", "start_price", "end_price", "pct_change"])
    
    df = pd.DataFrame(slices)
    df["pct_change"] = (df["end_price"] - df["start_price"]) / df["start_price"] * 100
    return df

In [None]:
# Example usage
qb = QuantBook()
symbol = qb.AddEquity("AAPL").Symbol

frontier_date = datetime(2015, 1, 1)
ending_date   = datetime(2025, 9, 15)

consecutive_slices = fetch_consecutive_slices(
    symbol, 
    frontier_date, 
    ending_date,
    interval=Resolution.Minute,
    slice_length=timedelta(days=3),
    max_slices = 45
)

consecutive_slices_df = slices_to_extended_dataframe(consecutive_slices)
print(consecutive_slices_df.head())