In [1]:
import cv2
import numpy as np
from pdf2image import convert_from_path
import easyocr
import os
import copy
import pandas as pd
import matplotlib.pyplot as plt
import os
import json

In [2]:
# input 
annotation_pdf_path = "C:\\Users\\jimmy\\Desktop\\sinotech2\\data\\pdf\\SheetPile\\plan_WordAndWall2.pdf"
wall_pdf_path = "C:\\Users\\jimmy\\Desktop\\sinotech2\\data\\pdf\\SheetPile\\plan_OnlyWall2.pdf"

annotation_img_path = "./plan_WordAndWall2.jpg"
wall_img_path = "./plan_OnlyWall2.jpg"

### PDF 解法
* Input 一張標註圖，一張鋼板樁圖 
##### 流程
1. Annotation image preprocessing
2. Arrow detection
3. Line detection
4. Arrow -> Line pairing
5. key word extraction pair with (arrow-line)
6. wall image preprocessing
7. Line detection
8. Line grouping according to the y axis
9. Segment the main wall line according to the annotation detection result
10. other

#### 1. Annotation image preprocessing

In [16]:
# pdf to image
annotation_images = convert_from_path(annotation_pdf_path, dpi=210)

cv2_annotation_image = cv2.cvtColor(np.asarray(annotation_images[0]), cv2.COLOR_RGB2BGR)
gray_annotation_img = cv2.cvtColor(cv2_annotation_image, cv2.COLOR_BGR2GRAY)
_, th_annotation_img = cv2.threshold (gray_annotation_img, 240, 255, 0)
GaussianBlur_annotation_img = cv2.GaussianBlur(th_annotation_img, (5, 5), 0)

cv2.imwrite(annotation_img_path, GaussianBlur_annotation_img)

# ocr
reader = easyocr.Reader(['en'])

raw_annotation_img = cv2.imread(annotation_img_path)

bounds = reader.readtext(raw_annotation_img, detail=1)

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


In [26]:
# image preprocessing
raw_annotation_img = cv2.imread(annotation_img_path)

gray_annotation_img = cv2.cvtColor(raw_annotation_img, cv2.COLOR_BGR2GRAY)

ret, th_annotation_img = cv2.threshold(gray_annotation_img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

filterSize = (2,2) # (4,4)for dpi = 300, (2,2) for dpi = 200 (6,6) for filter dpi=300
bhat_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, filterSize)

bhat_annotation_img = cv2.morphologyEx(th_annotation_img, cv2.MORPH_BLACKHAT, bhat_kernel)

invers_annotation_img = cv2.bitwise_not(th_annotation_img)
sub_annotation_img = invers_annotation_img - bhat_annotation_img

erode_kernel = np.ones((4, 4))
erode_annotation_img = cv2.erode(sub_annotation_img, erode_kernel, iterations=1)

# turn the pixel to 0 if it is in the bounds
for i in range(len(bounds)):
    x1, y1 = bounds[i][0][0]
    x2, y2 = bounds[i][0][2]
    erode_annotation_img[y1:y2, x1:x2] = 0

dilate_kernel = np.ones((5, 5))
dilate_annotation_img = cv2.dilate(erode_annotation_img, dilate_kernel, iterations=2)

cv2.imwrite("dilate.jpg", dilate_annotation_img)

True

#### 2. Arrow detection

In [27]:
arrow_contours, _ = cv2.findContours(dilate_annotation_img, cv2.RETR_LIST , cv2.CHAIN_APPROX_NONE)

# seperate two type of arrow
hori_arrow = []
vert_arrow = []
for i in range(len(arrow_contours)):
  x,y,w,h = cv2.boundingRect(arrow_contours[i])
  if w > h:
    hori_arrow.append(i)
  else:
    vert_arrow.append(i)

In [28]:
# draw img to check if the arrow is successfully seprated into hor and ver
img = cv2.imread("plan_WordAndWall2.jpg")
for i in hori_arrow:
  x,y,w,h = cv2.boundingRect(arrow_contours[i])
  cv2.rectangle(img,(x,y),(x+w,y+h),(0,0,255),1)
for i in vert_arrow:
  x,y,w,h = cv2.boundingRect(arrow_contours[i])
  cv2.rectangle(img,(x,y),(x+w,y+h),(0,255,0),1)

cv2.imwrite("arrow.jpg", img)

True

#### 3. Line detection

