In [20]:
# techs used - duckdb for a local dummy database
# columnar database

import timeit

import duckdb
import pandas as pd
from pathlib import Path

In [24]:
def load_sql_from_file(sql_filepath):    
    if Path(sql_filepath).exists():
        with open(sql_filepath, "r") as file:
            sql_text = file.read()
            print(sql_text)
            return sql_text
    else:
        return None

In [28]:
### DDL to instantiate database ###

# create initial database connection (and instantiate database if it doesn't exist already) 

DB_FILE = '../data/dim-model.ddb'

Path(DB_FILE).unlink(missing_ok=True)

con = duckdb.connect(DB_FILE)


In [29]:
# create two separate schemas

con.sql("""
    CREATE SCHEMA snow;
    CREATE SCHEMA star;
""")

In [30]:
# create a table using the initial data - auto create columns based on information

con.sql(load_sql_from_file("../SQL/create_companies.sql"))

CREATE TABLE raw_companies AS
SELECT *
FROM
    read_csv_auto(
        '../data/Sorenson_Workstation_FIVE_Data Folder/ascii/SorensonworkfirmFIVEdata.csv'
    )



In [31]:
# display the raw_companies table

# con.table('raw_companies')

con.sql("SELECT * FROM raw_companies").df()


Unnamed: 0,YEAR,FIRMNAME,FIVEFIRMID,FIRMID,CUSIPHEADERID,CUSIPHeaderName,CUSIPHISTORYID,CUSIPHistoryName,FOUND,FATE,...,CEO,OWN,CPU,OS,APPS,COMM,MONITOR,DISK,MEMORY,BOARD
0,1987,Accent Systems Corp.,1,1,,,,,1985.0,Acquired by S.A.Y. Industries,...,Kelly T. Hickel,0.0,0,1,1,0,0,0,0,0
1,1982,"Adage, Inc.",2,2,,,,,1957.0,,...,Richard N. Spann,0.0,0,0,0,0,0,0,0,0
2,1983,"Adage, Inc.",2,2,,,,,1957.0,,...,Richard N. Spann,1.0,0,0,0,0,0,0,0,0
3,1984,"Adage, Inc.",2,2,,,,,1957.0,,...,Richard N. Spann,1.0,0,0,1,0,0,0,0,0
4,1985,"Adage, Inc.",2,2,,,,,1957.0,,...,Richard N. Spann,1.0,0,1,1,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
685,1983,"Florida Computer Graphics, Inc.",174,174,,,,,1981.0,,...,Michael R. Coffman,1.0,1,1,0,0,0,0,0,0
686,1984,"Florida Computer Graphics, Inc.",174,174,,,,,1981.0,Firm sold (unknown),...,Michael R. Coffman,1.0,1,1,0,0,0,0,0,0
687,1983,Orcatech,175,175,,,,,1981.0,,...,David Pearson,1.0,0,1,0,1,0,0,0,0
688,1984,Orcatech,175,175,,,,,1981.0,,...,David Pearson,1.0,0,1,0,1,0,0,0,0


In [32]:
# verify initial data types

sql = """
    DESCRIBE raw_companies;
"""

con.sql(sql)

┌──────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│   column_name    │ column_type │  null   │   key   │ default │  extra  │
│     varchar      │   varchar   │ varchar │ varchar │ varchar │ varchar │
├──────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ YEAR             │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ FIRMNAME         │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ FIVEFIRMID       │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ FIRMID           │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ CUSIPHEADERID    │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ CUSIPHeaderName  │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ CUSIPHISTORYID   │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ CUSIPHistoryName │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ FOUND            │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ FATE             │ VARC

In [33]:
initial_cols_dict = {col[0]:col[1] for col in con.sql(sql).fetchall()}

In [34]:
# print the initial columns dictionary

initial_cols_dict

