In [2]:
# techs used - duckdb for a local dummy database
# columnar database
import duckdb
import timeit
import pandas as pd 

In [3]:
### DDL to instantiate database ###
con = duckdb.connect('dim-model.db')
# create initial database connection (and instantiate database if it doesn't exist already) 
con.sql("""
CREATE SCHEMA snow;
CREATE SCHEMA star;

""")



In [None]:
# create two separate schemas


In [3]:
# create a table using the initial data - auto create columns based on information


In [4]:
# display the raw_companies table


In [5]:
# verify initial data types


In [6]:
# print the initial columns dictionary


## A snowflake data model

Steps of Kimball dimensional modeling:

1. Select the business process
2. Declare the grain
3. Identify the dimensions
4. Identify the facts

![](snow.png)

There's a step before this:

0. Know thy data

In [7]:
# check lat/long/zip data columns


In [9]:
### INITIAL DATA CLEANSING ###

# change column zip to type varchar


# ensure leading 0's present in zip


# convert lat/long variables to degrees from radians




In [10]:
# display the results of cleansing


In [12]:
# set schema
# templated sql - DRY


In [13]:
# create compustat dimension
# MD5 hash as keys vs ordered int
# surrogate vs. natural keys
# primary keys (collisions)
sql = f"""


CREATE OR REPLACE TABLE {schema}.compustat (
    cusip_id VARCHAR,
    cusip_name VARCHAR,
    cusip_historical_id VARCHAR,
    cusip_historical_name VARCHAR,
    PRIMARY KEY (cusip_id)
);

"""

sql += f"""
INSERT INTO {schema}.compustat (cusip_id, cusip_name, cusip_historical_id, cusip_historical_name)
WITH cid AS (
    SELECT DISTINCT
        CUSIPHEADERID AS cusip_id,
        CUSIPHEADERNAME AS cusip_name,
        CUSIPHISTORYID AS cusip_historical_id,
        CUSIPHISTORYNAME AS cusip_historical_name
    FROM
        raw_companies
)
SELECT
    MD5(COALESCE(cusip_id || cusip_name || cusip_historical_id || cusip_historical_name, '-1')) AS cusip_id, --1 added for nulls
    cusip_name,
    cusip_historical_id,
    cusip_historical_name
FROM
    cid;
"""

con.sql(sql)



In [14]:
# display the head of the snow.compustat table


In [15]:
# create ceo dimension
# naming conventions
# joining on strings vs. others
sql = f"""

CREATE OR REPLACE TABLE {schema}.ceos (
    ceo_id VARCHAR,
    ceo_name VARCHAR,
    PRIMARY KEY (ceo_id)
);

"""

sql += f"""

INSERT INTO {schema}.ceos (ceo_id, ceo_name)

SELECT DISTINCT
    MD5(COALESCE(ceo, '-1')) AS ceo_id,
    ceo AS ceo_name
FROM
    raw_companies;

"""

con.sql(sql)

In [16]:
# display the snow.ceos table
con.table('snow.ceos')

In [17]:
# create locations dimension
# type 2 dimensions - change slow
# insert only
# primary keys
sql = f"""

CREATE OR REPLACE TABLE {schema}.locations (
    location_id VARCHAR,
    zipcode VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    PRIMARY KEY (location_id)
);

"""

sql += f"""
INSERT INTO {schema}.locations (location_id, zipcode, latitude, longitude)

SELECT DISTINCT
    MD5(COALESCE(zip || CAST(lat AS VARCHAR) || CAST(lon AS VARCHAR), '-1')),
    zip,
    lat,
    lon
FROM
    raw_companies;

"""

con.sql(sql)

In [18]:
# create companies dimension
# normalization benefits

sql = f"""

CREATE OR REPLACE TABLE {schema}.companies (
    company_id VARCHAR,
    company_name VARCHAR,
    cusip_id VARCHAR,
    ceo_id VARCHAR,
    founding_year INTEGER,
    ownership_type VARCHAR,
    exit_type VARCHAR,
    exit_comment VARCHAR,
    location_id VARCHAR,
    PRIMARY KEY (company_id)
    
);

"""

