# INITIALISATION 

## Premium Absorption Signal: 
A sharp buy volume spike (on your middle chart) occurs precisely as the price hits the cyan line (upper bound) of the SELL volume profile. If the price fails to break through, this is your highest-probability short signal. (The inverse is true for a long).

## Imminent V-Reversal: 
The yellow and cyan lines on a profile overlap or cross. This signals the structure has changed from a "zone" to a single, critical "Point of Control". Mark this price; it's a magnet and a powerful pivot for sharp reversals.

## Trend Continuation Setup:
The profile is split/inverted (e.g., yellow line is below the cyan line). Do not look for a reversal. Instead, watch for the price to pull back to the edge of the nearest value area and look for a continuation entry in the direction of the initial trend.

## Bullish Structure Confirmation: 
The primary HVN (zone between yellow/cyan) of the Buy Profile is at a lower price than the HVN of the Sell Profile. Favor long entries on pullbacks to the Buy Profile's value area.

## Bearish Structure Warning (Bull Trap): 
The primary HVN of the Buy Profile is above the HVN of the Sell Profile. Be very cautious with longs. This is a setup for a potential failure, making short entries on failed rallies more attractive.

## Acceptance in a "Red Zone" (LVN):
 Price normally moves quickly through red zones (Low-Volume Nodes). If the price stalls and begins to build volume within a red zone, it's a major change. The market is now accepting prices it previously rejected, often preceding a new trend.

## High-Volume Stall:
 You see a massive spike on your middle "DIFF" chart, but the price itself stops moving. This indicates a fierce battle. Don't trade during the stall; trade in the direction of the winner once price breaks out of this tight range.

## Diminishing Volume on Repeated Tests: 
Price attacks a key boundary (e.g., a cyan line) multiple times, but each corresponding volume spike is weaker than the last. This shows buyer/seller exhaustion and increases the probability that the next test will fail and reverse.

## The "Ledge" Test: 
A profile isn't smooth but has a sharp drop-off, creating a high-volume "ledge" above a low-volume area. These ledges are often weak structurally. A price break below the ledge is often a high-momentum trade targeting the next high-volume area below.

## The Gap Jump: 
After a profile splits, if the price jumps from the top of the lower value area to the bottom of the higher value area, watch the boundaries of that gap. A future re-test of the edge it broke away from is a classic trend-following entry point.

In [None]:
import pyqtgraph as pg
import pandas as pd
import numpy as np

# ======== INITIALISATION ============
app = pg.mkQApp()

#smadiff = calc_diff(instance.ltpDf)

win_buy = pg.GraphicsLayoutWidget(title="Buy Heatmap & Trends")
win_buy.setBackground("white")
# CREATE PLOTS 
plot_buy = win_buy.addPlot(title="Buy Side")
plot_diff = win_buy.addPlot(title="DIFF",row=1, col=0)
plot_sell = win_buy.addPlot(title="SELL",row=2, col=0)
# link x axes for plots
plot_buy.setXLink(plot_diff)
plot_diff.setXLink(plot_sell)
win_buy.show()
# ======== HELPER FUNCTIONS ========
def addHeatMap(plot,data):
    # Create a colormap with transparency
    """
    colors = [
        (1, (0, 255, 0, 200)), # Green with light alpha
        (0, (255, 0, 0, 200))    # Red with light alpha
    ]
        
    cmap = pg.ColorMap(pos=np.array([c[0] for c in colors]), color=np.array([c[1] for c in colors], dtype=np.ubyte))
    img = pg.ImageItem(data)
    img.setOpacity(.5)
    lut = cmap.getLookupTable(nPts=256, alpha=True)  # Enable alpha
    img.setLookupTable(lut)
    plot.addItem(img)
    return img"""
    
    lut = np.array([
        [254,   79,   92,   200],  # 0: red (0,0)
        [198, 158,   81,   200],  # 1: brown   (0,1)
        [146, 194,   79,   200],  # 2: green (1,0)
        [7,   235, 59,   200],  # 3: neon green  (1,1)
    ], dtype=np.ubyte)

    # Create the ImageItem
    img = pg.ImageItem(data)

    # Assign the LUT
    img.setLookupTable(lut)

    # No need for setOpacity()—alpha comes from LUT
    plot.addItem(img)
    return img
