In [69]:
from collections.abc import Callable
from tabulate import tabulate
import sqlite3 as sql
import traceback
import hashlib
import random
import string
import csv

## Two Cirlces Case Study - SQLite3 Views

#### Helper Functions

In [70]:
def print_schemas(path: str) -> None:
    '''
    Prints a detailed view of all schemas 
    within the database provided by path. 
    This includes types as well as all tags.
    '''
    conn = sql.connect(path)
    cur = conn.cursor()

    cur.execute('''
        SELECT
            name
        FROM
            sqlite_master
        WHERE
            type = 'table'
    ''')
    tables = cur.fetchall()

    if not tables:
        print("No tables found in database.")
        return
    
    for t in tables:
        name = t[0]
        print(f"\nTable: {name}\n")

        # Get column details
        cur.execute(f"PRAGMA table_info({name})")
        cols = cur.fetchall()

        # Format output with headers
        headers = ["Column Name", "Data Type", "Nullable", "Primary Key", "Default"]
        rows = [
            [col[1], col[2], "NO" if col[3] else "YES", "YES" if col[5] else "NO", col[4]]
            for col in cols
        ]
        
        print(tabulate(rows, headers = headers, tablefmt = "grid"))
        print("\n" + "-" * 50 + "\n")
    
    conn.close()

In [71]:
def print_sample(cursor: Callable, table_name: str, n_rows: int = 5) -> None:
    ''' Prints out sample rows from a table in a tabulated form'''
    cursor.execute(f"""
        SELECT * 
        FROM {table_name} 
        ORDER BY RANDOM() 
        LIMIT {n_rows}
    """)

    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]

    print(f"\nRandom {n_rows} rows from '{table_name}':")
    print(tabulate(rows, headers = columns, tablefmt = 'psql'))


In [72]:
def gen_hash(*args) -> str:
    '''
    Using the given seed argumements generates
    a truncated hash
    '''
    try:
        seed = ''.join(
            str(s) if s is not None else '.'
            for s in args
            ).encode('utf-8')
        return hashlib.sha256(seed).hexdigest()[:56]
    except Exception as e:
        print(f"Error in hash_columns: {e}")
        traceback.print_exc()

In [73]:
def standardize_name(name: str) -> str:
    '''Removes partner name variations'''
    match name:
        case 'Chevy':
            return 'chevrolet'
        case 'Draftkings' | 'Draft kings' | 'Draft Kings':
            return 'draft_kings'
        case 'Citizens Financial' | 'Citizens Bank' | 'Citizens':
            return 'citizens_bank'
        case 'Coke' | 'Diet Coke' | 'Coke Zero' | 'Coca Cola' | 'Coca-cola':
            return 'coca_cola'
        case 'Toyota Motors':
            return 'toyota'
        case 'Verizon Wireless':
            return 'verizon'
        case 'Chase Freedom' | 'JP Morgan':
            return 'chase'
        case 'Motorola Mobility':
            return 'motorola'
        case 'Miller' | 'Miller Lite':
            return 'miller_lite'
        case 'United Air Lines' | 'United Airlines':
            return 'united_airlines'
        case 'Vizzy Seltzer':
            return 'vizzy'
        case 'Detla Air Lines' | 'Delta Airlines' | 'Delta':
            return 'delta_airlines'
        case 'Fan Duel':
            return 'fan_duel'
        case _:
            return name.lower()

In [74]:
def standardize_assets(asset: str) -> str:
    '''Removes asset naming variations'''
    match asset:
        case 'Player Tunnel':
            return 'player_tunnel'
        case 'Pole Pad' | 'Pole':
            return 'pole_pad'
        case 'Basket Stanchion' | 'Stanchion':
            return 'basket_stanchion'
        case 'On-Court' | 'On Court Signage':
            return 'on_court_signage'
        case 'LED' | 'LED Courtside':
            return 'led_courtside'
        case 'Seatbacks' | 'Player Seatbacks':
            return 'player_seatbacks'
        case 'Upper Bowl LED':
            return 'upper_bowl_led'
        case 'Lower Bowl LED':
            return 'lower_bowl_led'
        case 'Exit Tunnel':
            return 'exit_tunnel'
        case 'Virtual Branding':
            return 'virtual_branding'
        case 'Penalty Box':
            return 'penalty_box'
        case 'Text Mention':
            return 'text_mention'
        case _:
            return asset if type(asset) != str else asset.lower()

