In [None]:
# set params
P = ! gcloud config list --format 'value(core.project)'
PROJECT_ID = P[0]
REGION = "us-central1"
SERVICE_ACCOUNT = f"sa-vertex-pipelines@{PROJECT_ID}.iam.gserviceaccount.com"
NETWORK = "vpc-adam-default"

USE_CASE = "dataflow-demo"

# GCS source inputs
GCS_BUCKET_NAME = f"bkt-{REGION}-{USE_CASE}"
GCS_BUCKET_PATH = f"gs://{GCS_BUCKET_NAME}"
GCS_BUCKET_PATH_DATA = f"{GCS_BUCKET_PATH}/data/"
GCS_BUCKET_PATH_CONFIGS = f"{GCS_BUCKET_PATH}/configs/"
GCS_BUCKET_PATH_TMP = f"{GCS_BUCKET_PATH}/tmp"
GCS_BUCKET_PATH_STAGING = f"{GCS_BUCKET_PATH}/staging"

# dataflow job inputs
UDF_FILENAME = "user_defined_function.js"
BQ_SCHEMA_FILENAME = "bq_schema.json"

# BQ destination inputs
BQ_DATASET = f"ds_{REGION.replace('-','')}"
BQ_TABLE = f"tb_{USE_CASE.replace('-','')}"
BQ_DESTINATION = f"{BQ_DATASET}.{BQ_TABLE}"

In [None]:
# create bucket
! gsutil mb -p {PROJECT_ID} -c standard -l {REGION} {GCS_BUCKET_PATH}
! gsutil ls -L -b {GCS_BUCKET_PATH}

In [None]:
# create BQ table
! bq rm --dataset --recursive=true --force=true {PROJECT_ID}:{BQ_DATASET}
! bq mk --dataset --location={REGION} {PROJECT_ID}:{BQ_DATASET}
####  ensure the schema named here matches the file you create below
! bq mk --table {PROJECT_ID}:{BQ_DATASET}.{BQ_TABLE} ID:INTEGER,data:INTEGER

In [None]:
#####################################################################
#
# generate example JSON files
#
#####################################################################

In [None]:
# generate some JSON files
n_files =  10
n_rows_per_file = 10

# create and copy files to GCS
num_files = [i for i in range(1, n_files)]
file_pattern = f"file_*.json"
for num in num_files:
    myfile = file_pattern.replace("*",str(num))
    with open(myfile, 'w') as g:
        for idx, i in enumerate(range(1, n_rows_per_file)):
            k = {"ID" : idx, "data" : i}
            g.write(json.dumps(k)+"\n")
    
    # write file to GCS bucket            
    GCS_FILE = GCS_BUCKET_PATH_DATA + myfile
    ! gsutil cp {myfile} {GCS_FILE}

# view a file
! head file_1.json

In [None]:
#####################################################################
#
# write configs for dataflow job
#
#####################################################################

In [None]:
#-------------------
# bq_schema.json
#-------------------

In [None]:
%%writefile {BQ_SCHEMA_FILENAME}

{
    "BigQuery Schema": [
        {
            "name": "ID",
            "type": "INTEGER",
            "mode": "NULLABLE"
        },
        {
            "name": "data",
            "type": "INTEGER",
            "mode": "NULLABLE"
        }
    ]
}

In [None]:
# copy file to GCS
! gsutil cp {BQ_SCHEMA_FILENAME} {GCS_BUCKET_PATH_CONFIGS + BQ_SCHEMA_FILENAME}

In [None]:
#-------------------
# bq_schema.json
#-------------------

In [None]:
%%writefile {UDF_FILENAME}

function transform(line) {
    var jsonInput = JSON.parse(line);

    var obj = new Object();
    obj.ID = jsonInput.ID;
    obj.data = jsonInput.data;

    var jsonString = JSON.stringify(obj);
    return jsonString;
}

In [None]:
# copy file to GCS
! gsutil cp {UDF_FILENAME} {GCS_BUCKET_PATH_CONFIGS + UDF_FILENAME}

In [None]:
#####################################################################
#
# create dataflow job
#
#####################################################################

In [None]:
params = [f"javascriptTextTransformGcsPath={GCS_BUCKET_PATH_CONFIGS + UDF_FILENAME}"
          ,f"JSONPath={GCS_BUCKET_PATH_CONFIGS + BQ_SCHEMA_FILENAME}"
          ,f"javascriptTextTransformFunctionName=transform"
          ,f"outputTable={PROJECT_ID}:{BQ_DATASET}.{BQ_TABLE}"
          ,f"inputFilePattern={GCS_BUCKET_PATH_DATA + file_pattern}"
          ,f"bigQueryLoadingTemporaryDirectory={GCS_BUCKET_PATH_TMP}"
         ]

p = ",".join(params)
params

In [None]:
! gcloud dataflow jobs run df-13 \
  --gcs-location "gs://dataflow-templates-us-central1/latest/GCS_Text_to_BigQuery" \
  --region "us-central1" \
  --service-account-email "sa-vertex-pipelines@ap-alto-ml-1000.iam.gserviceaccount.com" \
  --staging-location "gs://bkt-us-central1-dataflow-demo/staging" \
  --network "vpc-adam-default" \
  --parameters {p}

In [None]:
#####################################################################
#
# view data in BigQuery
#
#####################################################################

In [None]:
from google.cloud import bigquery

In [None]:
# view the data in BQ
client = bigquery.Client(project=PROJECT_ID)

query = f"SELECT * FROM `{BQ_DESTINATION}` LIMIT 5"
client.query(query).to_dataframe()