### Workflow

- A new version is released
- We get a new version of sql-export json file
- We check if we have all the source tables in the DWH
- we create the models
- we run the models

In [81]:
#imported libraries

import json 
import sqlparse
from sqlparse.sql import Identifier, IdentifierList, remove_quotes, Token, TokenList, Where
from sqlparse.tokens import Keyword, Name, Punctuation, String, Whitespace
from sqlparse.utils import imt
import pandas as pd
from flatten_dict import flatten
from pprint import pprint
import sql_metadata
import re
import os
import urllib.request as request

In [82]:
## Files imported

## counter queries (sql+redis): this is generated manually by the product intelligence team
## available here

json_file_path = '/Users/mathieupeychet/Downloads/usage_ping_sql_jan.json'


## foreign_key csv generated manually once by m_walker
sql_query_to_generate = """SELECT
    tc.table_schema, 
    tc.constraint_name, 
    tc.table_name, 
    kcu.column_name, 
    ccu.table_schema AS foreign_table_schema,
    ccu.table_name AS foreign_table_name,
    ccu.column_name AS foreign_column_name 
FROM 
    information_schema.table_constraints AS tc 
    JOIN information_schema.key_column_usage AS kcu
      ON tc.constraint_name = kcu.constraint_name
      AND tc.table_schema = kcu.table_schema
    JOIN information_schema.constraint_column_usage AS ccu
      ON ccu.constraint_name = tc.constraint_name
      AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY' --AND tc.table_name='mytable'
"""
foreign_key_df = pd.read_csv('/Users/mathieupeychet/Documents/foreign_keys.csv')

## To join an event to a ultimate_namespace_id, we have 3 potential standard tables to join
table_to_join = ['projects', 'namespaces', 'groups']

In [83]:
def sql_queries_dict(json_file):
    ''' 
    function that transforms the sql-export.json file into a Python dict with only SQL batch counters
    '''
    with open(json_file) as f:
        data = json.load(f)

    from flatten_dict.reducer import make_reducer
    full_payload_dict = flatten(data, reducer=make_reducer(delimiter='.'))

    sql_queries_dict  = {}

    for (key, value) in full_payload_dict.items():
       # Check if key is even then add pair to new dictionary
       if isinstance(value, str) and str.startswith(value, 'SELECT') is True:
           sql_queries_dict[key] = value
    
    return sql_queries_dict

sql_queries_dict = sql_queries_dict(json_file_path)

sql_queries_dict['active_user_count']

'SELECT COUNT("users"."id") FROM "users" WHERE ("users"."state" IN (\'active\')) AND ("users"."user_type" IS NULL OR "users"."user_type" IN (NULL, 6, 4))'

In [84]:
###
### Small section that extracts all the needed tables for usage ping generation 
###
needed_tables = []


for key, value in sql_queries_dict.items():
    sql_value = sqlparse.parse(value)[0]
    ## get the table which is queried in the FROM statement
    queried_tables_list = sql_metadata.get_query_tables(value)
    for table in queried_tables_list:
        if table not in needed_tables:
            needed_tables.append(table)

print(len(needed_tables))

99


