### <h3 style="color:yellow;">Authentication for GBQ</h3>

Defines `config()` to return the service account key path for a given user (`reese` or `ben`); raises an error for others. Used for BigQuery authentication.

In [None]:
gbq_proj_id = 'stock-chipper-87578' 

def config(username=None):
    if username == 'reese':
        file_dir = "C:/Users/rsmcd/OneDrive/Desktop/Trade Review/stock-chipper-research/"
        credential_file = 'stock-chipper-87578-ec8b427fca6a.json'
    elif username == 'ben':
        file_dir = "C:/Users/benwo/Documents/repos/stock-chipper-app/creds/"
        credential_file = "stock-chipper-87578-b17ad3f7e6e1.json"
    else:
        raise ValueError(f"Unrecognized or missing username: {username}")
    
    return file_dir + credential_file

Selects user credentials based on context: uses CLI argument if run as `__main__`, defaults to `"reese"` otherwise. Falls back to `"reese"` on error. Result (`private_key`) is used for BigQuery authentication.

In [None]:
# Use default when in Jupyter or if __main__ but no args
try:
    if __name__ == "__main__":
        if len(sys.argv) <= 1:
            raise ValueError("No username provided via CLI args")
        private_key = config(sys.argv[1])
    else:
        private_key = config("reese")  # Default for notebook
except Exception as e:
    print("Falling back to default username (reese)")
    private_key = config("reese")

Falling back to default username (reese)


### <h3 style="color:yellow;">Getting data from GBQ</h3>

Configuration and Directory Setup

Defines key flags controlling the behavior of the script, such as whether to rebuild results, use local or remote data, and how many tickers to analyze. It also sets up directory paths for saving data, and ensures the necessary folders exist before proceeding.


In [None]:
# Config Flags
get_ticker_list_from_gbq = True
use_local_data_gbq = False
build_results_gbq = True
fresh_start_gbq = True
num_to_test_gbq = 1 #schwab_ticker_num - 1  # If <0, use all
output_stub_gbq = "20240101_"

In [None]:
# Path setup
DATA_DIR = Path("strategies/swing-trading/data")
WALK_FORWARD_DIR_SCHWAB = DATA_DIR / "walk-forward-schwab"                              ### Consider adding later (or sooner and do the data dirs at the same time) !!!
WALK_FORWARD_DIR_GBQ = DATA_DIR / "walk-forward-gbq"
MINUTE_TICKERS_FILE = DATA_DIR / "tickers_random_index_list.txt"

# Delete the entire DATA_DIR folder and all its contents
if DATA_DIR.exists() and DATA_DIR.is_dir():
    shutil.rmtree(DATA_DIR)
    print(f"Deleted: {DATA_DIR}")
else:
    print(f"Directory does not exist: {DATA_DIR}")

DATA_DIR.mkdir(parents=True, exist_ok=True)
WALK_FORWARD_DIR_SCHWAB.mkdir(parents=True, exist_ok=True)
WALK_FORWARD_DIR_GBQ.mkdir(parents=True, exist_ok=True)

#MARKER# Make data storage directories here and move this block upstream

Deleted: strategies\swing-trading\data


Load Ticker List

Retrieves the list of tickers to analyze. If `get_ticker_list_from_gbq` is `True`, it queries BigQuery for tickers used in a recent backtesting window and adds a randomized index for sampling. Otherwise, it loads the list from a previously saved local file.


In [None]:
# Pull tickers from DB or local file
if get_ticker_list_from_gbq:
    # You'll need to implement this function to return a DataFrame
    sql = '''
        SELECT DISTINCT ticker 
        FROM main.tf_stocks_for_backtesting("2023-08-01","2024-01-01")
    '''
    all_tickers_gbq = run_sql_query(sql, project_id=gbq_proj_id, credentials_path=private_key) # placeholder
    all_tickers_gbq["random_index"] = random.sample(range(len(all_tickers_gbq)), len(all_tickers_gbq))
    all_tickers_gbq.to_csv(MINUTE_TICKERS_FILE, sep="\t", index=False)
else:
    all_tickers_gbq = pd.read_csv(MINUTE_TICKERS_FILE, sep="\t")

In [None]:
display(all_tickers_gbq)

Unnamed: 0,ticker,random_index
0,GLBZ,2501
1,TIPT,464
2,IGIC,1909
3,ESCA,1603
4,QCRH,1224
...,...,...
4482,TBI,2685
4483,DNB,2145
4484,MP,1627
4485,DQ,3240


Filter Tickers to Test

Determines which tickers to include in the analysis. If `num_to_test` is greater than 0, it selects only those tickers with a `random_index` below the threshold. Otherwise, it includes all available tickers.

