# Keep track of your pictures: how to automatically add keywords + how to build and maintain an image database

Author: Federica Lionetto  
Email: federica.lionetto@gmail.com  
Date: 30 April 2022  

This work is licensed under a <a rel="license" href="https://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

## Objective

Let's consider the following scenario.
You love taking pictures and always have your camera with you to be ready to capture the best moments of life.
However, most of your pictures simply go from your memory card to your external hard drive and you would have no idea where to find what.
You considered several times the possibility to add keywords to your pictures to be able to easily find them later on.
Maybe you even started with that, but gave up after a few attempts.
Does this sound familiar to you?

Generating keywords is a time-consuming task, and is likely to be very boring as well.
Instead of generating keywords manually, in this short tutorial we will draw on the power of the cloud and let the `Vision AI` API on Google Cloud solve this repetitive task for us.
We will also go through a way to keep our pictures organized into a database on Google Cloud.

For each image in our collection, we will store the following information in the database:
- `file_name`, that is, the name of the image
- `creation_date_time`, that is, the date and time of the creation of the image
- `keywords`, that is, the keywords associated with the image 

It's time to get started!

## Cloud settings

To run the tutorial, we need the following resources on Google Cloud: 
- a `Cloud Storage` bucket, where we will upload the pictures
- a `BigQuery` dataset, where we will store the information about the pictures 
- a `Vertex AI` notebook, where we will write and run our Python code 

You can create the `Cloud Storage` bucket, the `BigQuery` dataset, and the `Vertex AI` notebook from the Console.

## Step 1: Import modules

In [None]:
import io
import os
import shutil

import datetime

import matplotlib.pyplot as plt
import seaborn as sns

from IPython.display import Image, display

# Imports the Google Cloud client library
from google.cloud import vision
from google.cloud import storage
from google.cloud import bigquery

# %load_ext google.cloud.bigquery

## Step 2: Configuration

Here we can configure some of the variables that are used throughout the tutorial.

In [None]:
verbose = True

region = "[REGION GOES HERE]" # Where to create the resources on Google Cloud
gcp_project_name = "[GCP PROJECT NAME GOES HERE]" # The name of the GCP project

bucket_name = "[BUCKET NAME GOES HERE]" # The name of the Cloud Storage bucket
folder_name_landing = "image/landing" # The name of the folder in the Cloud Storage bucket where new pictures are uploaded
folder_name_archive = "image/archive" # The name of the folder in the Cloud Storage bucket where processed pictures are archived

scale = 0.2 # Used to scale images for easier visualization

bq_dataset_name = '[DATASET NAME GOES HERE]' # The name of the BigQuery dataset
bq_table_name = 'image' # The name of the BigQuery table

sample_image_name = '[SAMPLE IMAGE NAME GOES HERE]' # A sample image to use as an example

In [None]:
verbose = True

region = "europe-west6" # Where to create the resources on Google Cloud
gcp_project_name = "personalproject-348318" # The name of the GCP project

bucket_name = "whitebloomingtulip-input-164" # The name of the Cloud Storage bucket
folder_name_landing = "image/landing" # The name of the folder in the Cloud Storage bucket where new pictures are uploaded
folder_name_archive = "image/archive" # The name of the folder in the Cloud Storage bucket where processed pictures are archived

scale = 0.2 # Scale images for easier visualization

bq_dataset_name = 'whitebloomingtulip_db' # The name of the BigQuery dataset
bq_table_name = 'image' # The name of the BigQuery table

sample_image_name = 'IMG_3579.jpeg' # A sample image to use as an example

## 3: Instantiate the clients

We need to instantiate three clients: one for `Cloud Storage`, one for `BigQuery`, and one for the `Vision AI API`.

In [None]:
# Instantiate the clients
storage_client = storage.Client()
vision_client = vision.ImageAnnotatorClient()
bq_client = bigquery.Client()

## 4: Copy images from GCS to the machine

In [None]:
# Create 2 folders (if they do not exist), one for images and one for keywords
if not os.path.exists('images'):
    os.mkdir('images')
if not os.path.exists('keywords'):
    os.mkdir('keywords')

# Create an empty list for the images to be copied
file_names = []

# Access the GCS bucket containing the images to be copied
bucket = storage_client.get_bucket(bucket_name)

# If the images are in a subfolder of the GCS bucket, specify the subfolder structure
prefix_landing = f"{folder_name_landing}/" 
blobs_landing = bucket.list_blobs(prefix = prefix_landing, delimiter = '/')

for blob in blobs_landing:
    if(blob.name != prefix_landing): # Ignore the subfolder itself 
        file_name = blob.name.replace(prefix_landing, "")
        blob.download_to_filename('images/'+file_name) # Download the file to the machine
        file_names.append('images/'+file_name)

print("Images:")
print(file_names)
print('')

## 5: Generate keywords and display their score

The `Vision AI` API allows to annotate an image with keywords that describe the contents of that image.
Each keyword has an associated score, where a higher score means that the algorithm has a higher confidence that the keyword describes something in the image.

