In [None]:
import pandas as pd
import numpy as np
import google.auth
from google.auth import compute_engine
from google.cloud import bigquery
from google.colab import auth

# Create the BQ client. This will ask you to log in the first time.

project_id = 'andromeda-data-nonprod'
auth.authenticate_user()
client = bigquery.Client(project=project_id)

In [None]:
# list datasets and number of views/tables
print("Collecting data on all views in " + project_id + ". This may take some time...")
views = []
dataset_count = 0
for dataset in client.list_datasets():
  print("\nCollecting views for: " + dataset.dataset_id)  
  dataset_views = list(client.list_tables(dataset=dataset.reference))
  dataset_count += 1
  for view in dataset_views:
    try:
      dataset_ref = bigquery.DatasetReference(project_id, dataset.dataset_id)
      view_ref = dataset_ref.table(view.table_id)
      v = client.get_table(view_ref)
      views.append(v)

    except Exception, err:
      print(err)
      pass

  print(dataset.dataset_id + " contains {:d} views".format(len(dataset_views)))

print("\n" + project_id + " contains {:d} dataset".format(dataset_count))
print("\nDone!")

In [None]:
# Example data from table/view
print("Example table/view and query:")
print("\nView: {}".format(views[0].full_table_id))
print("\nView Query:\n{}".format(views[0].view_query))
print("\nType: {}".format(views[0].table_type))

Example table/view and query:

View: data-engineering-prod:landing_andromeda.energy_contracts_consumption_charge_generated_v2

View Query:
#standardSQL
SELECT
  `kafkaData`,
  `unionRecord`,
  `_PARTITIONTIME` `PARTITIONTIME`
FROM
  `data-engineering-prod.auto_capture_v2.energy_contracts_consumption_charge_generated_v2`

Type: VIEW


In [None]:
import datetime
import pytz
import google.cloud.bigquery.job as job
from collections import Counter
from  __builtin__ import any as b_any

### JOBS ANALYSIS ###
# Performs analysis on which objects are being used,
# how often and by whom.

project_id = 'andromeda-data-nonprod' # this will need to be updated to 'data-engineering-prod'
print("Starting jobs processing...")

# for each object
  # how many jobs are for that object: count
  # which users are querying that object: users[]
  # when was the last query on that object: datetime
# => [table_id, count, users[], datetime]
jobs_data = []
error_data = []
cost_data = []

for view in views:

  # we now want to use the logs dump to return jobs only for the specified view
  query = """
      select protopayload_auditlog.authenticationInfo.principalEmail as email,
	    protopayload_auditlog.servicedata_v1_bigquery.jobInsertResponse.resource.jobStatistics.createTime as time,
      protopayload_auditlog.servicedata_v1_bigquery.jobInsertRequest.resource.jobName.location as location,
      protopayload_auditlog.servicedata_v1_bigquery.jobInsertRequest.resource.jobConfiguration.query.query as query,
      protopayload_auditlog.servicedata_v1_bigquery.jobInsertRequest.resource.jobStatus.error.message
      from andromeda-data-nonprod.test_log_export.cloudaudit_googleapis_com_data_access 
      where REGEXP_CONTAINS(protopayload_auditlog.servicedata_v1_bigquery.jobInsertRequest.resource.jobConfiguration.query.query, "{0}.{1}") 
      and protopayload_auditlog.servicedata_v1_bigquery.jobInsertRequest.resource.jobConfiguration.dryRun is null
      and protopayload_auditlog.servicedata_v1_bigquery.jobInsertResponse.resource.jobStatus.state = "DONE"
    """.format(view.dataset_id, view.table_id)
  query_job = client.query(query)
  # if (len(list(query_job)) > 0): print(list(query_job))

  # filter for query jobs
  users = map(lambda x: x[0], list(query_job))
  most_common_user = list(Counter(users).most_common())[0][0] if len(users) > 0 else "No user data"
  last_query_mapped = map(lambda x: x[1], list(query_job))
  last_query_filter = filter(lambda x: x != None, last_query_mapped)
  last_query = next(iter(sorted(last_query_filter, reverse=True)), None)
  #errors = filter(lambda x: x != None, map(lambda x: x.errors, query_jobs_for_view))
  
  d = [view.full_table_id, 
      view.dataset_id.split('_')[0], 
      len(list(query_job)),
      most_common_user,
       #"serviceaccount.com" in most_common_user,
      b_any("ovoenergy.com" in user for user in Counter(users).keys()),
# #      resource_exceeded,
      last_query if last_query != None else None]  

  jobs_data.append(d)

  # if ["EU","US"] => multi-region (this will be every query basically so not high importance)
  # if ORDER BY => requires single node
  # any errors?
  # log the query
  for job in list(query_job): 
    if job[4] != None | (job[3] != None & 'ORDER BY' in job[3]):
      error = [view.full_table_id, 
               job[2],
               job[4],
               job[3],
               'ORDER BY' in job[3]]
      error_data.append(error)
      print(error)

  

