### Classification Approach

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Step 1: Generate a synthetic dataset with 356 rows and 8 features
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=356, n_features=8, 
                         n_informative=6, n_redundant=2, 
                         n_classes=2, random_state=42)

# Convert to DataFrame (optional, for better visualization)
df = pd.DataFrame(X, columns=[f'Feature_{i+1}' for i in range(8)])
df['Target'] = y

print("Dataset shape:", df.shape)
print("\nFirst 5 rows of the dataset:")
print(df.head())

# Step 2: Prepare features and target
X = df.iloc[:, :-1]  # First 8 columns (features)
y = df['Target']     # Last column (target)

# Step 3: Split the data into training and testing sets (80% train, 20% test)
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Step 4: Standardize the features (important for KNN)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
# Step 5: Train the KNN model
from sklearn.neighbors import KNeighborsClassifier
k = 5  # Number of neighbors
knn = KNeighborsClassifier(n_neighbors=k)
knn.fit(X_train_scaled, y_train)

# Feature importance using permutation importance
from sklearn.inspection import permutation_importance
perm_importance = permutation_importance(knn, X_train_scaled, y_train, n_repeats=10, random_state=42)
feature_importance = pd.DataFrame({
    'Feature': X.columns,
    'Importance': perm_importance.importances_mean,
    'Std': perm_importance.importances_std
})

# Sort by importance
significant_features = feature_importance.sort_values(by='Importance', ascending=False)

print("\nFeature Importance (Permutation Importance):")
print(significant_features)

# Step 6: Make predictions
y_pred = knn.predict(X_test_scaled)

