# Data Catalog Python Script
* Repository located here: https://github.com/emilyporter920/Data_Cataloging

## Import Libaries

In [None]:
# Import dependencies
from snowflake.snowpark.session import Session
import json
import pandas as pd
from platform import python_version
from datetime import datetime
import openpyxl
from config import account, user, authenticator, warehouse1, role1, warehouse2, role2 

# Shows Python version (SnowPark uses anything below 3.8.x)
print(python_version())

# GVR PROD Database

In [None]:
# Create Snowflake Session object (GVR_PROD)
connection_parameters = {
    "account": account,
    "user": user,
    "authenticator": authenticator,
    "warehouse": warehouse1,
    "role": role1
}

session = Session.builder.configs(connection_parameters).create()

## Primary Keys

In [None]:
# Selecting the UAT data from the Snowflake table
primary_keys = session.sql("SHOW PRIMARY KEYS IN DATABASE GVR_PROD").collect()

primary_keys = pd.DataFrame(list(primary_keys))

In [None]:
# Get rid of columns you don't need in PRIMARY_KEYS table
primary_keys = primary_keys.drop(['created_on', 'constraint_name', 'rely'], axis=1)
primary_keys.head()

In [None]:
# Rename columns in PRIMARY_KEYS table to match COLUMN_TABLE columns
primary_keys = primary_keys.rename(columns= {'database_name': 'DATABASE', 'schema_name': 'SCHEMA', 
                                             'table_name': 'TABLE_NAME', 'column_name': 'COLUMN_NAME', 'key_sequence': 'PK'})

primary_keys.head()

## Information Schema

In [None]:
# Information_Schema data
column_table = session.sql("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE, IS_NULLABLE FROM GVR_PROD.INFORMATION_SCHEMA.COLUMNS").to_pandas()

column_table.head()

In [None]:
# Rename columns in COLUMN_TABLE table
column_table = column_table.rename(columns= {'TABLE_CATALOG': 'DATABASE', 'TABLE_SCHEMA': 'SCHEMA', 'CHARACTER_MAXIMUM_LENGTH': 'LENGTH', 
                                             'NUMERIC_PRECISION': 'PRECISION', 'NUMERIC_SCALE': 'SCALE', 'IS_NULLABLE': 'NULLABLE'})

column_table.head()

In [None]:
# Merge PRIMARY_KEYS and COLUMN_TABLE tables
merged_tables = pd.merge(column_table, primary_keys, how='left', on=['DATABASE', 'SCHEMA', 'TABLE_NAME', 'COLUMN_NAME'])

merged_tables.head()

In [None]:
# Get rid of schemas: Information_Schema, Admin, and Stage
merged_tables=merged_tables[~merged_tables['SCHEMA'].isin(['INFORMATION_SCHEMA'])]

merged_tables.head()

# IS360 Database

In [None]:
# Create Snowflake Session object (IS360)
connection_parameters = {
    "account": account,
    "user": user,
    "authenticator": authenticator,
    "warehouse": warehouse2,
    "role": role2
}

session = Session.builder.configs(connection_parameters).create()

## Primary Keys

In [None]:
# Selecting the UAT data from the Snowflake table
primary_keys2 = session.sql("SHOW PRIMARY KEYS IN DATABASE GVR_IS360_DEV_DB").collect()

primary_keys2 = pd.DataFrame(list(primary_keys2))

In [None]:
# Get rid of columns you don't need in PRIMARY_KEYS table
primary_keys2 = primary_keys2.drop(['created_on', 'constraint_name', 'rely'], axis=1)

primary_keys2.head()

In [None]:
# Rename columns in PRIMARY_KEYS table to match COLUMN_TABLE columns
primary_keys2 = primary_keys2.rename(columns= {'database_name': 'DATABASE', 'schema_name': 'SCHEMA', 
                                             'table_name': 'TABLE_NAME', 'column_name': 'COLUMN_NAME', 'key_sequence': 'PK'})

primary_keys2.head()

## Information Schema

In [None]:
# Information_Schema data
column_table2 = session.sql("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE, IS_NULLABLE FROM GVR_IS360_DEV_DB.INFORMATION_SCHEMA.COLUMNS").to_pandas()

column_table2.head()

In [None]:
# Rename columns in COLUMN_TABLE table
column_table2 = column_table2.rename(columns= {'TABLE_CATALOG': 'DATABASE', 'TABLE_SCHEMA': 'SCHEMA', 'CHARACTER_MAXIMUM_LENGTH': 'LENGTH', 
                                             'NUMERIC_PRECISION': 'PRECISION', 'NUMERIC_SCALE': 'SCALE', 'IS_NULLABLE': 'NULLABLE'})

column_table2.head()

In [None]:
# Merge PRIMARY_KEYS and COLUMN_TABLE tables
merged_tables2 = pd.merge(column_table2, primary_keys2, how='left', on=['DATABASE', 'SCHEMA', 'TABLE_NAME', 'COLUMN_NAME'])

merged_tables2.head()

In [None]:
# Get rid of schemas: Information_Schema, Admin, and Stage
merged_tables3 = merged_tables2[~merged_tables2['SCHEMA'].isin(['INFORMATION_SCHEMA', 'STAGE', 'ADMIN'])]

merged_tables3.head()

# Merge GVR PROD with IS360 Database

In [None]:
# Appending IS360 to GVR_PROD dataframe
merged_tables3 = merged_tables.append(merged_tables2, ignore_index=True)

merged_tables3.head()

# Zones

