**Load** the required packages

In [None]:
!pip install --upgrade numexpr --quiet

!pip install pandas --quiet
!pip install datetime --quiet
!pip install feedparser --quiet
!pip install textblob --quiet
!pip install yfinance --quiet
!pip install requests --quiet

**Define a function to get the pricing data**

In [None]:
import pandas as pd
import yfinance as yf
from typing import List, Tuple

def fetch_price(ticker: str,
               period: str, 
               interval: str) -> pd.DataFrame:
    df = yf.download(ticker, period=period, interval=interval, auto_adjust=True, progress=False)
    df.columns = df.columns.get_level_values(0)
    df.reset_index(inplace=True) 
    df.columns.name = None
    df['ticker'] = ticker
    return df

**Define a function to compute the technical indicators**
1. Compute the MACD
2. Compute the RSI
3. Compute the ATR

In [None]:
import pandas as pd

def compute_macd(df: pd.DataFrame) -> pd.DataFrame:
    """
    Return a new DataFrame with EMA12, EMA26, MACD, Signal, and MACD_hist added.
    """
    df = df.copy()
    df['EMA12'] = df['Close'].ewm(span=12, adjust=False).mean()
    df['EMA26'] = df['Close'].ewm(span=26, adjust=False).mean()
    df['MACD']  = df['EMA12'] - df['EMA26']
    df['Signal']   = df['MACD'].ewm(span=9, adjust=False).mean()
    df['MACD_hist'] = df['MACD'] - df['Signal']
    return df

def compute_rsi(df: pd.DataFrame, window: int = 14) -> pd.DataFrame:
    """
    Return a new DataFrame with a 14-period RSI column added.
    """
    df = df.copy()
    delta = df['Close'].diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.ewm(alpha=1/window, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/window, adjust=False).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))
    return df

def compute_atr(df: pd.DataFrame, window: int = 14) -> pd.DataFrame:
    """
    Return a new DataFrame with a 14-period ATR column added.
    """
    df = df.copy()
    prev_close = df['Close'].shift(1)
    tr1 = df['High'] - df['Low']
    tr2 = (df['High'] - prev_close).abs()
    tr3 = (df['Low']  - prev_close).abs()
    df['ATR'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1).rolling(window=window).mean()
    return df


**Define a function to load the strike prices**

In [None]:
import pandas as pd

def load_strikes(
    path: str,
    strike_col: str = "strike",
    decimals: int = 4
) -> pd.DataFrame:
    """
    1. Reads a CSV from `path`
    2. Normalizes column names to lowercase & stripped
    3. Ensures there's a numeric `strike_col`, stripping whitespace, converting to float, and rounding
    
    Returns the cleaned DataFrame.
    """
    return (
        pd.read_csv(path)
          # normalize headers: strip whitespace, lowercase
          .rename(columns=lambda c: c.strip().lower())
          # strip & convert the strike column to numeric, then round
          .assign(**{
              strike_col: lambda df: (
                  pd.to_numeric(
                      df[strike_col]
                        .astype(str)
                        .str.strip(),
                      errors="raise"
                  )
                  .round(decimals)
              )
          })
    )


**Define a function to determine the trading signal**
1. Determine the Trend
2. Check the Momentum
3. Determine the Trigger
4. Check the Volatility
5. Generate the signal

In [None]:
import pandas as pd

def determine_trend(row):
    """
    Returns (trend_str, ema_diff) where ema_diff = EMA12 − EMA26.
    """
    diff = row.EMA12 - row.EMA26
    if (row.EMA12 > row.EMA26) and (row.Close > row.EMA12) and (row.Close > row.EMA26):
        trend = 'up'
    elif (row.EMA12 < row.EMA26) and (row.Close < row.EMA12) and (row.Close < row.EMA26):
        trend = 'down'
    else:
        trend = 'sideways'
    return trend, diff

def momentum_check(row, trend):
    """
    Returns (ok, rsi) where ok is True/False and rsi is the raw RSI value.
    """
    rsi = row.RSI
    if trend == 'up':
        ok = (rsi > 50) and ((row.MACD > row.Signal) or (row.MACD_hist > 0))
    elif trend == 'down':
        ok = (rsi < 50) and ((row.MACD < row.Signal) or (row.MACD_hist < 0))
    else:
        ok = False
    return ok, rsi

