DO NOT RUN THIS SCRIPT
============
Takes 15-25 minutes to run if database has not been created

Dataset Summary
=====================

The original Netflix Prize dataset was in the form of a small file for items (movies, tv shows, etc.) that had one line per item, and several test file for validating the model that reasearches had developed for the competition. The main training data was located a folder that contained 1 file per item in the item file. The item file had 17770 entries, and the training data folder had 17770 files containing an arbitary amount of ratings per file.

Datafile Formats
=====================

Item File Format
----------------------
Each line has the format:

MovieID,YearOfRelease,Title

- MovieID do not correspond to actual Netflix movie ids or IMDB movie ids.
- YearOfRelease can range from 1890 to 2005 and may correspond to the release of
  corresponding DVD, not necessarily its theaterical release.
- Title is the Netflix movie title and may not correspond to 
  titles used on other sites.  Titles are in English.

Rating File Format
----------------------
Each file has a single line to start that contains the MovieID followed by a colon. After that each line has the format:

CustomerID,Rating,Date

- MovieIDs range from 1 to 17770 sequentially.
- CustomerIDs range from 1 to 2649429, with gaps. There are 480189 users.
- Ratings are on a five star (integral) scale from 1 to 5.
- Dates have the format YYYY-MM-DD.

Import/Tidy
============

The general method for import was:
 1. Open connection to sqlite3 file
 2. Delete schemas if present
 3. Create schemas
 4. Parse item file
 5. Commit to databse
 6. For each rating file parse rating file and commit to database

Tidying was an artifact of placing the data in the database, the following items were cleaned:
 - Dates were read from format and placed in ISO format, sqlite supports sorting date strings in ISO format
 - Stripping textual data of leading and trailing whitespace
 - Converting numeric data to appropriate type, int or real

In [1]:
from datetime import datetime
import pathlib as plib
import sqlite3

import pandas as pd
import numpy as np
from ipywidgets import IntProgress
from IPython.display import display

In [2]:
DB_DEFINITION = """
CREATE TABLE item
(
    item_id INTEGER PRIMARY KEY NOT NULL,
    release_year INTEGER,
    name TEXT NOT NULL,
    avg_rating REAL DEFAULT 0 NOT NULL,
    rating_count INT DEFAULT 0 NOT NULL,
    mean_day REAL DEFAULT 0 NOT NULL
);
CREATE TABLE rating
(
    rating_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
    item_id INTEGER NOT NULL,
    customer_id INTEGER NOT NULL,
    value REAL NOT NULL,
    baseline_value REAL DEFAULT 0.0 NOT NULL,
    date TEXT NOT NULL,
    day INTEGER NOT NULL,
    model_group_id INTEGER NOT NULL
);
CREATE TABLE customer
(
    customer_id INT PRIMARY KEY NOT NULL,
    avg_rating REAL NOT NULL,
    rating_count INT NOT NULL,
    mean_day REAL NOT NULL
);
CREATE TABLE customer_time_freq
(
    customer_id INT NOT NULL,
    day INT NOT NULL,
    freq INT NOT NULL
)
"""

CLEAR_DB = """
DROP TABLE IF EXISTS item;
DROP TABLE IF EXISTS rating;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS customer_time_freq;
"""