In [None]:
# Step 7: Evaluate the model
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve
accuracy = accuracy_score(y_test, y_pred)
print(f"\nAccuracy: {accuracy:.4f}")
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Confusion Matrix (Training)
y_train_pred = knn.predict(X_train_scaled)
cm_train = confusion_matrix(y_train, y_train_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm_train, annot=True, fmt='d', cmap='Blues', xticklabels=['No', 'Yes'], yticklabels=['No', 'Yes'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix (Training)')
plt.show()

# Training Accuracy and AUC
y_train_pred_prob = knn.predict_proba(X_train_scaled)[:, 1]
train_accuracy = accuracy_score(y_train, y_train_pred)
train_auc = roc_auc_score(y_train, y_train_pred_prob)
print(f"\nTraining Accuracy: {train_accuracy:.4f}")
print(f"Training AUC: {train_auc:.4f}")

# Evaluate on test data
y_test_pred_prob = knn.predict_proba(X_test_scaled)[:, 1]
y_test_pred = knn.predict(X_test_scaled)
test_accuracy = accuracy_score(y_test, y_test_pred)
test_auc = roc_auc_score(y_test, y_test_pred_prob)
print(f"\nTest Accuracy: {test_accuracy:.4f}")
print(f"Test AUC: {test_auc:.4f}")

In [None]:
# ROC Curve (Training and Test)
fpr_train, tpr_train, _ = roc_curve(y_train, y_train_pred_prob)
fpr_test, tpr_test, _ = roc_curve(y_test, y_test_pred_prob)

plt.figure(figsize=(10, 8))
plt.plot(fpr_train, tpr_train, label=f'Training (AUC = {train_auc:.4f})')
plt.plot(fpr_test, tpr_test, label=f'Test (AUC = {test_auc:.4f})')
plt.plot([0, 1], [0, 1], color='red', linestyle='--', label='Random Guess')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Optional: Try different values of k to find the best one
print("\n--- Testing different values of k ---")
k_range = range(1, 11)
scores = {}
for k in k_range:
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(X_train_scaled, y_train)
    y_pred_k = knn.predict(X_test_scaled)
    scores[k] = accuracy_score(y_test, y_pred_k)
    print(f"k={k}, Accuracy={scores[k]:.4f}")

### Regression Approach

In [None]:
# Simple regression
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

# create a sample dataset 
from sklearn.datasets import make_regression
X,Y = make_regression(n_features=1, noise=5, n_samples=5000)

plt.xlabel('Feature - X')
plt.ylabel('Target - Y')
plt.scatter(X,Y,s=5)

# Build the model
from sklearn.linear_model import LinearRegression
linear_model = LinearRegression()
linear_model.fit(X,Y)

linear_model.coef_
linear_model.intercept_

# prediction
pred = linear_model.predict(X)
plt.scatter(X,Y,s=25, label='training')
plt.scatter(X,pred,s=25, label='prediction')
plt.xlabel('Feature - X')
plt.ylabel('Target - Y')
plt.legend()
plt.show()  

### Clustering - K-means Approach

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Set random seed for reproducibility
np.random.seed(42)
# -----------------------------------------------
# Step 1: Use the same dataset (5600 rows, 7 columns)
# -----------------------------------------------
from sklearn.datasets import make_blobs
X, _ = make_blobs(n_samples=5600, centers=5, n_features=7, cluster_std=2.0, random_state=42)

# Convert to DataFrame (we'll assume df from previous step, or recreate)
df = pd.DataFrame(X, columns=[f'Feature_{i+1}' for i in range(7)])

print("Dataset shape:", df.shape)
print("\nFirst 5 rows:")
print(df.head())

# -----------------------------------------------
# Step 2: Standardize the data
# -----------------------------------------------
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df)

print("\nData has been standardized.")

In [None]:
# -----------------------------------------------
# Step 3: Find the best k using Elbow Method and Silhouette Analysis
# -----------------------------------------------
from sklearn.metrics import silhouette_score
# Range of k values to test
k_range = range(2, 11)  # Test k from 2 to 10

# Lists to store results
inertias = []
silhouette_scores = []

print("\nEvaluating K-Means for k from 2 to 10...")

from sklearn.cluster import KMeans
for k in k_range:
    kmeans = KMeans(n_clusters=k, init='k-means++', n_init=10, random_state=42, max_iter=300)
    kmeans.fit(X_scaled)
    
    inertias.append(kmeans.inertia_)  # WCSS (Within-cluster sum of squares)
    
    # Silhouette score (slower for large data — sample if needed)
    if k > 1:
        # Use a sample for silhouette to save time (optional)
        sample_size = 1000
        indices = np.random.choice(X_scaled.shape[0], size=sample_size, replace=False)
        X_sample = X_scaled[indices]
        score = silhouette_score(X_sample, kmeans.labels_[indices])
        silhouette_scores.append(score)
    else:
        silhouette_scores.append(0)

In [None]:
# -----------------------------------------------
# Step 4: Plot Elbow and Silhouette
# -----------------------------------------------
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Elbow Method
ax1.plot(k_range, inertias, 'bo-', linewidth=2, markersize=6)
ax1.set_title('Elbow Method for Optimal k')
ax1.set_xlabel('Number of Clusters (k)')
ax1.set_ylabel('Within-cluster Sum of Squares (WCSS)')
ax1.grid(True)

# Silhouette Analysis
ax2.plot(k_range, silhouette_scores, 'ro-', linewidth=2, markersize=6)
ax2.set_title('Silhouette Score vs k')
ax2.set_xlabel('Number of Clusters (k)')
ax2.set_ylabel('Silhouette Score')
ax2.grid(True)

plt.tight_layout()
plt.show()

# -----------------------------------------------
# Step 5: Choose best k
# -----------------------------------------------
# Find k with highest silhouette score
best_k_silhouette = k_range[np.argmax(silhouette_scores)]
print(f"\nBest k based on Silhouette Score: {best_k_silhouette}")

# Optional: Elbow "knee" detection (manual or use kneed library)
# For this example, we'll go with silhouette

In [None]:
# -----------------------------------------------
# Step 6: Apply K-Means with best k and add labels to df
# -----------------------------------------------
final_k = best_k_silhouette  # or set to 5 if you know it from data

kmeans_final = KMeans(n_clusters=final_k, init='k-means++', n_init=10, random_state=42)
cluster_labels_kmeans = kmeans_final.fit_predict(X_scaled)

# Add K-Means cluster labels to df
df['KMeans_Cluster'] = cluster_labels_kmeans

print(f"\nK-Means clustering completed with k = {final_k}")
print("K-Means Cluster distribution:")
print(df['KMeans_Cluster'].value_counts().sort_index())

# Optional: Show first 10 rows with K-Means labels
print("\nFirst 10 rows with K-Means cluster labels:")
print(df[['KMeans_Cluster'] + [f'Feature_{i+1}' for i in range(7)]].head(10))

# -----------------------------------------------
# Step 7: (Optional) Visualize clusters using PCA
# -----------------------------------------------
from sklearn.decomposition import PCA

pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

plt.figure(figsize=(8, 6))
sns.scatterplot(
    x=X_pca[:, 0], y=X_pca[:, 1],
    hue=df['KMeans_Cluster'],
    palette='Set1',
    s=50,
    alpha=0.8
)
plt.title(f'K-Means Clustering Results (k={final_k}) - PCA Projection')
plt.xlabel('PCA Component 1')
plt.ylabel('PCA Component 2')
plt.legend(title='Cluster')
plt.tight_layout()
plt.show()

# -----------------------------------------------
# Final Output
# -----------------------------------------------
print(f"\n Final DataFrame shape: {df.shape}")
print("Each row now has a K-Means cluster label in the 'KMeans_Cluster' column.")
print("Access labels: df['KMeans_Cluster']")

## Aditional Information
- Review the backtesting
- Position Sizing Management

In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
# -------------------------------
# 1. Download AAPL Data
# -------------------------------
ticker = "AAPL"
start_date = "2020-01-01"
end_date = "2030-12-31"
data = yf.download(ticker, start=start_date, end=end_date, progress=False)

In [None]:
# -------------------------------
# 2. Calculate technical indicators
# -------------------------------
# Simple Moving Averages
data['MA_20'] = data['Close'].rolling(window=20).mean()
data['MA_50'] = data['Close'].rolling(window=50).mean()

# RSI
delta = data['Close'].diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)  # Avoid division by zero
data['RSI'] = 100 - (100 / (1 + rs))

