# Image annotation script

This notebook will serve as a prototype to create a script to annotate all the images of Terzani collection

## Installing the client library

If the Google cloud vision library is not installed already, install it.

If you have python environment use

```shell
pip install --upgrade google-cloud-vision
```

If you have conda environment use

```shell
conda install -c conda-forge google-cloud-vision
```

## Installing other libraries

Install `dotenv` to get the environment variables

If you have python environment use

```shell
pip install python-dotenv
```

If you have conda environment use

```shell
conda install -c conda-forge python-dotenvn
```

pip install python-dotenv

## Import the libraries

In [37]:
## Import the standard libraries
import os, io, pickle, random, json
## Import Vison API related libraries
from google.cloud import vision
from google.cloud.vision import types
## Import dotenv library to get environment variables
from dotenv import load_dotenv
# Import urllib to read images
import urllib.request as ur
# Import pymango to inset data into mangodb
import pymongo
from tqdm import tqdm
# 
from nltk.tokenize import word_tokenize
from nltk.stem.porter import PorterStemmer
import nltk
import string
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Maxime\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

## Setup the service account credentials to use the API

In [38]:
load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = GOOGLE_APPLICATION_CREDENTIALS

MANGO_CLIENT_URI = os.getenv('MONGO_URI')
os.environ['MANGO_CLIENT_URI'] = MANGO_CLIENT_URI

## Creation of Client to access the API

In [15]:
client = vision.ImageAnnotatorClient()

## Image selection

As this is a prototyping script we shall select 10 images randomly each from the color and monochrome photos (using the already created pickle files).

Generic class to store an image and its IIIF representation

In [16]:
class Terzani_Photo(object):
    def __init__(self, iiif, country):
        self.iiif = iiif
        self.photo = country
        
    def get_photo_link(self):
        return self.iiif["images"][0]["resource"]["@id"]

In [27]:
nu_of_images_per_type = 20

'''
# loading the color photos
color_photos = pickle.load(open("terzani_recto_iiif_color.pickle", "rb" ))
# randomly selecting 10 images
color_photos = random.sample(color_photos, nu_of_images_per_type)

# loading the monochrome photos
bw_photos = pickle.load(open("terzani_recto_iiif_color.pickle", "rb" ))
# randomly selecting 10 images
bw_photos = random.sample(bw_photos, nu_of_images_per_type)

all_photos = color_photos + bw_photos

'''

all_photos = pickle.load(open("terzani_recto_iiif.pickle", "rb"))
all_photos = random.sample(all_photos, nu_of_images_per_type)

## Function to clean text

In [28]:
def clean_text(text: str, lower: bool=True, rmv_punc: bool = True, stem: bool = True, norm: bool = True):
    """
    This function accepts a string and performs preprocessing steps on it. 
    
    :param text (str): The string or text on which the preprocessing has to be performed.
    :param lower (bool): Default=True, indicates if the text has to be made into lower case.
    :param rmv_punc (bool): Default=True, indicates if the punctuation should be removed in the text.
    :param stem (bool): Default=True, indicates if the stemming should be performed on the words in the text.
    :param norm (bool): Default=True, indicates if the words in the has to be normalised.
    :return cleaned_text (list): The modified text is returned as list after performing the indicated operations.
    """
    
    # split into words
    tokens = word_tokenize(text)
    if lower:
        # convert to lower case
        tokens = [w.lower() for w in tokens]
    if rmv_punc:
        # remove punctuation from each word
        table = str.maketrans('', '', string.punctuation)
        tokens = [w.translate(table) for w in tokens if w.translate(table) != '']
    if stem:
        # stemming of words
        porter = PorterStemmer()
        tokens = [porter.stem(word) for word in tokens]
    cleaned_text = list(set(tokens))
    return cleaned_text

## Calling the Vision API

The structure of tagged images will be
```json
{'tag1': [label_1],
 'tag2': [label_2, label_4],
 'tag3': [label_3],
 'tag4': [label_3]
}
```
 
