# Quotes Pipeline

<img src="./Quote-Sentiment-Pipeline.jpg">

## Building the Quote Fetcher Cloud Function

Steps are modified from GCP docs tutorial [Using Pub/Sub to trigger a Cloud Function](https://cloud.google.com/scheduler/docs/tut-pub-sub) along with the Quick Start example for [Functions Framework GitHub README](https://github.com/GoogleCloudPlatform/functions-framework-python)

Quick View of Steps:

1) Create Pub/Sub topic to write quotes to from Quote Fetcher Cloud Function

2) Create Quote Fetcher Cloud Function 

3) Create Pub/Sub topic to trigger Quote Fetcher Cloud Function

4) Create Cloud Scheduler job to invoke Pub/Sub topic


__Step 1:__ Create a pubsub topic for the Quote Fetcher cloud function to publish quotes to.

In [48]:
! gcloud pubsub topics create quotes

Created topic [projects/qwiklabs-gcp-04-2ad6a04dc593/topics/quotes].


__Step 2:__ Create Quote Fetcher Cloud Function

First need a directory to hold the source code in.

In [11]:
%%bash

if [ ! -d quote_fetcher ]
then
  echo "creating quote_fetcher directory"
  mkdir quote_fetcher
fi

if [ ! -d pubsub_schd ]
then
  echo "creating pubsub_schd directory"
  mkdir pubsub_schd
fi

creating pubsub_schd directory


Write requirements.txt for Quote Fetcher Cloud Function Python dependencies

In [35]:
%%writefile ./quote_fetcher/requirements.txt
google-cloud-pubsub==2.7.0
requests>=2.26.0,<2.27.0
beautifulsoup4>=4.9.3,<4.10.0
pydantic>=1.8.2,<1.9.0
# google-cloud-language>=2.2.2,<2.3.0

Overwriting ./quote_fetcher/requirements.txt


Write Quote Fetcher Cloud Function source code

In [36]:
%%writefile ./quote_fetcher/main.py
"""
Cloud Function to fetch quotes from quotes.toscrape.com/random 
and publish them to PubSub
"""

import json
import os
import typing

import requests

from bs4 import BeautifulSoup
from google.cloud import pubsub_v1

from pydantic import BaseModel

PROJECT_ID = os.environ['PROJECT_ID']
TOPIC_ID = os.environ['TOPIC_ID']


class Quote(BaseModel):
    text : str
    author : str
    tags : typing.Sequence[str]
    sentiment : typing.Optional[float]
    magnitude : typing.Optional[float]
        

def fetch_quote(events, context):
    quote_url = 'https://quotes.toscrape.com/random'

    response = requests.get(quote_url)

    soup = BeautifulSoup(response.content, 'html.parser')

    quote_el = soup.find('div', class_='quote')

    quote = Quote(
        text=quote_el.find('span', class_='text').get_text(),
        author=quote_el.find('small', class_='author').get_text(),
        tags=[el.get_text() for el in quote_el.find_all('a', class_='tag')]
    )
    
    quote_data = quote.dict()
    print("PROJECT_ID " + PROJECT_ID)
    print("TOPIC_ID " + TOPIC_ID)
    print(quote_data)
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
    publisher.publish(topic_path, json.dumps(quote_data).encode('utf-8'))
    
    return quote_data

Overwriting ./quote_fetcher/main.py


__Step 3:__ Write a helper deployment shell script which also creates a Pub/Sub Topic

In [37]:
%%writefile ./quote_fetcher/deploy-cloud-function.sh

#!/bin/bash

if [ -d quote_fetcher ]
then
  cd quote_fetcher
fi

set -ex

PROJECT_ID=$(gcloud config get-value project)
TOPIC_ID=quotes

gcloud functions deploy quote_fetcher \
  --set-env-vars PROJECT_ID=$PROJECT_ID,TOPIC_ID=$TOPIC_ID \
  --entry-point fetch_quote \
  --runtime python37 \
  --trigger-topic quote-fetcher-topic

Overwriting ./quote_fetcher/deploy-cloud-function.sh


Deploy the Quote Fetcher Cloud Function 

In [38]:
%%bash

chmod +x quote_fetcher/deploy-cloud-function.sh

./quote_fetcher/deploy-cloud-function.sh