INSERT_ITEM = """
INSERT INTO
    item (item_id, release_year, name)
VALUES
    (?, ?, ?);
"""
INSERT_RATING = """
INSERT INTO
    rating (item_id, customer_id, value, 'date', day, model_group_id) 
VALUES
    (?, ?, ?, ?, ?, ?);
"""
INSERT_CUSTOMERS = """
INSERT INTO
    customer (customer_id, avg_rating, rating_count, mean_day)
SELECT
    customer_id, avg(value), count(value), avg(day)
FROM rating
GROUP BY customer_id;
"""
INSERT_CUSTOMER_DAY_FREQS = """
INSERT INTO 
    customer_time_freq (customer_id, day, freq)
SELECT
    customer_id, day, count(*) as freq
FROM rating
GROUP BY customer_id, day;
"""
CREATE_TEMP_ITEM_STATS = """
CREATE TEMP TABLE item_stats
(
    item_id INT PRIMARY KEY NOT NULL,
    avg_rating REAL NOT NULL,
    rating_count INT NOT NULL,
    mean_day REAL NOT NULL
);
"""
FILL_TEMP_ITEM_STATS = """
INSERT INTO
    item_stats (item_id, avg_rating, rating_count, mean_day)
SELECT
    item_id, avg(value), count(value), avg(day)
FROM rating
GROUP BY item_id;
"""
CREATE_TEMP_ITEM_STATS_INDEX = """
CREATE INDEX item_id_temp_stats_index ON item_stats (item_id);
"""
INSERT_ITEM_STATS = """
UPDATE item
SET
    avg_rating = (
        SELECT
            item_stats.avg_rating
        FROM item_stats
        WHERE item_stats.item_id = item.item_id
    ),
    rating_count = (
        SELECT
            item_stats.rating_count
        FROM item_stats
        WHERE item_stats.item_id = item.item_id
    ),
    mean_day = (
        SELECT
            item_stats.mean_day
        FROM item_stats
        WHERE item_stats.item_id = item.item_id
    );
"""

SHIFT_ITEM_ID_DOWN = """
UPDATE item
SET item_id = item_id - 1;
UPDATE rating
SET item_id = item_id - 1;
"""

In [3]:
PROJECT_DATA_HOME = plib.Path('/') / 'data' / 'declanvk'
RAW_DATA_FOLDER = PROJECT_DATA_HOME / 'netflix_prize'
TRAINING_DATA_FOLDER = RAW_DATA_FOLDER / 'training_set'
ITEM_FILE = RAW_DATA_FOLDER / 'movie_titles.txt'
DB_FILE = PROJECT_DATA_HOME / 'netflix_prize.sqlite'

MAX_DATE = datetime.strptime("2005-12-31", "%Y-%m-%d")
MIN_DATE = datetime.strptime("1999-11-11", "%Y-%m-%d")
DATE_RANGE = MAX_DATE - MIN_DATE
SUB_GROUPS = 1000000

In [4]:
def main_import():
    """Main function"""

    with sqlite3.connect(str(DB_FILE)) as conn:
        curs = conn.cursor()
        try:
            curs.executescript(CLEAR_DB)
            curs.executescript(DB_DEFINITION)
            conn.commit()
            print("Cleared database and loaded table definitions")

            import_items(curs)
            conn.commit()
            print("Imported all items")

            import_all_ratings(curs, conn)
            conn.commit()
            print("Imported all ratings")

            curs.execute(INSERT_CUSTOMERS)
            conn.commit()
            print("Collected customer data from ratings")
            
            curs.execute(INSERT_CUSTOMER_DAY_FREQS)
            conn.commit()
            print("Insert customer day frequencies")

            curs.execute(CREATE_TEMP_ITEM_STATS)
            curs.execute(FILL_TEMP_ITEM_STATS)
            curs.execute(CREATE_TEMP_ITEM_STATS_INDEX)
            curs.execute(INSERT_ITEM_STATS)
            conn.commit()
            print("Collected pre item statistiscs")

            curs.executescript(SHIFT_ITEM_ID_DOWN)
            conn.commit()
            print("Shifted item_id down to account for later tensor indexing use")
        except Exception as err:
            conn.rollback()
            raise err
        finally:
            curs.close()

def import_items(cursor, item_file_path=ITEM_FILE):
    """Import items from dataset"""
    with item_file_path.open('r', encoding='latin1') as item_file:
        items = map(parse_item_row, item_file)
        cursor.executemany(INSERT_ITEM, items)