print("Jobs processing finished.")

In [None]:
# write data to newline delimited json file
json_file = "data_engineering_prod_bq_jobs_analysis.json"
columns = ["full_table_id","layer","queries","most_common_user","service_account","last_query"] #"most_common_user",,"retail",] 
df = pd.DataFrame(jobs_data, columns=columns)
# bq requires newline delimited json so append line break
file = open(json_file, "w")
for row in df.iterrows():
  row[1].to_json(file)
  file.write("\n")
file.close()
df

In [None]:
from google.cloud import storage

# write the results to a gc bucket 
client = storage.Client(project=project_id)
bucket = client.get_bucket("data-engineering-prod-bq-analysis")
blob = bucket.blob(json_file)

with open('data_engineering_prod_bq_jobs_analysis.json', 'rb') as file:
  blob.upload_from_file(file)

In [None]:
# write data to data-integration-prod
# to update the results we will wipe the existing table and replace it
project_id = 'data-integration-prod'
dataset_id = 'data_engineering_prod_bq_analysis'
table_id = 'data_engineering_prod_bq_jobs_analysis'
full_table_id = project_id + '.' + dataset_id + '.' + table_id
client = bigquery.Client(project=project_id)

# remove existing table
client.delete_table(full_table_id, not_found_ok=True)

# write new table
job_config = bigquery.LoadJobConfig(autodetect=True, 
            source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON)
uri = "gs://data-engineering-prod-bq-analysis/data_engineering_prod_bq_jobs_analysis.json"
load_job = client.load_table_from_uri(
    uri, full_table_id, job_config=job_config)  # Make an API request.
load_job.result() 

# check the number of rows loaded into the table is correct
destination_table = client.get_table(full_table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
assert(len(jobs_data) == destination_table.num_rows)

In [None]:
# write data to newline delimited json file for errors
json_file = "data_engineering_prod_bq_jobs_cost_analysis.json"
columns = ["table_id","location","error_message","query","order_by"]
df = pd.DataFrame(error_data, columns=columns)
# bq requires newline delimited json so append line break
file = open(json_file, "w")
for row in df.iterrows():
  row[1].to_json(file)
  file.write("\n")
file.close()
df

In [None]:
from google.cloud import storage

# write the results to a gc bucket 
client = storage.Client(project=project_id)
bucket = client.get_bucket("data-engineering-prod-bq-analysis")
blob = bucket.blob(json_file)

with open('data_engineering_prod_bq_jobs_cost_analysis.json', 'rb') as file:
  blob.upload_from_file(file)

In [None]:
# write data to data-integration-prod
# to update the results we will wipe the existing table and replace it
project_id = 'data-integration-prod'
dataset_id = 'data_engineering_prod_bq_analysis'
table_id = 'data_engineering_prod_bq_jobs_cost_analysis'
full_table_id = project_id + '.' + dataset_id + '.' + table_id
client = bigquery.Client(project=project_id)

# remove existing table
client.delete_table(full_table_id, not_found_ok=True)

# write new table
job_config = bigquery.LoadJobConfig(autodetect=True, 
            source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON)
uri = "gs://data-engineering-prod-bq-analysis/data_engineering_prod_bq_jobs_cost_analysis.json"
load_job = client.load_table_from_uri(
    uri, full_table_id, job_config=job_config)  # Make an API request.
load_job.result() 

# check the number of rows loaded into the table is correct
destination_table = client.get_table(full_table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
assert(len(jobs_cost_data) == destination_table.num_rows)