sql += f"""

INSERT INTO {schema}.companies (company_id, company_name, cusip_id, ceo_id, founding_year, ownership_type, exit_type, exit_comment, location_id)

SELECT DISTINCT
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,
    firmname,
    MD5(COALESCE(CUSIPHEADERID || CUSIPHEADERNAME || CUSIPHISTORYID || CUSIPHISTORYNAME, '-1')),
    MD5(COALESCE(ceo, '-1')),
    found,
    CASE own
        WHEN 0 THEN 'Private'
        WHEN 1 THEN 'Public'
        WHEN 2 THEN 'Subsidiary'
        ELSE NULL END,
    CASE estate
        WHEN 0 THEN 'Censored'
        WHEN 1 THEN 'Exited market'
        WHEN 2 THEN 'Acquired'
        WHEN 3 THEN 'Spun off'
        WHEN 4 THEN 'Changed Name'
        ELSE NULL END,
    fate,
     MD5(COALESCE(zip || CAST(lat AS VARCHAR) || CAST(lon AS VARCHAR), '-1'))
    
FROM
    raw_companies

    ;
"""

con.sql(sql)



In [19]:
# create product_offerings dimension
# what is a fact vs. a dimension
# CTE
sql = f"""

CREATE OR REPLACE TABLE {schema}.product_offerings (
    product_offering_id VARCHAR,
    product_types_offered VARCHAR,
    PRIMARY KEY (product_offering_id)
);

"""

sql += f"""

INSERT INTO {schema}.product_offerings (product_offering_id, product_types_offered)

WITH po AS (
     SELECT
        CASE
            WHEN system = 1 AND graphic = 0 THEN 'Desktop systems'
            WHEN system = 0 AND graphic = 1 THEN 'Graphics systems'
            WHEN system = 1 AND graphic = 1 THEN 'Desktop + Graphics systems'
            ELSE NULL END as offering

    from raw_companies
)
SELECT DISTINCT
    MD5(COALESCE(offering, '-1')),
    offering    
FROM
    po;
"""

con.sql(sql)

In [20]:
# create integration dimension
# type 4 dimension - change fast
sql = f"""

CREATE OR REPLACE TABLE {schema}.integrations (
   integration_id VARCHAR,
    cpu_source VARCHAR,
    os_source VARCHAR,
    application_source VARCHAR,
    communications_hardware_source VARCHAR,
    disk_source VARCHAR,
    ram_source VARCHAR,
    motherboard_source VARCHAR,
    PRIMARY KEY (integration_id)
);

"""

sql += f"""
INSERT INTO {schema}.integrations
(
    integration_id, 
    cpu_source, 
    os_source, 
    application_source, 
    communications_hardware_source, 
    disk_source, 
    ram_source, 
    motherboard_source
)

SELECT DISTINCT
    MD5(
        CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
    ), 
    CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
FROM
    raw_companies;


"""

con.sql(sql)

In [25]:
### create years table by dynamically assigning values ###

# integer values as key which helps with partitioning.
# date table's grain goes, other date information
# CTAS

sql = """
CREATE OR REPLACE TABLE years AS

WITH RECURSIVE years AS (
    SELECT 1970 as year
    UNION ALL
    SELECT year + 1
    FROM years
    WHERE year < 2000
)
SELECT 
    year,
    CASE 
        WHEN MOD(year - 1970, 12) = 0 THEN 'Dog'
        WHEN MOD(year - 1970, 12) = 1 THEN 'Pig'
        WHEN MOD(year - 1970, 12) = 2 THEN 'Rat'
        WHEN MOD(year - 1970, 12) = 3 THEN 'Ox'
        WHEN MOD(year - 1970, 12) = 4 THEN 'Tiger'
        WHEN MOD(year - 1970, 12) = 5 THEN 'Rabbit'
        WHEN MOD(year - 1970, 12) = 6 THEN 'Dragon'
        WHEN MOD(year - 1970, 12) = 7 THEN 'Snake'
        WHEN MOD(year - 1970, 12) = 8 THEN 'Horse'
        WHEN MOD(year - 1970, 12) = 9 THEN 'Sheep'
        WHEN MOD(year - 1970, 12) = 10 THEN 'Monkey'
        WHEN MOD(year - 1970, 12) = 11 THEN 'Rooster'
    END as zodiac_type
FROM years;
"""

con.sql(sql)

In [None]:
# display the snow.years table


### making the fact table

In [27]:
# create workstation_sales table
# additive facts vs. non-additive