The structure of annotated_images will be
```json
{'label_1':
    {
    'iiif': IIIF ANNOTATION,
    'obj_boxes':
    {
    frozenset({'tag1'}): [[x1, y1, w1, h1]],
    frozenset({'tag2'}): [[x4, y4, w4, h4], [x2, y2, w2, h2], [x3, y3, w3, h3]],
    frozenset({'tag3', 'tag4'}): [[x5, y5, w5, h5], [x6, y6, w6, h6], [x7, y7, w7, h7]]
    },
    'landmark_info':
    {
    'landmark1_name':{"latitude":la1, "longitude":lt1},
    'landmark2_name':{"latitude":la2, "longitude":lt2}
    },
    'country': None
    },
'label_2':
    {
    'iiif': IIIF ANNOTATION,
    'obj_boxes':
    {
    frozenset({'tag4'}): [[x1, y1, w1, h1]],
    frozenset({'tag5'}): [[x4, y4, w4, h4],],
    frozenset({'tag6'}): [[x5, y5, w5, h5], [x6, y6, w6, h6]]
    },
    'landmark_info':
    {
    'landmark3_name':{"latitude":la3, "longitude":lt3},
    'landmark4_name':{"latitude":la4, "longitude":lt4}
    },
    'country': None
    }
}
```

In [29]:
tagged_images = dict() # The keys would be the tags, entities and objects found in the annotation and the values would be the image labels.
annotated_images = dict() # The keys would be the image labels and the values will be the IIIF annotation, name of the country, lat,lon if there is geotag and object localization.
failed_images = dict() # We store information about images that are failed to be annotated by google api.

