## Project Setup

In [1]:
#pip install --upgrade google-cloud-pubsub

In [2]:
import os
import ast
import json

In [3]:
import random
import time, datetime

In [4]:
#pip install essential_generators

In [5]:
from essential_generators import DocumentGenerator

In [6]:
gen = DocumentGenerator()

In [7]:
from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema
from google.cloud import pubsub_v1

In [8]:
publisher = pubsub_v1.PublisherClient()

In [9]:
project_id = "curious-skyline-360213"

In [10]:
from google.cloud import bigquery
client = bigquery.Client(location="us-west1", project=project_id)

import pandas as pd

## Pub/Sub Schemas

### List existing schemas

In [520]:
project_path = f"projects/{project_id}"
schema_client = SchemaServiceClient()

for schema in schema_client.list_schemas(request={"parent": project_path}):
    print(schema)

name: "projects/curious-skyline-360213/schemas/CX_schema"
type_: AVRO

Listed schemas.


https://cloud.google.com/pubsub/docs/schemas#python

### Create new schema

In [457]:
schema_id = "CX_schema"

schema_dict ={
 "type" : "record",
 "name" : "Avro",
    
 "fields" : [
   {
     "name" : "Market",
     "type" : "string"
   },
   {
     "name" : "Ministry",
     "type" : "string"
   },
   {
     "name" : "Department",
     "type" : "int"
   },
   {
     "name" : "Type_Of_Visit",
     "type" : "string"
   },
   {
     "name" : "Survey_Completion_Date",
     "type" : "string"
   },
   {
     "name" : "Survey_Type",
     "type" : "string"
   },
   {
     "name" : "Survey_Project",
     "type" : "int"
   },
   {
     "name" : "Survey_ID",
     "type" : "int"
   },
   {
     "name" : "Gender",
     "type" : "string"
   },
   {
     "name" : "Age_Group",
     "type" : "string"
   },
   {
     "name" : "LTR_Facility",
     "type" : "int"
   },
   {
     "name" : "LTR_Doctor",
     "type" : "int"
   },
   {
     "name" : "Anything_Outstanding",
     "type" : "string"
   },
   {
     "name" : "Improve_Stay",
     "type" : "string"
   }

 ]
}

In [205]:
project_path = f"projects/{project_id}"

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=json.dumps(schema_dict))

try:
    result = schema_client.create_schema(
        request={"parent": project_path, "schema": schema, "schema_id": schema_id}
    )
    print(f"Created a schema using an Avro schema file:\n{result}")
except AlreadyExists:
    print(f"{schema_id} already exists.")

CX_schema already exists.


### Existing schema details

In [519]:
schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

try:
    result = schema_client.get_schema(request={"name": schema_path})
    print(f"Got a schema:\n{result}")
except NotFound:
    print(f"{schema_id} not found.")

Got a schema:
name: "projects/curious-skyline-360213/schemas/CX_schema"
type_: AVRO
definition: "{\"type\": \"record\", \"name\": \"Avro\", \"fields\": [{\"name\": \"Market\", \"type\": \"string\"}, {\"name\": \"Ministry\", \"type\": \"string\"}, {\"name\": \"Department\", \"type\": \"int\"}, {\"name\": \"Type_Of_Visit\", \"type\": \"string\"}, {\"name\": \"Survey_Completion_Date\", \"type\": \"string\"}, {\"name\": \"Survey_Type\", \"type\": \"string\"}, {\"name\": \"Survey_Project\", \"type\": \"int\"}, {\"name\": \"Survey_ID\", \"type\": \"int\"}, {\"name\": \"Gender\", \"type\": \"string\"}, {\"name\": \"Age_Group\", \"type\": \"string\"}, {\"name\": \"LTR_Facility\", \"type\": \"int\"}, {\"name\": \"LTR_Doctor\", \"type\": \"int\"}, {\"name\": \"Anything_Outstanding\", \"type\": \"string\"}, {\"name\": \"Improve_Stay\", \"type\": \"string\"}]}"



