# kalman

In [None]:
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt

# Fetch historical stock data for Apple Inc. (AAPL)
ticker = "AAPL"
data = yf.download(ticker, start="2022-01-01", end="2023-01-01")
closing_prices = data['close'].values

In [None]:
# Kalman Filter implementation
n_timesteps = len(closing_prices)
dt = 1.0  # time step
A = np.array([[1, dt], [0, 1]])
H = np.array([[1, 0]])
Q = np.array([[1, 0], [0, 1]])  # process noise covariance
R = np.array([[10]])  # measurement noise covariance
x = np.zeros((2, n_timesteps))  # state vector [price, drift]
P = np.zeros((2, 2, n_timesteps))  # error covariance
filtered_price = np.zeros(n_timesteps)  # filtered prices

In [None]:
# Initial estimates
x[:, 0] = [closing_prices[0], 0]  # initial state estimate
P[:, :, 0] = np.eye(2) * 1000  # initial error covariance

for t in range(1, n_timesteps):
    # Prediction
    x[:, t] = A @ x[:, t-1]
    P[:, :, t] = A @ P[:, :, t-1] @ A.T + Q
    
    # Update
    K = P[:, :, t] @ H.T @ np.linalg.inv(H @ P[:, :, t] @ H.T + R)
    x[:, t] = x[:, t] + K @ (closing_prices[t] - H @ x[:, t])
    P[:, :, t] = (np.eye(2) - K @ H) @ P[:, :, t]
    
    # Store the filtered price
    filtered_price[t] = x[0, t]

# Plotting results
plt.figure(figsize=(14, 7))
plt.plot(closing_prices, label='Observed Prices')
plt.plot(filtered_price, label='Filtered Prices', linestyle='dashed')
plt.title(f'{ticker} Stock Prices: Observed vs. Filtered')
plt.xlabel('Time')
plt.ylabel('Price')
plt.legend()
plt.show()

# smc

Yes, Python can be used to implement Smart Money Concepts (SMC) trading strategies. Python is a versatile programming language with a rich ecosystem of libraries and tools for data analysis, financial modeling, and algorithmic trading. Here's how you can use Python to apply SMC trading strategies:

### Key Libraries and Tools

1. **Pandas**: For data manipulation and analysis.
2. **NumPy**: For numerical operations.
3. **Matplotlib/Plotly**: For data visualization.
4. **TA-Lib**: For technical analysis indicators.
5. **ccxt**: For accessing cryptocurrency exchange APIs.
6. **yfinance**: For accessing historical stock market data.
7. **Scikit-learn**: For machine learning (if needed).
8. **Backtrader**: For backtesting trading strategies.

### Workflow for Implementing SMC Trading in Python

1. **Data Collection**:
   - Use `ccxt` or `yfinance` to fetch historical market data.

2. **Market Structure Analysis**:
   - Identify swing highs and lows to understand market phases.

3. **Order Block Identification**:
   - Identify potential order blocks by finding significant price levels where large moves originated.

4. **Liquidity Pool Detection**:
   - Identify areas of liquidity by finding clusters of stop-loss orders.

5. **Break of Structure (BOS) and Fair Value Gap (FVG) Detection**:
   - Detect breaks in market structure and gaps in price movements.

6. **Visualization**:
   - Visualize the data to identify patterns and confirm signals.

7. **Backtesting**:
   - Use `Backtrader` to backtest the strategy.

In [None]:
import pandas as pd

data = pd.read_csv('eth.csv', index_col='timestamp')
data

In [None]:
def identify_swings(data, window=1):
         data['Swing_high'] = data['high'][(data['high'].shift(window) < data['high']) & (data['high'].shift(-window) < data['high'])]
         data['Swing_low'] = data['low'][(data['low'].shift(window) > data['low']) & (data['low'].shift(-window) > data['low'])]
         return data

identify_swings(data)

In [None]:
def find_order_blocks(data, threshold=0.01):
         data['Order_Block'] = (data['close'].pct_change().abs() > threshold)
         return data

find_order_blocks(data)

In [None]:
def identify_liquidity_pools(data):
         # Simple method: mark previous highs/lows as liquidity pools
         data['Liquidity_Pool_low'] = data['low'].rolling(window=20).min()
         data['Liquidity_Pool_high'] = data['high'].rolling(window=20).max()
         data['Liquidity_Pool'] = data['high'].rolling(window=20).max() - data['low'].rolling(window=20).min()
         return data

identify_liquidity_pools(data)

In [None]:
def detect_bos(data):
    data['BOS'] = (data['close'] > data['high'].shift(1)) | (data['close'] < data['low'].shift(1))
    return data

def detect_fvg(data):
    data['FVG'] = (data['high'].shift(1) - data['low']) > 0
    return data

detect_bos(data)

detect_fvg(data)


In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Initialize figure
fig = make_subplots(rows=1, cols=1)