You can try out the `Vision AI` API interactively here: https://cloud.google.com/vision

In [None]:
label_annotation_desc_dict = {} 
label_annotation_score_dict = {} 
# file_names_out = []

for file_name in file_names:
    # Get the two parts of the file name
    file_name_without_extension = file_name.rsplit('.', 1)[0]
    file_name_extension = file_name.rsplit('.', 1)[1]
    if verbose:
        print('File name without extension:', file_name_without_extension)
        print('File name extension:', file_name_extension)
        print('')
    
    # Display selected image
    display(Image(filename=file_name, width=500))
    
    # Annotate selected image
    file_name = os.path.abspath(file_name)

    # Load the image into memory
    with io.open(file_name, 'rb') as image_file:
        content = image_file.read()

    image = vision.Image(content=content)

    # Perform label detection on the image
    response = vision_client.label_detection(image=image)
    if verbose:
        print('Response:')
        print(response)
        print('')
        print('Label annotations:')
        print(response.label_annotations)
        print('')
        print('First element of label annotations:')
        print(response.label_annotations[0])
        print('')
        print('Description of first element of label annotations:')
        print(response.label_annotations[0].description)
        print('')
    
    labels = response.label_annotations

    print('Keywords:')
    for label in labels:
        print(label.description)
    print('')
        
    # Create lists of description and score for selected image
    n_label_annotations = len(response.label_annotations)

    label_annotation_desc = []
    label_annotation_score = []

    for i in range(n_label_annotations):
        label_annotation_desc.append(response.label_annotations[i].description)
        label_annotation_score.append(response.label_annotations[i].score)
    print('List of keywords:')
    print(label_annotation_desc)
    print('')
    print('List of scores:')
    print(label_annotation_score)
    print('')

    label_annotation_desc_dict[file_name] = label_annotation_desc
    label_annotation_score_dict[file_name] = label_annotation_score
    
    # Display label annotations (description and score) for selected image
    plt.figure()
    sns.barplot(x=label_annotation_score, y=label_annotation_desc, color='red')
    plt.savefig(file_name_without_extension.replace('images/', 'keywords/')+'_keywords', format='png')
    plt.show()
    
    # Create filename based on label annotations
    # file_name_out = "_".join(label_annotation_desc)
    # file_name_out = file_name_out.replace(" ", "-")
    # file_name_out = file_name_out+".jpeg"
    # print("Input file name:", file_name)
    # print('')
    # print("Output file name:", file_name_out)
    # print('')
    # file_names_out.append(file_name_out)

In [None]:
print('Dictionary of list of keywords:')
print(label_annotation_desc_dict)
print('')
print('Dictionary of list of scores:')
print(label_annotation_score_dict)
print('')

## 6: Create database with keywords

In [None]:
# bq_dataset = bq_client.dataset(bq_dataset_name)

### Create empty table

In [None]:
query_create = f"""
CREATE TABLE IF NOT EXISTS `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}`
(
file_name STRING OPTIONS(description="The name of the file"),
creation_date_time DATETIME OPTIONS(description="The date and time when the file was uploaded to the cloud"),
keywords ARRAY<STRING> OPTIONS(description="The keywords associated to the image")
)
PARTITION BY DATETIME_TRUNC(creation_date_time, DAY)
OPTIONS(
description="Image database"
)
"""

print('Query for creating empty table:')
print(query_create)
print('')

In [None]:
# Execute the query
query_job_create = bq_client.query(query_create, location=region)

### Insert one row in the table

In [None]:
creation_date_time = datetime.datetime.fromtimestamp(os.path.getmtime('/home/jupyter/images/'+sample_image_name))
print('Creation date time:')
print(creation_date_time)
print('')

# Insert one row
query_insert_one = f"""
INSERT `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}` (file_name, creation_date_time, keywords) 
VALUES('/home/jupyter/images/{sample_image_name}', '{creation_date_time}', {label_annotation_desc_dict[f'/home/jupyter/images/{sample_image_name}']})
"""

print('Query for inserting one row:')
print(query_insert_one)
print('')

In [None]:
# Execute the query
query_job_insert_one = bq_client.query(query_insert_one, location=region)

### Insert multiple rows in the table

In [None]:
values_str = ''
for key in label_annotation_desc_dict.keys():
    creation_date_time_temp = datetime.datetime.fromtimestamp(os.path.getmtime(key))
    keywords_temp = label_annotation_desc_dict[key]
    if values_str == '':
        values_str = values_str+f"VALUES('{key}', '{creation_date_time_temp}', {keywords_temp})"
    else:
        values_str = values_str+f",('{key}', '{creation_date_time_temp}', {keywords_temp})"
    if verbose:
        print(values_str)
        print('')

print('String for inserting multiple rows:')
print(values_str)    
print('')

# Insert multiple rows
query_insert_many = f"""
INSERT `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}` (file_name, creation_date_time, keywords) 
{values_str}
"""

print('Query for inserting multiple rows:')
print(query_insert_many)
print('')

In [None]:
# Execute the query
query_job_insert_many = bq_client.query(query_insert_many, location=region)