def import_all_ratings(cursor, connection, training_data_path=TRAINING_DATA_FOLDER):
    """Import all rating files in training_data_path and remaps customer ids"""
    training_files = list(training_data_path.iterdir())
    customer_id_mapping = dict()
    current_id = 0
    for idx, training_file in enumerate(training_files):
        current_id = import_rating(cursor, training_file, customer_id_mapping, current_id)
        connection.commit()

def import_rating(cursor, training_file, id_mapping, current_id):
    """Import single rating file"""
    ratings_df = pd.read_csv(str(training_file), skiprows=1, header=None)
    item_id = int(training_file.name[3:10])
    ratings_df[2] = pd.to_datetime(ratings_df[2], format="%Y-%m-%d")
    ratings_df[3] = (ratings_df[2] - MIN_DATE).apply(lambda td: td.days)
    ratings_df[4] = np.random.randint(SUB_GROUPS, size=ratings_df.shape[0])

    def create_tuple(df_row):
        """Process rows of rating file"""
        nonlocal current_id
        nonlocal item_id
        if df_row[0] not in id_mapping:
            id_mapping[df_row[0]] = current_id
            current_id = current_id + 1

        new_id = id_mapping[df_row[0]]
        return item_id, new_id, float(df_row[1]),\
                str(df_row[2].date()), int(df_row[3]), int(df_row[4])

    ratings = map(create_tuple, ratings_df.itertuples(index=False, name=None))

    cursor.executemany(INSERT_RATING, ratings)

    return current_id

def parse_item_row(item_row_string):
    """Read items from file contents"""
    item_row = item_row_string.strip().split(",")
    item_id = int(item_row[0])
    release_year = datetime.strptime(item_row[1], "%Y").date().year \
                    if item_row[1] != 'NULL' else None
    name = ",".join(item_row[2:])
    return item_id, release_year, name

In [5]:
if not DB_FILE.exists():
    main_import()

Adding Data & Precomputing Statistics
================================

Certain commonly used pieces of data were precomputed, such as the averages for each item and customer, and the number of ratings per each item or customer. Another field that was added was a random number that will make sampling the database slightly easier. Instead of needing to perform an very expensive random selection based on RatingID, I can randomly select from the ModelGroupID field which is a randomly assigned number between 0 and 1,000,000.

Add Indices
=========

I specifically kept indices out of the schema while I imported the data. After the databse was completed on the server I added an index to the following fields:
 - Item
   - ItemID aka MovieID
   - Name
 - Rating
   - RatingID
   - ItemID aka MovieID
   - CustomerID
   - ModelGroupID
 - Customer
   - CustomerID

In [10]:
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS item_id_index ON item (item_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS name_item_index ON item (name);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS rating_id_index ON rating (rating_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS item_id_rating_index ON rating (item_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS custome_id_rating_index ON rating (customer_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS value_rating_index ON rating (value);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS day_customer_rating_index ON rating (customer_id, day);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS day_item_rating_index ON rating (item_id, day);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS date_rating_index ON rating (date);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS day_rating_index ON rating (day);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS model_group_id_rating_index ON rating (model_group_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS customer_id_customer_index ON customer (customer_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS customer_id_customer_time_freq_index ON customer_time_freq (customer_id);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS day_customer_time_freq_index ON customer_time_freq (day);"
!sqlite3 /data/declanvk/netflix_prize.sqlite "CREATE INDEX IF NOT EXISTS freq_customer_time_freq_index ON customer_time_freq (freq);"

In [9]:
!sqlite3 /data/declanvk/netflix_prize.sqlite "ANALYZE;"
!sqlite3 /data/declanvk/netflix_prize.sqlite "VACUUM;"

Data Size
==============

The training data alone is roughly 2.4 gigabytes on my computer, imported into the sqlite db without indices it is roughly 2.8 gigabytes, after adding additional tables and precomputing counts and averages it's 4.1 gigabytes, and on the server with indices added the sqlite file is roughly 16 gigabytes.