# Add candlestick chart
fig.add_trace(go.Candlestick(x=data.index,
                             open=data['open'],
                             high=data['high'],
                             low=data['low'],
                             close=data['close'],
                             name='Candlestick'))

# Add swing highs and lows
fig.add_trace(go.Scatter(x=data.index, y=data['Swing_high'],
                         mode='markers',
                         marker=dict(color='red', symbol='triangle-up', size=10),
                         name='Swing high'))

fig.add_trace(go.Scatter(x=data.index, y=data['Swing_low'],
                         mode='markers',
                         marker=dict(color='green', symbol='triangle-down', size=10),
                         name='Swing low'))

# Customize layout
fig.update_layout(title='Market Data with Swing highs and lows',
                  xaxis_title='Date',
                  yaxis_title='Price',
                  template='plotly_dark')

# Show the figure
fig.show()

# w3

In [None]:
import requests

# Your Etherscan API key
etherscan_api_key = 'BWV52ES9ZBH4DCHCME95Q4C3K1UCI5E36R'
wallet_address = '0x9671CD3E1304B0aA66AD87C188bb475A60D7C748'

# API endpoint to get transaction list
url = f'https://api.etherscan.io/api?module=account&action=txlist&address={wallet_address}&startblock=0&endblock=99999999&sort=asc&apikey={etherscan_api_key}'

response = requests.get(url)
data = response.json()

if data['status'] == '1':
    transactions = data['result']
    for tx in transactions:
        print(f"Transaction Hash: {tx['hash']}")
        print(f"From: {tx['from']}")
        print(f"To: {tx['to']}")
        print(f"Value: {int(tx['value']) / 10**18} Ether")
        print(f"Block Number: {tx['blockNumber']}")
        print(f"Gas: {tx['gas']}")
        print(f"Gas Price: {int(tx['gasPrice']) / 10**9} Gwei")
        print("-" * 60)
else:
    print("Error fetching data from Etherscan")

# Total transactions count
print(f"Total transactions found: {len(transactions)}")


In [None]:
data['result']

In [None]:
# !wget -nc https://lazyprogrammer.me/course_files/SPY.csv

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import itertools

In [None]:
df = pd.read_csv('SPY.csv', index_col='Date', parse_dates=True)

In [None]:
df['FastSMA'] = df['Close'].rolling(16).mean()
df['SlowSMA'] = df['Close'].rolling(33).mean()


In [None]:
df['LogReturn'] = np.log(df['Close']).diff()

In [None]:
Ntest = 1000
train = df.iloc[:-Ntest].copy()
test = df.iloc[-Ntest:].copy()

In [None]:
class Env:
    def __init__(self, df, feats):
        self.df = df
        self.n = len(df)
        self.current_idx = 0
        self.action_space = [0, 1, 2]  # BUY, SELL, HOLD
        self.invested = 0
        
        self.states = self.df[feats].to_numpy()
        self.rewards = self.df['LogReturn'].to_numpy()
        self.total_buy_and_hold = 0

    def reset(self):
        self.current_idx = 0
        self.total_buy_and_hold = 0
        self.invested = 0
        return self.states[self.current_idx]

    def step(self, action):
        # need to return (next_state, reward, done)
        self.current_idx += 1
        
        if self.current_idx >= self.n:
            raise Exception("Episode already done")
        
        if action == 0:  # BUY
            self.invested = 1
        elif action == 1:  # SELL
            self.invested = 0
        
        # Compute reward
        if self.invested:
            reward = self.rewards[self.current_idx]
        else:
            reward = 0
        
        # State transition
        next_state = self.states[self.current_idx]

        self.total_buy_and_hold += self.rewards[self.current_idx]
        
        # Check if the episode is done
        done = self.current_idx >= self.n - 1
        
        return next_state, reward, done


In [None]:
class Agent:
    def __init__(self):
        self.is_invested = False

    def act(self, state):
        assert len(state) == 2  # Ensure state has two elements (fast, slow)

        # (fast, slow)
        if state[0] > state[1] and not self.is_invested:
            self.is_invested = True
            return 0  # Buy

        if state[0] < state[1] and self.is_invested:
            self.is_invested = False
            return 1  # Sell

        return 2  # Do nothing


In [None]:
def play_one_episode(agent, env):
    state = env.reset()
    done = False
    total_reward = 0
    agent.is_invested = False
    
    while not done:
        action = agent.act(state)
        next_state, reward, done = env.step(action)
        total_reward += reward
        state = next_state
    
    return total_reward


In [None]:
train_env = Env(train, ['FastSMA', 'SlowSMA'])
test_env = Env(test, ['FastSMA', 'SlowSMA'])
agent = Agent()

In [None]:
train_reward = play_one_episode(agent, train_env)

test_reward = play_one_episode(agent, test_env)

In [None]:
train_reward, train_env.total_buy_and_hold

In [None]:
test_reward, test_env.total_buy_and_hold