In [None]:
# Filter tickers
if num_to_test_gbq > 0:
    tickers_to_test_gbq = all_tickers_gbq[all_tickers_gbq["random_index"] <= num_to_test_gbq]["ticker"].tolist() + big_ticker_list
else:
    tickers_to_test_gbq = all_tickers_gbq["ticker"].tolist()

In [None]:
display(tickers_to_test_gbq)

['BJRI', 'LSEA', 'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA']

In [None]:
def get_most_recent_trading_day(current_date=None):
    """
    Returns the most recent valid NYSE trading day before (or on) the given date.

    This function checks the NYSE trading calendar for the last 30 days leading up to 
    `current_date` (or today, if not specified), and iterates backward to find the most 
    recent trading day. It accounts for weekends and market holidays.

    Args:
        current_date (datetime.date, optional): The reference date. Defaults to today.

    Returns:
        datetime.date: The most recent valid trading day prior to `current_date`.

    Raises:
        ValueError: If no trading day is found within the past 20 calendar days.
    """
    if current_date is None:                                                                 # Use today's date if none is provided
        current_date = date.today()                                                          # Set current_date to today

    nyse = mcal.get_calendar('NYSE')                                                         # Load NYSE trading calendar
    schedule = nyse.valid_days(start_date=(current_date - timedelta(days=30)).isoformat(),   # Get list of valid trading days
                               end_date=current_date.isoformat())                             # within the past 30 days
    valid_days = [d.date() for d in schedule]                                                 # Convert schedule to list of date objects

    for i in range(1, 21):                                                                    # Check the last 20 calendar days
        candidate = current_date - timedelta(days=i)                                          # Go back one day at a time
        if candidate in valid_days:                                                           # Return if the day is a valid trading day
            return candidate                                                                  # Return most recent valid trading day
    raise ValueError("Could not find recent trading day.")  

Set up walkforward analysis parameters for GBQ data, defining the number of periods, analysis/evaluation window lengths, and overall date range. It also handles fresh vs. resumed runs by optionally deleting existing result files and filtering tickers accordingly.


In [None]:
# fresh_start logic
if build_results_gbq:
    overall_finish_gbq = datetime.strptime('2024-03-15', '%Y-%m-%d') #get_most_recent_trading_day()
    print("GBQ Overall finish date:", overall_finish_gbq)

    num_periods_gbq = 3
    print("GBQ Number of periods:", num_periods_gbq)

    analysis_period_gbq = timedelta(days=6*7)
    print("GBQ Analysis period:", analysis_period_gbq)

    evaluation_period_gbq = timedelta(days=4*7)
    print("GBQ Evaluation period:", evaluation_period_gbq)

    end_of_last_analysis_period_gbq = overall_finish_gbq - evaluation_period_gbq
    print("GBQ End of last analysis period:", end_of_last_analysis_period_gbq)

    overall_start_gbq = end_of_last_analysis_period_gbq - num_periods_gbq * analysis_period_gbq
    print("GBQ Overall start date:", overall_start_gbq)
    
    expected_min_analysis_days_gbq = 6 * 4
    print("GBQ Expected min analysis days:", expected_min_analysis_days_gbq)

    if fresh_start_gbq:                                                                 # If a full reset is requested...
        for file in WALK_FORWARD_DIR_GBQ.glob("*"):                                    # Iterate over all files in the walk-forward directory
            file.unlink()                                                          # Delete each file (clears previous results)
    else:                                                                          # Otherwise, resume from where you left off
        existing_files = [f.stem.replace(output_stub_gbq, "")                          # Get list of tickers that already have result files
                        for f in WALK_FORWARD_DIR_GBQ.glob("*.txt")]                   # Only look for .txt files matching previous outputs
        tickers_to_test_gbq = [t for t in tickers_to_test_gbq if t not in existing_files]  # Remove tickers that already have results from the test list

#MARKER# Transfer this to Schwab like logic

GBQ Overall finish date: 2024-03-15 00:00:00
GBQ Number of periods: 3
GBQ Analysis period: 42 days, 0:00:00
GBQ Evaluation period: 28 days, 0:00:00
GBQ End of last analysis period: 2024-02-16 00:00:00
GBQ Overall start date: 2023-10-13 00:00:00
GBQ Expected min analysis days: 24


Fetch daily data for each ticker from GBQ.

In [None]:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

candles_dict_gbq = {}
num_tickers = len(tickers_to_test_gbq)

for i, ticker in enumerate(tickers_to_test_gbq, start=1):
    print(f"({i}/{num_tickers}) Fetching data for {ticker}")
    candles_dict_gbq[ticker] = get_ticker_data(
        ticker,
        overall_start_gbq,
        local=use_local_data_gbq,
        credentials_path=private_key,
        project_id=gbq_proj_id
    )

