<h1>Distribute functions across a BigQuery dataset using Spark</h1>

<h3>Problem: As a PM, I give lots of public presentations and I want to make sure I use images that have an open license</h3>

<img src="https://storage.googleapis.com/kaggle-datasets-images/21870/28108/ef05bb21ad2ece2c6af034ceced30239/dataset-cover.jpeg?t=2018-04-16-23-00-50" height=400 width=400>    
<h4> BigQuery Public Datasets - Open Images: 9 million URLs of open images (with labels across 6,000 categories)</h4>

<h3>For smaller datasets, can use BigQuery magic and python</h3>

In [1]:
%reload_ext google.cloud.bigquery

In [2]:
%%bigquery  pd_results --use_bqstorage_api

SELECT original_url, title 
FROM `bigquery-public-data.open_images.images` 
WHERE license = 'https://creativecommons.org/licenses/by/2.0/' 
LIMIT 10

Query complete after 0.01s: 100%|██████████| 1/1 [00:00<00:00, 1199.06query/s]
Downloading: 100%|██████████| 10/10 [00:01<00:00,  7.46rows/s]


In [3]:
#review what our image database contains. 
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd_results.head()

Unnamed: 0,original_url,title
0,https://farm4.staticflickr.com/7162/6707198107_1993d73034_o.jpg,Men’s Pique Polo (NA)
1,https://c8.staticflickr.com/4/3676/11012143656_40a5269d42_o.jpg,Ladybug on green leaf
2,https://c4.staticflickr.com/9/8157/7599640374_f278fa5bd7_o.jpg,n53_w1150
3,https://c7.staticflickr.com/5/4101/4861005456_71b3b0be00_o.jpg,DSC_4918
4,https://farm5.staticflickr.com/5443/17832643436_904497178e_o.jpg,Central Accord 2015


Looks like a great set of images but how do I find what I need? What's a DSC-4918?

In [4]:
#function that makes is super easy to abstract some high confidence labels about my image.
from google.cloud import vision
def AnnotateHighConfidenceLabelsFromImage(image_uri):
    client = vision.ImageAnnotatorClient()
    request = {
            'image': {
            'source': {'image_uri': image_uri},
             },
         }
    response = client.annotate_image(request)

    high_confidence_labels = []
    for la in response.label_annotations:
        if float(la.score * 100) > 90.0:
            high_confidence_labels.append(la.description)

    if len(high_confidence_labels) < 1:
        high_confidence_labels.append("No labels detected")
        
    return str(high_confidence_labels)

In [5]:
#for 10 images, no problem to simply loop through them to get the labels. 
for image in pd_results['original_url']:
    labels = AnnotateHighConfidenceLabelsFromImage(image)
    print(labels)

['No labels detected']
['No labels detected']
['Branch', 'Organism', 'Plant community', 'Monochrome photography', 'Monochrome', 'Adaptation']
['Arm', 'Finger', 'People', 'Comfort', 'Hand', 'Child']
['Electronic device', 'Furniture', 'Technology', 'Table', 'Laptop', 'Computer accessory', 'Computer']
['Branch', 'Twig', 'Adaptation', 'Woody plant']
['Organism', 'Bird']
['Atmosphere', 'Cloud', 'Atmospheric phenomenon']
['Dog breed', 'Dog', 'Carnivore', 'Mammal']
['Text', 'White', 'Line', 'Font', 'Colorfulness']


<h3>Expanding to the full corpus of images will require scaling with Spark</h3>

In [6]:
#but what happens when I need to run that label extractor against the full dataset of images.
no_limit_query = "SELECT original_url, title FROM `bigquery-public-data.open_images.images` WHERE license = 'https://creativecommons.org/licenses/by/2.0/' LIMIT 100"

In [7]:
# use Spark to load full dataset into Spark Dataframe. Setup Spark Session with BQ storage connector
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2." + str(12) + ":0.18.0") \
                                    .enableHiveSupport() \
                                    .getOrCreate()

#Use this function to push the processing of the query back to BQ but still use BQ Storage Connector to
#pull back data in parallel and directly into a Spark DF that can handle the size. 
from google.cloud import bigquery
from pyspark import StorageLevel

def bq2df(QUERY):
    bq = bigquery.Client()
    query_job = bq.query(QUERY)
    query_job.result()

    df = spark.read.format('bigquery') \
        .option('dataset', query_job.destination.dataset_id) \
        .load(query_job.destination.table_id) \
        .persist(StorageLevel.MEMORY_AND_DISK)

    return df


In [8]:
df = bq2df(no_limit_query)
print(df.count())
df.printSchema()


100
root
 |-- original_url: string (nullable = true)
 |-- title: string (nullable = true)



In [9]:
#I'm now going to Spark-ify my python function with no code changes.

from pyspark.sql.functions import udf

@udf("string")
def AnnotateHighConfidenceLabelsFromImage_UDF(image_uri):
    from google.cloud import vision
    client = vision.ImageAnnotatorClient()
    request = {
            'image': {
            'source': {'image_uri': image_uri},
             },
         }
    response = client.annotate_image(request)

    high_confidence_labels = []
    for la in response.label_annotations:
        if float(la.score * 100) > 90.0:
            high_confidence_labels.append(la.description)

    if len(high_confidence_labels) < 1:
        high_confidence_labels.append("No labels detected")
        
    return str(high_confidence_labels)

In [10]:
df_results = df.select("original_url", "title",\
            AnnotateHighConfidenceLabelsFromImage_UDF("original_url").alias("labels"))\
            .cache()

In [11]:
#at this point, might make sense to save this table out to my hive metastore to avoid re-processing all the images 
#df_results.write.saveAsTable("HighConfidenceLabelsAndImages")

In [12]:
df_results.show(10, truncate=False)

+----------------------------------------------------------------+--------------------------------------+-----------------------------------------------------------------------------------------------------+
|original_url                                                    |title                                 |labels                                                                                               |
+----------------------------------------------------------------+--------------------------------------+-----------------------------------------------------------------------------------------------------+
|https://farm4.staticflickr.com/7162/6707198107_1993d73034_o.jpg |Men’s Pique Polo (NA)                 |['No labels detected']                                                                               |
|https://c8.staticflickr.com/4/3676/11012143656_40a5269d42_o.jpg |Ladybug on green leaf                 |['No labels detected']                                         

In [13]:
from pyspark.sql.functions import col
df_results.where(col("labels").contains("Bird")).show(truncate=False)

+----------------------------------------------------------------+-------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|original_url                                                    |title                                      |labels                                                                                                                                |
+----------------------------------------------------------------+-------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|https://farm4.staticflickr.com/1152/532400494_d65f8b7970_o.jpg  |DSC_0451                                   |['Organism', 'Bird']                                                                                                                  |
|https://farm5.s

Could write back to BQ if I want in the Warehouse or keep in my Hive datamart. 
Many customers use this technique to scale code they don't want to touck