# **NeuroTimber: Estimating Stacked Timber Volume Using AI Detection and Diameter Distribution Models**

## Install necessary packages

In [None]:
!pip install numpy opencv-python-headless torch supervision IPython pandas scipy matplotlib

## Clone the YOLOv9 repository to access the models

In [None]:
!git clone https://github.com/SkalskiP/yolov9.git
%cd yolov9
!pip3 install -r requirements.txt -q

In [None]:
import numpy as np
from base64 import b64encode
import cv2
import torch
import supervision as sv
from models.common import DetectMultiBackend, AutoShape
from utils.torch_utils import select_device
from utils.general import set_logging
from supervision import Detections as BaseDetections
from supervision.config import CLASS_NAME_DATA_FIELD
from IPython.display import HTML
import pandas as pd
from scipy import stats
import warnings
import matplotlib.pyplot as plt

warnings.filterwarnings('ignore')

## Define paths

In [None]:
weights = '/content/yolov9-c-wooddetection-converted.pt'
data = '/content/wooddetection.yaml'
SOURCE_VIDEO_PATH = '/content/Test_Video.mp4'
TARGET_VIDEO_PATH = '/content/Demo_Video.mp4'
data_path = '/content/dados.xlsx'

## AI Log Detection

In [None]:
# Extending Supervision's `Detections` to Handle YOLOv9 Results
class ExtendedDetections(BaseDetections):
    @classmethod
    def from_yolov9(cls, yolov9_results) -> 'ExtendedDetections':
        xyxy, confidences, class_ids = [], [], []

        for det in yolov9_results.pred:
            for *xyxy_coords, conf, cls_id in reversed(det):
                xyxy.append(torch.stack(xyxy_coords).cpu().numpy())
                confidences.append(float(conf))
                class_ids.append(int(cls_id))

        class_names = np.array([yolov9_results.names[i] for i in class_ids])

        if not xyxy:
            return cls.empty()

        return cls(
            xyxy=np.vstack(xyxy),
            confidence=np.array(confidences),
            class_id=np.array(class_ids),
            data={CLASS_NAME_DATA_FIELD: class_names},
        )

# Loading the Model
set_logging(verbose=False)
device = select_device('cuda:0')  # Change 'cpu' to 'cuda' to use GPU
model = DetectMultiBackend(weights= weights, device=device, data= data, fuse=True)
model = AutoShape(model)

# Function to Set YOLOv9 Post-processing Parameters
def prepare_yolov9(model, conf=0.2, iou=0.7, classes=None, agnostic_nms=False, max_det=1000):
    model.conf = conf
    model.iou = iou
    model.classes = classes
    model.agnostic = agnostic_nms
    model.max_det = max_det
    return model

# Function to Play Videos
def play(filename, width=500):
    html = ''
    video = open(filename, 'rb').read()
    src = 'data:video/mp4;base64,' + b64encode(video).decode()
    html += fr'' % src
    return HTML(html)

# Constants
SOURCE_VIDEO_PATH = SOURCE_VIDEO_PATH
TARGET_VIDEO_PATH = TARGET_VIDEO_PATH

# Simple Object Detection with YOLOv9 and Supervision
def prepare_model_and_video_info(model, config, source_path):
    model = prepare_yolov9(model, **config)
    video_info = sv.VideoInfo.from_video_path(source_path)
    return model, video_info

def setup_annotator():
    return sv.BoundingBoxAnnotator(thickness=2)

def simple_annotate_frame(frame, model, annotator):
    frame_rgb = frame[..., ::-1]
    results = model(frame_rgb, size=640, augment=False)
    detections = ExtendedDetections.from_yolov9(results)

    # Annotate the frame with detections
    annotated_frame = annotator.annotate(scene=frame.copy(), detections=detections)

    return annotated_frame