def getXY(data: pd.Series):
    if data is None or data.empty or data.shape == 0:
        return [], []
    if isinstance(data, pd.DataFrame):
    # Take first column automatically
        data = data.iloc[:, 0]
    x = instance.ltpDf.index.to_numpy()
    
    # Convert to numeric if data is not already float or int
    if data.dtypes not in [np.float64, np.int64]:  
        y = pd.to_numeric(data, errors='coerce').to_numpy()
    else:
        y = data.to_numpy()

    # Ensure x and y have the same length
    min_len = min(len(x), len(y))
    x, y = x[:min_len], y[:min_len]  

    # Create mask and apply it safely
    mask = ~np.isnan(y)
    mask = mask[:min_len]  # Ensure mask has the same length as x and y

    return x[mask], y[mask]  # Filter out NaN values        
def addlinePlot(plot,linewidth,data:pd.Series,name,color=None):
    x,y = getXY(data)
    if color==None:
        return plot.plot(x,y,pen=pg.mkPen( width=linewidth,name=name))
    return plot.plot(x,y,pen=pg.mkPen(color=color, width=linewidth,name=name))
def addScatterPlot(plot,size,data,color):
    x = [x[0]for x in data]
    y = instance.ltpDf['ltp'].loc[x]
    scatter = pg.ScatterPlotItem(x=x, y=y, pen=pg.mkPen(color=color), symbol='x',
                             brush=pg.mkBrush(0, 0, 255, 120), size=size)
    return plot.addItem(scatter)


# ===== CREATE ELEMENTS ========
#BUY
instance.hmap_buy = addHeatMap(plot_buy, instance.combineddf[0].to_numpy().T)
instance.line_upper_1_buy = addlinePlot(plot_buy, linewidth=4, data=instance.lowHighMaxes[0], name='Buy Uptrend', color='#097969')
instance.line_lower_1_buy = addlinePlot(plot_buy, linewidth=4, data=instance.HighlowMaxes[0], name='Buy Downtrend', color='#fbd604')
instance.line_ltp_buy = addlinePlot(plot_buy, linewidth=2, data=instance.ltpDf['ltp'], name='LTP', color=(0, 0, 255))
#SELL
instance.hmap_sell = addHeatMap(plot_sell, instance.combineddf[1].to_numpy().T)
instance.line_upper_1_buy = addlinePlot(plot_sell, linewidth=4, data=instance.lowHighMaxes[1], name='Buy Uptrend', color='#097969')
instance.line_lower_1_buy = addlinePlot(plot_sell, linewidth=4, data=instance.HighlowMaxes[1], name='Buy Downtrend', color='#fbd604')
instance.line_ltp_sell = addlinePlot(plot_sell, linewidth=2, data=instance.ltpDf['ltp'], name='LTP', color=(0, 0, 255))
#DIFF
#instance.vol_diff_50 = addlinePlot(plot_diff, linewidth=4, data=instance.voldiff_50, name='VolDiff_buy', color='#fdd750')
#instance.vol_diff_20 = addlinePlot(plot_diff, linewidth=4, data=instance.voldiff_20, name='VolDiff_sell', color='#097969')
#instance.scatter_buy = addScatterPlot(plot_buy,1,findPeaks(instance.voldiff_buy),color="#0BF05BFF")
#instance.scatter_buy = addScatterPlot(plot_sell,10,findPeaks(instance.voldiff_buy),color="#F00B0BFF")
# ======== UPDATE VALUES ========
#heatmap
instance.hmap_buy.setImage(instance.combineddf[0].to_numpy().T, autoLevels=False)
instance.hmap_sell.setImage(instance.combineddf[1].to_numpy().T, autoLevels=False)
min_ltp = instance.ltpDf['ltp'].min()
n = len(instance.ltpDf)
m = len(instance.aggDf)
instance.hmap_buy.setRect(0, min_ltp, n, m)
instance.hmap_sell.setRect(0, min_ltp, n, m)