{'YEAR': 'BIGINT',
 'FIRMNAME': 'VARCHAR',
 'FIVEFIRMID': 'BIGINT',
 'FIRMID': 'BIGINT',
 'CUSIPHEADERID': 'VARCHAR',
 'CUSIPHeaderName': 'VARCHAR',
 'CUSIPHISTORYID': 'VARCHAR',
 'CUSIPHistoryName': 'VARCHAR',
 'FOUND': 'BIGINT',
 'FATE': 'VARCHAR',
 'ESTATE': 'BIGINT',
 'SALES': 'DOUBLE',
 'EMPLOY': 'DOUBLE',
 'ZIP': 'BIGINT',
 'LAT': 'DOUBLE',
 'LON': 'DOUBLE',
 'RANDD': 'BIGINT',
 'PRODUCTS': 'BIGINT',
 'PTYPES': 'BIGINT',
 'SYSTEM': 'BIGINT',
 'GRAPHIC': 'BIGINT',
 'CEO': 'VARCHAR',
 'OWN': 'BIGINT',
 'CPU': 'BIGINT',
 'OS': 'BIGINT',
 'APPS': 'BIGINT',
 'COMM': 'BIGINT',
 'MONITOR': 'BIGINT',
 'DISK': 'BIGINT',
 'MEMORY': 'BIGINT',
 'BOARD': 'BIGINT'}

## A snowflake data model

Steps of Kimball dimensional modeling:

1. Select the business process
2. Declare the grain
3. Identify the dimensions
4. Identify the facts

![](datacamp-workspace/snow.png)

There's a step before this:

0. Know thy data

In [35]:
# check lat/long/zip data columns

con.sql("SELECT DISTINCT lat, lon, zip FROM raw_companies")

┌────────────────────┬─────────────────────┬───────┐
│        LAT         │         LON         │  ZIP  │
│       double       │       double        │ int64 │
├────────────────────┼─────────────────────┼───────┤
│ 0.7440609931945801 │  -1.244994044303894 │  1851 │
│  0.574072003364563 │ -2.0455870628356934 │ 92121 │
│ 0.5764449834823608 │ -2.0435290336608887 │ 92127 │
│ 0.6526600122451782 │  -2.128498077392578 │ 95054 │
│  0.587319016456604 │ -2.0563690662384033 │ 92715 │
│ 0.6525009870529175 │ -2.1308369636535645 │ 94039 │
│ 0.7179589867591858 │ -1.2863240242004395 │ 10504 │
│ 0.5933229923248291 │ -2.0664548873901367 │ 90233 │
│ 0.7125849723815918 │  -1.284695029258728 │ 11545 │
│ 0.7947999835014343 │ -2.1461191177368164 │ 97124 │
│          ·         │          ·          │    ·  │
│          ·         │          ·          │    ·  │
│          ·         │          ·          │    ·  │
│ 0.7129719853401184 │ -1.2910979986190796 │  7024 │
│ 0.7103580236434937 │  -1.396888017654419 │ 1

In [40]:
### INITIAL DATA CLEANSING ###

# change column zip to type varchar
# sql = "ALTER TABLE raw_companies ALTER COLUMN zip SET DATA TYPE VARCHAR;"

# ensure leading 0's present in zip
# sql += "UPDATE raw_companies SET zip = RIGHT('000' || zip, 5);" 

# convert lat/long variables to degrees from radians
# sql += "UPDATE raw_companies SET lat = DEGREES(lat), lon = DEGREES(lon);"

con.sql(load_sql_from_file("../SQL/initial_data_cleanse.sql"))

-- change column zip to type varchar
-- ensure leading 0's present in zip
-- convert lat/long variables to degrees from radians

ALTER TABLE raw_companies ALTER COLUMN zip SET DATA TYPE VARCHAR;
UPDATE raw_companies SET zip = RIGHT('000' || zip, 5);
UPDATE raw_companies SET lat = DEGREES(lat), lon = DEGREES(lon);



In [41]:
# display the results of cleansing

con.sql("SELECT DISTINCT lat, lon, zip FROM raw_companies")

┌────────────────────┬─────────────────────┬─────────┐
│        LAT         │         LON         │   ZIP   │
│       double       │       double        │ varchar │
├────────────────────┼─────────────────────┼─────────┤
│  112374.7501528708 │  -390805.9628003247 │ 93010   │
│  132956.0320216693 │ -244573.15885139877 │ 08873   │
│  141265.8968346589 │ -293808.96604802064 │ 53719   │
│ 147999.73965976422 │ -305742.03228118893 │ 55126   │
│ 122729.52975712848 │ -400791.14626243553 │ 94039   │
│ 138781.96680602228 │ -235762.78594941087 │ 01609   │
│ 138456.38541966357 │ -237441.31097147023 │ 01069   │
│ 128277.46303886319 │ -278056.16791353095 │ 41408   │
│ 131626.04870686884 │ -247437.41403932165 │ 19406   │
│  114001.5471863396 │  -284183.7932425247 │ 35894   │
│          ·         │           ·         │   ·     │
│          ·         │           ·         │   ·     │
│          ·         │           ·         │   ·     │
│ 111019.93426444048 │  -387498.1967262633 │ 90630   │
│ 112197.7