availableMemoryMb: 256
buildId: a88769e2-93ed-4f9b-a282-acba26951dfb
buildName: projects/774131484409/locations/us-central1/builds/a88769e2-93ed-4f9b-a282-acba26951dfb
entryPoint: fetch_quote
environmentVariables:
  PROJECT_ID: qwiklabs-gcp-04-2ad6a04dc593
  TOPIC_ID: quotes
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  resource: projects/qwiklabs-gcp-04-2ad6a04dc593/topics/quote-fetcher-topic
  service: pubsub.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/qwiklabs-gcp-04-2ad6a04dc593/locations/us-central1/functions/quote_fetcher
runtime: python37
serviceAccountEmail: qwiklabs-gcp-04-2ad6a04dc593@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-0590737b-324c-4a57-b58b-fd974ee68e4f/a53ef61d-b729-4a15-9e0d-093a03a1c592.zip
status: ACTIVE
timeout: 60s
updateTime: '2021-08-20T21:28:40.961Z'
versionId: '16'


+++ gcloud config get-value project
++ PROJECT_ID=qwiklabs-gcp-04-2ad6a04dc593
++ TOPIC_ID=quotes
++ gcloud functions deploy quote_fetcher --set-env-vars PROJECT_ID=qwiklabs-gcp-04-2ad6a04dc593,TOPIC_ID=quotes --entry-point fetch_quote --runtime python37 --trigger-topic quote-fetcher-topic
Deploying function (may take a while - up to 2 minutes)...
.
For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=us-central1/969ee653-b8ff-41c2-b17f-bd6d5178faf5?project=774131484409
...................................................done.


Publish some data to the quote-fetcher-topic Pub/Sub topic

In [50]:
! gcloud pubsub topics publish quote-fetcher-topic --message "this is a test message"

messageIds:
- '2905842177728919'


Check out the logs to make sure the Quote Fetcher Cloud Function is being Fired by Pub/Sub events

In [40]:
! gcloud functions logs read quote_fetcher --limit 12

