In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Retrieve environment variables
training_endpoint = os.getenv("VISION_TRAINING_ENDPOINT")
training_key = os.getenv("VISION_TRAINING_KEY")
prediction_key = os.getenv("VISION_PREDICTION_KEY")
prediction_endpoint = os.getenv("VISION_PREDICTION_ENDPOINT")
prediction_resource_id = os.getenv("VISION_PREDICTION_RESOURCE_ID")

In [2]:
from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from azure.cognitiveservices.vision.customvision.training.models import ImageFileCreateBatch, ImageFileCreateEntry, Region
from msrest.authentication import ApiKeyCredentials
import os, time, uuid

In [3]:
credentials = ApiKeyCredentials(in_headers={"Training-key": training_key})
trainer = CustomVisionTrainingClient(training_endpoint, credentials)


In [4]:
credentials

<msrest.authentication.ApiKeyCredentials at 0x7f5c9c290880>

In [5]:
publish_iteration_name = "classifyModel"

credentials = ApiKeyCredentials(in_headers={"Training-key": training_key})
trainer = CustomVisionTrainingClient(training_endpoint, credentials)

# Create a new project
print ("Creating project...")
project_name = uuid.uuid4()
project = trainer.create_project(project_name)

Creating project...


In [None]:

project = trainer.create_project(project_name)

In [4]:
project = trainer.get_projects()[0]

In [5]:
project.name

'10bbf92b-8094-4a81-81e7-37a3291af02f'

In [6]:
# Make two tags in the new project
receipt_tag = trainer.create_tag(project.id, "Receipt")
non_receipt_tag = trainer.create_tag(project.id, "Non Receipt")

In [7]:
base_image_location = os.path.dirname("dataset/")

In [8]:
base_image_location

'dataset'

In [9]:
os.listdir(base_image_location)

['receipt', 'non_receipt']

In [11]:
import os

# Directory where the images are stored
directory = "dataset/receipt"

# Iterate over the file numbers
for i in range(20):
    # Construct old and new filenames
    old_filename = os.path.join(directory, f"{i}.jpg")
    new_filename = os.path.join(directory, f"receipt_{i+1}.jpg")
    
    # Rename the file
    os.rename(old_filename, new_filename)

print("Files have been renamed.")

Files have been renamed.


In [12]:
import os

# Directory where the images are stored
directory = "dataset/non_receipt"

# Supported image extensions
extensions = {'.jpg', '.jpeg', '.png'}

# Get all image files with the supported extensions
image_files = [f for f in os.listdir(directory) if os.path.splitext(f)[1].lower() in extensions]

# Sort the files to maintain a consistent order (optional)
image_files.sort()

# Rename the files
for i, file_name in enumerate(image_files, start=1):
    # Extract the file extension
    _, ext = os.path.splitext(file_name)
    # Construct the new filename
    new_filename = f"non_receipt_{i}{ext}"
    # Rename the file
    os.rename(os.path.join(directory, file_name), os.path.join(directory, new_filename))

print("Files have been renamed.")

Files have been renamed.


In [10]:
print("Adding images...")

image_list = []

# Add receipt images
for image_num in range(1, 21):  # Assuming there are 20 receipt images
    file_name = f"receipt_{image_num}.jpg"  # Adjusted for the new naming convention
    with open(os.path.join(base_image_location, "receipt", file_name), "rb") as image_contents:
        image_list.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), tag_ids=[receipt_tag.id]))

# Add non_receipt images
for image_num in range(1, 21):  # Assuming there are 20 non_receipt images
    # The extension needs to be determined dynamically as they can vary
    for ext in ['.jpg', '.jpeg', '.png']:
        file_name = f"non_receipt_{image_num}{ext}"
        file_path = os.path.join(base_image_location, "non_receipt", file_name)
        if os.path.exists(file_path):
            with open(file_path, "rb") as image_contents:
                image_list.append(ImageFileCreateEntry(name=file_name, contents=image_contents.read(), tag_ids=[non_receipt_tag.id]))
            break  # Move to the next image number once the current one is found and processed

# Upload the images
upload_result = trainer.create_images_from_files(project.id, ImageFileCreateBatch(images=image_list))
if not upload_result.is_batch_successful:
    print("Image batch upload failed.")
    for image in upload_result.images:
        print("Image status: ", image.status)
    exit(-1)

Adding images...
Image batch upload failed.
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OKDuplicate
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  ErrorImageFormat
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  OK
Image status:  ErrorImageFormat


# Train

In [11]:
print ("Training...")
iteration = trainer.train_project(project.id)
while (iteration.status != "Completed"):
    iteration = trainer.get_iteration(project.id, iteration.id)
    print ("Training status: " + iteration.status)
    print ("Waiting 10 seconds...")
    time.sleep(10)

Training...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Waiting 10 seconds...
Training status: Training
Wa

In [12]:
# The iteration is now trained. Publish it to the project endpoint
trainer.publish_iteration(project.id, iteration.id, publish_iteration_name, prediction_resource_id)
print ("Done!")

Done!


In [26]:
# # Assuming 'trainer' is your CustomVisionTrainingClient instance
# # and you have 'project.id' and 'iteration.id' from your project and iteration

# # Unpublish the iteration
# trainer.unpublish_iteration(project.id, iteration.id)
# print("Iteration unpublished successfully.")

Iteration unpublished successfully.


# Predict

In [13]:
# Now there is a trained endpoint that can be used to make a prediction
prediction_credentials = ApiKeyCredentials(in_headers={"Prediction-key": prediction_key})
predictor = CustomVisionPredictionClient(prediction_endpoint, prediction_credentials)

In [20]:
with open("receipt-data/aeon.jpg", "rb") as image_contents:
    results = predictor.classify_image(
        project.id, publish_iteration_name, image_contents.read())
    # print(results.predictions)
    # Display the results.
    for prediction in results.predictions:
        # print(prediction)
        print("\t" + prediction.tag_name +
              ": {0:.2f}%".format(prediction.probability * 100))

	Receipt: 99.98%
	Non Receipt: 0.02%