def signal_trigger(row, trend):
    """
    Returns (ok, macd_diff) where macd_diff = MACD − Signal.
    """
    macd_diff = row.MACD - row.Signal
    if trend == 'up':
        ok = (macd_diff > 0) and (row.RSI > 50)
    elif trend == 'down':
        ok = (macd_diff < 0) and (row.RSI < 50)
    else:
        ok = False
    return ok, macd_diff

def volatility_check(row, strike_diff):
    """
    Returns (ok, strike_diff) so you have the raw distance too.
    """
    ok = strike_diff <= 0.5 * row.ATR
    return ok, strike_diff

def signal_detail_for_row(row, per_ticker, expiry="EOD"):
    df = per_ticker[row.ticker]
    last = df.iloc[-1]

    trend, trend_val = determine_trend(last)
    momentum_ok, momentum_val = momentum_check(last, trend)
    sig_ok, signal_val = signal_trigger(last, trend)
    strike_diff = abs(row.strike - last.Close)
    vol_ok, vol_val = volatility_check(last, strike_diff)

    # build recommendation + contract price
    if (trend == 'sideways'
        or not momentum_ok
        or not sig_ok
        or not vol_ok
    ):
        rec   = "No trade"
        price = pd.NA
    else:
        direction = "Buy" if trend == "up" else "Sell"
        price     = 10 * (0.5 - (strike_diff / (2 * last.ATR)))
        rec       = direction

    return pd.Series({
        "Date":           pd.Timestamp.now().strftime("%d-%b-%y"),
        "Ticker":         row.ticker,
        "Strike":         row.strike,
        "EMA12":          last.EMA12,
        "EMA26":          last.EMA26,
        "MACD":           last.MACD,
        "RSI":            last.RSI,
        "ATR":            last.ATR,
        "Recommendation": rec,
        "ContractPrice":  price,
        "Trend":          trend_val,
        "Momentum":       momentum_val,
        "Signal":         signal_val,
        "Volatility":     vol_val
    })

def generate_detailed_signals(per_ticker: dict[str,pd.DataFrame],
                              strikes_df: pd.DataFrame) -> pd.DataFrame:
    """
    Applies signal_detail_for_row to every strike, returning a DataFrame
    with separate columns for both the boolean/string and the raw measure.
    """
    return strikes_df.apply(
        lambda r: signal_detail_for_row(r, per_ticker),
        axis=1
    )    

def generate_trade_signal(df: pd.DataFrame, asset_symbol: str, strike_price: float, expiry="EOD") -> str:
    """
    Look only at the *last* row of df and decide on a trade recommendation.
    """
    last = df.iloc[-1]
    trend = determine_trend(last)
    if trend == 'sideways':
        return "No trade: Trend unclear."
    if trend == 'down':
        # optional debug print
        print(f"Down trend for {asset_symbol} at {strike_price:.4f}")

    if not momentum_check(last, trend):
        return "No trade: Momentum not aligned."
    if not signal_trigger(last, trend):
        return "No trade: No signal trigger."

    strike_diff = abs(strike_price - last.Close)
    if not volatility_check(last, strike_diff):
        return "No trade: Strike too far based on ATR."

    # crude price proxy
    price_est = 10 * (0.5 - (strike_diff / (2 * last.ATR)))
    direction = "Buy" if trend == 'up' else "Sell"
    return (
        f"{direction} {asset_symbol} @ {strike_price:.4f} (EOD) ≈ ${price_est:.1f}. "
        f"{trend.title()} trend, momentum/signal OK, volatility OK."
    )

def generate_all_signals(
    per_ticker: dict[str, pd.DataFrame], 
    strikes_df: pd.DataFrame
) -> pd.DataFrame:
    """
    For each row in strikes_df, look up its ticker-DF in per_ticker and
    call generate_trade_signal.  Returns a DataFrame of recommendations.
    """
    out = (
        strikes_df
        .assign(
            Date=pd.Timestamp.now().strftime('%d-%b-%y'),
            Signal=lambda d: d.apply(
                lambda r: generate_trade_signal(
                    per_ticker[r.ticker],
                    r.ticker,
                    r.strike
                ),
                axis=1
            )
        )
        .rename(columns={'ticker':'Ticker', 'strike':'Strike'})
        [['Date','Ticker','Strike','Signal']]
    )
    return out

**Define a function to upload the recommendations to S3**

In [None]:
import io
import boto3
import pandas as pd
from botocore.exceptions import ClientError
from botocore import UNSIGNED
from botocore.config import Config

from typing import Iterable, List, Dict