In [85]:
def create_join_mapping_df(sql_queries_dict):
    '''
        The functoin returns a dataframe with the following columns
        - counter: name of the counter, which is the item key in the dictionary passed
        - sql_query: query run to calculate the counter, item value in the dictionary passed as argument
        - table_name: name of the table in the FROM statement of the sql_query
        - foreingn_table_name: 
        - foreign_column_name:
    '''
    final_join_mapping_df = pd.DataFrame()
    for key, value in sql_queries_dict.items():
        sql_value = sqlparse.parse(value)[0]
        ## get the table which is queried in the FROM statement
        queried_tables_list = sql_metadata.get_query_tables(value)
        value = value.replace('"', "")
        
        value= re.sub('{:start=>[0-9]{1,}, :finish=>[0-9]{1,}}', 'id', value)
        # if projects is just do the join on projects
        if 'projects' in queried_tables_list:
            potential_joins = foreign_key_df[(foreign_key_df['table_name'] == 'projects') & (foreign_key_df['foreign_table_name'] == 'namespaces')]
            potential_joins = potential_joins.drop_duplicates()
            table_to_append = potential_joins[potential_joins.table_name == 'projects']
            table_to_append["counter"] = key
            table_to_append["sql_query"] = value
            final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)   
        else:
            for index, queried_table in enumerate(queried_tables_list):

            ## 
                potential_joins = foreign_key_df[(foreign_key_df['table_name'] == queried_table) & (foreign_key_df['foreign_table_name'].isin(table_to_join))]
                potential_joins = potential_joins.drop_duplicates()
                
                if potential_joins[potential_joins.foreign_table_name == 'projects'].empty is False:
                    table_to_append = potential_joins[potential_joins.foreign_table_name == 'projects']
                    table_to_append["counter"] = key
                    table_to_append["sql_query"] = value
                    final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)
                elif potential_joins[potential_joins.foreign_table_name == 'groups'].empty is False:
                    table_to_append = potential_joins[potential_joins.foreign_table_name == 'groups']
                    table_to_append["counter"] = key
                    table_to_append["sql_query"] = value
                    final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)
                elif potential_joins[potential_joins.foreign_table_name == 'namespaces'].empty is False:
                    table_to_append = potential_joins[potential_joins.foreign_table_name == 'namespaces']
                    table_to_append["counter"] = key
                    table_to_append["sql_query"] = value
                    final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)
                if table_to_append.empty is False:
                    break
                
        
    return final_join_mapping_df

fj_df = create_join_mapping_df(sql_queries_dict)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


#### SQL Transformation

This part of the code transforms the SQL query extracted directly from usage pings into SQL queries runable in the DWH

The goal of these transformations is to get 2 different types of queries:

##### Instance-level query for version 1 of Usage Ping

Example:

```
SELECT
'count.issues'        AS counter_name,
COUNT(issues.id)      AS counter_value,
TO_DATE(CURRENT_DATE) AS run_day
FROM prep.gitlab_dotcom.gitlab_dotcom_issues_dedupe_source
```

In [86]:
def add_counter_name_as_column(counter_row):
    '''
    step needed to add the first 2 columns:
      - counter_name
      - run_day
    
    this needs a specific row of a specific dataframe, I think this could be changed to a SQL query for more convenience
    
    a query like that SELECT COUNT(issues.id) FROM issues will be changed to SELECT 'counts.issues', COUNT(issues.id), TO_DATE(CURRENT_DATE)
    
    needed for version 1 and 2
    '''
    
    sql_query_parsed = sqlparse.parse(counter_row['sql_query'])
    
    ### split the query in tokens
    token_list = sql_query_parsed[0].tokens
    from_index = 0
    
    for index, token in enumerate(token_list):
        #Token.Keyword.DML
        ### identify if it is a select statement
        if (token.is_keyword and str(token) == 'SELECT') is True:
            ### set the select_index
            select_index = index
            break
    for index, token in enumerate(token_list):
        if (token.is_keyword and str(token) == 'FROM') is True:
            from_index = index
            break
    token_list_with_counter_name = token_list[:]
    token_list_with_counter_name.insert(from_index - 1, " AS counter_value, TO_DATE(CURRENT_DATE) AS run_day  ")
    token_list_with_counter_name.insert(select_index + 1, " '" + counter_row['counter'] + "' AS counter_name, ")
    enhanced_query_list = [str(token) for token in token_list_with_counter_name]
    enhanced_query = ''.join(enhanced_query_list)
    
    return enhanced_query

