<a href="https://colab.research.google.com/github/Aliahmadjangohar/Aliahmadjangohar/blob/main/posedetection%7C_converted%2Bpytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# -*- coding: utf-8 -*-
"""Untitled61.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1DE0mUH1__IJ70ga-9CgFD1TzSL7B9t5W
"""

#!/usr/bin/env python3

"""
 script that:

1) Downloads a TensorFlow 1.x checkpoint (ResNet-101 + Pose model),
2) Loads it via TensorFlow,
3) Converts weights to a PyTorch ResNet-101 backbone,
4) Defines a simple Pose Refinement head,
5) Demonstrates how to generate Gaussian heatmaps,
6) And finally does a forward pass to produce refined coordinates.

Author: <Your Name>
Date:   <Date>
"""

import os
import json
import numpy as np
import cv2  # If you want to use OpenCV-based generation (optional)
import torch
import torch.nn as nn
import torch.nn.functional as F
import tensorflow as tf  # Must be TF 1.x style for load_checkpoint
from typing import List, Tuple
import matplotlib.pyplot as plt
# -----------------------------
# 0) Download the TF1.x checkpoint
# -----------------------------
# Install gdown if it's not already installed
!pip install --upgrade gdown --quiet

# Create a model directory (if not exists)
!mkdir -p model

# Download the snapshot-515000.zip from Google Drive by file ID
# (ID: 1BEDB_zX8-0XrpdBimYAN4GwaAVAUrrBJ)
!gdown --id 1BEDB_zX8-0XrpdBimYAN4GwaAVAUrrBJ -O snapshot-515000.zip

# Unzip into the 'model' folder
!unzip -q snapshot-515000.zip -d model/

# Remove the ZIP to clean up
!rm snapshot-515000.zip

print("Done! The weights are now in the 'model' folder.")



Downloading...
From (original): https://drive.google.com/uc?id=1BEDB_zX8-0XrpdBimYAN4GwaAVAUrrBJ
From (redirected): https://drive.google.com/uc?id=1BEDB_zX8-0XrpdBimYAN4GwaAVAUrrBJ&confirm=t&uuid=0a91a5fb-e2e8-4eb4-a670-b1625c9fff53
To: /content/snapshot-515000.zip
100% 162M/162M [00:01<00:00, 95.9MB/s]
Done! The weights are now in the 'model' folder.


In [None]:

# -----------------------------
# 1) Show checkpoint contents
# -----------------------------
def show_checkpoint_contents():
    """
    Demonstrates how to read a TensorFlow 1.x checkpoint
    and list the variables inside.
    """
    checkpoint_path = "model/snapshot-515000"
    reader = tf.train.load_checkpoint(checkpoint_path)
    var_map = reader.get_variable_to_shape_map()
    print("[INFO] Variables found in checkpoint:")
    for vname in var_map:
        print(f"  {vname}: {var_map[vname]}")

# -----------------------------
# 2) Extract some selected TF weights for demonstration
# -----------------------------
def extract_tf_weights():
    """
    Fetch a few typical variables from the TF checkpoint (e.g. conv1 weights, BN stats).
    """
    checkpoint_path = "model/snapshot-515000"
    reader = tf.train.load_checkpoint(checkpoint_path)

    tf_weights = {}
    tf_weights["conv1_weights"] = reader.get_tensor("resnet_v1_101/conv1/weights")
    tf_weights["bn_gamma"] = reader.get_tensor("resnet_v1_101/BatchNorm/gamma")
    tf_weights["bn_beta"]  = reader.get_tensor("resnet_v1_101/BatchNorm/beta")
    tf_weights["bn_mean"]  = reader.get_tensor("resnet_v1_101/BatchNorm/moving_mean")
    tf_weights["bn_var"]   = reader.get_tensor("resnet_v1_101/BatchNorm/moving_variance")
    print("[INFO] Extracted these TF weights:")
    for k, v in tf_weights.items():
        print(f"  {k}: {v.shape}")
    return tf_weights

# -----------------------------
# 3) A small PyTorch demonstration module
# -----------------------------
class ResNetFirstLayer(nn.Module):
    def __init__(self):
        super(ResNetFirstLayer, self).__init__()
        # 3 in-channels (RGB), 64 out-channels
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn = nn.BatchNorm2d(64)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn(x)
        x = F.relu(x)
        return x

