## A Dial Electric Meter Reader based on YOLO

Author: SU, Jia Cheng (Boston University).

Update: July 23, 2024.

This project is a part for our research on Demand Responce facing residential communities. We are focusing on fomulating technical solutions to societal necessities. Utility companies are not willing to monitor and store user's meter information in real-time, even though they are permitted, capable of, and are currently collecting the data on daily basis. Nonetheless, we the users are not able to either frequently collect or analyze the numbers we are producing. Therefore, we come up with this tool to help monitor our own meters with only a webcam. 


This project is based on YOLO. We collected images of dial meters from public areas and formulate a computer vision model using YOLO. The model is for recognizing the dials from pictures for further processes. After cropping the dials, we apply morphological methods to rule out the shape of the needle hand, and optimizes its read by minimizing the projection area. We finally take algebraic methods to calculate the overall read of the meter in the scene.


Limits of this project includes:

### External packages

In [1]:
import numpy as np
import math
from ultralytics import YOLO
import cv2
import time
import numpy as np
from scipy.stats import norm
from sklearn.linear_model import LinearRegression
import os
import shutil


### Predict using YOLO

Model is established using codes in another file. Program is deployed on SCC On Demand at Boston University. Results of prediction is saved for future usages.

In [2]:
model = YOLO('images/0717/best.pt')
source = "sample"

results = model.predict(source, 
                        save=True, 
                        save_txt=True, 
                        save_conf=True, 
                        save_crop=True, 
                        show_labels=False, 
                        conf=0.85,)