for img in tqdm(all_photos):

    # if the image is already not present in the either annotated and failed dictionaries
    if img.iiif["label"] not in annotated_images and img.iiif["label"] not in failed_images:

        # get the image label
        img_lbl = img.iiif["label"]
        img_country = img.country
                
        # reading the image
        image_data = ur.urlopen(img.get_photo_link()).read()
        image = types.Image(content=image_data)
        
        # call the goole vision api to get the annotations of various types
        response = client.annotate_image({
            'image': image,
            'features': [{'type': vision.enums.Feature.Type.LANDMARK_DETECTION}, 
                         {'type': vision.enums.Feature.Type.LOGO_DETECTION},
                         {'type': vision.enums.Feature.Type.LABEL_DETECTION},
                         {'type': vision.enums.Feature.Type.TEXT_DETECTION},
                         {'type': vision.enums.Feature.Type.OBJECT_LOCALIZATION},
                         {'type': vision.enums.Feature.Type.WEB_DETECTION}],})
        
        # check if there is any error returned by the api
        if response.error.code != 0:
            failed_images[img_lbl] = {}
            failed_images[img_lbl]["error"] = [response.error.code, response.error.message]
        else:
            # if the API call is successful
            annotated_images[img_lbl] = {}
        
            # store the iiif description
            annotated_images[img_lbl]["iiif"] = img.iiif

            # get the list of labels
            labels = list()
            for lbl in response.label_annotations:
                labels.extend(clean_text(lbl.description))
            labels = list(set(labels))
            
            # Add the label and image label to the dictionary to perform search
            for label in labels:
                if label not in tagged_images:
                    tagged_images[label] = []
                if img_lbl not in tagged_images[label]:
                    tagged_images[label].append(img_lbl)
            
            # get the list of web entities
            webent = list()
            for weben in response.web_detection.web_entities:
                webent.extend(clean_text(weben.description))
            webent = list(set(webent))
            
            # Add the web entity and image label to the dictionary to perform search
            for web_entity in labels:
                if web_entity not in tagged_images:
                    tagged_images[web_entity] = []
                if img_lbl not in tagged_images[web_entity]:
                    tagged_images[web_entity].append(img_lbl)
            

            obj_boxes = {} # this dictionary will store the information of annotations along with bounding boxes.
            # The key will the the name to identify the annotation and the value be a list of lists containing the top left x coordinate
            # top left y coordinate, width and height of for the bounding box.
            # It would be a list of list to store coordinates for different boxes for same tag

            # storing the landmarks
            landmark_info = dict() # this dictionary will store the information of landmarks which are name, latitude, longitude.
            for lndmk in response.landmark_annotations:
                
                # if there are any landamrks identified, we store them in a seperate field,to access easily.
                landmark_name = lndmk.description.lower()
                landmark_info[landmark_name] = {"latitude":lndmk.locations[0].lat_lng.latitude, "longitude":lndmk.locations[0].lat_lng.longitude}
                
                
                lndmks = clean_text(lndmk.description)
                # we add the landmarks and image label to the dictionary to perform search
                for land_mark in lndmks:
                    if land_mark not in tagged_images:
                        tagged_images[land_mark] = []
                    if img_lbl not in tagged_images[land_mark]:
                        tagged_images[land_mark].append(img_lbl)
                
                # storing the landmarks with bounding boxes 
                lndmk_desc = frozenset(lndmks)
                if lndmk_desc not in obj_boxes:
                    obj_boxes[lndmk_desc] = list()
                ulx, uly, box_width, box_height = None, None, None, None
                ulx, uly = lndmk.bounding_poly.vertices[0].x, lndmk.bounding_poly.vertices[0].y
                box_width = lndmk.bounding_poly.vertices[1].x - lndmk.bounding_poly.vertices[0].x
                box_height = lndmk.bounding_poly.vertices[3].y - lndmk.bounding_poly.vertices[0].y
                if (ulx and uly and box_width and box_height) is not None:
                    vert = [ulx, uly, box_width, box_height] 
                    obj_boxes[lndmk_desc].append(vert)    

            for lgo in response.logo_annotations:
                
                logos = clean_text(lgo.description)
                # we add the logo names and image label to the dictionary to perform search
                for lgo_name in logos:
                    if lgo_name not in tagged_images:
                        tagged_images[lgo_name] = []
                    if img_lbl not in tagged_images[lgo_name]:
                        tagged_images[lgo_name].append(img_lbl)
                        
                lgo_desc = frozenset(logos)
                if lgo_desc not in obj_boxes:
                    obj_boxes[lgo_desc] = list()
                ulx, uly, box_width, box_height = None, None, None, None
                ulx, uly = lgo.bounding_poly.vertices[0].x, lgo.bounding_poly.vertices[0].y
                box_width = abs(lgo.bounding_poly.vertices[1].x - lgo.bounding_poly.vertices[0].x)
                box_height = abs(lgo.bounding_poly.vertices[3].y - lgo.bounding_poly.vertices[0].y)
                if (ulx and uly and box_width and box_height) is not None:
                    vert = [ulx, uly, box_width, box_height]
                    obj_boxes[lgo_desc].append(vert)

            if len(response.localized_object_annotations) > 0:
                img_width, img_height = img.iiif["width"], img.iiif["height"]
            for lobj in response.localized_object_annotations:
                
                objects = clean_text(lobj.name)  
                # we add the object names and image label to the dictionary to perform search
                for obj_name in objects:
                    if obj_name not in tagged_images:
                        tagged_images[obj_name] = []
                    if img_lbl not in tagged_images[obj_name]:
                        tagged_images[obj_name].append(img_lbl)
                
                lobj_name = frozenset(objects)
                if lobj_name not in obj_boxes:
                    obj_boxes[lobj_name] = list()
                ulx, uly, box_width, box_height = None, None, None, None
                ulx, uly = lobj.bounding_poly.normalized_vertices[0].x * img_width, lobj.bounding_poly.normalized_vertices[0].y * img_height
                box_width = (lobj.bounding_poly.normalized_vertices[1].x - lobj.bounding_poly.normalized_vertices[0].x) * img_width
                box_height = (lobj.bounding_poly.normalized_vertices[3].y - lobj.bounding_poly.normalized_vertices[0].y) * img_height
                if (ulx and uly and box_width and box_height) is not None:
                    vert = [ulx, uly, box_width, box_height]
                    obj_boxes[lobj_name].append(vert)

            for txt in response.text_annotations:
                modified_text = txt.description.replace(".", "_").lower()
                # we add the text and image label to the dictionary to perform search
                if modified_text not in tagged_images:
                    tagged_images[modified_text] = []
                if img_lbl not in tagged_images[modified_text]:
                        tagged_images[modified_text].append(img_lbl)    
                
                # the text identified on the images in not cleaned to store the original information.
                if modified_text not in obj_boxes:
                    obj_boxes[modified_text] = list()
                ulx, uly, box_width, box_height = None, None, None, None
                ulx, uly = txt.bounding_poly.vertices[0].x, txt.bounding_poly.vertices[0].y
                box_width = abs(txt.bounding_poly.vertices[1].x - txt.bounding_poly.vertices[0].x)
                box_height = abs(txt.bounding_poly.vertices[3].y - txt.bounding_poly.vertices[0].y)
                if (ulx and uly and box_width and box_height) is not None:
                    vert = [ulx, uly, box_width, box_height]
                    obj_boxes[modified_text].append(vert)

            # store the generated object boxes into the dictionary.
            annotated_images[img_lbl]["obj_boxes"] = obj_boxes
            
            # store the generated land mark information into the dictionary.
            annotated_images[img_lbl]["landmark_info"] = landmark_info
            
            # store the generated land mark information into the dictionary.
            annotated_images[img_lbl]["country"] = img_country

