In [19]:
import pandas as pd
import snowflake.connector as sf
import json
import os

To create a database and schema to the default pattern, we'll drive it from a json config, with an addtional credentails json.

In [2]:
def json_reader(path_to_json):
    with open(path_to_json, "r") as read_file:
        dict_obj = json.load(read_file)
    
    return dict_obj

In [3]:
def snowflake_connector(path_to_json_credentials):
    "Connects to the Snowflake database, given a json credentials file"
    
    credentials = json_reader(path_to_json_credentials)
    
    connection = sf.connect(user=credentials["user"],
                            password=credentials["password"],
                            account=credentials["account_name"],
                            warehouse=credentials["warehouse"],
                            database=credentials["database"],
                            schema=credentials["schema"])

    return connection

In [24]:
def sf_query(query_string, con):
    con.cursor().execute(query_string)

We want to 
1. Create a database
2. Use the database
3. Create schemas within

We'll pull this from a config JSON which assumes that there is one databse and multiple schemas within the database

In [5]:
config = json_reader("./db_creation_config.json")

We'll create a sql function for each schema as well as for the database. We'll also need to specify that we're using the new database on creation.

In [27]:
def whole_query(config, con):
    """
    INPUTS: Configuration file of database and schema name; Connection object from sf_connector
    OUTPUTS: none
    COMMENTS: The method assumes one database, and multiple schemas within the database. Schema quantity is dynamic.
    Database and schemas created and performance printed out.
    """
    
    database_name = config["database"]
    schema_list = config["schemas"]

    print(creation_query(database_name, entity_type = "database"))
    sf_query(creation_query(database_name, entity_type = "database"), con)
    print("success{}".format(os.linesep))
    
    print(use_query(database_name))
    sf_query(use_query(database_name), con)
    print("success{}".format(os.linesep))
    
    for schema in schema_list:
        print(creation_query(schema, entity_type = "schema"))  
        sf_query(creation_query(schema, entity_type = "schema"), con)
        print("success{}".format(os.linesep))

In [28]:
def creation_query(entity_name, entity_type = "schema"):
    """
    INPUTS: the name of the database or schema, and the type of the entity (assumes schema). accepts "schema" or "database" ONLY.
    OUTPUTS: a query to create the entity if not exists
    COMMENTS: if an entity type is mistyped, will throw an exception.
    """
    
    if entity_type != "schema" and entity_type != "database":
        raise ValueError
    query_string = "CREATE {} IF NOT EXISTS {};{}".format(entity_type.upper(), entity_name.upper(), os.linesep)
    
    return query_string

def use_query(entity_name):
    """creates a use database query, given a database (entity) name."""
    query_string = "USE DATABASE {};{}".format(entity_name.upper(), os.linesep)
    
    return query_string

In [23]:
con = snowflake_connector("./dv_demo_credentials.json")

In [30]:
whole_query(config, con)

CREATE DATABASE IF NOT EXISTS DV_TEST_BUILD_DB;

success

USE DATABASE DV_TEST_BUILD_DB;

success

CREATE SCHEMA IF NOT EXISTS SRC;

success

CREATE SCHEMA IF NOT EXISTS STG;

success

CREATE SCHEMA IF NOT EXISTS VLT;

success

CREATE SCHEMA IF NOT EXISTS MART;

success