In [None]:
# -------------------------------
# 3. Define trading strategy (MA Crossover + RSI Filter)
# -------------------------------
data['Buy_Signal'] = (data['MA_20'] > data['MA_50']) & (data['MA_20'].shift(1) <= data['MA_50'].shift(1)) & (data['RSI'] < 60)
data['Sell_Signal'] = (data['MA_20'] < data['MA_50']) & (data['MA_20'].shift(1) >= data['MA_50'].shift(1))

# Fill NaN values in signals to False to ensure valid booleans
data['Buy_Signal'] = data['Buy_Signal'].fillna(False)
data['Sell_Signal'] = data['Sell_Signal'].fillna(False)
data.tail()

In [None]:
# -------------------------------
# 4. Backtest strategy
# -------------------------------
initial_capital = 100_000
cash = initial_capital
shares = 0
position = 0  # 0 = no position, 1 = holding
portfolio_value = []

for date in data.index[1:]:
    open_price = data.loc[date, 'Close'].item()  # Use .item() for scalar
    prev_date = data.index[data.index.get_loc(date) - 1]  # Get previous date
    if data.loc[prev_date, 'Buy_Signal'].item() and position == 0:
        shares = cash // open_price
        cash -= shares * open_price
        position = 1
    elif data.loc[prev_date, 'Sell_Signal'].item() and position == 1:
        cash += shares * open_price
        shares = 0
        position = 0
    portfolio_value.append(float(cash + shares * data.loc[date, 'Close']))  # Convert to float

portfolio_df = pd.DataFrame(portfolio_value, index=data.index[1:], columns=['value'])

### Buy-signal: all buy no cash, Sell-signal:all sell get cash? - Position sizing!!!

### Position Sizing Management

**Position sizing** means deciding *how many shares (or contracts) to buy* when entering a trade. It's not about *whether* to trade, but *how big* the trade should be. Good position sizing helps manage risk — so you don’t lose too much on a single bad trade.

Three approaches we’ll implement are

1. **"Position sizing: Allocate 50% of current portfolio value"** - is not risk-based one.

- It’s a way to decide **how much** to invest in a single trade. Instead of risking a fixed dollar amount or using volatility (like ATR), you're simply committing **half of your portfolio** to this trade.

- Your **portfolio is worth $100,000** (all in cash for now), You get a buy signal, The stock price is \$50 per share. So you buy **1,000 shares** for \$50,000

In [None]:
######
initial_capital = 100_000
cash = initial_capital
shares = 0
position = 0  # 0 = no position, 1 = holding
portfolio_value = []
position_fraction = 0.5  # 50% of portfolio per trade

for i in range(1, len(data)):
    date = data.index[i]
    prev_date = data.index[i-1]
    close_price = data.loc[date, 'Close'].item()
    current_value = cash + (shares * close_price) if position == 1 else cash
    if data.loc[prev_date, 'Buy_Signal'].item():
        if position == 0:
            # Position sizing: Allocate 50% of current portfolio value
            target_value = current_value * position_fraction
            shares = int(target_value / close_price)  # Integer shares
            if shares > 0 and cash >= shares * close_price:
                cash -= shares * close_price
                position = 1
    elif data.loc[prev_date, 'Sell_Signal'].item() and position == 1:
        cash += shares * close_price
        shares = 0
        position = 0
    portfolio_value.append(float(cash + shares * close_price))