#ltp
x,y_ltp = getXY(instance.ltpDf['ltp'])
instance.line_ltp_buy.setData(x,y_ltp)
instance.line_ltp_sell.setData(x,y_ltp)
#voldiff buy
#x,y = getXY(instance.voldiff_50[0])
#instance.vol_diff_50.setData(x, y)
#voldiff sell
#x,y = getXY(instance.voldiff_20[0])
#instance.vol_diff_20.setData(x, y)

# ======= SETUP ==========
plot_diff.showGrid(x=True, y=True)
plot_buy.showGrid(x=True,y=True)
plot_sell.showGrid(x=True,y=True)
app.exec()


KeyboardInterrupt: 

# Premium Absorption

In [1]:
import StockAnalyser
class PremiumAbsorption(StockAnalyser.Cumulative_Support):
    def __init__(self):
        super().__init__()
        self.monitoring= False
        self.trades_to_collect = 10
        self.collected_trades = []
        # implement timestamp based exit too
        self.shorts = []
        self.riding_peak=False
        self.mean_range = 50
        self.multiplier = 3
        self.dominance_ratio = 2

    def Check(self,vol_type,vol,index,ltp):
        peak = self.peak() # keep monitoring for peak continually. 

        if self.monitoring:
            print("we're monitoring")
            self.collected_trades.append((vol_type,vol))
            if len(self.collected_trades) == self.trades_to_collect:
                rejection = self.Analyse()
                if rejection:
                    print(f'we got a rejection boys {index}:{ltp}')
                    self.shorts.append((index,ltp))
                else:
                    print(f'no rejection at {index}:{ltp}')
                self.reset()
                
        else:
            inResistance = self.inResistance(ltp)
            if inResistance:
                print(f"we're in resistance,{index}:{ltp}")
            
            if inResistance and peak:
                print("peak and spike detected")
                self.monitoring = True
                self.collected_trades.clear()
                print("spike detected, looking for premium absorption")

    def peak(self):
        if len(self.voldiff_buy)<self.mean_range:
            return False
        mean = self.voldiff_buy.iloc[-self.mean_range:].mean()[0]
        std = self.voldiff_buy.iloc[-self.mean_range:].std()[0]
        if self.voldiff_buy.iloc[-1][0]>mean+std*self.multiplier and not self.riding_peak:
            self.riding_peak=True
            return True
        if self.voldiff_buy.iloc[-1][0]<=mean+std*self.multiplier and self.riding_peak:
            self.riding_peak=False
            #print(f"peak ended at {self.voldiff_buy.index[-1]},ltpvalue:{self.voldiff_buy.iloc[-1]}\n")
            return False
         
    def reset(self):
        self.monitoring = False
        self.trades_to_collect = 9
        self.collected_trades = [] #(0|1,vol) 0 for buy, 1 for sell
        
    def Analyse(self):
        total_sell = sum([x[1] for x in self.collected_trades if x[0]=='s'])
        total_buy = sum([x[1] for x in self.collected_trades if x[0]=='b'])
        if total_buy ==0:
            return True # total seller dominance 
        
        return total_sell/total_buy>self.dominance_ratio # only return true if sellers dominate significantly. 

    def inResistance(self,current_ltp):
        return self.combineddf[1].iloc[:,-1].loc[current_ltp]==2

    def parse(self,message):
        super().parse(message)
        self.Check(self.ltp_type,self.delta,len(self.ltpDf),self.ltpDf.iloc[-1]['ltp'])
        