In [75]:
def standardize_camp(c: str) -> str:
    '''Removes varation and formats campaign naming'''
    match c:
        case 'Top Play':
            return 'top_play'
        case 'Goal Update':
            return 'goal_update'
        case 'Community Night':
            return 'community_night'
        case 'Player Arrival':
            return 'player_arrival'
        case 'Starting Line Up' | 'Starting Lineup':
            return 'starting_line_up'
        case 'Tune In':
            return 'tune_in'
        case 'In-Game Highlight':
            return 'game_highlight'
        case _:
            return c.lower() if type(c) == str else c

In [76]:
def snake_case(s: str) -> str:
    if (not s) or (type(s) != str): return s
    s = str(s).lower().strip()
    return '_'.join(s.split(' '))

In [77]:
def gen_id() -> str:
    characters = string.digits + string.ascii_lowercase
    uuid = ''.join(random.choice(characters) for _ in range(56))
    return gen_hash(uuid)

#### Data Insertion

In [78]:
# Create a database named two_circles
conn = sql.connect('two_cirlces.db')

# Cursor used to execute commands
cur = conn.cursor()

In [79]:
# Drop table if social_dataset already exists
cur.execute('DROP TABLE IF EXISTS social_dataset;')

# Create empty social_dataset
cur.execute('''
    CREATE TABLE IF NOT EXISTS social_dataset (
        exposure_id           TEXT            NOT NULL ON CONFLICT IGNORE,
        partner               TEXT            NOT NULL ON CONFLICT IGNORE,
        channel               TEXT            NOT NULL ON CONFLICT IGNORE,
        post_id               TEXT            NOT NULL ON CONFLICT IGNORE,
        partner_exposure_date TEXT            NOT NULL ON CONFLICT IGNORE,
        content_type          TEXT            NOT NULL ON CONFLICT IGNORE,
        impressions           INTEGER,
        projected_impressions INTEGER,
        video_views           INTEGER,
        brand_exposure_value  NUMERIC (10, 8) NOT NULL ON CONFLICT IGNORE,
        logo_impressions      INTEGER,
        engagement            INTEGER,
        asset                 TEXT,
        post_value            NUMERIC (10, 8) NOT NULL ON CONFLICT IGNORE,
        feed_name             TEXT            NOT NULL ON CONFLICT IGNORE,
        campaign              TEXT,
        snapshot_date         TEXT            NOT NULL ON CONFLICT IGNORE
    );
''')

# Open social_dataset.csv and read data
with open('Data/social_data_set.csv', 'r') as file:
    csv_reader = csv.reader(file)

    # Skip header
    next(csv_reader)

    # Insert data into the table
    # The .join is used to repeat NULLIF(?, 'NULL') for all 17 columns
    cur.executemany(f'''
        INSERT INTO social_dataset (
            exposure_id, 
            partner,
            channel,
            post_id,
            partner_exposure_date,
            content_type,
            impressions,
            projected_impressions,
            video_views,
            brand_exposure_value,
            logo_impressions,
            engagement,
            asset,
            post_value,
            feed_name,
            campaign,
            snapshot_date
        )
        VALUES ({", ".join(["NULLIF(?, 'NULL')"] * 17)})
        ''',
        csv_reader
    )

conn.commit()

In [80]:
# Drop table if broadcast_dataset already exists
cur.execute('DROP TABLE IF EXISTS broadcast_dataset;')

# Create empty broadcast_dataset
cur.execute('''
    CREATE TABLE IF NOT EXISTS broadcast_dataset (
        season                     TEXT            NOT NULL ON CONFLICT IGNORE,
        date                       TEXT            NOT NULL ON CONFLICT IGNORE,
        game                       TEXT            NOT NULL ON CONFLICT IGNORE,
        brand                      TEXT            NOT NULL ON CONFLICT IGNORE,
        asset                      TEXT,
        exposure                   INTEGER,
        duration                   INTEGER,
        qimv                       INTEGER,
        exposure_impressions       INTEGER,
        sponsorship_impressions    INTEGER,
        qi_sponsorship_impressions INTEGER
    );
''')

# Open social_dataset.csv and read data
with open('Data/broadcast_data_set.csv', 'r') as file:
    csv_reader = csv.reader(file)

    # Skip header
    next(csv_reader)

    # Insert data into the table
    # The .join is used to repeat NULLIF(?, 'NULL') for all 17 columns
    cur.executemany(f'''
        INSERT INTO broadcast_dataset (
            season, 
            date,
            game,
            brand,
            asset,
            exposure,
            duration,
            qimv,
            exposure_impressions,
            sponsorship_impressions,
            qi_sponsorship_impressions
        )
        VALUES ({", ".join(["NULLIF(?, 'NULL')"] * 11)})
        ''',
        csv_reader
    )

conn.commit()