### Delete schema

In [None]:
# schema_client = SchemaServiceClient()
# schema_path = schema_client.schema_path(project_id, schema_id)

# try:
#     schema_client.delete_schema(request={"name": schema_path})
#     print(f"Deleted a schema:\n{schema_path}")
# except NotFound:
#     print(f"{schema_id} not found.")

## BQ Tables

### Create BQ table from schema with Pub/Sub metadata

Create a BQ table following schema crated in previous section.
Table contains all additional Pub/Sub metadata columns.

In [532]:
#table_name_ = 'Stream_test_schema_attributes'
table_name_ = "CX_Stream_hour_partition"

In [533]:
def get_column_names_from_schema(schema_example):
    
    columns = [column['name'] for column in schema_example['fields']]
    return columns

def get_type_from_schema(schema_example):
    types = [column['type'] for column in schema_example['fields']]
    types = [type_.replace("int", "INTEGER") for type_ in types] 
    return types


In [458]:
# df_ = pd.DataFrame(columns=get_column_names_from_schema(schema_dict) + 
#                    ['subscription_name', 'message_id', 'publish_time', 'data', 'attributes'])

In [534]:
bq_schema = []
for col_, type_ in zip(get_column_names_from_schema(schema_dict), get_type_from_schema(schema_dict)):
    bq_schema.append(bigquery.SchemaField(col_, type_))
    
    
#https://cloud.google.com/pubsub/docs/bigquery    
bq_schema.append(bigquery.SchemaField('subscription_name', 'string'))
bq_schema.append(bigquery.SchemaField('message_id', 'string'))
bq_schema.append(bigquery.SchemaField('publish_time', 'timestamp'))
bq_schema.append(bigquery.SchemaField('data', 'string'))
bq_schema.append(bigquery.SchemaField('attributes', 'string'))

In [535]:
bq_schema