# MODIFIED: Renamed to a general-purpose class
class AbsorptionSignalFinder(StockAnalyser.Cumulative_Support):
    def __init__(self):
        super().__init__()
        # MODIFIED: Upgraded monitoring to a state string
        self.monitoring_state = "IDLE" 
        self.trades_to_collect = 10
        self.collected_trades = []
        
        # We now track both long and short signals
        self.shorts = []
        self.longs = []
        
        # MODIFIED: Separate state flags for each peak type
        self.riding_buy_peak = False
        self.riding_sell_peak = False
        
        self.mean_range = 50
        self.multiplier = 3
        self.dominance_ratio = 2

    def Check(self, vol_type, vol, index, ltp):
        # --- PART 1: HANDLE ACTIVE MONITORING FIRST ---
        if self.monitoring_state != "IDLE":
            self.collected_trades.append((vol_type, vol))
            
            if len(self.collected_trades) >= self.trades_to_collect:
                # Analyze trades based on which signal we were monitoring
                direction = "SHORT" if self.monitoring_state == "MONITORING_SHORT" else "LONG"
                result = self.Analyse(direction)
                
                if result:
                    print(f'ABSORPTION CONFIRMED ({direction}) at {index}:{ltp}')
                    if direction == "SHORT":
                        self.shorts.append((index, ltp))
                    else:
                        self.longs.append((index, ltp))
                else:
                    print(f'No absorption confirmed at {index}:{ltp}')
                
                self.reset()
            return # Do nothing else while monitoring

        # --- PART 2: IF IDLE, LOOK FOR NEW SIGNALS ---
        # Check for a short setup
        in_res = self.inResistance(ltp)
        if in_res and self.buy_peak():
            print("Buy spike in resistance zone! Starting SHORT monitoring...")
            self.monitoring_state = "MONITORING_SHORT"
            self.collected_trades.clear()
            return

        # Check for a long setup
        in_sup = self.inSupport(ltp)
        if in_sup and self.sell_peak():
            print("Sell spike in support zone! Starting LONG monitoring...")
            self.monitoring_state = "MONITORING_LONG"
            self.collected_trades.clear()
            return
            
        # If price is not in a zone, reset peak flags to allow re-triggering later
        if not in_res: self.riding_buy_peak = False
        if not in_sup: self.riding_sell_peak = False

    # MODIFIED: Renamed for clarity, checks for buy volume spike
    def buy_peak(self):
        if len(self.voldiff_buy) < self.mean_range: return False
        mean = self.voldiff_buy.iloc[-self.mean_range:].mean()[0]
        std = self.voldiff_buy.iloc[-self.mean_range:].std()[0]
        
        if self.voldiff_buy.iloc[-1][0] >= mean + std * self.multiplier and not self.riding_buy_peak:
            self.riding_buy_peak = True
            return True
        return False

    # MODIFIED: Renamed for clarity, checks for sell volume spike
    def sell_peak(self):
        if len(self.voldiff_sell) < self.mean_range: return False
        mean = self.voldiff_sell.iloc[-self.mean_range:].mean()[0]
        std = self.voldiff_sell.iloc[-self.mean_range:].std()[0]
        
        if self.voldiff_sell.iloc[-1][0] >= mean + std * self.multiplier and not self.riding_sell_peak:
            self.riding_sell_peak = True
            return True
        return False
         
    def reset(self):
        self.monitoring_state = "IDLE"
        self.collected_trades = []
        
    # MODIFIED: Now takes a 'direction' to know what to look for
    def Analyse(self, direction):
        total_sell = sum(x[1] for x in self.collected_trades if x[0] == 's')
        total_buy = sum(x[1] for x in self.collected_trades if x[0] == 'b')
        
        if direction == "SHORT":
            if total_buy == 0: return total_sell > 0
            return total_sell / total_buy >= self.dominance_ratio
            
        elif direction == "LONG":
            if total_sell == 0: return total_buy > 0
            return total_buy / total_sell >= self.dominance_ratio
        
        return False

    def inResistance(self, current_ltp):
        return self.combineddf[1].iloc[:, -1].loc[current_ltp] == 2

    def inSupport(self, current_ltp):
        return self.combineddf[0].iloc[:, -1].loc[current_ltp] == 2

    def parse(self, message):
        super().parse(message)
        self.Check(self.ltp_type, self.delta, len(self.ltpDf), self.ltpDf.iloc[-1]['ltp'])

In [2]:
SIMULATION_DATE = "2025-08-04"