image 1/7 /Users/hans/Boston University/2023C/sample/1.JPG: 448x640 5 subplates, 81.6ms
image 2/7 /Users/hans/Boston University/2023C/sample/2.JPG: 448x640 5 subplates, 87.7ms
image 3/7 /Users/hans/Boston University/2023C/sample/3.JPG: 448x640 5 subplates, 81.6ms
image 4/7 /Users/hans/Boston University/2023C/sample/4.JPG: 448x640 5 subplates, 82.0ms
image 5/7 /Users/hans/Boston University/2023C/sample/5.jpeg: 640x480 5 subplates, 96.3ms
image 6/7 /Users/hans/Boston University/2023C/sample/6.jpeg: 448x640 10 subplates, 90.7ms
image 7/7 /Users/hans/Boston University/2023C/sample/7.jpeg: 448x640 5 subplates, 85.4ms
Speed: 2.6ms preprocess, 86.5ms inference, 0.4ms postprocess per image at shape (1, 3, 448, 640)
Results saved to [1mruns/detect/predict14[0m
7 labels saved to runs/detect/predict14/labels


### Helper Methods for Morphorlogical Processes

In [3]:
def get_angle(xy1, xy2) -> float:
    x1, y1 = xy1
    x2, y2 = xy2
    m = (y2 - y1) / (x2 - x1)
    radian = - np.arctan(m)
    angle = np.rad2deg(radian)
    return angle
            
def get_cropped_xyxy(img, xyxy):
    x1f, y1f, x2f, y2f = tuple(xyxy)
    x1 = round(x1f)
    y1 = round(y1f)
    x2 = round(x2f)
    y2 = round(y2f)
    cropped_img = img[y1:y2, x1:x2]
    # _name = time.time()
    # cv2.imwrite(f'sample/cropped/{_name}.jpg', cropped_img)
    return cropped_img

def get_cropped_xywh(img, xywh):
    xf, yf, wf, hf = tuple(xywh)
    x = round(xf)
    y = round(yf)
    w = round(15 / 32 * wf)
    h = round(15 / 32 * hf)
    cropped_img = img[y-w:y+w, x-w:x+w]
    # _name = time.time()
    # cv2.imwrite(f'sample/cropped/{_name}.jpg', cropped_img)
    return cropped_img

def make_square(img):
    h, w = img.shape[:2]
    new_size = w * 2, w * 2
    # square_img = np.zeros((new_size, new_size, 3), dtype=np.uint8)
    # x_offset = ()
    stretched_image = cv2.resize(img, new_size, interpolation=cv2.INTER_LINEAR)
    # _name = time.time()
    # cv2.imwrite(f'sample/squ/{_name}.jpg', stretched_image)
    return stretched_image

def make_binary(img):
    gray_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    binary_image_adaptive = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
    _, binary_image_otsus = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    ## Mode Selector ##
    adopted = 255 - binary_image_otsus
    # _name = time.time()
    # cv2.imwrite(f'sample/bin/{_name}.jpg', adopted)
    return adopted

def make_circled(img):
    size, _ = img.shape
    mask = np.zeros((size, size), dtype=np.uint8)
    # print(f'{size} & {channels}')
    center = (size // 2, size // 2)
    radius = size // 2
    cv2.circle(mask, center, radius, 0, thickness=-1)
    result_image = np.where(mask == 255, 0, img)
    # eroded_img = make_eroded(result_image)
    # _name = time.time()
    # cv2.imwrite(f'sample/cir/{_name}.jpg', eroded_img)
    return result_image

def make_eroded(img):
    median_blurred_image = cv2.medianBlur(img, 15)
    # 创建结构元素（3x3矩形）
    kernel = np.ones((21, 21), np.uint8)
    # 执行腐蚀操作
    eroded_image = cv2.erode(median_blurred_image, kernel, iterations=1)
    # _name = time.time()
    # cv2.imwrite(f'sample/cir/{_name}.jpg', eroded_image)
    return eroded_image


### Helper Methods for Inference of Needle Hand Read

In [4]:
def get_dir_mean_angle(dx, dy):
    radii = np.sqrt(dx**2 + dy**2)
    angles = np.arctan2(dy, dx)
    angles_degrees = np.degrees(angles)

    weights = radii
    weighted_avg_angle = np.average(angles_degrees, weights=weights)
    return weighted_avg_angle

def get_dir_regression(dx, dy):
    y_indices, x_indices = dx, dy
    X = x_indices.reshape(-1, 1)  
    y = y_indices 
    model = LinearRegression()
    model.fit(X, y)

    slope = model.coef_[0]
    print(f'{slope:.4f}')
    angle_radians = np.arctan(slope)
    angle_degrees = np.degrees(angle_radians)
    return angle_degrees
    
def get_dir_ratio(dx, dy):
    slope = np.mean(dy) / np.mean(dx)
    print(f'{slope:.4f}')
    angle_radians = np.arctan(slope)
    angle_degrees = np.degrees(angle_radians)
    return angle_degrees

def calculate_projection_area(binary_image, angle):
    center = (binary_image.shape[1] // 2, binary_image.shape[0] // 2)
    rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated_image = cv2.warpAffine(binary_image, rotation_matrix, (binary_image.shape[1], binary_image.shape[0]))

    projection = np.sum(rotated_image, axis=0)  
    projection_area = np.sum(projection > 0)  

    return projection_area

def find_min_projection_angle(binary_image):
    angles = np.arange(0, 179, 2)  
    min_area = float('inf')
    best_angle = None

    for angle in angles:
        area = calculate_projection_area(binary_image, angle)
        if area < min_area:
            min_area = area
            best_angle = angle

    return best_angle, min_area

def count_white_pixels(binary_image) -> int:
    return np.sum(binary_image == 255)

def is_lower_half_more_white(binary_image) -> bool:
    (h, w) = binary_image.shape[:2]
    mid = h // 2
    upper_half = binary_image[:mid, :]
    lower_half = binary_image[mid:, :]

    upper_white_count = count_white_pixels(upper_half)
    lower_white_count = count_white_pixels(lower_half)
    if lower_white_count > upper_white_count:
        return True
    return False

def is_left_half_more_white(binary_image) -> bool:
    (h, w) = binary_image.shape[:2]
    mid = w // 2
    left_half = binary_image[:, :mid]
    right_half = binary_image[:, mid:]

    left_white_count = count_white_pixels(left_half)
    right_white_count = count_white_pixels(right_half)
    if left_white_count > right_white_count:
        return True
    return False

def make_circular_mask(img):
    size, _ = img.shape[:2]
    center = (size // 2, size // 2)
    radius = size // 4  
    mask = np.ones_like(img, dtype=np.uint8) * 255  
    cv2.circle(mask, center, radius, (0, 0, 0), -1)  
    masked_image = cv2.bitwise_and(img, mask)
    return masked_image

def get_dir_projection(img):
    best_angle, min_area = find_min_projection_angle(img)
    masked_img = make_circular_mask(img)
    if is_left_half_more_white(masked_img): best_angle+= 180
    if is_lower_half_more_white(masked_img):
        if best_angle < 70:
            best_angle += 180
        if best_angle > 280:
            best_angle -= 180
    if not is_lower_half_more_white(masked_img) and 100 < best_angle < 260:
        best_angle -= 180
    return best_angle

### Mode Selector of Inference of Needle Hands

In [5]:
def get_dir(img) -> float:
    length, _ = img.shape
    center_x, center_y = length // 2, length // 2
    y_indices, x_indices = np.where(img == 255)
    dx = x_indices - center_x
    dy = center_y - y_indices

    ##------ Mode Selector ------##
    # angle = get_dir_mean_angle(dx, dy)
    # angle = get_dir_regression(dx, dy)
    # angle = get_dir_ratio(dx, dy)
    angle = get_dir_projection(img)
    # -------------------------------- #
    return angle
    

### Other Helper Methods

In [6]:
def clear_directory(directory_path):
    # 检查目录是否存在
    if not os.path.exists(directory_path):
        # print(f"目录 {directory_path} 不存在。")
        return

    # 遍历目录中的所有文件
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)
        
        # 如果是文件，删除文件
        if os.path.isfile(file_path):
            os.remove(file_path)
            # print(f"删除文件: {file_path}")
        # 如果是目录，删除目录及其内容
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
            # print(f"删除目录及其内容: {file_path}")

# clear_directory(f'sample/marked')

In [7]:
"""
for i in range(1,7):
    # 读取图像并转换为灰度图像
    image_test = cv2.imread(f'sample/test/{i}.jpeg', cv2.IMREAD_GRAYSCALE)

    # 将图像二值化，如果已经是二值化图像可以跳过这一步
    _, binary_image = cv2.threshold(image_test, 240, 255, cv2.THRESH_BINARY)
    image_test_rslt = get_dir(binary_image)
    image_test_rslt2 = draw_txt(binary_image, f'{image_test_rslt:.0f}', channel=1)
    cv2.imwrite(f'sample/test/{time.time()}.jpg', image_test_rslt2)
"""

"\nfor i in range(1,7):\n    # 读取图像并转换为灰度图像\n    image_test = cv2.imread(f'sample/test/{i}.jpeg', cv2.IMREAD_GRAYSCALE)\n\n    # 将图像二值化，如果已经是二值化图像可以跳过这一步\n    _, binary_image = cv2.threshold(image_test, 240, 255, cv2.THRESH_BINARY)\n    image_test_rslt = get_dir(binary_image)\n    image_test_rslt2 = draw_txt(binary_image, f'{image_test_rslt:.0f}', channel=1)\n    cv2.imwrite(f'sample/test/{time.time()}.jpg', image_test_rslt2)\n"

In [8]:
def draw_txt(img, text, channel=3, font_scale=1):
    font = cv2.FONT_HERSHEY_SIMPLEX  
    # font_scale = 1 
    font_color = (0, 0, 255) if channel == 3 else 255
    thickness = 2 
    text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
    text_width, text_height = text_size
    height = img.shape[0]
    x = 10  
    y = height - 10 
    rslt = cv2.putText(img, text, (x, y), font, font_scale, font_color, thickness, lineType=cv2.LINE_AA)
    return rslt


### Class Dial and Meter

In [9]:
class Dial:
    def __init__(self, result, i) -> None:
        self.conf = result.numpy().boxes.conf[i]
        self.xyxy = result.numpy().boxes.xyxy[i]
        self.xywh = result.numpy().boxes.xywh[i]
        self.cropped = get_cropped_xywh(result.orig_img, self.xywh)
        self.serial = 5 - i
        self.bearing = self.get_bearing()
        self.real_bearing = None

    def get_bearing(self) -> float:
        square_img = make_square(self.cropped)
        binary_img = make_binary(square_img)
        circled_img = make_circled(binary_img)
        eroded_img = make_eroded(circled_img)

        direction = get_dir(eroded_img)
        # 预留操作接口
        # direction = set_direction(direaction)
        # 结束
        return direction
    
    def set_direction(orig_direction) -> float:
        pass
    
    def set_real_bearing(self, real_bearing) -> None:
        self.real_bearing = real_bearing
        return

    def offset_bearing(self, offset) -> None:
        self.real_bearing = self.bearing + offset
        return

    def read(self) -> float:
        return (self.real_bearing % 360) / 36


In [10]:
class MeterShapeError(Exception):
    def __init__(self, *args: object) -> None:
        super().__init__(*args)
        self.message = args

def is_good_meter(result) -> None:
    """
    Desides whether the result is legal or not.
    """
    # TODO: improve
    if result.numpy().boxes.conf.size != 5: raise MeterShapeError(f'Meter has {result.numpy().boxes.conf.size} dials')
    return 


In [11]:
class Meter:
    def __init__(self, result) -> None:
        # TODO 分组/判断
        try:
            is_good_meter(result)
        except MeterShapeError as e:
            print(f'{result.path} is not a good Meter: {e}')
            return
        self.result = result
        """
        Attributes:
            orig_img -> ndarray
            orig_shape -> tuple
            boxes -> Boxes
        """
        dials_l = list()
        # self.path = result.path
        for i in range(0,5):
            dial = Dial(result, i)
            dials_l.append(dial)
        sorted_dials = sorted(dials_l, key=lambda dial: dial.xywh[0])
        self.dials = {i+1: array for i, array in enumerate(sorted_dials)}
        self.horizon = self.get_horizon()
        self.init_single_reads()
        self.draw_reads()

    def init_single_reads(self):
        for dial in self.dials.values():
            dial.offset_bearing(self.horizon)
        self.single_reads = dict()
        for i, dial in self.dials.items():
            if i % 2 == 1:
                self.single_reads[i] = dial.read()
            else:
                self.single_reads[i] = 10 - dial.read()

    def draw_reads(self):
        txt = ''
        # for i in range(1,6):
        #     txt += f'{self.single_reads[i]:.1f} '
        txt += f'{self.read():05} kWh'
        print(txt) ## @@
        rslt_img = draw_txt(self.result.orig_img, txt, font_scale=5)
        _name = time.time()
        # TODO: path regulator
        cv2.imwrite(f'sample/{FOLDER_NAME}/{_name}.jpg', rslt_img)
        del rslt_img
        return
    
    def read(self) -> int:
        """Wrapper method"""
        # r = self.simple_read()
        r = self.more_robust_read()
        return r

    def simple_read(self) -> int:
        rslt = 0
        for i, read in self.single_reads.items():
            sig = 5 - i
            rslt += np.floor(read) * 10 ** sig
        # print(rslt)
        return int(rslt)
    
    def more_robust_read(self) -> int:
        rslt = 0
        for i, read in self.single_reads.items():
            sig = 5 - i
            lower = self.single_reads.get(i+1, -1)
            if lower <= 2:
                rslt += np.round(read) * 10 ** sig
            else:
                rslt += np.floor(read) * 10 ** sig
        return int(rslt)

    def get_horizon(self):
        xys = dict()
        for i in [1,5,2,4,3]:
            xys[i] = tuple(self.dials[i].xywh[0:2])
        horizon_ab = dict()
        horizon_ab = {'a': get_angle(xys[1], xys[5]),
                      'b': get_angle(xys[2], xys[4])}
        horizon = np.mean(list(horizon_ab.values()))
        return horizon
        


### Instances

In [13]:
FOLDER_NAME = 'final'
clear_directory(f'sample/{FOLDER_NAME}')
meters = []
for result in results:
    meters.append(Meter(result))
# meter1 = Meter(results[0])
# meter2 = Meter(results[1])

0.5 5.6 4.7 5.7 6.7 05457 kWh
0.3 5.7 4.5 5.4 6.7 05457 kWh
0.7 5.5 4.4 5.8 6.7 05457 kWh
0.6 5.4 4.6 5.5 6.7 05457 kWh
9.5 8.2 9.1 8.3 1.0 98981 kWh
/Users/hans/Boston University/2023C/sample/6.jpeg is not a good Meter: Meter has 10 dials
0.1 1.0 8.9 0.3 1.9 01902 kWh