In [12]:
# set schema
# templated sql - DRY
schema = 'snow'

In [13]:
# create compustat dimension
# MD5 hash as keys vs ordered int
# surrogate vs. natural keys
# primary keys (collisions)
sql = f"""


CREATE OR REPLACE TABLE {schema}.compustat (
    cusip_id VARCHAR,
    cusip_name VARCHAR,
    cusip_historical_id VARCHAR,
    cusip_historical_name VARCHAR,
    PRIMARY KEY (cusip_id)
);

"""

sql += f"""
INSERT INTO {schema}.compustat (cusip_id, cusip_name, cusip_historical_id, cusip_historical_name)
WITH cid AS (
    SELECT DISTINCT
        CUSIPHEADERID AS cusip_id,
        CUSIPHEADERNAME AS cusip_name,
        CUSIPHISTORYID AS cusip_historical_id,
        CUSIPHISTORYNAME AS cusip_historical_name
    FROM
        raw_companies
)
SELECT
    MD5(COALESCE(cusip_id || cusip_name || cusip_historical_id || cusip_historical_name, '-1')) AS cusip_id,  --1 added for nulls
    cusip_name,
    cusip_historical_id,
    cusip_historical_name
FROM
    cid;
"""

# con.sql(sql)



In [14]:
# display the head of the snow.compustat table

con.sql(f"SELECT * FROM {schema}.compustat LIMIT 5")

┌──────────────────────────────────┬─────────────────────────────┬─────────────────────┬───────────────────────────┐
│             cusip_id             │         cusip_name          │ cusip_historical_id │   cusip_historical_name   │
│             varchar              │           varchar           │       varchar       │          varchar          │
├──────────────────────────────────┼─────────────────────────────┼─────────────────────┼───────────────────────────┤
│ ddda16d2d6741556439f68b2b5ec7617 │ AMERIQUEST TECHNOLOGIES INC │ 12589740            │ C M S ENHANCEMENTS INC    │
│ 975e660bf8ed33c337101272a4bd891c │ SYMBOLICS INC               │ 87151220            │ SYMBOLICS INC             │
│ 1163121a0747a209504d91a30a1b0450 │ INFORMATION INTERNATIONAL   │ 45674010            │ INFORMATION INTERNATIONAL │
│ a5e921c80b615e9fbd6222cdf64414e4 │ SCAILEX CORP LTD - ORD      │ 80909010            │ SCITEX CORP LTD  -ORD     │
│ 2419f64acf4053c654b6dd1c165833d3 │ AUDRE RECOGNITION SYS INC  

In [15]:
# create ceo dimension
# naming conventions
# joining on strings vs. others
sql = f"""

CREATE OR REPLACE TABLE {schema}.ceos (
    ceo_id VARCHAR,
    ceo_name VARCHAR,
    PRIMARY KEY (ceo_id)
);

"""

sql += f"""

INSERT INTO {schema}.ceos (ceo_id, ceo_name)

SELECT DISTINCT
    MD5(COALESCE(ceo, '-1')) AS ceo_id,
    ceo AS ceo_name
FROM
    raw_companies;
"""

con.sql(sql)

In [16]:
# display the snow.ceos table
con.table('snow.ceos')