sql = f"""

CREATE OR REPLACE TABLE {schema}.workstation_sales (
   year_id INTEGER,
    company_id VARCHAR,
    integration_id VARCHAR,
    product_offering_id VARCHAR,
    sales DOUBLE,
    research_budget DOUBLE,
    employee_count DOUBLE,
    product_offering_count INTEGER,
    product_category_count INTEGER,
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.workstation_sales 
   (year_id,
    company_id ,
    integration_id ,
    product_offering_id ,
    sales ,
    research_budget ,
    employee_count ,
    product_offering_count ,
    product_category_count
    )

SELECT
    year,
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,

    MD5(
        CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
    ),
    
    MD5(COALESCE(
        CASE
            WHEN system = 1 AND graphic = 0 THEN 'Desktop systems'
            WHEN system = 0 AND graphic = 1 THEN 'Graphics systems'
            WHEN system = 1 AND graphic = 1 THEN 'Desktop + Graphics systems'
            ELSE NULL END,
        '-1'
    )),
    
    sales,
    randd,
    employ,
    products,
    ptypes
    
    
FROM
    raw_companies
    ;

"""

con.sql(sql)

In [30]:
# con.sql('select count(*) from workstation_snowflake.workstation_sales a join workstation_snowflake.companies b on a.company_id = b.company_id')

# con.sql('select count(*) from workstation_snowflake.workstation_sales a join workstation_snowflake.integrations b on a.integration_id = b.integration_id')

# con.sql('select count(*) from workstation_snowflake.companies') # 388

# con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.locations l on c.location_id = l.location_id")

con.sql("select 'locations', count(*) from snow.companies c join snow.compustat s on c.cusip_id = s.cusip_id")

# con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.ceos s on c.ceo_id = s.ceo_id")

# con.sql('select company_name, c.location_id, l.location_id from workstation_snowflake.companies c left join workstation_snowflake.locations l on c.location_id = l.location_id')

# con.sql('select count(*) from raw_companies')

## A star data model

Steps of Kimball dimensional modeling:

1. Select the business process
2. Declare the grain
3. Identify the dimensions
4. Identify the facts

![](star.png)

In [31]:
# update schema


In [32]:
# make new companies attribute which contains
# pros/cons of denormalizing to star
sql = f"""

CREATE OR REPLACE TABLE {schema}.companies (
       company_id VARCHAR,
    company_name VARCHAR,
    cusip_id VARCHAR,
    cusip_name VARCHAR,
    cusip_historical_id VARCHAR,
    cusip_historical_name VARCHAR,
    ceo_name VARCHAR,
    founding_year INTEGER,
    ownership_type VARCHAR,
    exit_type VARCHAR,
    exit_comment VARCHAR,
    zipcode VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    PRIMARY KEY (company_id),
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.companies (
    company_id ,
    company_name ,
    cusip_id ,
    cusip_name ,
    cusip_historical_id ,
    cusip_historical_name ,
    ceo_name ,
    founding_year ,
    ownership_type ,
    exit_type ,
    exit_comment ,
    zipcode ,
    latitude ,
    longitude )

SELECT DISTINCT
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,
    firmname,
    CUSIPHEADERID,
    CUSIPHeaderName,
    CUSIPHISTORYID,
    CUSIPHistoryName,
    ceo,
    found,
    CASE own
        WHEN 0 THEN 'Private'
        WHEN 1 THEN 'Public'
        WHEN 2 THEN 'Subsidiary'
        ELSE NULL END,
    CASE estate
        WHEN 0 THEN 'Censored'
        WHEN 1 THEN 'Exited market'
        WHEN 2 THEN 'Acquired'
        WHEN 3 THEN 'Spun off'
        WHEN 4 THEN 'Changed Name'
        ELSE NULL END,
    fate,
    ZIP,
    lat,
    lon
    
FROM
    raw_companies;
"""

con.sql(sql)

In [None]:
# inner join the count of workstation_snowflake.workstation_sales to star.companies by company ID
con.sql("""
select count(*) 
    from workstation_snowflake.workstation_sales a 
    join star.companies b 
        on a.company_id = b.company_id
""")

### Some query testing

In [33]:
# Select the zipcode and sum of sales 
# from snow.workstation_sales
# inner join to snow.companies on company ID
# inner join to snow.locations on location ID
# group by 1st column
# order by 2nd column in descending order


In [34]:
# Select the zipcode and sum of sales 
# from snow.workstation_sales
# inner join to snow.companies on company ID
# group by 1st column
# order by 2nd column in descending order


In [35]:
%%timeit -n 10

con.sql("""
select zipcode, sum(sales) 
from snow.workstation_sales a 
join snow.companies c on a.company_id=c.company_id
join snow.locations l on c.location_id=l.location_id
group by 1 order by 2 desc""")

In [37]:
%%timeit -n 10

con.sql("""
select zipcode, sum(sales) 
from snow.workstation_sales a 
join star.companies c on a.company_id=c.company_id
group by 1 order by 1 desc""")