In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
# Created By:     Kai Metzger
# Created School: Franz-Oberthuer-Schule Wuerzburg
# Created Email:  metzgerkai@franz-oberthuer-schule.de
# Created Date:   Fri February 23 07:31:00 UTC 2024
# Version:        1.0
# =============================================================================
"""The Module has been build for creating a dataset with images + ground truth
   on a Raspberry Pi 4 with a standard USB camera. An image with a resolution 
   of 640px x 480px is recorded and you can control image recording plus ground 
   truth creation via pressing the following keys on the keyboard:
   - ESC:       Quit
   - SPACE:     Take picture (*without pressing BACKSPACE before) and save into 
                folders data/ground_truth. Increment counter by + 1.
   - BACKSPACE  Search folders and start image/gt file counters with highest 
                count (i. e. already taken 100 images --> images names 0 - 99, 
                next image with <100.png> and gt with 100.txt).
   - 0          Use label 0 for grount truth and write it to .txt file.
   - ...        "
   - 3          "

   You can change the script to fit your needs (i. e. create more classes,
   choose different keys, etc.)

   Camera window has to be active for user input (add new data, abort/exit, 
   etc.)
   """

# =============================================================================
# Imports
# =============================================================================
import numpy as np
import cv2
import glob
from PIL import Image

# =============================================================================
# Imports and other stuff you could remove
# =============================================================================
import warnings
warnings.filterwarnings("ignore")

# =============================================================================
# Config
# =============================================================================
# Create folders in dataset path for data, gt (ground truth) and 
# chpt (checkpoint) folder
dataset_path = "/home/pi/ki-project/home/pi/images/hergenroether"

# =============================================================================
# Camera setup
# =============================================================================
# Settings for image recording
cam = cv2.VideoCapture(0)
cam.set(3,640) # set Width
cam.set(4,480) # set Height       
cv2.namedWindow("camera")

# =============================================================================
# Variables
# =============================================================================
img_counter = 4

# =============================================================================
# Main loop to record new images, abort with CTRL+C
# =============================================================================
try:
    while True:
        # Image related stuff
        ret, frame = cam.read()
        if not ret:
            print("failed to grab frame")
            break
        cv2.imshow("camera", frame)

        # Press some keys to record images, and then press another key, 
        # i. e. 0 to write the first index into the text file.
        k = cv2.waitKey(1)
        if k == 27:
            # ESC pressed
            print("Escape hit, closing...")
            break
        if k == 8:
            # BACKSPACE pressed
            # Search folder for already recorded images&ground truth data
            list_data = glob.glob(dataset_path +"/data/*")
            list_gt = glob.glob(dataset_path + "/gt/*")
            #print(list_data)
            img_counter = len(list_data)
            gt_counter = len(list_gt)
            #print(count_gt
            
            # image count = GT-Anzahl?
            if (img_counter != gt_counter):
                print("Images and annotated data are not equal!")
                
            print("Continue with :" + str(img_counter))
        elif k == 32:
            # SPACE pressed
            img_name = dataset_path + "/data/{}.png".format(img_counter)
            gt_name = dataset_path + "/gt/{}.txt".format(img_counter)

            # Save image ave in folder /dataset/<number+1>.png
            cv2.imwrite(img_name, frame)
            #print(frame.shape)
            print("{} written!".format(img_name))
            
            class_label = None
            print("Enter class label for current image, press ...\n \
                  0 = cross \n \
                  1 = circle \n \
                  2 = square \n \
                  3 = triangle")
            # Enter class label (0:circle or 1:rectangle or ...)
            # on keyboard, caution: Num-Pad does not work here!
            c = cv2.waitKey(-1)
            if c == 48: # ASCII 48 = key 0 on keyboard
                class_label = 0
            if c == 49: # ASCII 49 = key 1 on keyboard
                class_label = 1
            if c == 50: # ASCII 50 = key 2 on keyboard
                class_label = 2      
            if c == 51: # ASCII 51 = key 3 on keyboard
                class_label = 3
            print("Class = ", class_label)
            
            # GT in folder and /dataset/ground_truth/<number+1>.txt
            with open(gt_name, "w") as text_file:
                    text_file.write(str(class_label))
            print("{} written!".format(gt_name))

            # Increment by 1 for image and ground truth
            img_counter += 1

except KeyboardInterrupt:
    print("Program aborted!")
finally:
    # =========================================================================
    # Clean exit
    # =========================================================================
    cam.release()
    cv2.destroyAllWindows()

ModuleNotFoundError: No module named 'numpy'

In [6]:
from PIL import Image

# creating a object
im = Image.open("/home/pi/ki-project/ki-project/raw_data/Sechseck/KIBild59.jpg")

im.show()







In [9]:

# =============================================================================
# Imports
# =============================================================================
import numpy as np
import cv2
import glob
from PIL import Image
import os
from tensorflow.keras import layers, models, utils, callbacks
import matplotlib.pyplot as plt