┌──────────────────────────────────┬───────────────────────┐
│              ceo_id              │       ceo_name        │
│             varchar              │        varchar        │
├──────────────────────────────────┼───────────────────────┤
│ f058b6eb7f7202e7fa1ee269a287a67c │ William T. Mason      │
│ c8099fe7c4782d51be8495e08542d938 │ Charles P. Spector    │
│ 0bd712a991f0356000fb0d2f482c5e71 │ Thomas A. Vanderslice │
│ 47600976a8997349959fa69b00d27297 │ Lawrence T. Kou       │
│ 1a2acf2eefd35496a20d5a0ce8781481 │ Michael Rosenberg     │
│ 1798d480209139ae573e78cbfe62cb69 │ William F. Fenley     │
│ 49f40f4b3f783a2daca0ca36a5b3ff37 │ Ron Thompson          │
│ c0f53a44a73cdd13bf7214ded2ef93fa │ Chris Haudenschild    │
│ 32030b0a22aadde0a9d21991579cbaf1 │ Harold Copperman      │
│ 7a7288a494da60521b52e48bd11b14ca │ Aryeh Finegold        │
│                ·                 │     ·                 │
│                ·                 │     ·                 │
│                ·      

In [17]:
# create locations dimension
# type 2 dimensions - change slow
# insert only
# primary keys

sql = f"""

CREATE OR REPLACE TABLE {schema}.locations (
    location_id VARCHAR,
    zipcode VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    PRIMARY KEY (location_id)
);

"""

sql += f"""
INSERT INTO {schema}.locations (location_id, zipcode, latitude, longitude)

SELECT DISTINCT
    MD5(COALESCE(zip || CAST(lat AS VARCHAR) || CAST(lon AS VARCHAR), '-1')),
    zip,
    lat,
    lon
FROM
    raw_companies;

"""

con.sql(sql)

In [18]:
con.table('snow.locations')

┌──────────────────────────────────┬─────────┬────────────────────┬─────────────────────┐
│           location_id            │ zipcode │      latitude      │      longitude      │
│             varchar              │ varchar │       double       │       double        │
├──────────────────────────────────┼─────────┼────────────────────┼─────────────────────┤
│ 5d721601c5043800ef292bf582767b8d │ 92121   │ 32.891902929409454 │ -117.20350532704757 │
│ 806a7a68a63485d6d11e9cd29b2003bd │ 80233   │  39.90123771606788 │ -104.95824656040033 │
│ 12f6502395cb447546a31f12e27a3233 │ 55440   │  45.00984696992766 │  -93.42277422211164 │
│ bda418e93a7fd2e4dce67fdb92317165 │ 97420   │  43.36276392705201 │ -124.23312236044634 │
│ 83d8f091ae490085ffbebee2522e8b73 │ 06074   │  41.83405489268185 │  -72.55759815685487 │
│ 18545ec359ba0906ce910210e77aff29 │ 91320   │  34.17739234253004 │  -118.9358462062379 │
│ 9dc48c958248cda651926977c5f19ce3 │ 55425   │  44.84317327870642 │  -93.24939670022003 │
│ b830ca53

In [19]:
# create companies dimension
# normalization benefits

sql = f"""

CREATE OR REPLACE TABLE {schema}.companies (
    company_id VARCHAR,
    company_name VARCHAR,
    cusip_id VARCHAR,
    ceo_id VARCHAR,
    founding_year INTEGER,
    ownership_type VARCHAR,
    exit_type VARCHAR,
    exit_comment VARCHAR,
    location_id VARCHAR,
    PRIMARY KEY (company_id)
);

"""

sql += f"""

INSERT INTO {schema}.companies (company_id, company_name, cusip_id, ceo_id, founding_year, ownership_type, exit_type, exit_comment, location_id)

SELECT DISTINCT
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,
    firmname,
    MD5(COALESCE(CUSIPHEADERID || CUSIPHEADERNAME || CUSIPHISTORYID || CUSIPHISTORYNAME, '-1')),
    MD5(COALESCE(ceo, '-1')),
    found,
    CASE own
        WHEN 0 THEN 'Private'
        WHEN 1 THEN 'Public'
        WHEN 2 THEN 'Subsidiary'
        ELSE NULL END,
    CASE estate
        WHEN 0 THEN 'Censored'
        WHEN 1 THEN 'Exited market'
        WHEN 2 THEN 'Acquired'
        WHEN 3 THEN 'Spun off'
        WHEN 4 THEN 'Changed Name'
        ELSE NULL END,
    fate,
     MD5(COALESCE(zip || CAST(lat AS VARCHAR) || CAST(lon AS VARCHAR), '-1'))
    
FROM
    raw_companies;
"""

con.sql(sql)