In [87]:
def add_join_to_namespaces(counter_row):
    sql_query_parsed = sqlparse.parse(counter_row['sql_query'])
    token_list_with_counter_name = sql_query_parsed[0].tokens
    where_index = 0
    select_index = 0
    from_index = 0
    for index, token in enumerate(token_list_with_counter_name):        #Token.Keyword.DML
        if (token.is_keyword and str(token) == 'SELECT') is True:
            select_index = index
        if (token.is_keyword and str(token) == 'FROM') is True:
            from_index = index
        if isinstance(token, Where) is True:
            where_index = index
            break

    new_tok_list = token_list_with_counter_name[:]
    
    join_to_insert = ''
    if counter_row.table_name != 'users' and counter_row.table_name != 'namespaces':
        join_to_insert = ' LEFT JOIN ' + counter_row.foreign_table_name + ' ON ' + counter_row.foreign_table_name + '.' + counter_row.foreign_column_name + ' = ' + counter_row.table_name + '.' + counter_row.column_name + ' '
    if counter_row.foreign_table_name == 'projects'  and counter_row.table_name != 'namespaces':
        join_to_insert += ' LEFT JOIN namespaces ON projects.namespace_id = namespaces.id'
        
    join_to_insert  = join_to_insert + " LEFT JOIN {{ref('gitlab_dotcom_namespaces_xf')}} AS namespaces_xf ON namespaces.id = namespaces_xf.namespace_id "
    if where_index > 0:
        token_list_with_counter_name.insert(where_index + 1 , ' GROUP BY 1')
        token_list_with_counter_name.insert(where_index, join_to_insert)
    else:
        token_list_with_counter_name.append(join_to_insert)
        token_list_with_counter_name.append(' GROUP BY 1')

    token_list_with_counter_name.insert(from_index - 1, " AS counter_value ")
    if select_index >= 0:
        token_list_with_counter_name.insert(select_index + 1, ' namespaces_xf.namespace_id, TO_DATE(CURRENT_DATE) AS run_day, ')

    enhanced_query_list = [str(token) for token in token_list_with_counter_name]
    enhanced_query = ''.join(enhanced_query_list)
    
    return enhanced_query


In [88]:
def write_sql_model_files(counter_row, sql_query_column, counter_name_column, subdirectory, suffix=''):
    #subdirectory= 'models/workspaces/test_usage_pings/'
    try:
        os.makedirs(subdirectory)
    except Exception:
        pass
    model_name = counter_row[counter_name_column].replace('.',  '_') + suffix + '.sql'
    wr = open(os.path.join(subdirectory, model_name), 'w')
    wr.write(counter_row[sql_query_column])
    wr.close()

In [89]:
def rename_query_tables(query):
    '''
    function to rename the table based on a new regex
    '''
    
    ### comprehensive list of all the keywords that are followed by a table name
    keyword_to_look_at = [            
                'FROM',
                "JOIN",
                "INNER JOIN",
                "FULL JOIN",
                "FULL OUTER JOIN",
                "LEFT JOIN",
                "RIGHT JOIN",
                "LEFT OUTER JOIN",
                "RIGHT OUTER JOIN",
    ]

    
    ### start parsing the query and get the token_list
    parsed = sqlparse.parse(query)
    tokens = list(TokenList(parsed[0].tokens).flatten())
    ### setting up to -1 to start
    keyword_token_index = -1
    
    while keyword_token_index != 0:
        keyword_token_index = 0
        
        ### go through the tokens
        for index, token in enumerate(tokens):
            
            if str(token) in keyword_to_look_at:
                keyword_token_index = index
                i = 1
                while tokens[index + i].ttype is Whitespace:
                    i += 1
                next_token = tokens[index + i]
                if str(next_token).startswith('prep') is False:
                    tokens.insert(keyword_token_index + i, "prep.gitlab_dotcom.gitlab_dotcom_" + str(next_token) + "_dedupe_source AS " )
                    tokens = [str(token) for token in tokens]
                    token_query = ''.join(tokens)
                    parsed = sqlparse.parse(token_query)
                    tokens = list(TokenList(parsed[0].tokens).flatten())
                    break
                else:
                    keyword_token_index = 0
            if keyword_token_index > 0:
                break
    return token_query
        
        

In [90]:
def add_last_join_to_namespace_xf(sql_query):
    sql_query_parsed = sqlparse.parse(sql_query)
    token_list_with_counter_name = sql_query_parsed[0].tokens
    where_index = 0
    select_index = 0
    from_index = 0
    for index, token in enumerate(token_list_with_counter_name):        #Token.Keyword.DML
        if isinstance(token, Where) is True:
            where_index = index
            break

