# BigQuery Table Data Audit

---



### Before you begin


1.   Use the [Cloud Resource Manager](https://console.cloud.google.com/cloud-resource-manager) to check if you have access to the Cloud Platform project that has access to the table you want to audit.
2.   Make sure you have query access to the table you want to audit [Cloud Console](https://console.cloud.google.com).
3.   [Enable BigQuery](https://console.cloud.google.com/flows/enableapi?apiid=bigquery) APIs for the project.
4. Provide the variables to apply:

In [0]:
# BigQuery table to audit:
project_id = '<your_project_id>'
dataset_id = '<your_dataset_id>'
table_id = '<your_table_id>'

# Use exact count (uses more computational resources)
exact_count = True

# Calculate field top-n (bigger n uses more computational resources)
n=3

table_ref = '{}.{}.{}'.format(project_id, dataset_id, table_id)
print('Table reference to audit: \'{}.{}.{}\''.format(project_id, dataset_id, table_id))

### Provide your credentials to the runtime

In [0]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')

### Retrieve BQ table schema

In [0]:
from google.cloud import bigquery
import pandas as pd

client = bigquery.Client(project=project_id)

def get_table_schema(project_id, dataset_id, table_id):
  print('Retrieving table schema for BQ table: \'{}.{}.{}\''.format(project_id, dataset_id, table_id))
  table_schema = client.query('''
    SELECT column_name, data_type
    FROM `{}`.{}.INFORMATION_SCHEMA.COLUMNS
    WHERE table_name="{}"'''.format(project_id, dataset_id, table_id)).to_dataframe()
  return table_schema

def parse_field(field_name, data_type, nesting=[]):
  if data_type.startswith('ARRAY<') or data_type.startswith('STRUCT<'):
    if not nesting:
      nesting = [field_name]
    else:
      nesting[-1] += '.' + field_name
    if data_type.startswith('ARRAY<STRUCT<'):
      nesting.append(field_name)
      return parse_struct(data_type[13:-2], nesting)
    elif data_type.startswith('STRUCT<'):    
      return parse_struct(data_type[7:-1], nesting)
    else: # data_type.startswith('ARRAY<'):
      return [{'field_name': field_name, 'data_type': data_type[6:-1], 'nesting': nesting}]
  else: # primitive_type
    if nesting:
      field_name = nesting.pop() + '.' + field_name
    return [{'field_name': field_name, 'data_type': data_type, 'nesting': nesting}]
      

def parse_struct(struct, nesting):
  if struct.strip():
    field_name = struct.strip().split(' ')[0]
    struct = struct.strip()[len(field_name)+1:]
    data_type = ''
    depth = 0
    for char in iter(struct):
      if char == '<': 
        depth += 1
      elif char == '>': 
        depth -= 1
      if depth > 0: 
        pass
      elif char == ',': 
        break
      data_type += char
    remaining_struct = struct[len(data_type)+2:]
    return parse_field(field_name, data_type, nesting.copy()) + parse_struct(remaining_struct, nesting)
  else:
    return []

def get_unnest_statement(nesting):
  result = []
  if nesting:
    for nest in nesting:
      result.append(', UNNEST({}) {}'.format(nest, nest.split('.')[-1]))
  return ''.join(result)

def count_rows(table_ref, nesting):
  query = 'SELECT count(*) FROM `{}`'.format(table_ref) + get_unnest_statement(nesting)
  query_job = client.query(query)
  return list(query_job.result())[0].values()[0]


table_schema = get_table_schema(project_id, dataset_id, table_id)

fields = []
for i, col in table_schema.iterrows(): 
  print('- checking column: \'{}\' [{}/{}]'.format(col['column_name'], i+1, len(table_schema)))
  fields += parse_field(col['column_name'], col['data_type'])

row_counts = {}
print('\nCounting rows per nesting:')
for field in fields:
  key = get_unnest_statement(field['nesting'])
  if key not in row_counts:
    nest = [] if not field['nesting'] else field['nesting'][-1]
    print('- counting rows for nesting: \'{}\' '.format(nest), end = '')
    count = count_rows(table_ref, field['nesting'])
    row_counts[key] = count
    print(count)
  else:
    count = row_counts[key]
  field['row_count'] = count
  field['table_ref'] = table_ref

df = pd.DataFrame(fields)
df

### Perform data audit on table schema fields

In [0]:
def get_cardinality(field):
  return get_approx_cardinality(field) if not exact_count and field['data_type'] in ('INT64', 'NUMERIC', 'STRING', 'BYTES') else get_exact_cardinality(field)

def get_approx_cardinality(field):
  print('.', end = '')
  query = 'SELECT HLL_COUNT.MERGE(hll_count) approx FROM ( SELECT HLL_COUNT.INIT({}) hll_count FROM `{}`{})'.format(field['field_name'], field['table_ref'], get_unnest_statement(field['nesting']))
  query_job = client.query(query)
  return list(query_job.result())[0].values()[0]

def get_exact_cardinality(field):
  print('.', end = '')
  query = 'SELECT COUNT(DISTINCT({})) exact FROM `{}`{}'''.format(field['field_name'], field['table_ref'], get_unnest_statement(field['nesting']))
  query_job = client.query(query)
  return list(query_job.result())[0].values()[0]

def count_missing_values(field):
  print('.', end = '')
  where_clause = '{} IS NULL'.format(field['field_name']) + (' OR TRIM({}) = ""'.format(field['field_name']) if field['data_type'] in ('STRING') else '')
  query = 'SELECT SUM(1) num_missing FROM `{}`{} WHERE {}'.format(field['table_ref'], get_unnest_statement(field['nesting']), where_clause)
  query_job = client.query(query)
  result = list(query_job.result())[0].values()[0]
  return result if result else 0

def get_statistics(field):
  print('.', end = '')
  field_name = field['field_name']
  avg_expr = 'AVG({})'.format(field_name) if field['data_type'] in ('INT64', 'NUMERIC', 'FLOAT64') else 'CAST(\'NaN\' AS FLOAT64)'
  std_expr = 'STDDEV({})'.format(field_name) if field['data_type'] in ('FLOAT64') else 'CAST(\'NaN\' AS FLOAT64)'
  query = 'SELECT MIN({}) min, MAX({}) max, {} avg, {} std FROM `{}`{}'''.format(field_name, field_name, avg_expr, std_expr, field['table_ref'], get_unnest_statement(field['nesting']))
  query_job = client.query(query)
  return list(query_job.result())[0].values()

def get_top_n(field, n):
  print('.', end = '')
  where_clause = '{} IS NOT NULL'.format(field['field_name']) + (' AND TRIM({}) != ""'.format(field['field_name']) if field['data_type'] in ('STRING') else '')
  query = '''
    SELECT ARRAY_AGG(CONCAT("'", CAST(val AS STRING), "' [", CAST(cnt AS STRING), "]") ORDER BY cnt DESC LIMIT {}) AS top_{}
    FROM (SELECT {} AS val, count(*) AS cnt FROM `{}`{}
    WHERE {} GROUP BY 1)'''.format(n, n, field['field_name'], field['table_ref'], get_unnest_statement(field['nesting']), where_clause)
  query_job = client.query(query)
  return ', '.join(list(query_job.result())[0].values()[0])


print('Performing data audit on BQ table: \'{}\''.format(table_ref))
for i, field in enumerate(fields):
  print('- auditing field: \'{}\' [{}/{}] '.format(field['field_name'], i+1, len(fields)), end = '')
  field['cardinality'] = get_cardinality(field)
  field['num_missing_values'] = count_missing_values(field)
  min_, max_, avg, stddev = get_statistics(field)
  field['min'] = min_
  field['max'] = max_
  field['avg'] = avg
  field['stddev'] = stddev
  field['top-{}'.format(n)] = get_top_n(field, n)
  print('')
 
print('\ndone\n')
df = pd.DataFrame(fields)
del df['nesting']
df

### Prepare final data audit DataFrame
Here you can add some more derived metrics, like precentages etc.

In [0]:
data_audit_df = pd.DataFrame(fields)
# remove unneeded dataframe columns
del data_audit_df['nesting']
# add derived metrics
data_audit_df['perc_missing'] = 100*data_audit_df['num_missing_values']/data_audit_df['row_count']

### Export to Google Spreadseet

In [0]:
from oauth2client.client import GoogleCredentials
import gspread
import gspread_dataframe as gd
import datetime

gc = gspread.authorize(GoogleCredentials.get_application_default())

now = datetime.datetime.now()
date = now.strftime("%Y%m%d")
spreadhseet_name = 'Data_Audit_{}_{}'.format(table_id, date)

# create new spreadsheet
spreadhseet = gc.create(spreadhseet_name)

# open sheet1 of new spreadsheet and add data audit data
worksheet = gc.open(spreadhseet_name).sheet1
gd.set_with_dataframe(worksheet, data_audit_df)

print('Created \'{}\' in your Google Drive home folder.\n\n{}'.format(spreadhseet_name, spreadhseet.fetch_sheet_metadata()['spreadsheetUrl']))