# Imports

In [4]:
from google.colab import auth
from google.cloud import bigquery
from google.colab import data_table
from datetime import datetime, timedelta
from google.oauth2 import service_account
from google.colab import userdata

import pandas_gbq
import pandas as pd
import json

# Authenticate & Prepare

In [6]:
#Authenticate
sa_secret_key = userdata.get('SA_SECRET_KEY')
credentials = service_account.Credentials.from_service_account_info(json.loads(sa_secret_key))
pandas_gbq.context.credentials = credentials

In [7]:
project_id = 'bigtimestudios' # Project ID inserted based on the query results selected to explore
location = 'US' # Location inserted based on the query results selected to explore
client = bigquery.Client(project=project_id, location=location, credentials=credentials)
data_table.enable_dataframe_formatter()

In [8]:
#BIG_QUERY_DATASETS AND TABLES
DATASET='design_events_views'
VIEWS_CONFIG_TABLE='views_config'
VIEWS_CONFIG_REF="{0}.{1}".format(DATASET,VIEWS_CONFIG_TABLE)

ACK_KEYS_TABLE='design_events_ack_keys'
ACK_KEYS_TABLE_REF="{0}.{1}".format(DATASET,ACK_KEYS_TABLE)

DESIGN_EVENTS_TABLE_REF = 'game_analytics.design_events'


# Daily ACK event keys
Build daily keys based on the design_events table

In [9]:
def generate_keys_query(date):
  if date is None:
    date = datetime.strftime(datetime.now(), '%Y-%m-%d')
  return ('''
  CREATE TEMP FUNCTION json_keys(custom_fields STRING)
  RETURNS ARRAY<STRING>
  LANGUAGE js AS """
  return Object.keys(JSON.parse(custom_fields))
  """;

  with all_keys_per_event_id as (
    SELECT event_id, json_keys(custom_fields) fk FROM {0}
    WHERE TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = TIMESTAMP("{1}")
    and custom_fields is not null
  ),

  unnested_and_grouped_event_id_keys as (
    SELECT event_id, key FROM all_keys_per_event_id, unnest(fk) key
    group by event_id, key
  )

  SELECT event_id, ARRAY_AGG(key) keys FROM unnested_and_grouped_event_id_keys group by event_id
  ''').format(DESIGN_EVENTS_TABLE_REF,date)


In [16]:
#UPDATE THE ACK KEYS
processing_date = datetime.strftime(datetime.now(), '%Y-%m-%d')

def compare_date_str(date_str_1, date_str_2, format= '%Y-%m-%d'):
  date_1 = datetime.strptime(date_str_1, format)
  date_2 = datetime.strptime(date_str_2, format)
  if date_1 == date_2:
    return 0
  if date_1 > date_2:
    return 1
  return -1

#current_config shape:
# "key": {last_found_date:""}
def merge_with_current_config(current_config,new_keys, date):
  new_config = current_config
  if new_config is None:
    new_config = {}
  for key in new_keys:
      last_found_date = date
      if key in new_config:
       if compare_date_str(new_config[key]['last_found_date'],date) > 1:
        last_found_date = new_config[key].last_found_date
      new_config[key] = {'last_found_date': date }
  return new_config

#Get data from ack keys table
dataset_ref = bigquery.DatasetReference(project_id, DATASET)
table_ref = dataset_ref.table(ACK_KEYS_TABLE)
table = client.get_table(table_ref)

ack_keys_df = client.list_rows(table).to_dataframe()
ack_keys_dic = {}
#build keys dictionary
for index, row in ack_keys_df.iterrows():
  event_id = row['event_id']
  ##TODO:run keys expiration here
  ack_keys_dic[event_id] = json.loads(row['keys'])

keys_query = generate_keys_query(processing_date)
today_ack_keys_df = client.query(keys_query).to_dataframe().reset_index()

for index, row in today_ack_keys_df.iterrows():
  event_id = row['event_id']
  current_config = {}
  if event_id in ack_keys_dic:
    current_config = ack_keys_dic[event_id]
  ack_keys_dic[event_id] = merge_with_current_config(current_config,row['keys'],processing_date)

dict_for_data_frame = {
    'event_id':[],
    'keys':[]
}

for k, v in ack_keys_dic.items():
  dict_for_data_frame['event_id'].append(k)
  dict_for_data_frame['keys'].append(json.dumps(v))

df = pd.DataFrame.from_dict(dict_for_data_frame)

df.to_gbq(ACK_KEYS_TABLE_REF, project_id=project_id,if_exists='replace')

100%|██████████| 1/1 [00:00<00:00, 988.52it/s]


# Views creation

In [27]:
IDS_CONCAT_TOKEN = ':'

#concat ids helpers
def concat_ids(base_id,candidates):
  for candidate in candidates:
    if candidate == '*':
      return base_id
    else:
      base_id = base_id + IDS_CONCAT_TOKEN + candidate
  return base_id

In [28]:
#CREATE AND GET VIEWS HELPERS

def get_view_name(specified,id):
  if specified is None:
    return 'design_events_' + id.replace(':','_')
  return specified

def build_view_query(keys_to_extract,event_id_like):
  properties = ['* EXCEPT(custom_fields)']
  for k in keys_to_extract:
    properties.append("JSON_EXTRACT(custom_fields,'$.{0}') as {1}".format(k,k))
  return 'SELECT {0} FROM {1} WHERE event_id like "{2}%";'.format(", ".join(properties),DESIGN_EVENTS_TABLE_REF,event_id_like)

def create_view(client, dataset_name, view_name, view_query, exists_replace=True):
    try:
        dataset_ref = client.dataset(dataset_name)
        view_ref = dataset_ref.table(view_name)
        if exists_replace:
          client.delete_table(table=view_ref,not_found_ok=True)
        table = bigquery.Table(view_ref)
        table.view_query = view_query
        table.view_use_legacy_sql = False
        client.create_table(table,exists_ok=True)
    except Exception as e:
        errorStr = 'ERROR (create_view): ' + str(e)
        print(errorStr)
        raise

In [None]:
# RUN VIEWS CREATION
sql_query = ('''SELECT * FROM {0};''')

df_views_config = client.query(sql_query.format(VIEWS_CONFIG_REF)).to_dataframe().reset_index()  # make sure indexes pair with number of rows

event_ids = {}
for index, row in df_views_config.iterrows():
  event_id_01 = row['event_id_01']
  if event_id_01 == '*':
    print('Wrong config: * is not allowed in the event_id_01 field')
    print('Discarding view creation...')
    continue
  id = concat_ids(event_id_01,[row['event_id_02'],row['event_id_03'],row['event_id_04'],row['event_id_05']])
  event_ids[id] = get_view_name(row['destination_view_name'],id)

for event_id, view_name in event_ids.items():
    get_ack_keys_query = ('''SELECT * FROM {0} WHERE event_id like "{1}%"''')
    ack_df = client.query(get_ack_keys_query.format(ACK_KEYS_TABLE_REF,event_id)).to_dataframe().reset_index()
    keys_set = set()
    for index,row in ack_df.iterrows():
      ack_keys = json.loads("{0}".format(row['keys']));
      print(event_id)
      print(ack_keys)
      for k,v in ack_keys.items():
        keys_set.add(k)
    create_view(client, DATASET, view_name, build_view_query(keys_set,event_id), exists_replace=True)