# Bulk insert datafiles from disk to MySQL Server
Using:
* [mysql.connector](https://dev.mysql.com/doc/connector-python/en/) as driver 
* pandas for chunkwise reading of datafiles

## Setup

In [1]:
from bulk_insert import insert_table, insert_multiple_tables

In [2]:
import mysql.connector
import pandas as pd
import os

In [3]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

## create and inspect demo tables
larger example with 1 million rows per table

In [4]:
script = "demo_data/generate_demo_data.py" # script generating 3 demo tables (2 csv, 1 json)
data_path = os.getcwd() + "/demo_data/" # path to store demo tables
n = int(1e6) # number of rows for each demo table

In [5]:
%run $script {data_path} {n}

In [6]:
for table_name in ["users.csv", "products.csv", "reviews.json"]:
    if table_name.endswith(".csv"): 
        reader = pd.read_csv(data_path + table_name, sep=",", chunksize=200)
    else: 
        reader = pd.read_json(data_path + table_name, lines=True, chunksize=200)
    top_chunk = next(reader)
    top_chunk.head(3)

Unnamed: 0,user_id,username
0,1,yFnzHFPDkudEiyDq
1,2,FJNCkWZFXOpWDb
2,3,lWjADglkMatACqslNDxq


Unnamed: 0,product_id,category,price,weight
0,1,H,135.191586,6.78801
1,2,P,124.647725,1.468372
2,3,W,133.377321,5.147944


Unnamed: 0,review_id,user_id,product_id,review_text
0,1,334723,908214,text text text text text text text text text t...
1,2,995158,481647,text text text text text text text text text t...
2,3,581728,91421,text text text text text text text text text t...


## connect to mysql server and create demo database

In [7]:
db_credentials = {
"user" : "root",
"password" : "############",
"host": "localhost"
}

db = mysql.connector.connect(**db_credentials)
cursor = db.cursor()

In [8]:
cursor.execute("CREATE DATABASE demo")
cursor.execute("USE demo")

In [9]:
## helper function: execute sql code and fetch results in one step:
def exefetch(sql_code):
    cursor.execute(sql_code)
    return cursor.fetchall()

## for each table: select columns, datatypes and foreign keys

* create one dictionary per table with the most important table specifications (key = column name, value = datatype)
* column constraints like NOT NULL or PRIMARY KEY can be specified next to the datatype
* only specified columns will be inserted, remaining columns are ignored

In [10]:
users = {
    "user_id": "INT PRIMARY KEY",
    "username": "VARCHAR(20)"
}

* if multiple columns share the same datatype, you can combine their keys into one tuple<br>(like "price" and "weight" in below example)

In [11]:
products = {
    "product_id": "INT PRIMARY KEY",
    "category": "CHAR(1)",
    ("price", "weight"): "FLOAT"
}

* multiple foreign key constraints can be specified through nested dictionaries
    * the first foreign key "fk1" in the example below is fully specified
    * the second foreign key "fk2" showcases the minimal required specifications
* keys to the nested dicts ("fk1", "fk2") can be named freely, <br>
while the keys inside of the nested dicts must follow below naming conventions

In [12]:
reviews = {
    "review_id": "INT PRIMARY KEY",
    "user_id": "INT",
    "product_id": "INT",
    "review_text": "TEXT",
    "fk1": {
        "name": "fk_user_id", #name of the table constraint --> OPTIONAL
        "col": "user_id", #column serving as foreign key
        "ref": "users(user_id)", #parent table and reference column
        "upd": "RESTRICT", #behaviour on update --> OPTIONAL
        "del": "RESTRICT" #behaviour on delete --> OPTIONAL
    },
    "fk2": {
        "col": "product_id",
        "ref": "products(product_id)",
    } 
}

## insert a single table: users

In [13]:
## users
reader = pd.read_csv("demo_data/users.csv", sep=",", chunksize = 200)
insert_table(cursor, reader, "users", users)

5000it [01:20, 62.50it/s]


## insert the remaining two tables (or n tables) in one go
* the order only matters when foreign keys are involved <br>(all referenced parent tables have to be inserted first)
* foreign keys are added after table creation + insertion! <br>(make sure the referenced columns are indexed, usually 
as Primary Keys of their parent table)
* fast_fk_check = True uses a customized integrity check (Default is False)
    * Useful for VERY large tables like in this example, as regular FK checking may take a few hours
    * HOWEVER, a few conditions have to be met (else, assertion errors are raised)
    * Further explanations can be found in the docstring of "insert_table"

In [14]:
## define an adequate reader function for each filetype
def reader_csv(fpath): return pd.read_csv(fpath, sep=",", chunksize=200)
def reader_json(fpath): return pd.read_json(fpath, lines=True, chunksize=200)

## define insert_instructions for iteration
insert_instructions = {
        "products": (data_path + "products.csv", reader_csv, products),
        "reviews": (data_path + "reviews.json", reader_json, reviews)
    }

## carry out insert of multiple tables
insert_multiple_tables(cursor, insert_instructions, fast_fk_check=True)

5000it [01:58, 42.28it/s]


No foreign key definitions to check


5000it [02:39, 31.40it/s]


Fast integrity check of FK user_id in table reviews...
Fast integrity check of FK product_id in table reviews...
FK integrity check completed
adding foreign keys...
Add foreign key at: 18:45:39
Foreign key added at: 18:45:51
Add foreign key at: 18:45:51
Foreign key added at: 18:46:02
Foreign key constraints are now in place


## check if all columns have been specified as desired

In [15]:
## Info on tables
sql_statements = [
    "DESC users",
    "DESC products",
    "DESC reviews"
]

for statement in sql_statements:
    result = exefetch(statement)
    for i in result: print(i)
    print()

('user_id', 'int', 'NO', 'PRI', None, '')
('username', 'varchar(20)', 'YES', '', None, '')

('product_id', 'int', 'NO', 'PRI', None, '')
('category', 'char(1)', 'YES', '', None, '')
('price', 'float', 'YES', '', None, '')
('weight', 'float', 'YES', '', None, '')

('review_id', 'int', 'NO', 'PRI', None, '')
('user_id', 'int', 'YES', 'MUL', None, '')
('product_id', 'int', 'YES', 'MUL', None, '')
('review_text', 'text', 'YES', '', None, '')



## check if foreign keys have been specified as desired

In [16]:
fk_test = """SELECT
                TABLE_NAME, COLUMN_NAME, CONSTRAINT_NAME, REFERENCED_TABLE_NAME, REFERENCED_COLUMN_NAME
            FROM
                INFORMATION_SCHEMA.KEY_COLUMN_USAGE
            WHERE
                REFERENCED_TABLE_SCHEMA = "{0}" AND
                REFERENCED_TABLE_NAME = "{1}";"""

In [17]:
exefetch(fk_test.format("demo", "users"))[0]

exefetch(fk_test.format("demo", "products"))[0] #no constraint name was specified, hence a default name appears

('reviews', 'user_id', 'fk_user_id', 'users', 'user_id')

('reviews', 'product_id', 'reviews_ibfk_1', 'products', 'product_id')

## select and view example data

In [18]:
table = "reviews"
colnames = [row[0] for row in exefetch("SHOW COLUMNS FROM {}".format(table))]
df = pd.DataFrame(data = exefetch("SELECT * FROM {} LIMIT 1000".format(table)), columns = colnames)
df

Unnamed: 0,review_id,user_id,product_id,review_text
0,1,334723,908214,text text text text text text text text text t...
1,2,995158,481647,text text text text text text text text text t...
2,3,581728,91421,text text text text text text text text text t...
3,4,449306,331771,text text text text text text text text text t...
4,5,726816,264543,text text text text text text text text text t...
...,...,...,...,...
995,996,656380,662989,text text text text text text text text text t...
996,997,760015,762662,text text text text text text text text text t...
997,998,422908,766213,text text text text text text text text text t...
998,999,260150,439521,text text text text text text text text text t...


## commit and close connection

In [19]:
db.commit()
cursor.close()

True