# To get reproducable results with the same training setting random seed
SEED = 42
np.random.seed(SEED)

# =============================================================================
# Imports and other stuff you could remove
# =============================================================================
import warnings
warnings.filterwarnings("ignore")

# =============================================================================
# Declare variables
# =============================================================================
img_size_x = 28
img_size_y = 28
img_dim = img_size_x * img_size_y
img_dir = '/home/pi/ki-project/ki-project/raw_data/Sechseck'
gt_dir = '/home/pi/ki-project/ki-project/raw_data/Sechseck'
checkpoint_filepath = '/home/pi/ki-project/home/pre_made_datasets/dataset2/Checkpoint'

# destination filepathes for image preperation
cnt_path = '/home/pi/ki-project/ki-project/c_nr'
shape_path = '/home/pi/ki-project/ki-project/shape'
gt = '/gt'
im = '/data'
chpt = '/chpt'

# =============================================================================
# Get number of data (image/label) 
# =============================================================================
anz_data = len(os.listdir(img_dir))
anz_data = int(anz_data) - 1
dataset = np.zeros((anz_data, img_size_x, img_size_y), dtype=float)
ground_truth = np.zeros((anz_data), dtype=int)
print("dataset size:", anz_data)

# =============================================================================
# Main loop to record new images, abort with CTRL+C
# =============================================================================
try:
    while True:
        # Image related stuff
        ret, frame = im.read()
        if not ret:
            print("failed to grab frame")
            break
        im.imshow("bild", frame)

        # Press some keys to record images, and then press another key, 
        # i. e. 0 to write the first index into the text file.
        k = im.waitKey(1)
        if k == 27:
            # ESC pressed
            print("Escape hit, closing...")
            break

        if k == 8:
            # BACKSPACE pressed
            # Search folder for already recorded images&ground truth data
            list_data = glob.glob(dest_path +"/data/*")
            list_gt = glob.glob(dest_path + "/gt/*")
            #print(list_data)
            img_counter = len(list_data)
            gt_counter = len(list_gt)
            #print(count_gt)
            
            # image count = GT-Anzahl?
            if (img_counter != gt_counter):
                print("Images and annotated data are not equal!")

        print("Continue with :" + str(img_counter))
    elif k == 32:
            # SPACE pressed
            img_name = dataset_path + "/data/{}.png".format(img_counter)
            gt_name = dataset_path + "/gt/{}.txt".format(img_counter)

            # Save image ave in folder /dataset/<number+1>.png
            cv2.imwrite(img_name, frame)
            #print(frame.shape)
            print("{} written!".format(img_name))
            
            class_label_no = None
            class_label_shape = None
            print("Enter class label for current image, press ...\n \
                  a = circle \n \
                  d = square \n \
                  f = hexagon \n \
                  s = octagon \n \
                  and number of balls")
        
            # Enter class label (0:circle or 1:rectangle or ...)
            # on keyboard, caution: Num-Pad does not work here!
            c = im.waitKey(-1)
            if c == 48: # ASCII 48 = key 0 on keyboard
                class_label_no = 0
            if c == 49: # ASCII 49 = key 1 on keyboard
                class_label_no = 1
            if c == 50: # ASCII 50 = key 2 on keyboard
                class_label_no = 2      
            if c == 51: # ASCII 51 = key 3 on keyboard
                class_label_no = 3
            if c == 52: # ASCII 52 = key 4 on keyboard
                class_label_no = 4
            if c == 53: # ASCII 53 = key 5 on keyboard
                class_label_no = 5
            if c == 54: # ASCII 54 = key 6 on keyboard
                class_label_no = 6
            if c == 55: # ASCII 55 = key 7 on keyboard
                class_label_no = 7
            if c == 56: # ASCII 56 = key 8 on keyboard
                class_label_no = 8
            if c == 57: # ASCII 57 = key 9 on keyboard
                class_label_no = 9

            if c == 97: # a = circle
                class_label_shape = 0
            if c == 100: # d = square
                class_label_shape = 2
            if c == 102: # f = hexagon
                class_label_shape = 3
            if c == 115: # s = octagon
                class_label_shape = 1

            # GT in folder and /dataset/ground_truth/<number+1>.txt
            with open(gt_name, "w") as text_file:
                    text_file.write(str(class_label_no))
            print("{} written!".format(gt_name))
        
            # GT in folder and /dataset/ground_truth/<number+1>.txt
            with open(gt_name, "w") as text_file:
                    text_file.write(str(class_label_shape))
            print("{} written!".format(gt_name))

            # Increment by 1 for image and ground truth
            img_counter += 1

except KeyboardInterrupt:
    print("Program aborted!")


dataset size: 53


In [None]:
# ===========================================
# GPT Train
# ===========================================