ConstraintException: Constraint Error: PRIMARY KEY or UNIQUE constraint violated: duplicate key "f99b7b727ee5c2b600eb40a770327799"

In [None]:
# create product_offerings dimension
# what is a fact vs. a dimension
# CTE
sql = f"""

CREATE OR REPLACE TABLE {schema}.product_offerings (
    product_offering_id VARCHAR,
    product_types_offered VARCHAR,
    PRIMARY KEY (product_offering_id)
);

"""

sql += f"""

INSERT INTO {schema}.product_offerings (product_offering_id, product_types_offered)

WITH po AS (
     SELECT
        CASE
            WHEN system = 1 AND graphic = 0 THEN 'Desktop systems'
            WHEN system = 0 AND graphic = 1 THEN 'Graphics systems'
            WHEN system = 1 AND graphic = 1 THEN 'Desktop + Graphics systems'
            ELSE NULL END as offering

    from raw_companies
)
SELECT DISTINCT
    MD5(COALESCE(offering, '-1')),
    offering    
FROM
    po;
"""

con.sql(sql)

In [None]:
# create integration dimension
# type 4 dimension - change fast
sql = f"""

CREATE OR REPLACE TABLE {schema}.integrations (
   integration_id VARCHAR,
    cpu_source VARCHAR,
    os_source VARCHAR,
    application_source VARCHAR,
    communications_hardware_source VARCHAR,
    disk_source VARCHAR,
    ram_source VARCHAR,
    motherboard_source VARCHAR,
    PRIMARY KEY (integration_id)
);

"""

sql += f"""
INSERT INTO {schema}.integrations
(
    integration_id, 
    cpu_source, 
    os_source, 
    application_source, 
    communications_hardware_source, 
    disk_source, 
    ram_source, 
    motherboard_source
)

SELECT DISTINCT
    MD5(
        CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
    ), 
    CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
FROM
    raw_companies;


"""

con.sql(sql)

In [None]:
### create years table by dynamically assigning values ###

# integer values as key which helps with partitioning.
# date table's grain goes, other date information
# CTAS

sql = """
CREATE OR REPLACE TABLE years AS

WITH RECURSIVE years AS (
    SELECT 1970 as year
    UNION ALL
    SELECT year + 1
    FROM years
    WHERE year < 2000
)
SELECT 
    year,
    CASE 
        WHEN MOD(year - 1970, 12) = 0 THEN 'Dog'
        WHEN MOD(year - 1970, 12) = 1 THEN 'Pig'
        WHEN MOD(year - 1970, 12) = 2 THEN 'Rat'
        WHEN MOD(year - 1970, 12) = 3 THEN 'Ox'
        WHEN MOD(year - 1970, 12) = 4 THEN 'Tiger'
        WHEN MOD(year - 1970, 12) = 5 THEN 'Rabbit'
        WHEN MOD(year - 1970, 12) = 6 THEN 'Dragon'
        WHEN MOD(year - 1970, 12) = 7 THEN 'Snake'
        WHEN MOD(year - 1970, 12) = 8 THEN 'Horse'
        WHEN MOD(year - 1970, 12) = 9 THEN 'Sheep'
        WHEN MOD(year - 1970, 12) = 10 THEN 'Monkey'
        WHEN MOD(year - 1970, 12) = 11 THEN 'Rooster'
    END as zodiac_type
FROM years;
"""

con.sql(sql)

In [None]:
# display the snow.years table


### making the fact table

In [None]:
# create workstation_sales table
# additive facts vs. non-additive

sql = f"""

CREATE OR REPLACE TABLE {schema}.workstation_sales (
   year_id INTEGER,
    company_id VARCHAR,
    integration_id VARCHAR,
    product_offering_id VARCHAR,
    sales DOUBLE,
    research_budget DOUBLE,
    employee_count DOUBLE,
    product_offering_count INTEGER,
    product_category_count INTEGER,
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.workstation_sales 
   (year_id,
    company_id ,
    integration_id ,
    product_offering_id ,
    sales ,
    research_budget ,
    employee_count ,
    product_offering_count ,
    product_category_count
    )

SELECT
    year,
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,

    MD5(
        CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
    ),
    
    MD5(COALESCE(
        CASE
            WHEN system = 1 AND graphic = 0 THEN 'Desktop systems'
            WHEN system = 0 AND graphic = 1 THEN 'Graphics systems'
            WHEN system = 1 AND graphic = 1 THEN 'Desktop + Graphics systems'
            ELSE NULL END,
        '-1'
    )),
    
    sales,
    randd,
    employ,
    products,
    ptypes
    
    
FROM
    raw_companies
    ;

"""