[SchemaField('Market', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Ministry', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Department', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('Type_Of_Visit', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Survey_Completion_Date', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Survey_Type', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Survey_Project', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('Survey_ID', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('Gender', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Age_Group', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('LTR_Facility', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('LTR_Doctor', 'INTEGER', 'NULLABLE', None, (), None),
 SchemaField('Anything_Outstanding', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('Improve_Stay', 'STRING', 'NULLABLE', None, (), None),
 SchemaField('subscription_name', 'STRING', 'NULL

In [538]:
table_name = f'curious-skyline-360213.CX.{table_name_}'

table = bigquery.Table(table_name, schema=bq_schema)
table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.HOUR,
    field="publish_time",  # name of column to use for partitioning
) 

table = client.create_table(table)

In [211]:
# table_name = f'curious-skyline-360213.CX.{table_name_}'

# # Configure the load job
# job_config = bigquery.LoadJobConfig(
#     schema=bq_schema
# )

# try:       
#     client.create_table(table_name, exists_ok=False)
    
#     client.load_table_from_dataframe(dataframe=df_, destination=table_name, job_config=job_config)
# except AlreadyExists:
#     print(f"{table_name} already exists.")

### Check dataset schema

In [541]:
query ="""
SELECT 
table_name, 
column_name, 
data_type

FROM `curious-skyline-360213.CX.INFORMATION_SCHEMA.COLUMNS`

WHERE table_name  = 'CX_data_conform_hourpartition'

"""
query_job = client.query(query)
df_project = query_job.to_dataframe()

In [542]:
df_project

Unnamed: 0,table_name,column_name,data_type
0,CX_data_conform_hourpartition,Market,STRING
1,CX_data_conform_hourpartition,Ministry,STRING
2,CX_data_conform_hourpartition,Department,INT64
3,CX_data_conform_hourpartition,Type_Of_Visit,STRING
4,CX_data_conform_hourpartition,Survey_Completion_Date,STRING
5,CX_data_conform_hourpartition,Survey_Type,STRING
6,CX_data_conform_hourpartition,Survey_Project,INT64
7,CX_data_conform_hourpartition,Survey_ID,INT64
8,CX_data_conform_hourpartition,Gender,STRING
9,CX_data_conform_hourpartition,Age_Group,STRING


## Pub/Sub BQ Subscription

## List all existing subscriptions

In [544]:
subscriber = pubsub_v1.SubscriberClient()
project_path = f"projects/{project_id}"

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    for subscription in subscriber.list_subscriptions(
        request={"project": project_path}
    ):
        print(subscription.name)

projects/curious-skyline-360213/subscriptions/CX_schema_json_BQ
projects/curious-skyline-360213/subscriptions/CX_schema_hourly


### List all subscriptions to given topic

In [547]:
topic_id = "CX_schema_json"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

response = publisher.list_topic_subscriptions(request={"topic": topic_path})
for subscription in response:
    print(subscription)

projects/curious-skyline-360213/subscriptions/CX_schema_hourly


### Create BQ Subscription with above schema and table

In [543]:
topic_id = "CX_schema_json"
subscription_id = "CX_schema_hourly"
bigquery_table_id = f"curious-skyline-360213.CX.{table_name_}"

subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(table=bigquery_table_id, write_metadata=True, use_topic_schema=True)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")

BigQuery subscription created: name: "projects/curious-skyline-360213/subscriptions/CX_schema_hourly"
topic: "projects/curious-skyline-360213/topics/CX_schema_json"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}
expiration_policy {
  ttl {
    seconds: 2678400
  }
}
bigquery_config {
  table: "curious-skyline-360213.CX.CX_Stream_hour_partition"
  use_topic_schema: true
  write_metadata: true
  state: ACTIVE
}
state: ACTIVE
.
Table for subscription is: curious-skyline-360213.CX.CX_Stream_hour_partition


### Delete subscription

In [545]:
subscription_id = "CX_schema_json_BQ"


subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscriber.delete_subscription(request={"subscription": subscription_path})

print(f"Subscription deleted: {subscription_path}.")

Subscription deleted: projects/curious-skyline-360213/subscriptions/CX_schema_json_BQ.


## Fake data generator

In [445]:
df_project['column_name'][:-5].to_list()

['Market',
 'Ministry',
 'Department',
 'Type_Of_Visit',
 'Survey_Completion_Date',
 'Survey_Type',
 'Survey_Project',
 'Survey_ID',
 'Gender',
 'Age_Group',
 'LTR_Facility',
 'LTR_Doctor',
 'Anything_Outstanding',
 'Improve_Stay']

In [14]:
def generate_message():    
    #Enterprise Hierarchy
    def Market():
        return random.choice(['TN', 'TX', 'MI', 'FL'])
    def Ministry():
        return random.choice(['TNNAS', 'TXAUS', 'MIDET', 'FLJAC' ])
    def Department():
        return random.randint(10000, 30000)
    
    #Reporting hierarchy to be joined off separate source once data is processed

    #Tuchpoint metadata
    def Type_Of_Visit():
        return random.choice(['In Person', 'Virtual'])

    #Survey metadata
    def Survey_Completion_Date():
        return datetime.date.today().strftime('%m-%d-%Y')
    def Survey_Type(): 
        return random.choice(['OAS', 'AMG', 'HCAHPS', 'ED'])
    def Survey_Project(): 
        return random.randint(10000, 30000)
    def Survey_ID(): 
        return random.randint(10000, 30000)

    #Patient demographics
    def Gender():
        return random.choice(['Male', 'Female'])
    def Age_Group():
        return random.choice(['18-24', '25-29', '30-34', '35-39'])

    #Survey questions
    def Anything_Outstanding():
        return gen.gen_sentence()
    def Improve_Stay():
        return gen.gen_sentence()
    def LTR_Facility():
        return random.randint(1, 10)
    def LTR_Doctor():
        return random.randint(1, 10)

    return {"Market" : Market(),
            "Ministry": Ministry(),
            'Department': Department(),
            'Type_Of_Visit': Type_Of_Visit(),
             'Survey_Completion_Date': Survey_Completion_Date(),
             'Survey_Type': Survey_Type(),
             'Survey_Project': Survey_Project(),
             'Survey_ID': Survey_ID(),
             'Gender': Gender(),
             'Age_Group': Age_Group(),
             'LTR_Facility': LTR_Facility(),
             'LTR_Doctor': LTR_Doctor(),
             'Anything_Outstanding': Anything_Outstanding(),
             'Improve_Stay': Improve_Stay()            
           }

In [15]:
generate_message()

{'Market': 'FL',
 'Ministry': 'MIDET',
 'Department': 29648,
 'Type_Of_Visit': 'In Person',
 'Survey_Completion_Date': '08-26-2022',
 'Survey_Type': 'HCAHPS',
 'Survey_Project': 16157,
 'Survey_ID': 20226,
 'Gender': 'Male',
 'Age_Group': '30-34',
 'LTR_Facility': 10,
 'LTR_Doctor': 7,
 'Anything_Outstanding': "No restrictions Okinotorishima. Japan's",
 'Improve_Stay': 'Alexandrine and energy driving a crane motor. Lifting'}

## Publish single message to Pub/Sub - deprecated

Doing this in a round about way by means of DataFrame to display message in human readable format.  
Batch messaging skips the DataFrame preview step.

In [481]:
df_.loc[0] = generate_message()

In [482]:
df_

Unnamed: 0,Market,Ministry,Department,Type_Of_Visit,Survey_Completion_Date,Survey_Type,Survey_Project,Survey_ID,Gender,Age_Group,LTR_Facility,LTR_Doctor,Anything_Outstanding,Improve_Stay,subscription_name,message_id,publish_time,data,attributes
0,TN,TXAUS,17950,In Person,08-23-2022,HCAHPS,14714,24663,Male,30-34,5,9,Genre. DJs became unsustainable and he establi...,Burnings of Ertebølle middens,,,,,


In [483]:
payload = df_.iloc[0, :-5].to_json()
data = str.encode(payload)

In [484]:
topic_name = "CX_schema_json"
topic_path = f"projects/{project_id}/topics/{topic_name}"

publisher.publish(
    topic = topic_path,
    data = data)

<Future at 0x7f629f194650 state=pending>

## Publish lots of messages at random intervals

In [16]:
def simulate_stream(n_minutes, n_burst, topic_name):
    """
    Sends bursts of n_burst every 1-30 seconds over a period of specified n_minutes.
    """
    
    
    topic_path = f"projects/{project_id}/topics/{topic_name}"

    t_end = time.time() + 60 * n_minutes
    n_batches = 0
    while time.time() < t_end:
        #send a burst of n_burst messages
        for _ in range(n_burst):
            publisher.publish(
                topic = topic_path,
                data = str.encode(json.dumps(generate_message())))
        
        #wait for some time from 1 to 30 seconds
        time.sleep(random.randint(1, 30))
        n_batches += 1
    
    total_messages = n_batches*n_burst
    print(f'Published {total_messages} messages.')

In [17]:
%%time
simulate_stream(n_minutes=1, n_burst=20, topic_name="CX_schema_json")

Published 80 messages.
CPU times: user 68.7 ms, sys: 7.49 ms, total: 76.2 ms
Wall time: 1min 6s


### Query sink table subscription

In [508]:
query ="""
SELECT 
*

FROM `curious-skyline-360213.CX.Stream_test_schema_attributes`

"""
query_job = client.query(query,
    #location="us-west1",
)

df_data = query_job.to_dataframe()
df_data

Unnamed: 0,Market,Ministry,Department,Type_Of_Visit,Survey_Completion_Date,Survey_Type,Survey_Project,Survey_ID,Gender,Age_Group,LTR_Facility,LTR_Doctor,Anything_Outstanding,Improve_Stay,subscription_name,message_id,publish_time,data,attributes
0,TX,TNNAS,17218,In Person,08-23-2022,AMG,12996,14408,Male,25-29,9,10,"1688, noted address uniqueness. The size of th...","Only Marxist religious congregations. 100,960 ...",projects/348611359036/subscriptions/CX_schema_...,5425698921604647,2022-08-23 18:02:40.511000+00:00,,"{""googclient_schemaname"": ""projects/curious-sk..."
1,TX,TNNAS,17218,In Person,08-23-2022,AMG,12996,14408,Male,25-29,9,10,"1688, noted address uniqueness. The size of th...","Only Marxist religious congregations. 100,960 ...",projects/348611359036/subscriptions/CX_schema_...,5425732453511627,2022-08-23 18:03:08.073000+00:00,,"{""googclient_schemaencoding"": ""JSON"",""googclie..."
2,TN,TXAUS,17950,In Person,08-23-2022,HCAHPS,14714,24663,Male,30-34,5,9,Genre. DJs became unsustainable and he establi...,Burnings of Ertebølle middens,projects/348611359036/subscriptions/CX_schema_...,5426276393328167,2022-08-23 19:38:03.067000+00:00,,"{""googclient_schemaname"": ""projects/curious-sk..."
3,TN,TNNAS,15854,In Person,08-23-2022,HCAHPS,14125,26883,Female,25-29,1,3,By 55%. sustainable economic development,Social contexts. manpower contribution to the end,projects/348611359036/subscriptions/CX_schema_...,5426322517225864,2022-08-23 19:45:23.562000+00:00,,"{""googclient_schemaname"": ""projects/curious-sk..."
4,TX,TXAUS,23298,In Person,08-23-2022,OAS,13045,13832,Female,25-29,1,2,Delta provides production near,Million bits the 61 world's largest 10 km race...,projects/348611359036/subscriptions/CX_schema_...,5426359843136933,2022-08-23 19:55:27.578000+00:00,,"{""googclient_schemaencoding"": ""JSON"",""googclie..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
579,TX,MIDET,10264,Virtual,08-23-2022,HCAHPS,22440,28308,Male,30-34,7,6,By Europe tribes encountered,"(120.2 kg), This concept of",projects/348611359036/subscriptions/CX_schema_...,5426409448828666,2022-08-23 20:04:59.381000+00:00,,"{""googclient_schemaname"": ""projects/curious-sk..."
580,FL,MIDET,11315,In Person,08-23-2022,ED,17440,21678,Female,35-39,9,4,"Western France, short stay hotels are short da...",Democratic republic. middle-étage bases that,projects/348611359036/subscriptions/CX_schema_...,5426409448828668,2022-08-23 20:04:59.381000+00:00,,"{""googclient_schemaencoding"": ""JSON"",""googclie..."
581,MI,TNNAS,25706,Virtual,08-23-2022,OAS,28083,25199,Female,18-24,5,1,Itself. A exclusive private,Typically light altocumulus (Ac.,projects/348611359036/subscriptions/CX_schema_...,5426409448828669,2022-08-23 20:04:59.381000+00:00,,"{""googclient_schemaencoding"": ""JSON"",""googclie..."
582,TX,MIDET,23141,Virtual,08-23-2022,HCAHPS,25598,23165,Female,18-24,6,5,"Exporting, manufacturing, leisure trips!",All being such names as (translated from Frenc...,projects/348611359036/subscriptions/CX_schema_...,5426409448828667,2022-08-23 20:04:59.381000+00:00,,"{""googclient_schemaencoding"": ""JSON"",""googclie..."