portfolio_df = pd.DataFrame(portfolio_value, index=data.index[1:], columns=['value'])

2. Risk-Based Position Sizing: Position sizing: Risk 1% of portfolio, using ATR as risk per share

   -  What does **"Risk 1% of portfolio"** mean?

For example: If your portfolio is worth \$100,000 → 1% = \$1,000. So, you size your trade so that if it fails (i.e., price moves against you), you lose **at most \$1,000**. This is called **percent risk model** and is widely used by professional traders.

- Risk 1% of the portfolio per trade, adjusting the number of shares based on the stock’s volatility or stop-loss distance.

-  What is **ATR**? - risk!

**ATR = Average True Range**, a technical indicator that measures market volatility. Here, ATR is used as a **proxy for risk per share**. So if ATR = \$5, it means the stock typically moves around \$5 per day — so you might assume your stop-loss could be about that far. 

What is the **Risk 1% of portfolio, using ATR as risk per share**? 

We want to buy enough shares so that if the price drops by **one ATR**, we lose **exactly 1% of our portfolio**.

- $$
  \text{Position Size(Shares)}=\frac{\text{Portfolio Value}\times \text{Risk per Trade}}{\text{Risk per Share}}
  $$

  where Risk Per Share is often the difference between the entry price and stop-loss or a volatility measure. We need to estimate **how much each share could risk** (≈ ATR), then to decide **how much total money you're willing to risk** (1% of portfolio)

Assume: Current portfolio value = \$100,000, You risk 1% → \$1,000. and ATR = \$10 → you assume price might move \$10 against you (your "risk per share") --> share = 1000/10 =100 shares.  If the price drops by \$10 (1 ATR), you lose: 100 shares × \$10 = $1,000 → exactly 1% of portfolio.

In [None]:
### The ATR is proxy as the risk per trade
data['TR'] = data['Close'].diff().abs()  
               # True Range approximated as |Close - Previous Close|
data['ATR'] = data['TR'].rolling(window=14).mean()  # 14-day moving average

######
initial_capital = 100_000
cash = initial_capital
shares = 0
position = 0  # 0 = no position, 1 = holding
portfolio_value = []
risk_per_trade = 0.01  # Risk 1% of portfolio per trade

for i in range(1, len(data)):
    date = data.index[i]
    prev_date = data.index[i-1]
    close_price = data.loc[date, 'Close'].item()
    atr_value = data.loc[date, 'ATR'].item()  # Extract scalar ATR
    atr = atr_value if not pd.isna(atr_value) else 1.0  
                        # Fallback to 1.0 if ATR is NaN
    current_value = cash + (shares * close_price) if position == 1 else cash
    if data.loc[prev_date, 'Buy_Signal'].item():
        if position == 0:
            # Position sizing: Risk 1% of portfolio, using ATR as risk per share
            risk_amount = current_value * risk_per_trade
            shares = int(risk_amount / atr) if atr > 0 else 0  
                              # Integer shares based on ATR
            if shares > 0 and cash >= shares * close_price:
                cash -= shares * close_price
                position = 1
    elif data.loc[prev_date, 'Sell_Signal'].item() and position == 1:
        cash += shares * close_price
        shares = 0
        position = 0
    portfolio_value.append(float(cash + shares * close_price))

portfolio_df = pd.DataFrame(portfolio_value, index=data.index[1:], columns=['value'])

The **True Range (TR)** is the maximum of:

- High price minus low price. 
- Absolute difference between high price and previous close.
- Absolute difference between low price and previous close.

3. **Kelly Criterion**

- The last one is the **fixed fractional risk model** (risk 1% of portfolio per trade, using ATR to determine position size). Now you'd like to **replace or enhance** this with the **Kelly Criterion**.

- Optimizes position size to maximize long-term portfolio growth based on the expected win rate and reward-to-risk ratio.

  - $$
    \text{Kelly Fraction}=\frac{\text{Win Prob}-\frac{1-\text{win Prob}}{\text{Reward to risk Ratio}}}{\text{Reward to risk Ratio}}
    $$

  - The Kelly fraction f is the percentage of capital to allocate. To reduce risk, a **fractional Kelly** (e.g., 0.5 * Kelly) is often used.