In [92]:
final_dict = {}

for index, row in fj_df.iterrows():
    fj_df.loc[index,'global_counter_query'] = add_counter_name_as_column(row)
    fj_df.loc[index,'counter_per_namespace_query'] = add_join_to_namespaces(row)
    tables = sql_metadata.get_query_tables(fj_df.loc[index,'global_counter_query'])
    are_all_tables_in_analytics = True
    for table in tables:
        file_name = 'gitlab_dotcom_{}_dedupe_source.sql'.format(table)
        result = []
        dirname = os.getcwd()
        model_path = os.path.join(dirname, 'models')
        for root, dirs, files in os.walk(model_path):
            if file_name in files:
                result.append(os.path.join(root, file_name))
                are_all_tables_in_analytics = True
                break
            else:
                are_all_tables_in_analytics = False
        if are_all_tables_in_analytics == False:
            break
    fj_df.loc[index,'global_counter_query'] = rename_query_tables(row['global_counter_query'])
    #fj_df.loc[index,'counter_per_namespace_query'] = rename_query_tables(row['counter_per_namespace_query'])

    final_dict[fj_df.loc[index,'counter']] = fj_df.loc[index,'global_counter_query']

In [None]:
with open('query_json.json', 'w') as f:
    json.dump(final_dict, f)

In [None]:
for index, row in fj_df.iterrows():
    if fj_df.loc[index,'are_all_tables_in_analytics'] == 'True':
        write_sql_model_files(row, 'global_counter_query', 'counter', 'models/workspaces/test_usage_pings/')
        write_sql_model_files(row, 'counter_per_namespace_query', 'counter', 'models/workspaces/workspace_usage_pings_namespaces/',suffix='_namespaces')

# FINISHED, BELOW IS SCRATCH PAD

In [99]:
tables_to_dedup = ['merge_requests', 'issues']

table_format = """
  SELECT *
  FROM {{{{ source('gitlab_dotcom', '{}') }}}}
  QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) = 1
"""
for dedup in tables_to_dedup:
    formatted = table_format.format(dedup)
    subdirectory= 'models/sources/gitlab_dotcom/dedupe'
    try:
        os.makedirs(subdirectory)
    except Exception:
        pass
    model_name = 'gitlab_dotcom_{}_dedupe_source.sql'.format(dedup)
    wr = open(os.path.join(subdirectory, model_name), 'w')
    wr.write(formatted)
    wr.close()

In [43]:
for index, token in enumerate(tok_list):
    #Token.Keyword.DML
    if (token.is_keyword and str(token) == 'SELECT') is True:
        select_index = index
    if isinstance(token, Where) is True:
        where_index = index

new_tok_list = tok_list[:]
new_tok_list.insert(where_index + 1 , ' GROUP BY 1')
new_tok_list.insert(where_index, ' LEFT JOIN projects ON services.id = projects.project_id ')
new_tok_list.insert(select_index + 1, ' namespace_id,')

tok_list_str = [str(token) for token in new_tok_list]
query_stringed = ''.join(tok_list_str)
query_parsed = sqlparse.parse(query_stringed)
tok_list_transformed = query_parsed[0].tokens

query_stringed
test_formatting = sqlparse.format(query_stringed)
test_formatting



NameError: name 'tok_list' is not defined