### Drop duplicates

In [None]:
# Drop duplicated rows
query_drop_duplicates = f"""
CREATE OR REPLACE TABLE `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}` 
PARTITION BY DATETIME_TRUNC(creation_date_time, DAY) AS (
  SELECT 
    * EXCEPT(row_number) 
  FROM (
    SELECT
      *,
        ROW_NUMBER() OVER (PARTITION BY file_name) row_number
    FROM 
        `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}`)
WHERE row_number = 1
)
"""

print('Query for dropping duplicates:')
print(query_drop_duplicates)
print('')

In [None]:
# Execute the query
query_job_drop_duplicates = bq_client.query(query_drop_duplicates, location=region)

### Read the contents of the table

This is used to cross-check the contents of the table by importing the data in a `Pandas` dataframe.

In [None]:
# Retrieve rows from table
query_read = f"""
SELECT *
FROM `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}`
"""

print('Query for reading the contents of the table:')
print(query_read)
print('')

# Execute the query
query_job_read = bq_client.query(query_read, location=region)

df = query_job_read.result().to_dataframe()

In [None]:
df.head(50)

## 7: Archive processed images

In [None]:
# If the images must be archived in a subfolder of the GCS bucket, specify the subfolder structure
prefix_archive = f"{folder_name_archive}/" 
blobs_landing = bucket.list_blobs(prefix = prefix_landing, delimiter = '/')

for blob in blobs_landing:
    if(blob.name != prefix_landing): # Ignore the subfolder itself 
        bucket.rename_blob(blob, new_name=blob.name.replace(prefix_landing, prefix_archive))
        print(f'{blob.name} renamed to {blob.name.replace(prefix_landing, prefix_archive)}')

## 8: Clean up

### Clean up the machine

In [None]:
# Delete all files in the machine
shutil.rmtree('images')
shutil.rmtree('keywords')

### Clean up 

In [None]:
# Move the images back from archive to landing
blobs_archive = bucket.list_blobs(prefix = prefix_archive, delimiter = '/')

for blob in blobs_archive:
    if(blob.name != prefix_archive): # Ignore the subfolder itself 
        bucket.rename_blob(blob, new_name=blob.name.replace(prefix_archive, prefix_landing))
        print(f'{blob.name} renamed to {blob.name.replace(prefix_archive, prefix_landing)}')

### Clean up BigQuery

Run this only if you want to delete the database.

In [None]:
# Delete all rows in the table
query_del = f"""
DROP TABLE `{gcp_project_name}.{bq_dataset_name}.{bq_table_name}`
"""

print('Query for deleting all rows in the table:')
print(query_del)
print('')

# Execute the query
query_job_del = bq_client.query(query_del, location=region)

## Possible extensions

So far we assumed that, after uploading new pictures to Google Cloud, we manually execute the code in the `Vertex AI` notebook to generate the keywords and update the database.
One step further could be to trigger the execution of the code when one or more files are uploaded to the relevant `Cloud Storage` bucket in Google Cloud. We can do that in a couple of different ways. 
One way is to use a `Cloud Function`. `Cloud Functions` are a function-as-a-service (FaaS) product that allows you to execute your code without having to worry about infrastructure (no servers and no containers to manage) and to pay only for the execution time of the code. `Cloud Functions` are event-driven and can be triggered by several events related to `Cloud Storage`, in particular:
- object creation
- object deletion
- object archiving
- metadata updates

Another way to achieve the same result is to use `Dataflow`, a product that allows to deal with batch and stream data processing in a serverless way. 
`Dataflow` is built around `Apache Beam`, an open-source model for defining batch and stream data processing pipelines.

Instead of having our code triggered by an event, we could decide to execute it according to a certain schedule, for example once a day or once a week.
This can be done using `Composer`, a workflow orchestration service built on `Apache Airflow`. 

## References

- `Cloud Storage` on Google Cloud: https://cloud.google.com/storage
- `BigQuery` on Google Cloud: https://cloud.google.com/bigquery
- `Vision AI` API on Google Cloud: https://cloud.google.com/vision
- `Cloud Functions` on Google Cloud: https://cloud.google.com/functions
- `Dataflow` on Google Cloud: https://cloud.google.com/dataflow
- `Composer` on Google Cloud: https://cloud.google.com/composer
- How to use the `BigQuery` API: https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries
- How to specify a schema in `BigQuery`: https://cloud.google.com/bigquery/docs/schemas
- How to create a partitioned table in `BigQuery`: https://cloud.google.com/bigquery/docs/creating-partitioned-tables
- How to write the results of a query in `BigQuery`: https://cloud.google.com/bigquery/docs/writing-results
- How to download `BigQuery` data to a `Pandas` dataframe: https://cloud.google.com/bigquery/docs/bigquery-storage-python-pandas
- How to trigger a `Cloud Function` from a `Cloud Storage` event: https://cloud.google.com/functions/docs/calling/storage
- `Apache Beam` programming guide, including triggers: https://beam.apache.org/documentation/programming-guide/ 
- `Apache Airflow`: https://airflow.apache.org/