LEVEL  NAME           EXECUTION_ID  TIME_UTC                 LOG
D      quote_fetcher  c7nw33jjkj6f  2021-08-20 21:28:01.243  Function execution took 316 ms, finished with status: 'ok'
I      quote_fetcher  c7nw33jjkj6f  2021-08-20 21:28:01.102  {'text': "“The question isn't who is going to let me; it's who is going to stop me.”", 'author': 'Ayn Rand', 'tags': [], 'sentiment': None, 'magnitude': None}
I      quote_fetcher  c7nw33jjkj6f  2021-08-20 21:28:01.102  TOPIC_ID quotes
I      quote_fetcher  c7nw33jjkj6f  2021-08-20 21:28:01.102  PROJECT_ID qwiklabs-gcp-04-2ad6a04dc593
D      quote_fetcher  c7nw33jjkj6f  2021-08-20 21:28:00.927  Function execution started
D      quote_fetcher  c7nwxn6201f2  2021-08-20 21:26:01.352  Function execution took 404 ms, finished with status: 'ok'
I      quote_fetcher  c7nwxn6201f2  2021-08-20 21:26:01.232  {'text': '“There is nothing I would not do for those who are really my friends. I have no notion of loving people by halves, it is not my nature.”',

Create a Cloud Schedule Job to Push Messages to Pub/Sub which in turn Invokes the Cloud Function

In cloud shell runt he following.

```sh
gcloud services enable cloudscheduler.googleapis.com

export PROJECT_ID=$(gcloud config get-value project)
gcloud app create --project $PROJECT_ID --region us-central

gcloud scheduler jobs create pubsub quotefetcher \
  --schedule "*/2 * * * *" \
  --topic quote-fetcher-topic \
  --message-body "fetch quote"
```

## Create a Dataflow Pipeline

Create a directory to hold Beam Pipeline code

In [41]:
! mkdir quote_pipeline

mkdir: cannot create directory ‘quote_pipeline’: File exists


Create a requriements.txt file for Beam Python library dependencies

In [36]:
%%writefile ./quote_pipeline/requirements.txt

google-cloud-language==2.2.2

Overwriting ./quote_pipeline/requirements.txt


Below is the pipeline code which consumes data from Pub/Sub quotes topic containing messages in JSON format as shown below.

```json
{
  "text": "Contents of a quote",
  "author": "The Person Attributed with the Quote",
  "tags": ['A', 'list', 'of', 'tags', 'associated', 'with', 'quote'],
  "": 
  
}
```

In [47]:
%%writefile ./quote_pipeline/pipeline.py

import argparse
import typing

import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.runners import DataflowRunner

import google.auth
from google.cloud import language

import time

import json


class Quote(typing.NamedTuple):
    text : str
    author : str
    tags : typing.Sequence[str]
    sentiment : float
    magnitude : float

beam.coders.registry.register_coder(Quote, beam.coders.RowCoder)


def analyze_quote(element):
    row = json.loads(element.decode('utf-8'))
    
    client = language.LanguageServiceClient()
    
    doc = language.Document(content=row['text'],
                            type_=language.Document.Type.PLAIN_TEXT)
    
    response = client.analyze_sentiment(document=doc)

    row.update(
      sentiment=response.document_sentiment.score,
      magnitude=response.document_sentiment.magnitude
    )
    
    return row


def main(args):
    options = PipelineOptions(beam_args,
                              runner=args.runner,
                              streaming=True,
                              project=args.project,
                              region=args.region,
                              job_name='{}{}'.format('quotes-pipeline-', time.time_ns()),
                              staging_location=args.staginglocation,
                              temp_location=args.templocation,
                              save_main_session=True)
    
    table_spec = bigquery.TableReference(projectId=args.project,
                                         datasetId=args.bqdataset,
                                         tableId=args.bqtable)
    QUOTES_TABLE_SCHEMA = {
        "fields": [
            {
                "name": "text",
                "type": "STRING"
            },
            {
                "name": "author",
                "type": "STRING"
            },
            {
                "name": "tags",
                "type": "STRING",
                "mode": "REPEATED"
            },
            {
                "name": "sentiment",
                "type": "FLOAT"
            },
            {
                "name": "sentiment",
                "type": "FLOAT"
            }
        ]
    }
    
    with beam.Pipeline(options=options) as p:
        (p  | "ReadPubSub" >> beam.io.ReadFromPubSub(args.pubsubtopic)
            | "AnalyzeQuote" >> beam.Map(analyze_quote)
            | "SaveToBigQuery" >> beam.io.WriteToBigQuery(
                                          table_spec,
                                          schema=QUOTES_TABLE_SCHEMA,
                                          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

        
if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--runner', default='DataflowRunner')
    parser.add_argument('--project')
    parser.add_argument('--region')
    parser.add_argument('--bqdataset')
    parser.add_argument('--bqtable')
    parser.add_argument('--staginglocation')
    parser.add_argument('--templocation')
    parser.add_argument('--pubsubtopic')
    parser.add_argument('--requirements_file')
    
    args, beam_args = parser.parse_known_args()
    
    main(args)

Overwriting ./quote_pipeline/pipeline.py


Create Staging and Temp Dataflow Cloud Storage Endpoints

In [32]:
! gsutil mb -l US gs://quotes-pipeline-qwiklabs-gcp-04-2ad6a04dc593

Creating gs://quotes-pipeline-qwiklabs-gcp-04-2ad6a04dc593/...


In [53]:
! python quote_pipeline/pipeline.py \
   --project qwiklabs-gcp-04-2ad6a04dc593 \
   --region us-central1 \
   --bqdataset quotesds \
   --bqtable quotes \
   --staginglocation gs://quotes-pipeline-qwiklabs-gcp-04-2ad6a04dc593/staging \
   --templocation gs://quotes-pipeline-qwiklabs-gcp-04-2ad6a04dc593/temp \
   --pubsubtopic projects/qwiklabs-gcp-04-2ad6a04dc593/topics/quotes \
   --requirements_file quote_pipeline/requirements.txt 

  experiments = p.options.view_as(DebugOptions).experiments or []
Traceback (most recent call last):
  File "quote_pipeline/pipeline.py", line 117, in <module>
    main(args)
  File "quote_pipeline/pipeline.py", line 100, in main
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/pipeline.py", line 581, in __exit__
    self.result.wait_until_finish()
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1674, in wait_until_finish
    'Job did not reach to a terminal state after waiting indefinitely.')
AssertionError: Job did not reach to a terminal state after waiting indefinitely.
