In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform $USER_FLAG -q

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [82]:
import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  qwiklabs-gcp-01-df00ef1cf655


In [83]:
REGION = "[your-region]"  # @param {type:"string"}
if REGION == "[your-region]":
    REGION = "us-central1"

In [9]:
#!pip install google_cloud_pipeline_components

In [5]:
dataset_path = 'gs://cloud-ai-platform-60f6fd86-0e8f-40ce-bf1b-6c73847561e2/creditcard.csv'

In [74]:
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.components import OutputPath,InputPath
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Output,
    Input,
    Artifact,
    Model,
)

In [27]:
BUCKET_NAME = "gs://vaibucket"
PIPELINE_ROOT = "{}/pipeline_root".format(BUCKET_NAME)

In [28]:
PIPELINE_ROOT

'gs://vaibucket/pipeline_root'

In [118]:
#define all components here

@component
def get_input_data() -> str:
    # getting data from API, save to Cloud Storage
    # return GS URI
    gcs_batch_input_path = 'gs://cloud-ai-platform-60f6fd86-0e8f-40ce-bf1b-6c73847561e2/creditcard.csv'
    return gcs_batch_input_path



@component(
    base_image="python:3.9",
    packages_to_install=['pandas']
)
def feature_engineering(inputpath:str) -> str:
    '''take input file and perform feature engineering'''
    updatedinputpath = inputpath
    return updatedinputpath



@component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.8.0']
)
def batch_prediction(inputgcsuri:str, outputbquri:str):
    import google.cloud.aiplatform as aip

    model_id = '75325342595678208'
    model = aip.Model(model_name=model_id, project='qwiklabs-gcp-01-df00ef1cf655', location='us-central1')
    model.batch_predict(
        job_display_name="prediction-123",
        gcs_source=inputgcsuri,
        instances_format="csv",
        bigquery_destination_prefix=outputbquri
    )
    


In [121]:
@dsl.pipeline(
    name="my-pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
        project_id: str = "qwiklabs-gcp-01-df00ef1cf655" ,
        region: str = 'us-central1',
        bucket: str = "gs://vaibucket",
        big_query_op: str = "bq://qwiklabs-gcp-01-df00ef1cf655.credit"
):
    import datetime
    ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    
    batch_input_data_op = get_input_data()  # this loads input data to GCS path
    feature_input = feature_engineering(batch_input_data_op.output)
    batch_prediction(feature_input.output, big_query_op)

    



In [122]:
pipeline_export_filepath = 'test-pipeline.json'
compiler.Compiler().compile(pipeline_func=pipeline,
                                package_path=pipeline_export_filepath)

In [123]:
#Preparining the pipeline to submit to VertexAI
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
        display_name="scoring_pipeline_3",
        template_path=pipeline_export_filepath,
        pipeline_root=PIPELINE_ROOT,
        enable_caching = False
    )


In [124]:
#Submitting the Pipeline to VertexAI
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/367561912346/locations/us-central1/pipelineJobs/my-pipeline-20220926144613
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/367561912346/locations/us-central1/pipelineJobs/my-pipeline-20220926144613')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/my-pipeline-20220926144613?project=367561912346
PipelineJob projects/367561912346/locations/us-central1/pipelineJobs/my-pipeline-20220926144613 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/367561912346/locations/us-central1/pipelineJobs/my-pipeline-20220926144613 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/367561912346/locations/us-central1/pipelineJobs/my-pipeline-20220926144613 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/367561912346/locations/us-central1/pipelineJobs/my-pipeline-20220926144613

In [123]:
import google.cloud.aiplatform as aip

model_id = '75325342595678208'
model = aip.Model(model_name=model_id, project=PROJECT_ID, location='us-central1')

In [17]:
#batch_input_data_op = get_input_data()

In [124]:
response = model.batch_predict(
        job_display_name="prediction-123",
        bigquery_source="bq://qwiklabs-gcp-01-df00ef1cf655.credit.credit_data",
        instances_format="bigquery",
        bigquery_destination_prefix="bq://qwiklabs-gcp-01-df00ef1cf655.credit",
        machine_type = "n1-standard-2",
        sync = True
    )


Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/367561912346/locations/us-central1/batchPredictionJobs/6921052687530196992
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/367561912346/locations/us-central1/batchPredictionJobs/6921052687530196992')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/6921052687530196992?project=367561912346
BatchPredictionJob projects/367561912346/locations/us-central1/batchPredictionJobs/6921052687530196992 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/367561912346/locations/us-central1/batchPredictionJobs/6921052687530196992 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/367561912346/locations/us-central1/batchPredictionJobs/6921052687530196992 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/367561912346/locations/us-central1/batchPredictionJobs/