100%|██████████████████████████████████████████████████████████████████████████████████| 20/20 [01:36<00:00,  4.81s/it]


## Saving the dictionaries to JSON file

In [30]:
tagged_images

{'room': ['F26_148_recto',
  'T6_155_recto',
  'T6_163_recto',
  'T10_79_recto',
  'F26_105_recto',
  'T35_49_recto'],
 'stock': ['F26_148_recto',
  'T43_7_recto',
  'T4_157_recto',
  'F25_164_recto',
  'F25_155_recto',
  'T6_155_recto',
  'T42_130_recto',
  'Cina5_193_recto',
  'F27_180_recto',
  'T42_203_recto',
  'T6_163_recto',
  'Cina7_83_recto',
  'T10_79_recto',
  'T36_12_recto',
  'T55_55_recto',
  'F26_105_recto'],
 'photograph': ['F26_148_recto',
  'T43_7_recto',
  'T4_157_recto',
  'T4_122_recto',
  'F25_155_recto',
  'T6_155_recto',
  'Cina5_193_recto',
  'F27_180_recto',
  'T6_163_recto',
  'T10_79_recto',
  'T62_20_recto',
  'T21_71_recto',
  'T36_12_recto',
  'T55_55_recto',
  'F26_105_recto',
  'T35_49_recto'],
 'blackandwhit': ['F26_148_recto',
  'T43_7_recto',
  'T4_157_recto',
  'T4_122_recto',
  'F25_164_recto',
  'F25_155_recto',
  'T6_155_recto',
  'T42_130_recto',
  'Cina5_193_recto',
  'F27_180_recto',
  'T42_203_recto',
  'T6_163_recto',
  'Cina7_83_recto',
  '

In [32]:
annotated_images

{'F26_148_recto': {'iiif': {'@id': 'http://dl.cini.it/oa/items/65938/canvas.json',
   'label': 'F26_148_recto',
   '@type': 'sc:Canvas',
   'width': 3824,
   'height': 2911,
   'images': [{'@id': 'http://dl.cini.it/oa/files/64898/anno.json',
     'motivation': 'sc:painting',
     '@type': 'oa:Annotation',
     'resource': {'@id': 'http://dl.cini.it/files/original/f55c28dfc6bbd6fb03d8569a50397b54.jpg',
      '@type': 'dctypes:Image',
      'format': 'image/jpeg',
      'width': 3824,
      'height': 2911,
      'service': {'@id': 'http://dl.cini.it:8080/digilib/Scaler/IIIF/f55c28dfc6bbd6fb03d8569a50397b54.jpg',
       '@context': 'http://iiif.io/api/image/2/context.json',
       'profile': 'http://iiif.io/api/image/2/level2.json'}},
     'on': 'http://dl.cini.it/oa/items/65938/canvas.json'}],
   'metadata': [{'label': 'Identifier',
     'value': '535aaa5b-2db5-4897-a585-f5d400c4cd31'},
    {'label': 'notes', 'value': 'F26_148_recto'},
    {'label': 'originalFileName', 'value': 'F26_148_

In [33]:
with open('tagged_images.json', 'w') as fp:
    json.dump(tagged_images, fp, indent=4)
    
with open('annotated_images.json', 'w') as fp:
    json.dump(annotated_images, fp, indent=4)

if len(failed_images) > 0:
    print("There are failed images")
    with open('failed_images.json', 'w') as fp:
        json.dump(failed_images, fp, indent=4)

TypeError: keys must be str, int, float, bool or None, not frozenset

## Inserting the data into Mangodb

# TODO: The insert the data into database.

In [34]:
# creating a client to work with mango db
mangoclient = pymongo.MongoClient(MANGO_CLIENT_URI)
# selecting the <terzani_photos> database
mango_db = mangoclient["terzani_photos"]

### Storing the Image Tags

In [35]:
# creating a new collection named <sample_tagging>
mango_tag_collection = mango_db["sample_taggings"]
# inserting the dictionary into the db
for label, annotations in tagged_images.items():
    mango_tag_collection.insert_one(annotations)

TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping

### Storing the Image information

In [36]:
# creating a new collection named <sample_annotations>
mango_box_collection = mango_db["sample_annotations"]
# inserting the dictionary into the db
for label, annotations in annotated_images.items():
    mango_box_collection.insert_one(annotations)

InvalidDocument: documents must have only string keys, key was frozenset({'person'})