def create_s3_clients(
    profile: str = "default", region: str = "us-east-1"
) -> Dict[str, boto3.client]:
    session = boto3.Session(profile_name=profile, region_name=region)
    return {
        "public": session.client(
            "s3",
            config=Config(signature_version=UNSIGNED),
            region_name=region,
        ),
        "private": session.client("s3"),
        "resource": session.resource("s3"),
    }

def get_bucket(resource: boto3.resource, name: str):
    return resource.Bucket(name)

def upload_df_to_s3(
    df: pd.DataFrame,
    bucket: str,
    key: str,
    region: str = None) -> None:
    """
    Uploads a DataFrame to S3 as CSV. Verifies bucket existence first.
    
    Parameters
    ----------
    df : pd.DataFrame
    bucket : str
        Name of the S3 bucket (no leading/trailing spaces).
    key : str
        S3 object key, e.g. "csv/2025-06-28-results.csv"
    region : str, optional
        AWS region where the bucket resides.
    """
    bucket = bucket.strip()
    s3 = boto3.client('s3', region_name=region)

    try:
        s3.head_bucket(Bucket=bucket)
    except ClientError as e:
        code = e.response['Error']['Code']
        msg = e.response['Error']['Message']
        raise RuntimeError(
            f"Could not access bucket '{bucket}' (region={region}): {msg} (code {code})"
        ) from e

    buffer = io.StringIO()
    df.to_csv(buffer, index=False)
    buffer.seek(0)

    try:
        s3.put_object(Bucket=bucket, Key=key, Body=buffer.getvalue())
        print(f"✅ Uploaded to s3://{bucket}/{key}")
    except ClientError as e:
        raise RuntimeError(
            f"Failed to upload CSV to s3://{bucket}/{key}: "
            f"{e.response['Error']['Message']}"
        ) from e

**Define a function to run the Pipeline**
1. Collect pricing data
2. Compute the technical indicators
3. Compute the trading signals
4. Compare with the day's Strike prices
5. Upload the analysis

In [None]:
import pandas as pd
from datetime import date, datetime

from typing import List

def run_recommendation_pipeline(tickers: List,
                           bucket_name: str,
                           period: str,
                           interval: str) -> pd.DataFrame:
    """
    Fetch price, compute indicators, load strikes,
    and return a DataFrame of trade signals.
    """
    clients = create_s3_clients()
    public_s3 = clients["public"]
    private_s3 = clients["private"]
    s3_resource = clients["resource"]
    buckets = {
        "daily":  get_bucket(s3_resource, bucket_name),
    }

    ticker_price_data = [
        (ticker, fetch_price(ticker, period, interval))
        for ticker in tickers
    ]

    processed = {
        ticker: (
            df
              .pipe(compute_macd)
              .pipe(compute_rsi)
              .pipe(compute_atr)
        )
        for ticker, df in ticker_price_data
    }
    strikes_df = load_strikes("contracts.csv")
    signals_df = generate_detailed_signals(processed, strikes_df)

    today_str = date.today().strftime('%Y%m%d')

    s3_key = f"recommendations/{today_str}.csv"
    upload_df_to_s3(
        signals_df,
        bucket_name,
        s3_key
    )
    return signals_df

**Define a function to show interesting trades**

In [None]:
from tabulate import tabulate

def show_interesting_trades(df: pd.DataFrame) -> None:
    interesting_trades_df = df[
        ~df['Recommendation']
            .str.contains("No trade", case=False, na=False)
    ]
    if not interesting_trades_df.empty:
        print(
            tabulate(
                interesting_trades_df[["Date","Ticker","Recommendation","Strike","ContractPrice"]],
                headers='keys',
                tablefmt='fancy_grid',
                showindex=False,        
                maxcolwidths=200  
            )
        )
    else:
        print("No trades recommended today")

**Run recommendation pipeine**

In [None]:
TICKERS = {
    'CL=F',
    'ES=F',
    'GC=F',
    'NQ=F',
    'RTY=F',
    'YM=F',
    'NG=F',
    'AUDUSD=X',
    'EURJPY=X',
    'EURUSD=X',
    'GBPJPY=X',
    'GBPUSD=X',
    'USDCAD=X',
    'USDCHF=X',
    'USDJPY=X'
}

show_interesting_trades(
    run_recommendation_pipeline(tickers=TICKERS,
                                period="60d",
                                interval="1h",
                                bucket_name="nadex-daily-results",
))