In [None]:
### build the ATR
data['TR'] = data['Close'].diff().abs()  
               # True Range approximated as |Close - Previous Close|
data['ATR'] = data['TR'].rolling(window=14).mean()  # 14-day moving average

### backtesting
initial_capital = 100_000
cash = initial_capital
shares = 0
position = 0  # 0 = no position, 1 = holding
portfolio_value = []

# Kelly inputs — these should be estimated from backtesting!
win_probability = 0.55        # p: 55% of trades are winners
avg_win_to_risk_ratio = 2.0   # b: average win is 2x the risk (e.g., 2×ATR target)
# So if you risk 1 ATR, you gain 2 ATR on average when you win

# Optional: Use Fractional Kelly to reduce risk
fractional_kelly = 0.5  # Use 50% of Kelly recommendation

for i in range(1, len(data)):
    date = data.index[i]
    prev_date = data.index[i-1]
    close_price = data.loc[date, 'Close'].item()
    
    atr_value = data.loc[date, 'ATR'].item()
    atr = atr_value if not pd.isna(atr_value) else 1.0
    
    current_value = cash + (shares * close_price) if position == 1 else cash

    if data.loc[prev_date, 'Buy_Signal'].item():
        if position == 0:
            # === Kelly Criterion: Optimal bet fraction ===
            q = 1 - win_probability  # probability of loss
            b = avg_win_to_risk_ratio
            
            kelly_fraction = (b * win_probability - q) / b  # Kelly formula
            
            # Apply fractional Kelly for safety
            position_fraction = fractional_kelly * kelly_fraction
            
            # Don't go negative if edge is negative
            if position_fraction <= 0:
                print(f"Kelly suggests no edge at {date}, skipping trade.")
                shares = 0
            else:
                # Allocate a % of portfolio based on Kelly
                target_value = current_value * position_fraction
                shares = int(target_value / close_price)
                
                # Check affordability
                if shares > 0 and cash >= shares * close_price:
                    cash -= shares * close_price
                    position = 1
                else:
                    shares = 0  # Can't afford even one share

    elif data.loc[prev_date, 'Sell_Signal'].item() and position == 1:
        # Exit position
        cash += shares * close_price
        shares = 0
        position = 0

    # Record portfolio value
    portfolio_value.append(float(cash + shares * close_price))

# Build portfolio value DataFrame
portfolio_df = pd.DataFrame(portfolio_value, index=data.index[1:], columns=['value'])

In [None]:
# -------------------------------
# 5. Calculate performance metrics
# -------------------------------
final_value = float(portfolio_value[-1])  # Ensure scalar
days = (data.index[-1] - data.index[0]).days
# cagr = ((final_value / initial_capital) ** (365.25 / days) - 1) if days > 0 else 0
returns = portfolio_df['value'].pct_change().dropna()
volatility = returns.std() * np.sqrt(252)
# sharpe_ratio = (cagr - 0.03) / volatility if volatility != 0 else 0

# Print results
print(f"\n{ticker} Trading Strategy Results")
print(f"Period: {start_date} to {end_date}")
print(f"Initial Capital: ${initial_capital:,.0f}")
print(f"Final Value: ${final_value:,.0f}")
# print(f"CAGR: {cagr:.2%}")
# print(f"Sharpe Ratio: {sharpe_ratio:.2f}")

In [None]:
# -------------------------------
# 6. Visualization
# -------------------------------
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8), sharex=True)

# Plot price and signals
ax1.plot(data.index, data['Close'], label='Close Price', color='black') # Close
ax1.plot(data.index, data['MA_20'], label='MA 20', color='blue')        # MA_20
ax1.plot(data.index, data['MA_50'], label='MA 50', color='red')         # MA_50

ax1.scatter(data.index[data['Buy_Signal']], data['Close'][data['Buy_Signal']], marker='^', color='green', label='Buy') #up-- Taiwan red
ax1.scatter(data.index[data['Sell_Signal']], data['Close'][data['Sell_Signal']], marker='v', color='red', label='Sell') # down-- Taiwan green

ax1.set_title(f'{ticker} Price and Signals')
ax1.set_ylabel('Price ($)')
ax1.legend()
ax1.grid(True)

# Plot portfolio value
ax2.plot(portfolio_df.index, portfolio_df['value'], label='Portfolio Value', color='blue')
ax2.set_title('Portfolio Value')
ax2.set_ylabel('Value ($)')
ax2.set_xlabel('Date')
ax2.legend()
ax2.grid(True)

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()