import tensorflow as tf
from tensorflow.keras import layers, models

def create_form_cnn():
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_SIZE, IMG_SIZE, 3)),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(len(class_names_form), activation='softmax')  # Anzahl der Formen
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Modell erstellen
model_form = create_form_cnn()

# Training starten
model_form.fit(x_train_form, y_train_form, epochs=10, validation_split=0.2)


In [None]:
# ================================
# GPT Kugelanzahl erkennung
# ================================

def create_ball_count_cnn():
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_SIZE, IMG_SIZE, 3)),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(1, activation='linear')  # Regression for Kugelanzahl
    ])
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])  # MSE for Regression
    return model

# Modell erstellen
model_balls = create_ball_count_cnn()

# Training starten
model_balls.fit(x_train_balls, y_train_balls, epochs=10, validation_split=0.2)


In [None]:
# ========================================
# GPT Optimieren
# ========================================

def convert_to_tflite(model, filename):
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    with open(filename, "wb") as f:
        f.write(tflite_model)

convert_to_tflite(model_form, "form_model.tflite")
convert_to_tflite(model_balls, "balls_model.tflite")


In [None]:
# ==================================
# GPT Echtzeit-Erkennung
# ==================================

import cv2
import numpy as np
import tensorflow.lite as tflite

# Modelle laden
interpreter_form = tflite.Interpreter(model_path="form_model.tflite")
interpreter_form.allocate_tensors()

interpreter_balls = tflite.Interpreter(model_path="balls_model.tflite")
interpreter_balls.allocate_tensors()

# Kamera starten
cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_resized = cv2.resize(frame, (IMG_SIZE, IMG_SIZE)) / 255.0
    input_data = np.expand_dims(frame_resized, axis=0).astype(np.float32)

    # Form erkennen
    interpreter_form.set_tensor(interpreter_form.get_input_details()[0]['index'], input_data)
    interpreter_form.invoke()
    form_output = interpreter_form.get_tensor(interpreter_form.get_output_details()[0]['index'])
    form_class = np.argmax(form_output)

    # Kugelanzahl erkennen
    interpreter_balls.set_tensor(interpreter_balls.get_input_details()[0]['index'], input_data)
    interpreter_balls.invoke()
    ball_count = interpreter_balls.get_tensor(interpreter_balls.get_output_details()[0]['index'])

    print(f"Erkannte Form: {list(class_names_form.keys())[form_class]}")
    print(f"Kugelanzahl: {int(ball_count[0])}")

    cv2.imshow("Frame", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
# =================================
# GPT ergebnisse an SPS senden
# =================================

import cv2
import numpy as np
import tensorflow.lite as tflite
from pymodbus.client import ModbusTcpClient

# SPS Konfiguration
SPS_IP = "192.168.1.100"
SPS_PORT = 502
client = ModbusTcpClient(SPS_IP, port=SPS_PORT)

# Modelle laden
interpreter_form = tflite.Interpreter(model_path="form_model.tflite")
interpreter_form.allocate_tensors()

interpreter_balls = tflite.Interpreter(model_path="balls_model.tflite")
interpreter_balls.allocate_tensors()

# Kamera starten
cap = cv2.VideoCapture(0)

# Form Mapping
form_mapping = {0: "octagon", 1: "circle", 2: "square"}

def send_to_sps(form, ball_count):
    """Sendet die erkannten Werte an die SPS."""
    if not client.connect():
        print("❌ Verbindung zur SPS fehlgeschlagen!")
        return

    form_value = list(form_mapping.keys())[list(form_mapping.values()).index(form)]

    client.write_register(0, form_value)   # Register 0 = Form
    client.write_register(1, ball_count)   # Register 1 = Kugelanzahl
    print(f"✅ Gesendet: Form = {form} ({form_value}), Kugeln = {ball_count}")

    client.close()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_resized = cv2.resize(frame, (128, 128)) / 255.0
    input_data = np.expand_dims(frame_resized, axis=0).astype(np.float32)

    # Form erkennen
    interpreter_form.set_tensor(interpreter_form.get_input_details()[0]['index'], input_data)
    interpreter_form.invoke()
    form_output = interpreter_form.get_tensor(interpreter_form.get_output_details()[0]['index'])
    form_class = np.argmax(form_output)
    form_name = form_mapping.get(form_class, "unknown")

    # Kugelanzahl erkennen
    interpreter_balls.set_tensor(interpreter_balls.get_input_details()[0]['index'], input_data)
    interpreter_balls.invoke()
    ball_count = int(interpreter_balls.get_tensor(interpreter_balls.get_output_details()[0]['index'])[0])

    print(f"📸 Erkannte Form: {form_name}, Kugeln: {ball_count}")

    # Ergebnisse an SPS senden
    send_to_sps(form_name, ball_count)

    cv2.imshow("Frame", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()