In [29]:
# helper function
def FLD(image,drawimage, draw=False):
    # Create default Fast Line Detector class
    fld = cv2.ximgproc.createFastLineDetector()
    # Get line vectors from the image
    lines = fld.detect(image)

    # Draw lines on the image
    line_on_image = fld.drawSegments(drawimage, lines)

    if draw:
        cv2.imwrite('line_on_image.png', line_on_image)

    return lines

def for_hori(bl, tr, p, thr) :
   if (p[0] > bl[0]-thr and p[0] < tr[0]+thr and p[1] > bl[1] and p[1] < tr[1]) :
      return True
   else :
      return False

def for_vert(bl, tr, p, thr) :
   if (p[0] > bl[0] and p[0] < tr[0] and p[1] > bl[1]-thr and p[1] < tr[1]+thr) :
      return True
   else :
      return False

In [30]:
# line thinning
thinned_annotation_img = cv2.ximgproc.thinning(bhat_annotation_img)

# get the lines
annotation_img_lines = FLD(thinned_annotation_img, copy.deepcopy(raw_annotation_img), draw=False)

annotation_df = pd.DataFrame(columns = ['left_arrow', 'right_arrow'],index = range(len(annotation_img_lines)))

# 讓每條線都是頭的座標比較小
check = []
for i in range(len(annotation_img_lines)):
  for j in range(len(annotation_img_lines)-(i+1)):
    line1 = annotation_img_lines[i]
    line2 = annotation_img_lines[i+j+1]
    # line1 is correct
    if float(line1[0,0]) < float(line1[0,2]) or float(line1[0,1] < float(line1[0,3])):
      if ((float(line1[0,0])-float(line2[0,2]))**2 + (float(line1[0,1])-float(line2[0,3]))**2)**(1/2) < 10 and ((float(line1[0,2])-float(line2[0,0]))**2 + (float(line1[0,3])-float(line2[0,1]))**2)**(1/2) < 3:
        check.append(i+j+1)
    # line1 is not correct
    else:
      if ((float(line1[0,0])-float(line2[0,2]))**2 + (float(line1[0,1])-float(line2[0,3]))**2)**(1/2) < 10 and ((float(line1[0,2])-float(line2[0,0]))**2 + (float(line1[0,3])-float(line2[0,1]))**2)**(1/2) < 3:
        check.append(i)

# seperate two type of line
hori_line = []
vert_line = []
for k, line in enumerate(annotation_img_lines):
  if k  not in check:
    if ((float(line[0,0])-float(line[0,2]))**2 + (float(line[0,1])-float(line[0,3]))**2)**(1/2) > 30:
      if (float(line[0,0])-float(line[0,2]))**2 > (float(line[0,1])-float(line[0,3]))**2:
        hori_line.append(k)
      else:
        vert_line.append(k)

# remove the line that the head coordinate is smaller than the tail coordinate
for i in hori_line:
    if float(annotation_img_lines[i][0,0]) > float(annotation_img_lines[i][0,2]):
        hori_line.remove(i)

In [31]:
print(hori_line)

[248, 249, 250, 251, 252, 253, 937, 938, 939]


#### 4. Arrow -> Line pairing

In [32]:
# helper function
def arrow_line_pairing(hori_line, hori_arrow, vert_line, vert_arrow, arrow_contours, annotation_img_lines, df, raw_img, draw=False):
    count = 0
    for i in hori_line:
        line = annotation_img_lines[i]
        p1 = (int(line[0,0]),int(line[0,1]))
        p2 = (int(line[0,2]),int(line[0,3]))
        for j in hori_arrow:
            x,y,w,h = cv2.boundingRect(arrow_contours[j])
            b1 = (x,y)
            tr = (x+w, y+h)
            # check left side
            if for_hori(b1, tr, p1, 2):
                count+=1
                cv2.line(raw_img, p1, p2, [0, 0, 255], 1) 
                cv2.rectangle(raw_img,(x,y),(x+w,y+h),(0,255,0),1)
                df.loc[i,'left_arrow'] = j
            # check right side
            elif for_hori(b1, tr, p2, 3):
                df.loc[i,'right_arrow'] = j
                cv2.line(raw_img, p1, p2, [0, 0, 255], 1) 
                cv2.rectangle(raw_img,(x,y),(x+w,y+h),[255,0,0],1)

    for i in vert_line:
        line = annotation_img_lines[i]
        p1 = (int(line[0,0]),int(line[0,1]))
        p2 = (int(line[0,2]),int(line[0,3]))
        for j in vert_arrow:
            x,y,w,h = cv2.boundingRect(arrow_contours[j])
            b1 = (x,y)
            tr = (x+w, y+h)
            # check left side
            if for_vert(b1, tr, p1, 3):
                df.iat[i,0]= j
                cv2.line(raw_img, p1, p2, [0, 0, 255], 1) 
                cv2.rectangle(raw_img,(x,y),(x+w,y+h),(0,255,0),1)
            # check right side
            elif for_vert(b1, tr, p2, 3):
                df.iat[i,1]= j
                cv2.line(raw_img, p1, p2, [0, 0, 255], 1) 
                cv2.rectangle(raw_img,(x,y),(x+w,y+h),[255,0,0],1)
    if draw:
        cv2.imwrite("plan_pair.jpg", raw_img)

    return df