def simple_process_video(model, config=dict(conf=0.1, iou=0.45, classes=None,), source_path=SOURCE_VIDEO_PATH, target_path=TARGET_VIDEO_PATH):
    model, _ = prepare_model_and_video_info(model, config, source_path)
    annotator = setup_annotator()

    def callback(frame: np.ndarray, index: int) -> np.ndarray:
        return simple_annotate_frame(frame, model, annotator)

    sv.process_video(source_path=source_path, target_path=target_path, callback=callback)

# Advanced Detection, Tracking, and Counting with YOLOv9 and Supervision

class GlobalCounter:
    def __init__(self):
        self.total_count = 0
        self.tracker_ids = set()

    def update(self, detections):
        for tracker_id in detections.tracker_id:
            if tracker_id not in self.tracker_ids:
                self.tracker_ids.add(tracker_id)
                self.total_count += 1

def setup_model_and_video_info(model, config, source_path):
    model = prepare_yolov9(model, **config)
    video_info = sv.VideoInfo.from_video_path(source_path)
    return model, video_info

def create_byte_tracker(video_info):
    return sv.ByteTrack(track_activation_threshold=0.25, lost_track_buffer=250, minimum_matching_threshold=0.95, frame_rate=video_info.fps)

def setup_annotators():
    round_box_annotator = sv.RoundBoxAnnotator(thickness=2, color_lookup=sv.ColorLookup.TRACK)
    label_annotator = sv.LabelAnnotator(text_scale=0.5, color_lookup=sv.ColorLookup.TRACK)
    return round_box_annotator, label_annotator

def annotate_frame(frame, index, video_info, detections, byte_tracker, round_box_annotator, label_annotator, show_labels, model, global_counter):
    detections = byte_tracker.update_with_detections(detections)
    annotated_frame = frame.copy()

    # Draw the bounding boxes
    annotated_frame = round_box_annotator.annotate(scene=annotated_frame, detections=detections)

    # Add labels to the detections
    if show_labels:
        annotated_frame = add_labels_to_frame(label_annotator, annotated_frame, detections, model)

    # Update the global counter and draw it in the top-right corner
    global_counter.update(detections)
    annotated_frame = draw_counter(annotated_frame, global_counter.total_count)

    return annotated_frame

def add_labels_to_frame(annotator, frame, detections, model):
    labels = [f"#{tracker_id} {model.model.names[class_id]} {confidence:0.2f}" for confidence, class_id, tracker_id in zip(detections.confidence, detections.class_id, detections.tracker_id)]
    return annotator.annotate(scene=frame, detections=detections, labels=labels)

def draw_counter(frame, count):
    text = f"Total Count: {count}"
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    color = (255, 255, 255)
    thickness = 2
    size = cv2.getTextSize(text, font, font_scale, thickness)[0]
    x = frame.shape[1] - size[0] - 10
    y = size[1] + 10
    cv2.putText(frame, text, (x, y), font, font_scale, color, thickness)
    return frame

def process_video(model, config=dict(conf=0.1, iou=0.45, classes=True,), show_labels=True, source_path=SOURCE_VIDEO_PATH, target_path=TARGET_VIDEO_PATH):
    model, video_info = setup_model_and_video_info(model, config, source_path)
    byte_tracker = create_byte_tracker(video_info)
    round_box_annotator, label_annotator = setup_annotators()
    global_counter = GlobalCounter()

    def callback(frame: np.ndarray, index: int) -> np.ndarray:
        frame_rgb = frame[..., ::-1]
        results = model(frame_rgb, size=608, augment=False)
        detections = ExtendedDetections.from_yolov9(results)

        # Annotate the frame with detections
        annotated_frame = annotate_frame(frame, index, video_info, detections, byte_tracker, round_box_annotator, label_annotator, show_labels, model, global_counter)

        return annotated_frame

    sv.process_video(source_path=source_path, target_path=target_path, callback=callback)

    # Display the total count of logs
    print(f"Total logs counted: {global_counter.total_count}")

    return global_counter.total_count