In [97]:
# Verify schemas with pre-defined helper function
print_schemas('two_cirlces.db')


Table: social_dataset

+-----------------------+-----------------+------------+---------------+-----------+
| Column Name           | Data Type       | Nullable   | Primary Key   | Default   |
| exposure_id           | TEXT            | NO         | NO            |           |
+-----------------------+-----------------+------------+---------------+-----------+
| partner               | TEXT            | NO         | NO            |           |
+-----------------------+-----------------+------------+---------------+-----------+
| channel               | TEXT            | NO         | NO            |           |
+-----------------------+-----------------+------------+---------------+-----------+
| post_id               | TEXT            | NO         | NO            |           |
+-----------------------+-----------------+------------+---------------+-----------+
| partner_exposure_date | TEXT            | NO         | NO            |           |
+-----------------------+----------------

In [98]:
# Verify rows were inserted by sampling some random ones
query = '''
    SELECT
        * 
    FROM 
        social_dataset 
    WHERE
        rowid IN (
            SELECT
                rowid
            FROM
                social_dataset
            ORDER BY
                RANDOM()
            LIMIT
                3
        )
'''

print_sample(cur, 'social_dataset', 3)
print_sample(cur, 'broadcast_dataset', 3)


Random 3 rows from 'social_dataset':
+----------------------------------------------------------+-----------+-----------+----------------------------------------------------------+-------------------------+----------------+---------------+-------------------------+---------------+------------------------+--------------------+--------------+------------------+--------------+-------------+------------+---------------------+
| exposure_id                                              | partner   | channel   | post_id                                                  | partner_exposure_date   | content_type   |   impressions |   projected_impressions |   video_views |   brand_exposure_value |   logo_impressions |   engagement | asset            |   post_value | feed_name   | campaign   | snapshot_date       |
|----------------------------------------------------------+-----------+-----------+----------------------------------------------------------+-------------------------+---------------

#### Standardization & Normalization - Social Dataset

**SN1:** Only issue with this method is that since asset is non-exhasutive, there could be two seperate assets by the same company,featured in the same post, but they are both encoded as NULL so we cannot differentiate. Only method is using the differences in their data analytics which is unreliable at best.

**SN2, SN3, SN6, SN7:** In abscense of User-Defined Prodcedures (like in MySQL or PostgreSQL), User-Defined Functions (UDFs) have been used.These operate like procedures but are added to the SQLite containerized database and can be created in a variety of languages. 

In [83]:
# SN1: Drop rows with duplicate [exposure_id, post_id, asset, snapshot_date]
cur.execute('''
    DELETE FROM
        social_dataset
    WHERE rowid NOT IN (
        SELECT 
            MAX(rowid)
        FROM 
            social_dataset
        GROUP BY 
            exposure_id, post_id, asset, snapshot_date
    );
''')

conn.commit()

In [84]:
# SN2: Modify exposure_id so that it takes into account different assets
# If the same brand appears in the same video but in a different forms, we need to differentiate

# SN3: Standardize partner naming conventions
# SN6: Stanrdaize asset naming conventions
# SN7: Standardize campaign naming conventions
# SN8: Lowercase all remaining textual fields for consistency

conn.create_function('standardize_assets', 1, standardize_assets)
conn.create_function('standardize_camp', 1, standardize_camp)
conn.create_function('standardize_name', 1, standardize_name)
conn.create_function('hash_id', -1, gen_hash)

cur.execute('''
    UPDATE
        social_dataset
    SET
        asset = standardize_assets(asset),
        campaign = standardize_camp(campaign),
        partner = standardize_name(partner),
        exposure_id = hash_id(
            exposure_id,
            post_id,
            asset
        ),
        channel = LOWER(channel),
        content_type = LOWER(content_type),
        feed_name = LOWER(feed_name)
''')

conn.commit()

In [85]:
# SN4: Some entries for logo_impression do not conform to (projected_impressions / 5.7), so need to fix
# This trend is only seen for a subset of Twitter photos
cur.execute('''
    UPDATE
        social_dataset
    SET
        logo_impressions = CAST(projected_impressions / 5.7 AS Integer)
    WHERE
        logo_impressions = projected_impressions
''')

conn.commit()

In [86]:
# SN5: When video_views exceeds impressions, set impressions to video_views
cur.execute('''
    UPDATE
        social_dataset
    SET
        impressions = video_views
    WHERE
        video_views > impressions
''')

conn.commit()

#### Standardization & Normalization - Broadcast Dataset

In [87]:
# SN1: Drop irrelevant columns
cur.execute('''
    ALTER TABLE broadcast_dataset 
    DROP COLUMN season;
''')