In [44]:
def _update_table_names(
    tables: List[str], tokens: List[sqlparse.sql.Token], index: int, last_keyword: str
) -> List[str]:
    """
    Return new table names matching database.table or database.schema.table notation
    :type tables list[str]
    :type tokens list[sqlparse.sql.Token]
    :type index int
    :type last_keyword str
    :rtype: list[str]
    """

    token = tokens[index]
    last_token = tokens[index - 1].value.upper() if index > 0 else None
    next_token = tokens[index + 1].value.upper() if index + 1 < len(tokens) else None

    if (
        last_keyword
        in [
            "FROM",
            "JOIN",
            "INNER JOIN",
            "FULL JOIN",
            "FULL OUTER JOIN",
            "LEFT JOIN",
            "RIGHT JOIN",
            "LEFT OUTER JOIN",
            "RIGHT OUTER JOIN",
            "INTO",
            "UPDATE",
            "TABLE",
        ]
        and last_token not in ["AS"]
        and token.value not in ["AS", "SELECT"]
    ):
        if last_token == "." and next_token != ".":
            # we have database.table notation example
            table_name = "{}.{}".format(tokens[index - 2], tokens[index])
            if len(tables) > 0:
                tables[-1] = table_name
            else:
                tables.append(table_name)

        schema_notation_match = (Name, ".", Name, ".", Name)
        schema_notation_tokens = (
            (
                tokens[index - 4].ttype,
                tokens[index - 3].value,
                tokens[index - 2].ttype,
                tokens[index - 1].value,
                tokens[index].ttype,
            )
            if len(tokens) > 4
            else None
        )
        if schema_notation_tokens == schema_notation_match:
            # we have database.schema.table notation example
            table_name = "{}.{}.{}".format(
                tokens[index - 4], tokens[index - 2], tokens[index]
            )
            if len(tables) > 0:
                tables[-1] = table_name
            else:
                tables.append(table_name)
        elif tokens[index - 1].value.upper() not in [",", last_keyword]:
            # it's not a list of tables, e.g. SELECT * FROM foo, bar
            # hence, it can be the case of alias without AS, e.g. SELECT * FROM foo bar
            pass
        else:
            table_name = str(token.value.strip("`"))
            tables.append(table_name)

    return tables


NameError: name 'List' is not defined

In [None]:
sql_test = 'SELECT test FROM "services" LEFT JOIN "users" ON users.id = services.id WHERE users.id = 5'

sql_test = sql_test.replace('"', "")
#print(sql_test)
parsed = sqlparse.parse(sql_test)
tokens = TokenList(parsed[0].tokens).flatten()
# print([(token.value, token.ttype) for token in tokens])

test = [token for token in tokens if token.ttype is not Whitespace]

table_syntax_keywords = [
    # SELECT queries
    "FROM",
    "WHERE",
    "JOIN",
    "INNER JOIN",
    "FULL JOIN",
    "FULL OUTER JOIN",
    "LEFT OUTER JOIN",
    "RIGHT OUTER JOIN",
    "LEFT JOIN",
    "RIGHT JOIN",
    "ON",
    # INSERT queries
    "INTO",
    "VALUES",
    # UPDATE queries
    "UPDATE",
    "SET",
    # Hive queries
    "TABLE",  # INSERT TABLE
]

tables = []
last_keyword = None
last_token=None

print(test)
for index, token in enumerate(test):
    print(token.ttype is Name)
    #print([token, token.ttype, last_token, last_keyword, token.is_keyword, index])
    if token.is_keyword and token.value.upper() == 'WHERE':
        # keep the name of the last keyword, the next one can be a table name
        where_index = index
        print(where_index)
    elif (
        token.is_keyword
        and str(token).upper() == "SELECT"
    ):
        # reset the last_keyword for "INSERT INTO SELECT" and "INSERT TABLE SELECT" queries
        last_keyword = None
        select_index = index
        print(2)
    elif token.ttype is Name or token.is_keyword:
        tables.append(str(token))
        print(3)
        
print(tables)
tables[0] = 'replace'
tables

In [None]:
for index, row in final_join.iterrows():
   print(row['table_name'])

In [None]:
for index, row in final_join_mapping_df.iterrows():
    if row.table_name != 'users':
        join_to_insert = 'LEFT JOIN ' + row.foreign_table_name + ' ON ' + row.foreign_table_name + '.' + row.foreign_column_name + ' = ' + row.table_name + '.' + row.column_name
        if row.foreign_table_name == 'projects':
            join_to_insert += ' LEFT JOIN namespaces ON projects.namespace_id = namespaces.id '
        print(join_to_insert)
    parsed_sql_query = sqlparse.parse(row.sql_query)[0]
    parsed_join_to_insert = sqlparse.parse(join_to_insert)
    parsed_sql_query.insert_before(-1, parsed_join_to_insert[0])
    print(parsed_sql_query)
    print(row.counter)
        