In [33]:
annotation_df = arrow_line_pairing(hori_line, hori_arrow, vert_line, vert_arrow, arrow_contours, annotation_img_lines, annotation_df, copy.deepcopy(raw_annotation_img), draw=False)
annotation_df_all = annotation_df.dropna(axis='index', how='all')

In [34]:
print(annotation_df_all)

    left_arrow right_arrow
248         11         NaN
250        NaN          10
251          9           8
252          7           6
937          5           4
938          3           2
939          1           0


#### 5. key word extraction pair with (arrow-line)

In [48]:
pair_array = []
key_word = 'SHEET PILE'

for j, bound in enumerate(bounds):
  bound = list(bound)
  if (bound[1].upper()).find(key_word) > -1:
    dis = 1000000
    middle_point = [(bounds[j][0][0][0] + bounds[j][0][2][0]) // 2,(bounds[j][0][0][1] + bounds[j][0][2][1]) // 2]
    check: int = 0
    for k in range(len(annotation_df_all)):
      line = annotation_img_lines[annotation_df_all.index[k]]
      p_mean = [ (int(line[0,0]) + int(line[0,2])) //2 , (int(line[0,1]) + int(line[0,3])) //2 ]
      if ((middle_point[0]-p_mean[0])**2 + (middle_point[1]-p_mean[1])**2) < dis:
        dis = (middle_point[0]-p_mean[0])**2 + (middle_point[1]-p_mean[1])**2
        check = annotation_df_all.index[k]
    pair_array.append([j,check])

# 整理成dataframe
index = list(range(len(pair_array)))

pair_df = pd.DataFrame(np.arange((len(pair_array))*5).reshape(len(pair_array),5), columns=['head_x', 'head_y', 'end_x', 'end_y', 'ocr'],
                  index=index)
wall_type = []

for i in pair_array:
  indexx = i[0]
  string = bounds[indexx][1]
  wall_type.append(string)

for index,i in enumerate(pair_array): 
  text_index = i[0]
  line_index = i[1]
  line = annotation_img_lines[line_index]
  # if df index have left arrow use left arrow's x cooridnate
  if not pd.isnull(annotation_df_all.loc[line_index,'left_arrow']):
    x,y,w,h = cv2.boundingRect(arrow_contours[int(annotation_df_all.loc[line_index,'left_arrow'])])
    pair_df.iloc[index,0] = x 
  else:
    pair_df.iloc[index,0] = min(int(line[0,0]), int(line[0,2]))
  pair_df.iloc[index,1] = min(int(line[0,1]), int(line[0,3]))
  
  # if df index have right arrow use right arrow's x cooridnate
  if not pd.isnull(annotation_df_all.loc[line_index,'right_arrow']):
    x,y,w,h = cv2.boundingRect(arrow_contours[int(annotation_df_all.loc[line_index,'right_arrow'])])
    pair_df.iloc[index,2] = x + w
  else:
    pair_df.iloc[index,2] = max(int(line[0,0]), int(line[0,2]))
  pair_df.iloc[index,3] = max(int(line[0,3]), int(line[0,1]))

  pair_df.iloc[index,4] = wall_type[index]

In [36]:
print(pair_df)

   head_x  head_y  end_x  end_y                        ocr
0    1194     722   1410    722  SP-IlI SHEET PILE (L=l6m)
1    1415     722   3062    722   SP-IIlSHEET PILE (L=13m)
2    3069     722   4716    722   SP-III SHEET PILE (L=9m)
3     586    1658   1411   1658  SP-Ill SHEET PILE (L=l6m)
4    1412    1658   3065   1658  SP-Ili SHEET PILE (L=l3m)
5    3067    1658   4718   1658   SP-Iii SHEET PILE (L=9m)


#### 6. wall image preprocessing

In [37]:
wall_images = convert_from_path(wall_pdf_path, dpi=210)
cv2_wall_image = cv2.cvtColor(np.asarray(wall_images[0]), cv2.COLOR_RGB2BGR)
gray_wall_img = cv2.cvtColor(cv2_wall_image, cv2.COLOR_BGR2GRAY)
_, th_wall_img = cv2.threshold (gray_wall_img, 240, 255, 0)
GaussianBlur_wall_img = cv2.GaussianBlur(th_wall_img, (5, 5), 0)

cv2.imwrite(wall_img_path, GaussianBlur_wall_img)

raw_wall_img = cv2.imread(wall_img_path)

gray_wall_img = cv2.cvtColor(raw_wall_img, cv2.COLOR_BGR2GRAY)

ret, th_wall_img = cv2.threshold(gray_wall_img , 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

bhat_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2,2))
bhat_wall_img = cv2.morphologyEx(th_wall_img, cv2.MORPH_BLACKHAT, bhat_kernel)

invers_wall_img = cv2.bitwise_not(th_wall_img)
sub_wall_img = cv2.subtract(invers_wall_img, bhat_wall_img)

erode_kernel = np.ones((1, 1))
erode_wall_img = cv2.erode(sub_wall_img, erode_kernel, iterations=2)

dilate_kernel = np.ones((6, 6))
dilate_wall_img = cv2.dilate(erode_wall_img, dilate_kernel, iterations=2)

#### 7. Line detection

In [38]:
# line thinning
thinned_wall_img = cv2.ximgproc.thinning(bhat_wall_img)
# get all the lines in the image
wall_img_lines = FLD(thinned_wall_img, copy.deepcopy(raw_wall_img), draw=False)

#### 8. Line grouping according to the y axis

In [39]:
# helper function
def group_by_y_and_connect_extremes(lines, y_threshold=100):
    """Group lines based on y-coordinate and connect the leftmost and rightmost lines in each group."""
    # Sort lines by the average y-coordinate
    lines.sort(key=lambda x: (x[1] + x[3]) / 2)

    # Group lines by y-coordinate with a difference threshold
    grouped_lines = []
    current_group = [lines[0]]

    for line in lines[1:]:
        if abs(((line[1] + line[3]) / 2) - ((current_group[-1][1] + current_group[-1][3]) / 2)) > y_threshold:
            grouped_lines.append(current_group)
            current_group = [line]
        else:
            current_group.append(line)
    
    if current_group:
        grouped_lines.append(current_group)

    # For each group, find the leftmost and rightmost line and connect them
    connected_lines = []
    for group in grouped_lines:
        if len(group) == 1:
            connected_lines.append((group[0][0], group[0][1], group[0][2], group[0][3]))
            continue
        # Find the leftmost and rightmost lines
        leftmost = min(group, key=lambda x: min(x[0], x[2]))
        rightmost = max(group, key=lambda x: max(x[0], x[2]))
        # Connect the leftmost start to the rightmost end
        connected_lines.append((min(leftmost[0], leftmost[2]), 
                                (leftmost[1] + leftmost[3]) / 2, 
                                max(rightmost[0], rightmost[2]), 
                                (rightmost[1] + rightmost[3]) / 2))

    return connected_lines

In [40]:
lines_list = []
for line in wall_img_lines:
    x1, y1, x2, y2 = line[0]
    lines_list.append([x1, y1, x2, y2])

connected_lines_sample = group_by_y_and_connect_extremes(lines_list)

#### 9. Segment the main wall line according to the annotation detection result

In [41]:
# helper function
def segment_main_lines_with_labels(connected_lines_by_y, ocr_data):

    # Extract the bounds for the upper and lower lines
    upper_bounds = (connected_lines_by_y[0][0], connected_lines_by_y[0][2])
    lower_bounds = (connected_lines_by_y[1][0], connected_lines_by_y[1][2])

    # Determine the mid y-coordinates of upper and lower lines
    upper_mid_y = (connected_lines_by_y[0][1] + connected_lines_by_y[0][3]) / 2
    lower_mid_y = (connected_lines_by_y[1][1] + connected_lines_by_y[1][3]) / 2
    
    # Function to determine the closest main line based on y-coordinate
    def closest_main_line(y_coord):
        if abs(y_coord - upper_mid_y) < abs(y_coord - lower_mid_y):
            return 'upper'
        else:
            return 'lower'
    
    # Assign lines to upper or lower based on y-coordinate
    ocr_data['line_group'] = ocr_data.apply(lambda row: closest_main_line((row['head_y'] + row['end_y']) / 2), axis=1)

    # Extract segments for upper and lower lines
    segments_upper = ocr_data[ocr_data['line_group'] == 'upper'][['head_x', 'end_x', 'ocr']]
    segments_lower = ocr_data[ocr_data['line_group'] == 'lower'][['head_x', 'end_x', 'ocr']]

    # Segmenting logic as previously defined
    def correct_segment_main_line(bounds, segments):
        line_segments = []
        start, end = bounds
        sorted_segments = sorted(segments, key=lambda x: x['head_x'])

        # Initialize the previous end to the start of the main line
        prev_end = start

        # Iterate through sorted segments and ensure no overlap
        for segment in sorted_segments:
            current_start = max(prev_end, segment['head_x'])  # Start at the greater of previous end or current head_x
            current_end = segment['end_x']
            
            # If there's a gap between the previous end and the current start, fill it with the previous segment's label
            if current_start > prev_end:
                line_segments.append((prev_end, current_start, segment['ocr']))  # Use the first segment's label for any initial gap
            
            # Append the current segment
            line_segments.append((current_start, current_end, segment['ocr']))
            prev_end = current_end  # Update the previous end

        # Handle any remaining portion of the main line after the last segment
        if prev_end < end:
            line_segments.append((prev_end, end, sorted_segments[-1]['ocr']))  # Use the last segment's label for the final gap

        return line_segments

    # Segmenting the upper and lower lines
    segmented_upper = correct_segment_main_line(upper_bounds, segments_upper.to_dict('records'))
    segmented_lower = correct_segment_main_line(lower_bounds, segments_lower.to_dict('records'))

    # Combine and return segmented data
    segmented_data = {'upper': segmented_upper, 'lower': segmented_lower}
    return segmented_data

In [42]:
segmented_data = segment_main_lines_with_labels(connected_lines_sample, pair_df)

In [43]:
for i in segmented_data['upper']:
    print(i)

print("-----------------------------------")

for i in segmented_data['lower']:
    print(i)

(590.86334, 1194, 'SP-IlI SHEET PILE (L=l6m)')
(1194, 1410, 'SP-IlI SHEET PILE (L=l6m)')
(1410, 1415, 'SP-IIlSHEET PILE (L=13m)')
(1415, 3062, 'SP-IIlSHEET PILE (L=13m)')
(3062, 3069, 'SP-III SHEET PILE (L=9m)')
(3069, 4716, 'SP-III SHEET PILE (L=9m)')
-----------------------------------
(603.79846, 1411, 'SP-Ill SHEET PILE (L=l6m)')
(1411, 1412, 'SP-Ili SHEET PILE (L=l3m)')
(1412, 3065, 'SP-Ili SHEET PILE (L=l3m)')
(3065, 3067, 'SP-Iii SHEET PILE (L=9m)')
(3067, 4718, 'SP-Iii SHEET PILE (L=9m)')


In [44]:
import re

plan_wall_img= cv2.imread("plan_WordAndWall2.jpg")

# Extract the length value from the OCR text and create a color mapping based on unique length values
length_pattern = re.compile(r"L=(\d+)m")

def extract_length(text):
    match = length_pattern.search(text.replace('l', '1'))  # Replace possible 'l' with '1' for consistency
    return int(match.group(1)) if match else None

df_copy = pair_df.copy()

# Update the segments data to include length values
df_copy['length'] = df_copy['ocr'].apply(extract_length)
unique_lengths = df_copy['length'].unique()
# Plotting segmented lines with their labels
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (0, 255, 255), (255, 0, 255)]
color_map = {length: color for length, color in zip(unique_lengths, colors[:len(unique_lengths)])}

# Visualization function updated to use length-based colors
def plot_segmented_lines_by_length(segments, connected_lines_sample):
    for keys in segments.keys():
        for segment in segments[keys]:
            if keys == 'upper':
                y_level = connected_lines_sample[0][1]
            else:
                y_level = connected_lines_sample[1][1]
            length = segment[2].split(' ')[-1]
            color = color_map[extract_length(segment[2])]
            cv2.line(plan_wall_img, (int(segment[0]), int(y_level)), (int(segment[1]), int(y_level)), color, 2)
            mid_x = int((segment[0] + segment[1]) // 2)
            cv2.putText(plan_wall_img, length, (mid_x, int(y_level)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    cv2.imwrite('plan_OnlyWall2_segmented3333.jpg', plan_wall_img)

plot_segmented_lines_by_length(segmented_data, connected_lines_sample)

In [47]:
dis1 = (3062 - 1410)*15/620
dis2 = (3065 - 1411)*15/620
print(dis1+dis2)

79.98387096774194