# SN2: Rename specific columns
renaming = [
    'exposure_impressions TO impressions',
    'sponsorship_impressions TO video_views',
    'brand TO partner'
]
for r in renaming:
    cur.execute(f'''
        ALTER TABLE broadcast_dataset 
        RENAME COLUMN {r};
    ''')

# SN3: Add columns
columns = [
    'partner_exposure_date',
    'snapshot_date',
    'exposure_id',
    'post_id',
    'channel',
    'content_type',
    'feed_name'
]
for c in columns:
    cur.execute(f'''
        ALTER TABLE broadcast_dataset ADD COLUMN {c} TEXT;
    ''')

conn.commit()

In [88]:
# SN4: Standardized naming and fill in static columns
conn.create_function('snake_case', 1, snake_case)
conn.create_function('gen_id', 0, gen_id)

cur.execute('''
    UPDATE
        broadcast_dataset
    SET
        asset = standardize_assets(asset),
        partner = standardize_name(partner),
        game = snake_case(game),
        channel = 'broadcast',
        content_type = 'video',
        feed_name = 'earned',
        partner_exposure_date = date,
        snapshot_date = date,
        post_id = hash_id(game),
        exposure_id = gen_id();
''')

cur.execute('''
    ALTER TABLE
        broadcast_dataset 
    DROP COLUMN 
        date;
''')

conn.commit()

In [89]:
# SN5: Drop duplicates
cur.execute('''
    DELETE FROM
        broadcast_dataset
    WHERE rowid NOT IN (
        SELECT 
            MAX(rowid)
        FROM 
            broadcast_dataset
        GROUP BY 
            game, partner, asset
    );
''')

conn.commit()

In [90]:
# SN6: NHL games are ~2.5 hours, so add 2.5 houts to snapshot_date

cur.execute('''
    UPDATE
        broadcast_dataset
    SET 
        snapshot_date = datetime(
            snapshot_date,
            '+2.5 hours'
        )
    WHERE 
        snapshot_date IS NOT NULL;
''')

conn.commit()


#### Data Consolidation

In [91]:
# Merging tables by a union all
cur.execute('DROP TABLE IF EXISTS merged_table;')  

cur.execute('''
CREATE TABLE merged_table AS
    SELECT 
        exposure_id,
        partner,
        channel,
        post_id,
        partner_exposure_date,
        content_type,
        impressions,
        projected_impressions,
        video_views,
        brand_exposure_value,
        logo_impressions,
        engagement,
        asset,
        post_value,
        feed_name,
        campaign,
        snapshot_date,
        NULL AS game,
        NULL AS exposure,
        NULL AS duration,
        NULL AS qimv,
        NULL AS qi_sponsorship_impressions
    FROM 
        social_dataset

UNION ALL

    SELECT 
        exposure_id,
        partner,
        channel,
        post_id,
        partner_exposure_date,
        content_type,
        impressions,
        NULL AS projected_impressions,
        video_views,
        NULL AS brand_exposure_value,
        NULL AS logo_impressions,
        NULL AS engagement,
        asset,
        NULL AS post_value,
        feed_name,
        NULL AS campaign,
        snapshot_date,
        game,
        exposure,
        duration,
        qimv,
        qi_sponsorship_impressions
    FROM 
        broadcast_dataset;
''')

<sqlite3.Cursor at 0x26d241fca40>

In [99]:
print_sample(cur, 'merged_table', 10)


Random 10 rows from 'merged_table':
+----------------------------------------------------------+---------------+-----------+----------------------------------------------------------+-------------------------+----------------+---------------+-------------------------+---------------+------------------------+--------------------+--------------+------------------+--------------+-------------+------------+---------------------+--------+------------+------------+--------+------------------------------+
| exposure_id                                              | partner       | channel   | post_id                                                  | partner_exposure_date   | content_type   |   impressions |   projected_impressions |   video_views |   brand_exposure_value |   logo_impressions |   engagement | asset            |   post_value | feed_name   | campaign   | snapshot_date       | game   | exposure   | duration   | qimv   | qi_sponsorship_impressions   |
|--------------------------

#### Data Model Definition

In [100]:
# Entity: Content_Channel
cur.execute('DROP VIEW IF EXISTS content_channel_view;')
cur.execute('''
    CREATE VIEW content_channel_view AS
        SELECT
            hash_id(t.content_type, t.channel) AS content_channel_id,
            t.content_type,
            t.channel
        FROM (
            SELECT DISTINCT
                content_type,
                channel
            FROM
                merged_table
        ) AS t
''')
print_sample(cur, 'content_channel_view', 3)