# -----------------------------
# 4) Copy TF weights to PyTorch
# -----------------------------
def copy_weights_into_pytorch(tf_weights):
    """
    Copy the extracted conv1 + BN weights into a small PyTorch layer.
    """
    model = ResNetFirstLayer()
    with torch.no_grad():
        # TF: [7, 7, 3, 64] -> PyTorch: [64, 3, 7, 7]
        conv1_w_np = tf_weights["conv1_weights"].transpose(3, 2, 0, 1)
        model.conv1.weight.copy_(torch.from_numpy(conv1_w_np))

        model.bn.weight.copy_(torch.from_numpy(tf_weights["bn_gamma"]))
        model.bn.bias.copy_(torch.from_numpy(tf_weights["bn_beta"]))
        model.bn.running_mean.copy_(torch.from_numpy(tf_weights["bn_mean"]))
        model.bn.running_var.copy_(torch.from_numpy(tf_weights["bn_var"]))
    return model

# -----------------------------
# 5) Full ResNet-101 definition in PyTorch
# -----------------------------
class Bottleneck(nn.Module):
    """
    Bottleneck block for ResNet-50/101/152.
    """
    expansion = 4
    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = F.relu(out)
        out = self.conv3(out)
        out = self.bn3(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        out = F.relu(out)
        return out

class ResNet101(nn.Module):
    """
    Standard ResNet-101 (no final FC).
    """
    def __init__(self):
        super(ResNet101, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1   = nn.BatchNorm2d(64)
        # We'll do ReLU + MaxPool in forward().

        # block1: 3 bottlenecks
        self.layer1 = self._make_layer(64,  3, stride=1)
        # block2: 4 bottlenecks
        self.layer2 = self._make_layer(128, 4, stride=2)
        # block3: 23 bottlenecks
        self.layer3 = self._make_layer(256, 23, stride=2)
        # block4: 3 bottlenecks
        self.layer4 = self._make_layer(512, 3, stride=2)

    def _make_layer(self, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * Bottleneck.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * Bottleneck.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * Bottleneck.expansion),
            )

        layers = []
        layers.append(Bottleneck(self.inplanes, planes, stride=stride, downsample=downsample))
        self.inplanes = planes * Bottleneck.expansion
        for _ in range(1, blocks):
            layers.append(Bottleneck(self.inplanes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        # x shape: [B, 3, 64, 64] typically
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        # 3x3 maxpool => reduces from 64x64 -> 32x32
        x = F.max_pool2d(x, kernel_size=3, stride=2, padding=1)

        # layer1 => [B, 256, 32, 32] (since expansion=4 for Bottleneck)
        x = self.layer1(x)  # block1
        # layer2 => [B, 512, 16, 16]
        x = self.layer2(x)  # block2
        # layer3 => [B, 1024, 8, 8]
        x = self.layer3(x)  # block3
        # layer4 => [B, 2048, 4, 4] or even [B, 2048, 2, 2] if there's another stride
        # Actually, each of block2/block3/block4 has stride=2, so:
        # after block1 => 32x32
        # after block2 => 16x16
        # after block3 => 8x8
        # after block4 => 4x4
        x = self.layer4(x)
        return x

# -----------------------------
# 6) Pose Refinement Head that outputs 15×64×64
# -----------------------------
class PoseRefineHead(nn.Module):
    """
    Takes in [B, 2048, 4, 4] from ResNet-101 (final stage),
    refines/upsamples to [B, 15, 64, 64].
    """
    def __init__(self, in_channels=2048, num_joints=15):
        super(PoseRefineHead, self).__init__()
        # A small conv block to reduce from 2048 -> 256
        self.conv1 = nn.Conv2d(in_channels, 256, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(256)
        self.relu1 = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(256)
        self.relu2 = nn.ReLU(inplace=True)

        # Now we upsample from 4x4 -> 64x64 => scale=16
        # We can do it in two steps or a single conv transpose.
        # Let's do a single step:
        self.deconv = nn.ConvTranspose2d(
            256, num_joints, kernel_size=32, stride=16, padding=8, output_padding=0, bias=False
        )
        # This should yield  (4-1)*16 -2*8 + 32 = 64, so 64x64 output

    def forward(self, x):
        # x: [B, 2048, 4, 4]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)

        # Upsample to 15-channel, 64×64
        x = self.deconv(x)  # => [B, 15, 64, 64]
        return x


class FullModel(nn.Module):
    def __init__(self, backbone, refine_head):
        super(FullModel, self).__init__()
        self.backbone = backbone
        self.refine_head = refine_head

    def forward(self, x):
        # x: [B, 3, 64, 64]
        feats = self.backbone(x)
        # feats: [B, 2048, 4, 4]
        out = self.refine_head(feats)
        # out: [B, 15, 64, 64]
        return out

# -----------------------------
# 7) TF -> PyTorch checkpoint loading
# -----------------------------
def load_conv_and_bn(reader, var_map, tf_scope_name, conv_module, bn_module):
    """
    Utility for copying TF conv weights into PyTorch conv + BN.
    """
    with torch.no_grad():
        w_name = f"{tf_scope_name}/weights"
        if w_name not in var_map:
            print(f"[WARN] {w_name} not found.")
            return
        tf_w = reader.get_tensor(w_name)
        pt_w = torch.from_numpy(tf_w.transpose(3, 2, 0, 1))
        conv_module.weight.copy_(pt_w)

        gamma_name = f"{tf_scope_name}/BatchNorm/gamma"
        beta_name  = f"{tf_scope_name}/BatchNorm/beta"
        mean_name  = f"{tf_scope_name}/BatchNorm/moving_mean"
        var_name   = f"{tf_scope_name}/BatchNorm/moving_variance"

        if gamma_name in var_map:
            bn_module.weight.copy_(torch.from_numpy(reader.get_tensor(gamma_name)))
        if beta_name in var_map:
            bn_module.bias.copy_(torch.from_numpy(reader.get_tensor(beta_name)))
        if mean_name in var_map:
            bn_module.running_mean.copy_(torch.from_numpy(reader.get_tensor(mean_name)))
        if var_name in var_map:
            bn_module.running_var.copy_(torch.from_numpy(reader.get_tensor(var_name)))

        print(f"[OK] loaded {tf_scope_name} -> conv({pt_w.shape}), bn({bn_module.num_features})")

def load_bottleneck(reader, var_map, tf_scope, bottleneck_module):
    load_conv_and_bn(reader, var_map, f"{tf_scope}/conv1", bottleneck_module.conv1, bottleneck_module.bn1)
    load_conv_and_bn(reader, var_map, f"{tf_scope}/conv2", bottleneck_module.conv2, bottleneck_module.bn2)
    load_conv_and_bn(reader, var_map, f"{tf_scope}/conv3", bottleneck_module.conv3, bottleneck_module.bn3)
    if bottleneck_module.downsample is not None:
        ds_conv = bottleneck_module.downsample[0]
        ds_bn   = bottleneck_module.downsample[1]
        load_conv_and_bn(reader, var_map, f"{tf_scope}/shortcut", ds_conv, ds_bn)

def load_resnet101_from_tf():
    net = ResNet101()
    net.eval()

    checkpoint_path = "model/snapshot-515000"
    reader = tf.train.load_checkpoint(checkpoint_path)
    var_map = reader.get_variable_to_shape_map()
    print(f"[INFO] TF checkpoint has {len(var_map)} variables.")

    # Load conv1
    load_conv_and_bn(reader, var_map, "resnet_v1_101/conv1", net.conv1, net.bn1)
    # block1: 3 units
    for i in range(1, 4):
        tf_scope = f"resnet_v1_101/block1/unit_{i}/bottleneck_v1"
        load_bottleneck(reader, var_map, tf_scope, net.layer1[i-1])
    # block2: 4 units
    for i in range(1, 5):
        tf_scope = f"resnet_v1_101/block2/unit_{i}/bottleneck_v1"
        load_bottleneck(reader, var_map, tf_scope, net.layer2[i-1])
    # block3: 23 units
    for i in range(1, 24):
        tf_scope = f"resnet_v1_101/block3/unit_{i}/bottleneck_v1"
        load_bottleneck(reader, var_map, tf_scope, net.layer3[i-1])
    # block4: 3 units
    for i in range(1, 4):
        tf_scope = f"resnet_v1_101/block4/unit_{i}/bottleneck_v1"
        load_bottleneck(reader, var_map, tf_scope, net.layer4[i-1])

    print("[INFO] Done transferring all ResNet101 weights from TF to PyTorch.")
    return net

# -----------------------------
# 8) Generate 15-channel heatmaps
# -----------------------------
def generate_heatmaps(
    joints: List[Tuple[float, float, float]],
    heatmap_size: Tuple[int, int],
    sigma: float = 2.0,
) -> torch.Tensor:
    """
    Creates 15 (or len(joints)) channel heatmaps, shape [num_joints, H, W].
    Rescales joints from some 160x160 space to 64x64, as an example.
    """
    original_w, original_h = (160, 160)
    target_w, target_h = heatmap_size
    num_joints = len(joints)

    # Rescale
    scaled = []
    for (x, y, v) in joints:
        x_scaled = (x / original_w) * target_w
        y_scaled = (y / original_h) * target_h
        scaled.append((x_scaled, y_scaled, v))

    heatmaps = torch.zeros((num_joints, target_h, target_w), dtype=torch.float32)
    yy, xx = torch.meshgrid(
        torch.arange(target_h, dtype=torch.float32),
        torch.arange(target_w, dtype=torch.float32),
        indexing="ij"
    )
    for i, (x, y, v) in enumerate(scaled):
        if v < 0.5:
            continue
        if x < 0 or y < 0 or x >= target_w or y >= target_h:
            continue
        g = torch.exp(-((xx - x)**2 + (yy - y)**2) / (2.0 * sigma**2))
        # clamp
        g = torch.clamp(g, min=0.0, max=1.0)
        heatmaps[i] = g
    return heatmaps



In [None]:
# -----------------------------
# 9) Convert heatmaps to coordinates
# -----------------------------
def heatmaps_to_coordinates(heatmaps: torch.Tensor) -> List[Tuple[float, float]]:
    """
    Finds argmax for each channel.
    heatmaps: [num_joints, H, W].
    returns: list[(x,y)] for each joint.
    """
    num_joints, H, W = heatmaps.shape
    coords = []
    for j in range(num_joints):
        hm = heatmaps[j]
        idx = torch.argmax(hm).item()
        y = idx // W
        x = idx % W
        coords.append((float(x), float(y)))
    return coords

# -----------------------------
# 10) JSON loading/saving
# -----------------------------
def load_joint_coordinates(json_file: str):
    """
    If your JSON is a special format, adapt this as needed.
    Return: {filename -> list of (x,y,v)} with len(...)=15
    """
    with open(json_file, "r") as f:
        data = json.load(f)

    image_id_to_filename = {}
    for im in data["images"]:
        image_id_to_filename[im["image_id"]] = im["file_name"]

    result = {}
    for ann in data["annotations"]:
        img_id = ann["image_id"]
        file_name = image_id_to_filename.get(img_id, None)
        if file_name is None:
            continue
        kpts = ann["keypoints"]  # [x1, y1, v1, x2, y2, v2, ...]
        joints = []
        for i in range(0, len(kpts), 3):
            x_ = kpts[i]
            y_ = kpts[i+1]
            v_ = kpts[i+2]
            joints.append((x_, y_, v_))
        result[file_name] = joints  # store the 15 or so joints
    return result

def save_coordinates_to_json(coordinates: List[Tuple[float, float]], output_file: str):
    data = {"joints": []}
    for (x, y) in coordinates:
        data["joints"].append({"x": x, "y": y})
    with open(output_file, "w") as f:
        json.dump(data, f, indent=4)
    print(f"[INFO] Saved refined coords to {output_file}")

# -----------------------------
# 11) A comparison "layer"
# -----------------------------
class CompareLayer(nn.Module):
    """
    Compares two heatmaps of shape [B, 15, 64, 64].
    Returns a scalar difference (e.g. mean absolute error).
    If difference > threshold => "This is not normal"
    """
    def __init__(self, threshold=0.5):
        super().__init__()
        self.threshold = threshold

    def forward(self, heat_in, heat_out):
        """
        heat_in, heat_out: [B, 15, 64, 64]
        Returns the mean absolute difference.
        Prints a message if over threshold.
        """
        # Just an example measure:
        diff = torch.abs(heat_in - heat_out).mean()
        if diff.item() > self.threshold:
            print("This is not normal. Diff =", diff.item())
        else:
            print("Difference is within normal range. Diff =", diff.item())
        return diff


In [None]:

# -----------------------------
# Main
# -----------------------------
def main():
    # download_checkpoint()  # If needed

    # 1) (Optional) Show checkpoint contents
    show_checkpoint_contents()

    # 2) Extract a few TF weights
    tf_w = extract_tf_weights()

    # 3) Demo: conv1 + BN
    demo_model = copy_weights_into_pytorch(tf_w)
    print("[INFO] Copied conv1 + BN from TF to PyTorch (demo).")

    # 4) Build ResNet101 from the checkpoint
    full_resnet = load_resnet101_from_tf()
    print("[INFO] Completed ResNet101 conversion. Testing forward pass...")

    # Test dummy forward
    dummy = torch.randn(1, 3, 64, 64)
    with torch.no_grad():
        out_features = full_resnet(dummy)
    print("ResNet-101 output shape:", out_features.shape)
    # Usually [1, 2048, 4, 4]

    # 5) Build a PoseRefineHead that returns [B, 15, 64, 64]
    refine_head = PoseRefineHead(in_channels=2048, num_joints=15)
    model = FullModel(backbone=full_resnet, refine_head=refine_head)
    model.eval()

    # 6) Suppose we load some JSON with joint coords
    json_file_path = "/content/frame_000_transformed.json"
    joint_dict = load_joint_coordinates(json_file_path)
    print("Loaded joint_dict keys:", joint_dict.keys())

    # 7) Build our CompareLayer
    compare_layer = CompareLayer(threshold=0.1)

    # We'll process each file_name in the JSON:
    for file_name, joints in joint_dict.items():
        print("Processing:", file_name)
        # Create 15-channel heatmaps [15, 64, 64]
        heatmaps_in = generate_heatmaps(joints, (64, 64))
        print("heatmaps_in shape:", heatmaps_in.shape)  # [15, 64, 64]

        # BUT our backbone expects 3 channels. We'll sum or group to 3 channels:
        rgb_image = torch.zeros((3, 64, 64), dtype=heatmaps_in.dtype)
        # Example grouping:
        # R = sum of first 5 joints
        rgb_image[0] = torch.sum(heatmaps_in[0:5], dim=0)
        # G = sum of next 5
        rgb_image[1] = torch.sum(heatmaps_in[5:10], dim=0)
        # B = sum of last 5
        rgb_image[2] = torch.sum(heatmaps_in[10:15], dim=0)

        # Normalize
        max_val = rgb_image.max()
        if max_val > 0:
            rgb_image /= max_val

        # Add batch dimension => [1, 3, 64, 64]
        input_to_model = rgb_image.unsqueeze(0)

        # 8) Forward pass to get refined 15×64×64
        with torch.no_grad():
            out_heatmaps = model(input_to_model)  # => [1, 15, 64, 64]
        print("Refined heatmaps shape:", out_heatmaps.shape)

        # 9) Compare original 15-channel input with refined output
        # We must also add batch dimension to the original heatmaps
        heatmaps_in_batched = heatmaps_in.unsqueeze(0)  # => [1, 15, 64, 64]
        diff_value = compare_layer(heatmaps_in_batched, out_heatmaps)

        # 10) Optionally convert out_heatmaps[0] to coords:
        out_heatmaps_1 = out_heatmaps[0]  # shape [15, 64, 64]
        refined_coords = heatmaps_to_coordinates(out_heatmaps_1)
        print("Refined coords:", refined_coords)
        # Save or do further steps if needed.

        print("-" * 40)

    print("[INFO] Done. Customize further as needed.")


if __name__ == "__main__":
    main()


[INFO] Variables found in checkpoint:
  resnet_v1_101/block4/unit_3/bottleneck_v1/conv3/BatchNorm/gamma: [2048]
  resnet_v1_101/block4/unit_3/bottleneck_v1/conv2/BatchNorm/moving_variance: [512]
  resnet_v1_101/block4/unit_3/bottleneck_v1/conv1/BatchNorm/moving_mean: [512]
  resnet_v1_101/block4/unit_2/bottleneck_v1/conv3/weights: [1, 1, 512, 2048]
  resnet_v1_101/block4/unit_2/bottleneck_v1/conv3/BatchNorm/moving_mean: [2048]
  resnet_v1_101/block4/unit_2/bottleneck_v1/conv2/weights: [3, 3, 512, 512]
  resnet_v1_101/block4/unit_2/bottleneck_v1/conv1/weights: [1, 1, 2048, 512]
  resnet_v1_101/block4/unit_2/bottleneck_v1/conv1/BatchNorm/gamma: [512]
  resnet_v1_101/block4/unit_1/bottleneck_v1/shortcut/BatchNorm/moving_variance: [2048]
  resnet_v1_101/block4/unit_1/bottleneck_v1/shortcut/BatchNorm/gamma: [2048]
  resnet_v1_101/block4/unit_1/bottleneck_v1/conv3/BatchNorm/moving_mean: [2048]
  resnet_v1_101/block4/unit_1/bottleneck_v1/conv2/weights: [3, 3, 512, 512]
  resnet_v1_101/block4/

KeyError: 'image_id'