from config import r,STOCKS,FILEPATH,s3,EXCHANGE
import os
def download_file():
    if not os.path.exists(f'{FILEPATH}/{EXCHANGE}/{STOCKS[0]}/{SIMULATION_DATE}.csv'):
        try:
            s3.download_file(Bucket='kite', Key=f'{EXCHANGE}/{STOCKS[0]}/{SIMULATION_DATE}.csv', Filename=f'{FILEPATH}/{EXCHANGE}/{STOCKS[0]}/{SIMULATION_DATE}.csv')
            print(f"[MAIN] Downloaded {EXCHANGE}:{STOCKS[0]} for {SIMULATION_DATE}")
        except Exception as e:
            raise Exception("file not found,choose another date.")
download_file()

from simulator import get_all_tick_data
df = get_all_tick_data(SIMULATION_DATE)

from StockAnalyser import Cumulative_Support
#instance = Cumulative_Support()
instance = AbsorptionSignalFinder()
import pandas as pd
for row in df.loc[:].rolling(1):
    instance.parse(row.iloc[0].to_dict())

[INFO] loaded data into memory for simulation


  self.ltpDf = pd.concat([self.ltpDf, pd.DataFrame(new_record)], ignore_index=True)


# SIMULATE

In [None]:
import pyqtgraph as pg
import pandas as pd
import numpy as np

def calc_diff(ltpdf):

    return ltpdf['sell-vol'].rolling(10).sum()-ltpdf['buy-vol'].rolling(10).sum()


# ======== INITIALISATION ============
app = pg.mkQApp()
sma_diff = calc_diff(instance.ltpDf)
win_buy = pg.GraphicsLayoutWidget(title="Buy Heatmap & Trends")
win_buy.setBackground("white")
# CREATE PLOTS 
plot_buy = win_buy.addPlot(title="Buy Side")
plot_diff = win_buy.addPlot(title="DIFF",row=1, col=0)
plot_sell = win_buy.addPlot(title="SELL",row=2, col=0)
# link x axes for plots
plot_buy.setXLink(plot_diff)
plot_diff.setXLink(plot_sell)
plot_sell.setYLink(plot_buy)
win_buy.show()
# ======== HELPER FUNCTIONS ========
def addHeatMap(plot,data):
    # Create a colormap with transparency
    """
    colors = [
        (1, (0, 255, 0, 200)), # Green with light alpha
        (0, (255, 0, 0, 200))    # Red with light alpha
    ]
        
    cmap = pg.ColorMap(pos=np.array([c[0] for c in colors]), color=np.array([c[1] for c in colors], dtype=np.ubyte))
    img = pg.ImageItem(data)
    img.setOpacity(.5)
    lut = cmap.getLookupTable(nPts=256, alpha=True)  # Enable alpha
    img.setLookupTable(lut)
    plot.addItem(img)
    return img"""
    
    lut = np.array([
        [254,   79,   92,   200],  # 0: red (0,0)
        [198, 158,   81,   200],  # 1: brown   (0,1)
        [146, 194,   79,   200],  # 2: green (1,0)
        [7,   235, 59,   200],  # 3: neon green  (1,1)
    ], dtype=np.ubyte)

    # Create the ImageItem
    img = pg.ImageItem(data)

    # Assign the LUT
    img.setLookupTable(lut)

    # No need for setOpacity()—alpha comes from LUT
    plot.addItem(img)
    return img
def getXY(data: pd.Series):
    if data is None or data.empty or data.shape == 0:
        return [], []
    if isinstance(data, pd.DataFrame):
    # Take first column automatically
        data = data.iloc[:, 0]
    x = instance.ltpDf.index.to_numpy()
    
    # Convert to numeric if data is not already float or int
    if data.dtypes not in [np.float64, np.int64]:  
        y = pd.to_numeric(data, errors='coerce').to_numpy()
    else:
        y = data.to_numpy()

    # Ensure x and y have the same length
    min_len = min(len(x), len(y))
    x, y = x[:min_len], y[:min_len]  

    # Create mask and apply it safely
    mask = ~np.isnan(y)
    mask = mask[:min_len]  # Ensure mask has the same length as x and y

    return x[mask], y[mask]  # Filter out NaN values        