# Detection, Tracking, and Counting in Full Frame
yolov9_config=dict(conf=0.3, iou=0.45, classes=[0])
total_logs = process_video(model, config=yolov9_config, show_labels=False, target_path= TARGET_VIDEO_PATH)

cv2.destroyAllWindows()  # Close all OpenCV windows after processing

## Diameter Distribution Models

In [None]:
# Load the data for fitting distributions
data_path = data_path
data = pd.read_excel(data_path)

# Assuming the data is in the first column of the Excel sheet
data_column = data.columns[0]
data_values = data[data_column].dropna().values

# List of specific distributions to check
distributions = [
    'genextreme', 'powernorm', 'pearson3', 'genhyperbolic', 'johnsonsu', 'truncnorm',
    'skewnorm', 'nct', 'gennorm', 'exponnorm', 'norm', 'dweibull', 'dgamma', 't'
]

# Function to calculate the best fit distributions using AIC
def best_fit_distributions(data, distributions, top_n=5):
    distribution_scores = []

    for distribution in distributions:
        dist = getattr(stats, distribution)
        try:
            params = dist.fit(data)
            # Calculate log likelihood
            log_likelihood = np.sum(dist.logpdf(data, *params[:-2], loc=params[-2], scale=params[-1]))
            # Number of parameters
            k = len(params)
            # Calculate AIC
            aic = 2 * k - 2 * log_likelihood
            # Append distribution, parameters, and AIC to the list
            distribution_scores.append((dist, params, aic))
        except Exception:
            continue

    # Sort by AIC and select the top_n distributions
    distribution_scores.sort(key=lambda x: x[2])
    return distribution_scores[:top_n]

# Find the top 5 best fit distributions
top_distributions = best_fit_distributions(data_values, distributions, top_n=5)

# Print the best distributions and their parameters
for i, (dist, params, aic) in enumerate(top_distributions):
    print(f'Rank {i+1}: {dist.name} with AIC: {aic}')
    print(f'Parameters: {params}')

# Prepare to plot the top 5 best fit distributions
plt.figure(figsize=(10, 6))
plt.hist(data_values, bins=30, density=True, alpha=0.5, color='b', label='Data')
xmin, xmax = plt.xlim()
x = np.linspace(xmin, xmax, 100)

# x_values for calculating probabilities
x_values = np.arange(1, 36)

# DataFrame to store the probabilities of each distribution
results = pd.DataFrame(x_values, columns=['Value'])

for i, (dist, params, aic) in enumerate(top_distributions):
    p = dist.pdf(x_values, *params[:-2], loc=params[-2], scale=params[-1])
    results[f'Rank {i+1}: {dist.name}'] = p
    plt.plot(x_values, p, linewidth=2, label=f'Rank {i+1}: {dist.name}')

plt.title('Top 5 Best Fit Distributions')
plt.xlabel('Data values')
plt.ylabel('Density')
plt.legend()
plt.savefig('/content/top_5_best_fit_distributions.png')  # Save the plot
plt.close()  # Close the plot

# Save the probabilities to a CSV file
results.to_csv('/content/probabilities_top_5.csv', index=False)

# Display the results
print(results)

## Estimating Stacked Timber Volume

In [None]:
# Calculate the estimated volume for each class
log_length = 6  # Customize log length as needed
total_volume_estimated = 0

for index, row in results.iterrows():
    class_number = row['Value']
    probability = row['Rank 1: genextreme'] # Replace with your distribuition model selected
    log_count = total_logs  # Replace with the actual count of logs for this class if available. default: total_logs

    # Volume observed for each class
    volume_observed = (np.pi / 40000) * (class_number ** 2) * log_length

    # Estimated volume for each class
    volume_estimated = probability * log_count * volume_observed

    total_volume_estimated += volume_estimated

print(f'Total estimated volume: {total_volume_estimated} cubic meters')