RuntimeError: Job failed with:
code: 1
message: "CANCELED"


In [35]:
response.output_info["bigquery_output_dataset"]

bigquery_output_dataset: "bq://qwiklabs-gcp-01-df00ef1cf655.credit"
bigquery_output_table: "predictions_2022_09_29T04_23_10_521Z_274"

In [115]:
f = "bq://qwiklabs-gcp-01-df00ef1cf655.credit"

In [118]:
str(f[5:])

'qwiklabs-gcp-01-df00ef1cf655.credit'

In [107]:
response.output_info.bigquery_output_table

'predictions_2022_09_29T04_23_10_521Z_274'

In [None]:
qwiklabs-gcp-01-df00ef1cf655.credit.predictions_2022_09_26T06_44_33_719Z_504

In [37]:
from google_cloud_pipeline_components import aiplatform as gcc_aip

In [None]:
model.

In [41]:
from google.cloud import bigquery

In [42]:
client = bigquery.Client(location="us-central1", project=PROJECT_ID)
table_id = "qwiklabs-gcp-01-df00ef1cf655.credit.credit_data"

In [125]:
#data = client.query(query= "SELECT uid, class FROM qwiklabs-gcp-01-df00ef1cf655.credit.credit_data LIMIT 100")
data = client.query(query= "SELECT * FROM qwiklabs-gcp-01-df00ef1cf655.credit.predictions_2022_09_26T06_44_33_719Z_504 limit 100")


In [72]:
import json
records = [dict(row) for row in data]
json_obj = json.dumps(str(records))

ValueError: dictionary update sequence element #0 has length 1; 2 is required

In [126]:
t = data.to_dataframe()

In [127]:
t

Unnamed: 0,amount,class,predicted_class,uid,v1,v10,v11,v12,v13,v14,...,v26,v27,v28,v3,v4,v5,v6,v7,v8,v9
0,0,0,"{'classes': ['0', '1'], 'scores': [0.999757230...",A177669,-1.005771658,0.047577915,-0.861037283,-0.203845076,0.619057513,-0.247185755,...,2.470206274,-0.156485698,0.027599494,1.519595528,0.383493159,0.763643416,-0.120690716,0.80466248,-0.872072893,-1.700160777
1,0,0,"{'classes': ['0', '1'], 'scores': [0.999792456...",A173907,2.179643887,0.300387691,-1.275377159,-0.598664868,-1.229645119,1.029418718,...,0.709802537,-0.135570405,-0.091907826,-2.652050663,0.355470952,1.002058838,-1.265588764,0.895277546,-0.508510292,0.016497432
2,0,0,"{'classes': ['0', '1'], 'scores': [0.999568104...",A131262,-0.268992308,0.068239822,-1.594267805,0.202640185,1.105694241,-0.498228909,...,-0.171325885,0.042321875,0.025788406,1.694042213,1.88908876,0.595605415,0.40926808,0.629258239,0.069031047,-0.946909822
3,0,0,"{'classes': ['0', '1'], 'scores': [0.999524950...",A103391,0.243426752,0.184182984,-0.371880355,0.55608226,0.491201813,0.301633834,...,0.839887561,0.198165317,0.233434354,0.715695084,1.231665609,-0.187426818,-0.461335095,-0.457976376,-1.836685661,-0.286415
4,0,0,"{'classes': ['0', '1'], 'scores': [0.999669492...",A133537,-0.929699988,0.40138532,0.200009805,0.615586146,-0.217176607,-0.106266113,...,-0.634668702,0.062523387,0.343740508,1.549226878,1.693430329,1.038639374,-0.214544514,-0.032842794,0.248008535,-0.598264605
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,0,0,"{'classes': ['0', '1'], 'scores': [0.999491930...",A8387,-0.319472963,0.513002902,1.997996025,-1.773975124,2.755055753,1.699638236,...,-0.229380397,0.159103894,0.176801885,1.92644332,2.907521904,0.668914523,0.241443829,0.126259374,0.054475708,-0.495370519
96,0,0,"{'classes': ['0', '1'], 'scores': [0.998918592...",A191797,1.882226593,1.314816255,-1.823095547,-0.508250731,-1.138742965,-0.123171693,...,0.141150974,-0.005872978,-0.049902437,-0.165758336,4.056814331,-0.087563473,0.23657199,-0.255124921,0.096423964,-0.097107001
97,0,0,"{'classes': ['0', '1'], 'scores': [0.999618649...",A161316,-2.956092749,1.39514984,-2.015018564,0.154281451,0.115434202,-0.344799675,...,-0.238724867,0.084017203,0.960471226,0.599051953,4.228797707,4.343250389,-2.446533672,-2.009515971,0.205327923,-0.226057836
98,0,0,"{'classes': ['0', '1'], 'scores': [0.999858975...",A202978,2.079117597,0.034943081,0.676726673,1.464482311,0.802265884,0.42774779,...,-0.070882688,-0.048387205,-0.076541699,-1.564499903,-0.080450641,0.378175508,-0.805268615,0.323178799,-0.295062712,0.311881447