def addlinePlot(plot,linewidth,data:pd.Series,name,color=None):
    x,y = getXY(data)
    if color==None:
        return plot.plot(x,y,pen=pg.mkPen( width=linewidth,name=name))
    return plot.plot(x,y,pen=pg.mkPen(color=color, width=linewidth,name=name))
def addScatterPlot(plot,size,data,color):
    x = [x[0]for x in data]
    y = instance.ltpDf['ltp'].loc[x]
    scatter = pg.ScatterPlotItem(x=x, y=y, pen=pg.mkPen(color=color), symbol='x',
                             brush=pg.mkBrush(0, 0, 255, 120), size=size)
    return plot.addItem(scatter)


# ===== CREATE ELEMENTS ========
#BUY
instance.hmap_buy = addHeatMap(plot_buy, instance.combineddf[0].to_numpy().T)
instance.line_upper_1_buy = addlinePlot(plot_buy, linewidth=4, data=instance.lowHighMaxes[0], name='Buy Uptrend', color='#097969')
instance.line_lower_1_buy = addlinePlot(plot_buy, linewidth=4, data=instance.HighlowMaxes[0], name='Buy Downtrend', color='#fbd604')
instance.line_ltp_buy = addlinePlot(plot_buy, linewidth=2, data=instance.ltpDf['ltp'], name='LTP', color=(0, 0, 255))
#SELL
instance.hmap_sell = addHeatMap(plot_sell, instance.combineddf[1].to_numpy().T)
instance.line_upper_1_sell = addlinePlot(plot_sell, linewidth=4, data=instance.lowHighMaxes[1], name='Sell Uptrend', color='#097969')
instance.line_lower_1_sell = addlinePlot(plot_sell, linewidth=4, data=instance.HighlowMaxes[1], name='Sell Downtrend', color='#fbd604')
instance.line_ltp_sell = addlinePlot(plot_sell, linewidth=2, data=instance.ltpDf['ltp'], name='LTP', color=(0, 0, 255))
#DIFF
instance.vol_diff_50 = addlinePlot(plot_diff, linewidth=4, data=instance.voldiff_buy, name='VolDiff_buy', color='#fdd750')
instance.vol_diff_20 = addlinePlot(plot_diff, linewidth=4, data=instance.voldiff_sell, name='VolDiff_sell', color='#097969')
instance.vol_diff_sma = addlinePlot(plot_diff, linewidth=2, data=sma_diff, name='VolDiff_sma', color=(0, 0, 255))
#SCATTER PLOTS
#instance.scatter_buy = addScatterPlot(plot_buy,15,findPeaks(instance.voldiff_buy),color="#0BF05BFF")
instance.scatter_buy = addScatterPlot(plot_buy,15,instance.longs,color="#0BF05BFF")
instance.scatter_sell = addScatterPlot(plot_sell,15,instance.shorts,color="#F00B0BFF")

# ======== UPDATE VALUES ========
#heatmap
instance.hmap_buy.setImage(instance.combineddf[0].to_numpy().T, autoLevels=False)
instance.hmap_sell.setImage(instance.combineddf[1].to_numpy().T, autoLevels=False)
min_ltp = instance.ltpDf['ltp'].min()
n = len(instance.ltpDf)
m = len(instance.aggDf)
instance.hmap_buy.setRect(0, min_ltp, n, m)
instance.hmap_sell.setRect(0, min_ltp, n, m)

#ltp
x,y_ltp = getXY(instance.ltpDf['ltp'])
instance.line_ltp_buy.setData(x,y_ltp)
instance.line_ltp_sell.setData(x,y_ltp)
#voldiff buy
x,y = getXY(instance.voldiff_buy[0])
instance.vol_diff_50.setData(x, y)
#voldiff sell
x,y = getXY(instance.voldiff_sell[0])
instance.vol_diff_20.setData(x, y)
## mean diff:
x,y = getXY(sma_diff)
instance.vol_diff_sma.setData(x, y)


# ======= SETUP ==========
plot_diff.showGrid(x=True, y=True)
plot_buy.showGrid(x=True,y=True)
plot_sell.showGrid(x=True,y=True)
app.exec()