Random 3 rows from 'content_channel_view':
+----------------------------------------------------------+----------------+-----------+
| content_channel_id                                       | content_type   | channel   |
|----------------------------------------------------------+----------------+-----------|
| 5e2f614bf69248d26c5f0f9d10f307d880dde60ed5000944c6353a05 | text           | twitter   |
| 3ace2bd75220e7c3688532b954617ca41b01da4ce2356740857d995c | video          | youtube   |
| 7f8662fe280159fc11fedb525130a8d67b3ec15eae984a074edf0cfd | photo          | facebook  |
+----------------------------------------------------------+----------------+-----------+


In [101]:
# Entity: Posts
cur.execute('DROP VIEW IF EXISTS posts_view;')
cur.execute('''
    CREATE VIEW posts_view AS
        SELECT DISTINCT
            post_id,
            hash_id(content_type, channel) AS content_channel_id,
            partner_exposure_date,
            game,
            campaign,
            feed_name
        FROM
            merged_table
''')
print_sample(cur, 'posts_view', 3)


Random 3 rows from 'posts_view':
+----------------------------------------------------------+----------------------------------------------------------+-------------------------+--------+------------+-------------+
| post_id                                                  | content_channel_id                                       | partner_exposure_date   | game   | campaign   | feed_name   |
|----------------------------------------------------------+----------------------------------------------------------+-------------------------+--------+------------+-------------|
| 1d090d110e391fd980e87b91a5a7ef71e8e414e1bb00827b2326a7da | 0da1aa614f006e1b5b3c8515205d181e8518a26664a37461258a4270 | 2023-01-26 22:00:00     |        |            | owned       |
| 68f8439bd1895613980fcddcd3fcc4eb6540087954dc9fa754141115 | 0da1aa614f006e1b5b3c8515205d181e8518a26664a37461258a4270 | 2023-02-26 14:08:00     |        |            | earned      |
| cfe749b77a3e15f1a87c0c977aa3de7dac27ca2653d0bc78470693

In [102]:
# Entity: Exposures
cur.execute('DROP VIEW IF EXISTS exposures_view;')
cur.execute('''
    CREATE VIEW exposures_view AS
        SELECT DISTINCT
            exposure_id,
            post_id,
            partner,
            asset
        FROM
            merged_table
''')
print_sample(cur, 'exposures_view', 3)


Random 3 rows from 'exposures_view':
+----------------------------------------------------------+----------------------------------------------------------+----------------+------------------+
| exposure_id                                              | post_id                                                  | partner        | asset            |
|----------------------------------------------------------+----------------------------------------------------------+----------------+------------------|
| 0379d45784da2a0d742b3cda0a72705e81ee888eb7495e71f596fb4b | 80f506f753240350e8f3161369cee0813f51c1a5ad20c010086d695d | delta_airlines | led_courtside    |
| dffe687d0e199f4c6a5a55cda9b6c061b7ee08ff07037f0339cb1065 | dd457c96caa39354e6ce16e01cc8d6428ad60ca5bf60676fc510882d | coca_cola      | basket_stanchion |
| 6da8e0f102d576e0b5d36df7fb341c532aaa32ad46804e6890b88e51 | 11e615e7052313ea3e44b0119bdf2464675cf160a80d56b6439ee954 | coca_cola      | led_courtside    |
+-------------------------

In [103]:
# Entity: Data_Snapshots
cur.execute('DROP VIEW IF EXISTS data_snapshots_view;')
cur.execute('''
    CREATE VIEW data_snapshots_view AS
        SELECT DISTINCT
            hash_id(exposure_id, snapshot_date) AS snapshot_id,
            exposure_id,
            snapshot_date,
            impressions,
            projected_impressions,
            logo_impressions,
            video_views,
            engagement,
            brand_exposure_value,
            post_value,
            exposure,
            duration,
            qimv,
            qi_sponsorship_impressions
        FROM
            merged_table
''')
print_sample(cur, 'data_snapshots_view', 3)


Random 3 rows from 'data_snapshots_view':
+----------------------------------------------------------+----------------------------------------------------------+---------------------+---------------+-------------------------+--------------------+---------------+--------------+------------------------+--------------+------------+------------+--------+------------------------------+
| snapshot_id                                              | exposure_id                                              | snapshot_date       |   impressions | projected_impressions   |   logo_impressions |   video_views |   engagement |   brand_exposure_value |   post_value | exposure   | duration   | qimv   | qi_sponsorship_impressions   |
|----------------------------------------------------------+----------------------------------------------------------+---------------------+---------------+-------------------------+--------------------+---------------+--------------+------------------------+-------------