2025-09-09 00:45:56
(1/7) Fetching data for BJRI
(2/7) Fetching data for LSEA
(3/7) Fetching data for AAPL
(4/7) Fetching data for MSFT
(5/7) Fetching data for GOOGL
(6/7) Fetching data for AMZN
(7/7) Fetching data for TSLA


In [None]:
display(candles_dict_gbq)

{'BJRI':     ticker   date_time    open    high     low   close       volume
 0     BJRI  2023-10-16 22.5200 22.8300 22.2900 22.4600 313,796.0000
 1     BJRI  2023-10-17 22.2600 23.9600 22.2600 23.9350 441,152.0000
 2     BJRI  2023-10-18 23.7500 24.1500 23.5975 24.0800 294,400.0000
 3     BJRI  2023-10-19 24.1100 24.7500 23.7100 24.0400 400,820.0000
 4     BJRI  2023-10-20 24.3000 24.7450 23.8500 24.0400 266,268.0000
 ..     ...         ...     ...     ...     ...     ...          ...
 100   BJRI  2024-03-11 36.9200 36.9200 35.8700 35.9800 197,697.0000
 101   BJRI  2024-03-12 36.0500 36.8100 35.8200 36.4100  98,583.0000
 102   BJRI  2024-03-13 36.2200 37.5050 36.2200 36.3620 274,622.0000
 103   BJRI  2024-03-14 36.0600 36.2450 34.6500 35.0800 203,878.0000
 104   BJRI  2024-03-15 34.8200 35.3400 34.3100 35.0800 189,804.0000
 
 [105 rows x 7 columns],
 'LSEA':     ticker   date_time    open    high     low   close       volume
 0     LSEA  2023-10-16  7.5100  7.5800  7.2950  7.3300 149,

### <h3 style="color:yellow;">Running analysis on GBQ data</h3>

In [None]:
def prepare_analysis_structure_gbq(ticker, end_of_last_analysis_period, analysis_period, evaluation_period, num_periods):
    """
    Constructs a DataFrame defining rolling analysis and evaluation periods for a given ticker.

    Args:
        ticker (str): Ticker symbol to assign to the generated rows.
        end_of_last_analysis_period (datetime): The most recent analysis period end date.
        analysis_period (timedelta): Duration of each analysis period.
        evaluation_period (timedelta): Duration of each evaluation period following the analysis.
        num_periods (int): Number of rolling periods to generate.

    Returns:
        pd.DataFrame: A DataFrame with `num_periods` rows and the columns at the end of the function:
    """
    analysis_period_starts = [                                                                 # Generate list of period start dates
        end_of_last_analysis_period - analysis_period * i                                      # Each start is offset by i * analysis_period
        for i in range(1, num_periods + 1)                                                      # For the last `num_periods` analysis windows
    ]

    df = pd.DataFrame({                                                                         # Create base DataFrame for the analysis structure
        "ticker": ticker,                                                                       # Set the ticker label
        "analysis_period_start": pd.to_datetime(analysis_period_starts),                        # Assign start dates for each analysis period
    })

    df["analysis_period_end"] = df["analysis_period_start"] + analysis_period                   # Calculate the end of each analysis period
    df["analysis_buy"] = 0.0                                                                    # Initialize buy threshold column
    df["analysis_sell"] = 0.0                                                                   # Initialize sell threshold column
    df["analysis_return"] = 0.0                                                                 # Initialize return column for analysis period
    df["analysis_trades"] = 0                                                                   # Initialize number of trades in analysis period
    df["analysis_eval_metric"] = 0.0                                                            # Initialize penalized evaluation metric column
    df["evaluation_period_start"] = df["analysis_period_end"] + timedelta(days=1)              # Evaluation starts the day after analysis ends
    df["evaluation_period_end"] = df["evaluation_period_start"] + evaluation_period             # Evaluation end is offset from its start
    df["evaluation_return"] = 0.0                                                               # Initialize evaluation return column
    df["evaluation_trades"] = 0                                                                 # Initialize evaluation trade count
    df["evaluation_data_good"] = False                                                          # Flag whether evaluation data exists

    return df                                                                                   # Return the prepared DataFrame

Builds the base results structure for each GBQ ticker by generating rolling analysis and evaluation windows using `prepare_analysis_structure()`.

In [None]:
# 1. Build the base results structure
results_structure_dict_gbq = {}
num_tickers_gbq = len(tickers_to_test_gbq)

# Prepare the analysis structure for each ticker
for i, ticker in enumerate(tickers_to_test_gbq, start=1):
    print(f"({i}/{num_tickers_gbq}) Preparing analysis structure for {ticker}")
    results_structure_dict_gbq[ticker] = prepare_analysis_structure_gbq(
        ticker,
        end_of_last_analysis_period_gbq,
        analysis_period_gbq,
        evaluation_period_gbq,
        num_periods_gbq
    )

In [None]:
display(results_structure_dict_gbq)

In [None]:
num_tickers_schwab = len(tickers_to_test_schwab)
for i, ticker in enumerate(tickers_to_test_schwab, start=1):
    print(f"({i}/{num_tickers_schwab}) Preparing analysis structure for {ticker}")

Runs the walk-forward optimization and evaluation loop for each GBQ ticker and stores the updated results in `final_results_dict_gbq`.

In [None]:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
final_results_dict_gbq = {}
num_tickers_gbq = len(tickers_to_test_gbq)

# Run the analysis and evaluation loop for each ticker
for i, ticker in enumerate(tickers_to_test_gbq, start=1):
    print(f"({i}/{num_tickers_gbq}) Running analysis loop for {ticker}")
    final_results_dict_gbq[ticker] = run_analysis_loop(
        results_structure_dict_gbq[ticker],
        candles_dict_gbq[ticker],
        expected_min_analysis_days_gbq
    )

Taking a look at GBQ analysis results.

In [None]:
# Convert dict items to a list to enable index-based access
final_results_list = list(final_results_dict_gbq.items())

# Example: display the DataFrame at index 0
ticker, df = final_results_list[0]
print(f"Ticker: {ticker}")
display(df)
display(final_results_dict_gbq)

Save GBQ results to disk and print performance.

In [None]:
for i, ticker in enumerate(tickers_to_test_gbq, start=1):
    output_file = f"{output_stub_gbq}{ticker}.txt"
    output_path = WALK_FORWARD_DIR_GBQ / output_file
    final_results_dict_gbq[ticker].to_csv(output_path, sep="\t", index=False)

    print(f"({i}/{num_tickers_gbq}) Completed {ticker}")
    recent_eval = evaluate_recent_performance(final_results_dict_gbq[ticker])

In [None]:
if num_periods_gbq == 1 and analysis_ratio_gbq == 1:
    final_results_dict_gbq = add_analysis_metrics(final_results_dict_gbq, candles_dict_gbq)
    display(final_results_dict_gbq)

if num_periods_gbq == 1 and analysis_ratio_gbq == 1:
    final_results_df_gbq = pd.concat(final_results_dict_gbq.values(), ignore_index=True)
    print("Concatenated results for single period analysis.")
    final_results_df_gbq.to_csv(WALK_FORWARD_DIR_GBQ / "final_results_df_gbq.csv", sep="\t", index=False)
    display(final_results_df_gbq)
else: print("Multiple periods detected, skipping concatenation.")

MARKER# CONTINUE WORKING ON THIS

### <h3 style="color:yellow;">Monolithic Schwab Functions</h3>

Old monolithic, complex `add_analysis_metrics`.

In [None]:
# def add_analysis_metrics(results_dict, price_data_dict):
#     """
#     Adds analytics columns to each DataFrame in results_dict using the associated OHLCV data.

#     Parameters:
#     - results_dict (dict): Maps tickers to DataFrames of walk-forward analysis results.
#     - price_data_dict (dict): Maps tickers to their full OHLCV DataFrames with 'date_time', 'high', 'low', 'close'.

#     Returns:
#     - dict: Updated results_dict with new columns added.
#     """
#     updated_results = {}

#     for ticker, df in results_dict.items():
#         prices = price_data_dict[ticker].copy()
#         prices["date_time"] = pd.to_datetime(prices["date_time"])
#         last_close = prices["close"].iloc[-1]

#         df = df.copy()
#         df["current_price_below_lb"] = df["analysis_buy"].apply(
#             lambda lb: last_close < lb if pd.notnull(lb) else np.nan
#         )
#         df["percent_below_lb"] = df["analysis_buy"].apply(
#             lambda lb: (lb - last_close) / lb if pd.notnull(lb) else np.nan
#         )
#         df["current_price_below_ub"] = df["analysis_sell"].apply(
#             lambda ub: last_close < ub if pd.notnull(ub) else np.nan
#         )
#         df["percent_below_ub"] = df["analysis_sell"].apply(
#             lambda ub: (ub - last_close) / ub if pd.notnull(ub) else np.nan
#         )
#         df["current_price_between_bounds"] = df.apply(
#             lambda row: (
#                 row["analysis_buy"] < last_close < row["analysis_sell"]
#                 if pd.notnull(row["analysis_buy"]) and pd.notnull(row["analysis_sell"])
#                 else np.nan
#             ),
#             axis=1
#         )

#         num_days_lb_list = []
#         num_days_ub_list = []
#         trend_slope_list = []
#         norm_trend_slope_list = []

#         for _, row in df.iterrows():
#             lb = row["analysis_buy"]
#             ub = row["analysis_sell"]
#             start_date = pd.to_datetime(row["analysis_period_start"])
#             end_date = pd.to_datetime(row["analysis_period_end"])

#             analysis_period = prices[
#                 (prices["date_time"] >= start_date) &
#                 (prices["date_time"] <= end_date)
#             ].copy()

#             analysis_period = analysis_period.sort_values("date_time", ascending=False)

#             # Days below lb
#             if pd.notnull(lb):
#                 count_lb = 0
#                 for _, candle in analysis_period.iterrows():
#                     if candle["high"] < lb and candle["low"] < lb:
#                         count_lb += 1
#                     else:
#                         break
#                 num_days_lb_list.append(count_lb)
#             else:
#                 num_days_lb_list.append(np.nan)

#             # Days below ub
#             if pd.notnull(ub):
#                 count_ub = 0
#                 for _, candle in analysis_period.iterrows():
#                     if candle["high"] < ub and candle["low"] < ub:
#                         count_ub += 1
#                     else:
#                         break
#                 num_days_ub_list.append(count_ub)
#             else:
#                 num_days_ub_list.append(np.nan)

#             # Trend slope (raw and normalized) using real time
#             if len(analysis_period) >= 2:
#                 closes = analysis_period["close"].values
#                 x = mdates.date2num(analysis_period["date_time"])  # use real time axis
#                 slope, _, _, _, _ = linregress(x, closes)
#                 trend_slope_list.append(slope)
#                 norm_slope = slope / closes.mean() if closes.mean() != 0 else 0
#                 norm_trend_slope_list.append(norm_slope)
#             else:
#                 trend_slope_list.append(np.nan)
#                 norm_trend_slope_list.append(np.nan)

#         df["num_days_below_lb"] = num_days_lb_list
#         df["num_days_below_ub"] = num_days_ub_list
#         df["trend_slope"] = trend_slope_list
#         df["norm_trend_slope"] = norm_trend_slope_list  # For scoring

#         # Cyclicality measure
#         full_prices = prices.set_index("date_time")
#         full_span = full_prices["high"].max() - full_prices["low"].min()
#         avg_day_range = (full_prices["high"] - full_prices["low"]).mean()
#         cyclicality_ratio = avg_day_range / full_span if full_span != 0 else np.nan
#         df["cyclicality"] = cyclicality_ratio

#         updated_results[ticker] = df

#     return updated_results

# if num_periods_schwab == 1 and analysis_ratio_schwab == 1:
#     final_results_dict_schwab = add_analysis_metrics(final_results_dict_schwab, candles_dict_schwab)
#     display(final_results_dict_schwab)

Old monolithic complex version of `score_profit_probability`.

In [None]:
# def score_profit_probability(results_dict, weights=None, bound_reference="upper"):
#     """
#     Adds a profit probability score, ranking, and swing probability category to each DataFrame in a results dictionary.
#     Also records the weight values used into each DataFrame as columns.

#     Parameters:
#     - results_dict (dict): Dictionary of {ticker: pd.DataFrame} with analysis results.
#     - weights (dict, optional): Weights for each scoring component.
#     - bound_reference (str): Either "lower" or "upper" to indicate which bound to use for scoring.

#     Returns:
#     - dict: Updated results_dict with 'profit_score', 'profit_rank', 'swing_probability', and weight columns.
#     """
#     if bound_reference not in {"lower", "upper"}:
#         raise ValueError("bound_reference must be either 'lower' or 'upper'")

#     suffix = "lb" if bound_reference == "lower" else "ub"

#     if weights is None:
#         weights = {
#             f"current_price_below_{suffix}": 1.0,
#             f"percent_below_{suffix}": 0.25,
#             f"num_days_below_{suffix}": 0.0,
#             "cyclicality": 0.75,
#             "norm_trend_slope": 1.75  # ✅ New metric
#         }

#     combined_df = pd.concat(results_dict.values(), ignore_index=True)
#     combined_df[f"current_price_below_{suffix}"] = combined_df[f"current_price_below_{suffix}"].astype(float)

#     # Rank components
#     combined_df["_rank_percent"] = combined_df[f"percent_below_{suffix}"].rank(pct=True)
#     combined_df["_rank_days"] = combined_df[f"num_days_below_{suffix}"].rank(pct=True)
#     combined_df["_rank_cyclicality"] = combined_df["cyclicality"].rank(pct=True)
#     combined_df["_rank_trend"] = combined_df["norm_trend_slope"].rank(pct=True)

#     current_mask = combined_df[f"current_price_below_{suffix}"]

#     # Final score
#     combined_df["profit_score"] = current_mask * (
#         weights.get(f"current_price_below_{suffix}", 0) +
#         combined_df["_rank_percent"] * weights.get(f"percent_below_{suffix}", 0) +
#         combined_df["_rank_days"] * weights.get(f"num_days_below_{suffix}", 0) +
#         combined_df["_rank_cyclicality"] * weights.get("cyclicality", 0) +
#         combined_df["_rank_trend"] * weights.get("norm_trend_slope", 0)
#     )

#     combined_df["profit_rank"] = combined_df["profit_score"].rank(ascending=False)

#     # Swing category
#     swing_probs = pd.Series(index=combined_df.index, dtype="object")
#     nonzero_scores = combined_df[combined_df["profit_score"] > 0]
#     percentiles = nonzero_scores["profit_score"].rank(pct=True)

#     for idx, p in percentiles.items():
#         if p <= 0.2:
#             swing_probs[idx] = "very_low"
#         elif p <= 0.4:
#             swing_probs[idx] = "low"
#         elif p <= 0.6:
#             swing_probs[idx] = "medium"
#         elif p <= 0.8:
#             swing_probs[idx] = "high"
#         else:
#             swing_probs[idx] = "very_high"

#     swing_probs[combined_df["profit_score"] == 0] = "zero"
#     combined_df["swing_probability"] = swing_probs

#     # Clean up
#     combined_df.drop(columns=["_rank_percent", "_rank_days", "_rank_cyclicality", "_rank_trend"], inplace=True)

#     # Return results
#     row_counter = 0
#     for ticker, df in results_dict.items():
#         num_rows = len(df)
#         updated_chunk = combined_df.iloc[row_counter:row_counter + num_rows]

#         # Assign core results
#         df["profit_score"] = updated_chunk["profit_score"].values
#         df["profit_rank"] = updated_chunk["profit_rank"].values
#         df["swing_probability"] = updated_chunk["swing_probability"].values

#         # Assign weights as constant columns
#         for key, val in weights.items():
#             df[key + "_weight"] = val

#         row_counter += num_rows

#     return results_dict

# bound_reference_schwab = "lower"  # Choose either "lower" or "upper" bound for scoring

# if num_periods_schwab == 1 and analysis_ratio_schwab == 1:
#     final_results_dict_schwab = score_profit_probability(final_results_dict_schwab, weights=None, bound_reference=bound_reference_schwab)

In [None]:
# def get_trades(data, upper_bound, lower_bound, time_start):
#     """
#     Simulates a basic swing trading strategy using high/low breakouts.

#     Iterates over a DataFrame of daily OHLC data to identify buy and sell trades:
#     - A **buy** occurs when the low of the day falls below or equals the `lower_bound`.
#     - A **sell** occurs when the high of the day rises above or equals the `upper_bound`.
#     - Only one position can be held at a time.
#     - If the end of the data is reached while a position is open, it is force-closed 
#       at the midpoint of the final day's high and low.

#     Args:
#         data (pd.DataFrame): Daily OHLC data with columns ['date_time', 'high', 'low'].
#         upper_bound (float): The price level that triggers a sell.
#         lower_bound (float): The price level that triggers a buy.
#         time_start (datetime-like): Trades are only evaluated for rows on or after this timestamp.

#     Returns:
#         pd.DataFrame: A DataFrame of executed trades with columns:
#             ['date', 'type', 'daily_high', 'daily_low', 'trade_price']
#     """
#     time_start = pd.to_datetime(time_start)  # Ensure compatible type for comparison
#     data = data.copy()
#     data["date_time"] = pd.to_datetime(data["date_time"])    
#     state = 0                                                                                     # 0 = not in position, 1 = in position
#     trades = []                                                                                   # List to store executed trades

#     for i, row in data.iterrows():                                                                # Iterate over each row in the data
#         if row["date_time"] < time_start:                                                         # Skip rows before the time_start threshold
#             continue                                                                              # Move to the next row

#         if state == 0 and row["low"] <= lower_bound:                                              # Entry condition: not in position and price hits or drops below lower bound
#             trades.append({                                                                       # Record a buy trade
#                 "date": row["date_time"],                                                         # Trade date
#                 "type": "buy",                                                                    # Trade type
#                 "daily_high": row["high"],                                                        # High of the day
#                 "daily_low": row["low"],                                                          # Low of the day
#                 "trade_price": lower_bound                                                        # Executed price at lower bound
#             })
#             state = 1                                                                             # Update state to indicate we are now in a position
#         elif state == 1 and row["high"] >= upper_bound:                                           # Exit condition: in position and price rises above upper bound
#             trades.append({                                                                       # Record a sell trade
#                 "date": row["date_time"],                                                         # Trade date
#                 "type": "sell",                                                                   # Trade type
#                 "daily_high": row["high"],                                                        # High of the day
#                 "daily_low": row["low"],                                                          # Low of the day
#                 "trade_price": upper_bound                                                        # Executed price at upper bound
#             })
#             state = 0                                                                             # Update state to indicate we're out of position

#     if state == 1:                                                                                # If still in position at the end, force close
#         last = data.iloc[-1]                                                                      # Get the last row in the data
#         trades.append({                                                                           # Record a forced sell trade
#             "date": last["date_time"],                                                            # Trade date
#             "type": "sell",                                                                       # Trade type
#             "daily_high": last["high"],                                                           # High of the day
#             "daily_low": last["low"],                                                             # Low of the day
#             "trade_price": 0.5 * (last["high"] + last["low"])                                     # Forced close price is midpoint between high and low
#         })

#     return pd.DataFrame(trades)                                                                   # Convert list of trades to a DataFrame and return

In [None]:
# def get_returns(data, upper_bound, lower_bound, time_start, starting_cash=10000):
#     """
#     Simulates a swing trading strategy and calculates the annualized return.

#     Executes trades based on breakout conditions using `get_trades()`:
#     - Buys when price hits `lower_bound`
#     - Sells when price hits `upper_bound`
#     - Assumes full portfolio allocation on each trade (no partial positions)
#     - Closes the final position at the end of the data if still open

#     Computes:
#     - The total number of buy-side trades
#     - The annualized return over the trading period

#     Args:
#         data (pd.DataFrame): Daily OHLC data with columns ['date_time', 'high', 'low'].
#         upper_bound (float): Price level that triggers a sell.
#         lower_bound (float): Price level that triggers a buy.
#         time_start (datetime-like): The start date for evaluating trades.
#         starting_cash (float): Initial portfolio cash. Defaults to 10,000.

#     Returns:
#         dict: {
#             "annualized_return" (float or None): Annualized percentage return,
#                 or None if no trades were executed or time span is 0.
#             "num_trades" (int): Number of completed buy trades.
#         }
#     """
#     trades = get_trades(data, upper_bound, lower_bound, time_start)                             # Run the trade simulation using breakout rules

#     if trades.empty:                                                                            # If no trades occurred, return early
#         return {"annualized_return": None, "num_trades": 0}                                     # Return None and 0 trades if no signals

#     shares = 0                                                                                  # Initialize position size
#     cash = starting_cash                                                                        # Start with the full cash amount

#     for _, trade in trades.iterrows():                                                          # Loop through the trades chronologically
#         if trade["type"] == "buy":                                                              # If it's a buy trade
#             shares = cash / trade["trade_price"]                                                # Allocate entire portfolio into shares
#             cash = 0                                                                            # Cash is now fully deployed
#         elif trade["type"] == "sell":                                                           # If it's a sell trade
#             cash = shares * trade["trade_price"]                                                # Liquidate shares to get cash
#             shares = 0                                                                          # No position remains

#     last_day = pd.to_datetime(data["date_time"].max())  
#     time_start = pd.to_datetime(time_start)                                                          # Last available date in the data
#     period_years = (last_day - time_start).days / 365.25                        # Duration of trading period in years

#     if period_years == 0:                                                                       # Edge case: zero duration (e.g., same-day trades)
#         return {"annualized_return": None, "num_trades": trades['type'].eq("buy").sum()}        # Avoid divide-by-zero; return None safely

#     total_return = cash / starting_cash - 1                                                     # Compute simple return (final / initial - 1)
#     annualized = (cash / starting_cash) ** (1 / period_years) - 1                               # Convert total return to annualized return

#     return {"total_return": total_return, "annualized_return": annualized, "num_trades": trades['type'].eq("buy").sum()}      # Return final results as a dictionary

In [None]:
# def run_analysis_loop(ticker_results, daily_data, expected_min_analysis_days):                          # Perform optimization and evaluation for each analysis/evaluation period
#     """
#     Performs optimization and evaluation for each row in the walk-forward results DataFrame.

#     Args:
#         ticker_results (pd.DataFrame): Table with period metadata and signal slots for a single ticker.
#         daily_data (pd.DataFrame): OHLCV data with a 'date_time' column.
#         expected_min_analysis_days (int): Minimum number of days required to consider an analysis window valid.

#     Returns:
#         pd.DataFrame: Updated ticker_results with optimized thresholds and evaluation performance filled in.
#     """
#     for idx, row in ticker_results.iterrows():                                                           # Iterate through each row (period) of the ticker results table
#         analysis_data = daily_data[                                                                      # Slice the data for the current analysis window
#             (daily_data["date_time"] >= row["analysis_period_start"]) &                                  # Include data on or after the analysis start
#             (daily_data["date_time"] <= row["analysis_period_end"])                                      # And on or before the analysis end
#         ]

#         if len(analysis_data) < expected_min_analysis_days:                                              # Skip this window if not enough trading days
#             continue

#         evaluation_data = daily_data[                                                                    # Slice the data for the evaluation window
#             (daily_data["date_time"] >= row["evaluation_period_start"]) &                                # Include data on or after the evaluation start
#             (daily_data["date_time"] <= row["evaluation_period_end"])                                    # And on or before the evaluation end
#         ]

#         if not analysis_data.empty:                                                                       # Proceed if there's valid analysis data
#             period_results = analyze_ticker_data(analysis_data)                                           # Run optimization for buy/sell thresholds

# ################################################### NEW CODE BLOCK ##################################################
#             analysis_trades = get_trades(analysis_data, period_results["ub"][0], period_results["lb"][0], row["analysis_period_start"])
#             buy_sell_pairs = analysis_trades[analysis_trades["type"].isin(["buy", "sell"])]
#             if len(buy_sell_pairs) >= 2:
#                 durations = buy_sell_pairs["date"].diff().dropna().dt.days
#                 ticker_results.at[idx, "analysis_avg_trade_duration"] = durations[::2].mean()
#             else:
#                 ticker_results.at[idx, "analysis_avg_trade_duration"] = np.nan
# ################################################### NEW CODE BLOCK ##################################################

#             ticker_results.at[idx, "analysis_buy"] = period_results["lb"][0]                              # Save optimized lower bound (buy threshold)
#             ticker_results.at[idx, "analysis_sell"] = period_results["ub"][0]                             # Save optimized upper bound (sell threshold)
#             ticker_results.at[idx, "analysis_return"] = period_results["return"][0]                       # Save annualized return for this config
#             ticker_results.at[idx, "analysis_trades"] = period_results["trades"][0]                       # Save number of trades
#             ticker_results.at[idx, "analysis_eval_metric"] = period_results["return_lb"][0]              # Save penalized return metric
#         else:
#             ticker_results.at[idx, "analysis_return"] = np.nan                                            # If no data, store NaN as placeholder

#         if not evaluation_data.empty:                                                                     # Proceed if there's evaluation data
#             ticker_results.at[idx, "evaluation_data_good"] = True                                         # Mark the data as usable

#             eval_results = get_returns(
#                 evaluation_data,
#                 upper_bound=ticker_results.at[idx, "analysis_sell"],
#                 lower_bound=ticker_results.at[idx, "analysis_buy"],
#                 time_start=ticker_results.at[idx, "evaluation_period_start"]
#             )

# ################################################## NEW CODE BLOCK ##################################################
#             eval_trades = get_trades(evaluation_data, ticker_results.at[idx, "analysis_sell"], ticker_results.at[idx, "analysis_buy"], row["evaluation_period_start"])
#             buy_sell_pairs = eval_trades[eval_trades["type"].isin(["buy", "sell"])]
#             if len(buy_sell_pairs) >= 2:
#                 durations = buy_sell_pairs["date"].diff().dropna().dt.days
#                 ticker_results.at[idx, "evaluation_avg_trade_duration"] = durations[::2].mean()
#             else:
#                 ticker_results.at[idx, "evaluation_avg_trade_duration"] = np.nan
# ################################################## NEW CODE BLOCK ##################################################

#             ticker_results.at[idx, "evaluation_trades"] = eval_results["num_trades"]                      # Store number of trades during evaluation
#             ticker_results.at[idx, "evaluation_return"] = eval_results["annualized_return"]               # Store annualized return during evaluation

#     return ticker_results                                                                                 # Return the updated DataFrame

Pull all data and include exchange info from Schwab.

In [None]:
# print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Revert to this working block 1111

# all_candles_combined = []
# num_tickers = len(test_tickers_df)
# print(num_tickers)

# for i, (_, row) in enumerate(test_tickers_df.iterrows(), start=1):
#     ticker = row['ticker']
#     exchange = row['exchange']

#     # print(f"[{i}/{num_tickers}] Fetching {ticker} ({exchange})...")
    
#     try:
#         candles = get_schwab_data_for_last_year(ticker, exchange, years=2)  # Fetch 2 years of data
#         all_candles_combined.extend(candles)
#         print(f"[{i}/{num_tickers}] {ticker}: {exchange}: Retrieved {len(candles)} candles")
#     except Exception as e:
#         print(f"Failed to retrieve {ticker}: {e}")

Plot random tickers' analyses from GBQ base.

In [None]:
# plot_random_ticker_trades(final_results_dict_gbq, num_tickers_gbq, local_folder="local-ticker-data-gbq", data_source='gbq', num_tickers=3)