In [128]:
d_publish = str(t.T.to_json())

In [100]:
#d_publish

In [13]:
new_df = t.drop(columns=['uid', 'class'])

In [52]:
ip_data = new_df.head(10)

In [136]:
#client.load_table_from_dataframe(dataframe= df, destination=table_id)

LoadJob<project=qwiklabs-gcp-01-df00ef1cf655, location=us-central1, id=46b86cc0-4b61-4bb4-b01d-074487a9c519>

In [139]:
#ta = client.list_tables("qwiklabs-gcp-01-df00ef1cf655.credit")

In [None]:
    # gcc_aip.ModelBatchPredictOp(
    #                             project = project_id,
    #                             job_display_name = "scoring_test_model_" + ts,
    #                             location = 'us-central1',
    #                             model = loaded_model.output,
    #                             instances_format="csv",
    #                             gcs_source_uris=dataset_path,
    #                             bigquery_destination_output_uri = big_query_op,
    #                             ).after(batch_input_data_op)



In [5]:
endpoint = aip.Endpoint(endpoint_name = '3010660748943687680', project=PROJECT_ID, location='us-central1')

In [53]:
ip_data = ip_data.astype(str)

In [54]:
d = ip_data.T.to_dict().values()

In [56]:
res = endpoint.predict(d)

In [57]:
for prediction_ in res.predictions:
        print(prediction_)

{'scores': [0.9995893836021423, 0.0004106451524421573], 'classes': ['0', '1']}
{'classes': ['0', '1'], 'scores': [0.9997668862342834, 0.0002330496645299718]}
{'scores': [0.9997137188911438, 0.0002863074187189341], 'classes': ['0', '1']}
{'classes': ['0', '1'], 'scores': [0.9996871948242188, 0.0003127176314592361]}
{'scores': [0.9994317293167114, 0.0005681620095856488], 'classes': ['0', '1']}
{'scores': [0.9998094439506531, 0.0001905228564282879], 'classes': ['0', '1']}
{'scores': [0.9996927380561829, 0.0003072416293434799], 'classes': ['0', '1']}
{'scores': [0.9992913603782654, 0.0007085721590556204], 'classes': ['0', '1']}
{'classes': ['0', '1'], 'scores': [0.9993150234222412, 0.000685076869558543]}
{'classes': ['0', '1'], 'scores': [0.9992673397064209, 0.0007326480117626488]}


In [1]:
from google_cloud_pipeline_components.experimental.dataproc import \
        DataprocPySparkBatchOp

In [None]:
DataprocPySparkBatchOp()

In [129]:
from google.cloud import pubsub_v1

# TODO(developer)
project_id = PROJECT_ID
topic_id = "credit"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    #data_str = f"Message number {n}"
    data_str = d_publish
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print(f"Published messages with custom attributes to {topic_path}.")

5783237892996646
5783213236451640
5783237347397717
5783212452112260
5783248283712547
5783207552133286
5783235593549220
5783213791725602
5783192622782321
Published messages with custom attributes to projects/qwiklabs-gcp-01-df00ef1cf655/topics/credit.