con.sql(sql)

In [None]:
# con.sql('select count(*) from workstation_snowflake.workstation_sales a join workstation_snowflake.companies b on a.company_id = b.company_id')

# con.sql('select count(*) from workstation_snowflake.workstation_sales a join workstation_snowflake.integrations b on a.integration_id = b.integration_id')

# con.sql('select count(*) from workstation_snowflake.companies') # 388

# con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.locations l on c.location_id = l.location_id")

con.sql("select 'locations', count(*) from snow.companies c join snow.compustat s on c.cusip_id = s.cusip_id")

# con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.ceos s on c.ceo_id = s.ceo_id")

# con.sql('select company_name, c.location_id, l.location_id from workstation_snowflake.companies c left join workstation_snowflake.locations l on c.location_id = l.location_id')

# con.sql('select count(*) from raw_companies')

## A star data model

Steps of Kimball dimensional modeling:

1. Select the business process
2. Declare the grain
3. Identify the dimensions
4. Identify the facts

![](star.png)

In [None]:
# update schema


In [None]:
# make new companies attribute which contains
# pros/cons of denormalizing to star
sql = f"""

CREATE OR REPLACE TABLE {schema}.companies (
       company_id VARCHAR,
    company_name VARCHAR,
    cusip_id VARCHAR,
    cusip_name VARCHAR,
    cusip_historical_id VARCHAR,
    cusip_historical_name VARCHAR,
    ceo_name VARCHAR,
    founding_year INTEGER,
    ownership_type VARCHAR,
    exit_type VARCHAR,
    exit_comment VARCHAR,
    zipcode VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    PRIMARY KEY (company_id),
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.companies (
    company_id ,
    company_name ,
    cusip_id ,
    cusip_name ,
    cusip_historical_id ,
    cusip_historical_name ,
    ceo_name ,
    founding_year ,
    ownership_type ,
    exit_type ,
    exit_comment ,
    zipcode ,
    latitude ,
    longitude )

SELECT DISTINCT
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,
    firmname,
    CUSIPHEADERID,
    CUSIPHeaderName,
    CUSIPHISTORYID,
    CUSIPHistoryName,
    ceo,
    found,
    CASE own
        WHEN 0 THEN 'Private'
        WHEN 1 THEN 'Public'
        WHEN 2 THEN 'Subsidiary'
        ELSE NULL END,
    CASE estate
        WHEN 0 THEN 'Censored'
        WHEN 1 THEN 'Exited market'
        WHEN 2 THEN 'Acquired'
        WHEN 3 THEN 'Spun off'
        WHEN 4 THEN 'Changed Name'
        ELSE NULL END,
    fate,
    ZIP,
    lat,
    lon
    
FROM
    raw_companies;
"""

con.sql(sql)

In [None]:
# inner join the count of workstation_snowflake.workstation_sales to star.companies by company ID
con.sql("""
select count(*) 
    from workstation_snowflake.workstation_sales a 
    join star.companies b 
        on a.company_id = b.company_id
""")

### Some query testing

In [None]:
# Select the zipcode and sum of sales 
# from snow.workstation_sales
# inner join to snow.companies on company ID
# inner join to snow.locations on location ID
# group by 1st column
# order by 2nd column in descending order


In [None]:
# Select the zipcode and sum of sales 
# from snow.workstation_sales
# inner join to snow.companies on company ID
# group by 1st column
# order by 2nd column in descending order


In [None]:
%%timeit -n 10

con.sql("""
select zipcode, sum(sales) 
from snow.workstation_sales a 
join snow.companies c on a.company_id=c.company_id
join snow.locations l on c.location_id=l.location_id
group by 1 order by 2 desc""")

In [None]:
%%timeit -n 10

con.sql("""
select zipcode, sum(sales) 
from snow.workstation_sales a 
join star.companies c on a.company_id=c.company_id
group by 1 order by 1 desc""")