In [None]:
## verify it is a where clause

parsed = sqlparse.parse(value)
where = parsed[0][-1]
print(where)
where_clause = False
for i in where.tokens:
    if str(i).find('WHERE') >= 0:
        where_clause = True
    
for i in parsed[0].tokens:
    try:
        for j in i.tokens:

            if str(j) == '"services"':
                print(j)
    except:
        pass
    
idx, _ = parsed[0].token_next_by(m=(Identifier, 'WHERE'))
print(idx)

In [None]:
import json 
import sqlparse
from sqlparse.sql import Identifier, IdentifierList, remove_quotes, Token, TokenList, Where
from sqlparse.tokens import Keyword, Name, Punctuation, String, Whitespace
from sqlparse.utils import imt
import pandas as pd

sql_first = sql_queries_dict["counts.issues"]

parsed = sqlparse.parse(sql_first)[0]
test_list = []

print(parsed.get_name())

parsed._pprint_tree()
where_statment = parsed[-1]
select_statement = parsed

for x in select_statement:
    test_list.append(str(x))
    
test_list

left_join = ' LEFT JOIN projects ON test.project_id = projects.id'
test_list.append(left_join)
test_list

In [None]:

table_to_look = 'issues'
table_to_join = ['projects', 'namespaces', 'groups']
foreign_key_df.head(20)

potential_joins = foreign_key_df[(foreign_key_df['table_name'] == table_to_look) & (foreign_key_df['foreign_table_name'].isin(table_to_join))]

for index, row in final_join.iterrows():
    if row.table_name != 'users':
        print('LEFT JOIN ' + row.foreign_table_name + ' ON ' + row.foreign_table_name + '.' + row.foreign_column_name + ' = ' + row.table_name + '.' + row.column_name)
        if row.foreign_table_name == 'projects':
            print('LEFT JOIN namespaces ON projects.namespace_id = namespaces.id')
    print(row.table_name)
    print(row.counter)

    
group_by_statement = 'GROUP BY 1'
print(group_by_statement)


In [None]:
isinstance(where_statment, Where)

In [None]:
idx, _ = parsed.token_next_by(m=(Keyword, "BETWEEN"))
print(idx)
if idx is not None:
    _, token = parsed.token_next(idx=idx)
    if token:
        if isinstance(token, IdentifierList):
            # In case of "LIMIT <offset>, <limit>", find comma and extract
            # first succeeding non-whitespace token
            idx, _ = token.token_next_by(m=(sqlparse.tokens.Punctuation, ","))
            _, token = token.token_next(idx=idx)
        if token and token.ttype == sqlparse.tokens.Literal.Number.Integer:
            print(int(token.value))

where = next(token for token in parsed.tokens if isinstance(token, Where))


In [None]:
value = 'SELECT test FROM "services" LEFT JOIN "users" ON users.id = services.id WHERE users.id = 5'

elements = sqlparse.parse(value)
tok_list = elements[0].tokens

isinstance(tok_list[-1], Where)

tok_list

## add group by at the end
## add joins (what if there is a namespace, group or project table already)
## add namespace_id


In [None]:
for index, token in enumerate(tok_list):
    #Token.Keyword.DML
    if (token.is_keyword and str(token) == 'SELECT') is True:
        select_index = index
    if isinstance(token, Where) is True:
        where_index = index

new_tok_list = tok_list[:]
new_tok_list.insert(where_index + 1 , ' GROUP BY 1')
new_tok_list.insert(where_index, ' LEFT JOIN projects ON services.id = projects.project_id ')
new_tok_list.insert(select_index + 1, ' namespace_id,')

tok_list_str = [str(token) for token in new_tok_list]
query_stringed = ''.join(tok_list_str)
query_parsed = sqlparse.parse(query_stringed)
tok_list_transformed = query_parsed[0].tokens

query_stringed
test_formatting = sqlparse.format(query_stringed)
test_formatting

wr = open('test.sql', 'w')
wr.write(test_formatting)
wr.close()