In [None]:
# Generates values for the ZONE column
def zone(column):
    if column['SCHEMA'] in ['SMS', 'SALESFORCE', 'AX', 'MAC-PAC', 'PROTHEUS_AR', 'PROTHEUS_BR', 'PROTHEUS_CH', 'QAD', 'HFM', 'INSITE360_TELEMETRY', 'AVA_DEMO', 'IOT_CORE', 'AVA_LEGACY', 
                            'AVA_DEV', 'AVA_CORE_DEV', 'AVA_UAT', 'AVA_QA', 'AVA_CORE_UAT', 'AVA', 'ARCHIVE', 'AVA_CORE_QA', 'AVA_CORE_DEMO', 'PUSH_SALE_EVENT', 'CENSUS']:
        val = 'DATALAKE'
    elif column['SCHEMA'] in ['DATA_MART_FIN_NA', 'DATA_MART_AMO_NA', 'DATA_MART_FIN_GLOBAL', 'DATA_MART_CUSTOMER']:
        val = 'DATAMART'
    elif column['SCHEMA'] in ['DW']:
        val = 'DATAWAREHOUSE'
    elif column['SCHEMA'] in ['RPT']:
        val = 'CONSUMPTION'
    else:
        val = ' '
    return val

In [None]:
# Apply the zone function to the merged_tables3 dataframe
merged_tables3['ZONE'] = merged_tables3.apply(zone, axis=1)

merged_tables3.head()

# Historical Retention

In [None]:
# Add the historical retention data (hard coded for now, will be available in the future when this historical retention changes)
merged_tables3['HISTORICAL_RETENTION'] = '32 DAYS'

merged_tables3.head()

# Load Times

## GVR PROD

In [None]:
# Load_table created to show when the table was last loaded
load_table1 = session.sql("SELECT SCHEMA_NAME, TABLE_NAME, LAST_LOAD_TIME FROM GVR_PROD.INFORMATION_SCHEMA.LOAD_HISTORY").to_pandas()

load_table1.head()

In [None]:
# Change the last_load_time to only be yyyy/mm/dd
load_table1['LAST_LOAD_TIME'] = pd.to_datetime(load_table1['LAST_LOAD_TIME']).dt.date

load_table1.head()

In [None]:
# Rename columns in LOAD_TABLE table to match MERGED_TABLES table
load_table1 = load_table1.rename(columns= {'SCHEMA_NAME': 'SCHEMA'})

load_table1.head()

In [None]:
# Sort values to show the most recent load times
most_recent_times1 = load_table1.sort_values('LAST_LOAD_TIME', ascending=False)

most_recent_times1.head()

In [None]:
# Select only the most recent load time
most_recent_times1 = most_recent_times1.drop_duplicates(subset='TABLE_NAME', keep='first')

most_recent_times1 = most_recent_times1.reset_index(drop=True)

most_recent_times1.head()

## IS360

In [None]:
# Load_table created to show when the table was last loaded
load_table2 = session.sql("SELECT SCHEMA_NAME, TABLE_NAME, LAST_LOAD_TIME FROM GVR_IS360_DEV_DB.INFORMATION_SCHEMA.LOAD_HISTORY").to_pandas()

load_table2.head()

In [None]:
# Change the last_load_time to only be yyyy/mm/dd
load_table2['LAST_LOAD_TIME'] = pd.to_datetime(load_table2['LAST_LOAD_TIME']).dt.date

load_table2.head()

In [None]:
# Rename columns in LOAD_TABLE table to match MERGED_TABLES table
load_table2 = load_table2.rename(columns= {'SCHEMA_NAME': 'SCHEMA'})

load_table2.head()

In [None]:
# Sort values to show the most recent load times
most_recent_times2 = load_table2.sort_values('LAST_LOAD_TIME', ascending=False)

most_recent_times2.head()

In [None]:
# Select only the most recent load time 
most_recent_times2 = most_recent_times2.drop_duplicates(subset='TABLE_NAME', keep='first')

most_recent_times2 = most_recent_times2.reset_index(drop=True)

most_recent_times2.head()

## Appending GVR PROD & IS360 Load Times

In [None]:
# Append the two most recent times tables together
most_recent_times = most_recent_times1.append(most_recent_times2, ignore_index=True)

most_recent_times.head()

## Merge Load Times To Merged_Tables

In [None]:
# Merge PRIMARY_KEYS and COLUMN_TABLE tables
merged_tables3 = pd.merge(merged_tables3, most_recent_times, how='left', on=['SCHEMA', 'TABLE_NAME'])

merged_tables3.head()

## Table Type

In [None]:
# Hard code that all table types are Type1 (History Available)
merged_tables3['TABLE_TYPE'] = 'History Available'

merged_tables3.head()

## Dropping Schemas

In [None]:
# Get rid of schemas: Information_Schema, Admin, and Stage
merged_tables3 = merged_tables3[~merged_tables3['SCHEMA'].isin(['INFORMATION_SCHEMA', 'STAGE', 'ADMIN'])]

merged_tables3.head()

In [None]:
# Verify the columns are all capitalized
merged_tables3.columns = merged_tables3.columns.str.upper()

merged_tables3.head()

## Rearrange Columns

In [None]:
# Rearrange columns - to be added later

# Write to Excel

In [None]:
with pd.ExcelWriter('Catalog/Data_Catalog.xlsx', mode='a', engine='openpyxl', if_sheet_exists="replace",) as writer:
    merged_tables3.to_excel(writer, sheet_name='Catalog', index=False)