# Problems:
* Decimal is unsupported -> conversion to float
* No constraints besides the unique constraints -> all checks should be performed on backend

In [13]:
from neo4j import GraphDatabase, DirectDriver
from neo4j.exceptions import ClientError

NEO_LOGIN = "neo4j"
NEO_PASSWORD = "bitnami"

uri = "bolt://localhost:7687"
driver = GraphDatabase.driver(uri, auth=(NEO_LOGIN, NEO_PASSWORD))




In [20]:
import psycopg2

PG_LOGIN = "DMD2user"
PG_PASS = "DMD2pgPass"
PG_DB = "dvdrental"

con = psycopg2.connect(database=PG_DB, user=PG_LOGIN,
                       password=PG_PASS, host="localhost", port="5432")

cursor = con.cursor()

In [18]:
cursor.execute("alter table store rename column manager_staff_id to staff_id")
con.commit()

In [15]:
from typing import List, Tuple, Optional
from decimal import Decimal

steps = 29
current_step = 1

def report(obj_name: str):
    global steps, current_step
    print(f"{obj_name} done: {current_step}/{steps}")
    current_step += 1

def make_label(s:str) -> str:
        return s.capitalize()
    
def make_var(s:str) -> str:
    return s.lower()

def fix_arg_type(arg):
    if type(arg) == Decimal:
        return float(arg)
    else:
        return arg

def transfer_table(driver: DirectDriver, cursor, tablename: str, *args, 
                   relationships: Optional[List[Tuple[str,str,str]]] = None,
                   indices:Optional[Tuple[str]] = None):
    label = make_label(tablename)
    var = make_var(tablename)
    insert_statement = f"CREATE ({var}: {label} {{ {', '.join([f'{prop}: ${prop}' for prop in args])} }})"
    sql_query = f"select {', '.join(args)} from {tablename}"
    unique = f"CREATE CONSTRAINT ON ({var}: {label}) ASSERT {var}.{tablename.lower()}_id IS UNIQUE"

    relation = "MATCH ({var_from}:{label_from}), ({var_to}:{label_to}) WHERE " \
               "{var_from}.{table_from}_id = ${table_from}_id  AND {var_to}.{table_to}_id = ${table_to}_id " \
               "CREATE ({var_from}) - [r:{relation_name}] -> ({var_to})" 
    
    remove_unused_id ="MATCH (n:{label_from})" \
                      "REMOVE n.{table_to}_id"
    
    cursor.execute(sql_query)
    with driver.session() as session:
        tx = session.begin_transaction()
        
        for row in cursor.fetchall():
            data = dict(zip(args, map(fix_arg_type, row)))
            tx.run(insert_statement, **data)
            
            if relationships is not None:
                for relationship in relationships:
                    rel = relation.format(
                        table_from=relationship[0], table_to=relationship[1],
                        var_from=make_var(relationship[0]), var_to=make_var(relationship[1]),
                        label_from=make_label(relationship[0]), label_to=make_label(relationship[1]),
                        relation_name=relationship[2])
                        
                    tx.run(relation.format(
                        table_from=relationship[0], table_to=relationship[1],
                        var_from=make_var(relationship[0]), var_to=make_var(relationship[1]),
                        label_from=make_label(relationship[0]), label_to=make_label(relationship[1]),
                        relation_name=relationship[2]
                    ), **data)
                    
        if relationships is not None:
            for relationship in relationships:
                tx.run(remove_unused_id.format(label_from=make_label(relationship[0]),
                                               table_to=relationship[1]
                                               ))
        tx.sync()
        tx.commit()
        session.run(unique)
    
    report(tablename)
    if relationships is not None:
        for relationship in relationships:
            report(f"{relationship[0]} -> {relationship[1]}")
    
def transfer_many2many_rel(driver: DirectDriver, cursor, table_name: str, table_from:str, table_to: str, 
                           relation_name:str, *args:str):
    relation = "MATCH ({var_from}:{label_from}), ({var_to}:{label_to}) WHERE " \
               "{var_from}.{table_from}_id = ${table_from}_id  AND {var_to}.{table_to}_id = ${table_to}_id " \
               "CREATE ({var_from}) - [r:{relation_name} {{{args}}} ] -> ({var_to})".format(
                table_from=table_from, table_to=table_to,
                var_from=make_var(table_from), var_to=make_var(table_to),
                label_from=make_label(table_from), label_to=make_label(table_to),
                relation_name=relation_name, args=", ".join([f"{x}:${x}" for x in args if not x.endswith("_id")])
               )
    sql_query = f"select {', '.join(args)} from {table_name}"
    cursor.execute(sql_query)
    with driver.session() as session:
        tx = session.begin_transaction()
        for row in cursor.fetchall():
            data = dict(zip(args, map(fix_arg_type, row)))
            tx.run(relation, **data)
        tx.sync()
        tx.commit()
    report(f"{table_from} -> {table_to}")

def create_index(driver: DirectDriver, tablename:str, *fields:str):
    with driver.session() as session:
        session.run(f"CREATE INDEX ON :{make_label(tablename)} "
                    f"({', '.join(fields)})")

Connected to db
# Start transfer
## Category

In [20]:
transfer_table(driver, cursor, "category", "category_id", "name", "last_update")

category done: 1/29


## Language

In [21]:
transfer_table(driver, cursor, "language", "language_id", "name", "last_update")

language done: 2/29


## Film + Film-language connection

In [22]:
transfer_table(driver, cursor, "film", "film_id", "title", "description", "release_year", "rental_duration", "rental_rate", "length", 
"replacement_cost", "rating", "last_update", "special_features", "fulltext", "language_id",
               relationships=[("film", "language", "IN_LANGUAGE")])

film done: 3/29
film -> language done: 4/29


## Category-Film

In [23]:
transfer_many2many_rel(driver, cursor, "film_category", "film", "category", "IN_CATEGORY", 
                       "film_id", "category_id", "last_update")

film -> category done: 5/29


## Actor

In [24]:
transfer_table(driver, cursor, "actor", 
               "actor_id", "first_name", "last_name", "last_update")

actor done: 6/29


## Actor->Film connection

In [25]:
transfer_many2many_rel(driver, cursor, "film_actor", "actor", "film", "FILMED_IN", 
                       "actor_id", "film_id", "last_update")

actor -> film done: 7/29


## Inventory + film<-inventory

In [26]:
transfer_table(driver, cursor, "inventory", 
               "inventory_id", "film_id", "store_id", "last_update",
               relationships=[("inventory", "film", "RENTS_FILM")])

inventory done: 8/29
inventory -> film done: 9/29


## Country

In [27]:
transfer_table(driver, cursor, "country", "country_id", "country", "last_update")


country done: 10/29


## City

In [28]:
transfer_table(driver, cursor, "city",  
               "city_id", "city", "country_id", "last_update",
               relationships=[("city", "country", "SITUATED_IN")])

city done: 11/29
city -> country done: 12/29


## Address

In [29]:
transfer_table(driver, cursor, "address",
               "address_id", "address", "address2", "district", "city_id", "postal_code", "phone", "last_update",
               relationships=[("address", "city", "SITUATED_IN")])

address done: 13/29
address -> city done: 14/29


## Customer

In [30]:
transfer_table(driver, cursor, "customer",
               "customer_id", "store_id", "first_name", "last_name", "email", "address_id", "activebool",
               "create_date", "last_update", "active",
               relationships=[("customer", "address", "LIVES_AT")])

customer done: 15/29
customer -> address done: 16/29


## Staff

In [31]:
transfer_table(driver, cursor, "staff",
               "staff_id", "first_name", "last_name", "address_id", "email", "store_id", "active",
               "username", "password", "last_update",
               relationships=[("staff", "address", "LIVES_AT")])

staff done: 17/29
staff -> address done: 18/29


## Store

In [32]:
transfer_table(driver, cursor, "store", 
               "store_id", "staff_id", "address_id", "last_update",
               relationships=[("store", "staff", "MANAGED_BY"), ("store", "address", "LOCATED_AT")])

cursor.execute("alter table store rename column staff_id to manager_staff_id")
con.commit()

store done: 19/29
store -> staff done: 20/29
store -> address done: 21/29


## Rental
Very long step

In [33]:
print("Please, be patient, following steps might take up to 10 minutes in total")
transfer_table(driver, cursor, "rental",
               "rental_id", "rental_date", "inventory_id", "customer_id", "return_date", "staff_id", "last_update",
               relationships=[("rental", "inventory", "RENTS"), 
                              ("rental", "customer", "RENTED_TO"),
                              ("rental", "staff", "RENTED_BY")])

Please, be patient, following steps might take up to 10 minutes in total


ServiceUnavailable: Failed to read from defunct connection Address(host='localhost', port=7687) (Address(host='127.0.0.1', port=7687))

## Payment
Also long step

In [None]:
transfer_table(driver, cursor, "payment",
               "payment_id", "customer_id", "staff_id", "rental_id", "amount", "payment_date",
               relationships=[("payment", "customer", "PAID_BY"), 
                              ("payment", "staff", "ACCEPTED_BY"),
                              ("payment", "rental", "PAID_FOR")])

In [22]:
def proper_type(column_name:str, column_type:str) -> str:
    if column_type in ["integer", "int", "smallint", "bigint"]:
        return f"toInteger(row.{column_name})"
    elif column_type in ["numeric", "float", "decimal"]:
        return f"toFloat(row.{column_name})"
    elif column_type in ["boolean"]:
        return f"toBoolean(row.{column_name})"
    elif column_type in ["date"]:
        return f'apoc.date.parse(row.{column_name}, \'s\', "yyyy-MM-dd")'
    elif column_type.startswith("timestamp"):
        return f"apoc.date.parse(row.{column_name})"
    else:
        return f"row.{column_name}"

def print_transfer(table_name:str):
    export = f"copy {table_name} to '/export/{table_name}.csv' DELIMITER ',' CSV HEADER;"
    cursor.execute(export)
    import_statement = "USING PERIODIC COMMIT\n" \
                       f"LOAD CSV WITH HEADERS FROM 'file:/{table_name}.csv' AS row\n" \
                       f"MERGE ({table_name.lower()}: {table_name.capitalize()} {{{table_name.lower()}_id:toInteger(row.{table_name.lower()}_id)}})\n" \
                       "\tON CREATE SET\n"

    cursor.execute("select column_name, data_type from INFORMATION_SCHEMA.COLUMNS where table_name = %s ;", (table_name,))
    
    props = []
    for column in cursor.fetchall():
        column_name, column_type = column
        if column_name.endswith("_id"):
            continue
        props.append(f"{table_name}.{column_name} = {proper_type(column_name, column_type)}")
    
    return import_statement + "\t\t{data};".format(data=",\n\t\t".join(props))

tables = ["category", "film", "language", "actor", "staff",
          "payment", "rental", "inventory",
          "customer", "address", "city", "country", "store"]

from time import time
start = time()
with driver.session() as session:
    for table in tables:
        session.run(print_transfer(table))
        # session.run(f"CREATE INDEX ON :{table.capitalize()} ({table.lower()}_id)")
print(time()-start)

# print("\n\n\n"
#       "\n".join([print_transfer(x) for x in tables]))

1.210146188735962


In [23]:
def many2many(table_name:str, from_table:str, to_table:str, rel_name:str):
    export = f"copy {table_name} to '/export/{table_name}.csv' DELIMITER ',' CSV HEADER;"
    cursor.execute(export)
    cursor.execute("select column_name, data_type from INFORMATION_SCHEMA.COLUMNS where table_name = %s ;", (table_name,))
    
    props = []
    for column in cursor.fetchall():
        column_name, column_type = column
        if column_name.endswith("_id"):
            continue
        props.append(f"{column_name}: {proper_type(column_name, column_type)}")
        
    import_statement = "USING PERIODIC COMMIT\n" \
                       f"LOAD CSV WITH HEADERS FROM 'file:/{table_name}.csv' AS row\n" \
                       f"MATCH ( {from_table} :{from_table.capitalize()} {{ {from_table}_id: {proper_type(f'{from_table}_id', 'int')} }})\n" \
                       f"MATCH ( {to_table}   :{to_table.capitalize()} {{ {to_table}_id: {proper_type(f'{to_table}_id', 'int')} }})\n" \
                       f"MERGE ( {from_table}) - [:{rel_name.upper()}{{ {','.join(props)} }}] -> ({to_table});"
    print(import_statement)

many2many("film_category", "film", "category", "IN_CATEGORY")
print("")
many2many("film_actor", "actor", "film", "FILMED_IN")

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:/film_category.csv' AS row
MATCH ( film :Film { film_id: toInteger(row.film_id) })
MATCH ( category   :Category { category_id: toInteger(row.category_id) })
MERGE ( film) - [:IN_CATEGORY{ last_update: apoc.date.parse(row.last_update) }] -> (category);

USING PERIODIC COMMIT
LOAD CSV WITH HEADERS FROM 'file:/film_actor.csv' AS row
MATCH ( actor :Actor { actor_id: toInteger(row.actor_id) })
MATCH ( film   :Film { film_id: toInteger(row.film_id) })
MERGE ( actor) - [:FILMED_IN{ last_update: apoc.date.parse(row.last_update) }] -> (film);


In [26]:
from time import time
def transfer_relation(table_name:str, to_table:str, rel_name:str, last_update_to_rel: bool = True, fk:Optional[str] = None):
    if fk is None:
        fk = f"{to_table}_id"
    export = f"copy (select {table_name}_id, {fk} as {to_table}_id " \
             f"{', last_update' if last_update_to_rel else ''}" \
             f" from {table_name}) to '/export/{table_name}_{to_table}_rel.csv' DELIMITER ',' CSV HEADER;"
    cursor.execute(export)
    
    props = []
    if last_update_to_rel:
        props.append(f"last_update: {proper_type('last_update', 'timestamp')}")
    
    import_statement = "USING PERIODIC COMMIT\n" \
                       f"LOAD CSV WITH HEADERS FROM 'file:/{table_name}_{to_table}_rel.csv' AS row\n" \
                       f"MATCH ( {table_name} :{table_name.capitalize()} {{ {table_name}_id: {proper_type(f'{table_name}_id', 'int')} }})\n" \
                       f"MATCH ( {to_table}   :{to_table.capitalize()} {{ {to_table}_id: {proper_type(f'{to_table}_id', 'int')} }})\n" \
                       f"MERGE ( {table_name}) - [:{rel_name.upper()}{{ {','.join(props)} }}] -> ({to_table});"
    # print(import_statement)
    with driver.session() as session:
        session.run(import_statement)

start = time()
transfer_relation("film", "language", "IN_LANGUAGE")
transfer_relation("inventory", "film", "RENTS_FILM")
transfer_relation("rental", "inventory", "RENTS")
transfer_relation("rental", "customer", "RENTED_TO")
transfer_relation("rental", "staff", "RENTED_BY")
transfer_relation("payment", "rental", "PAID_FOR", last_update_to_rel=False)
transfer_relation("payment", "customer", "PAID_BY", last_update_to_rel=False)
# transfer_relation("payment", "staff", "ACCEPTED_BY", last_update_to_rel=False)
transfer_relation("customer", "address", "LIVES_AT")
transfer_relation("staff", "address", "LIVES_AT")
transfer_relation("store", "address", "LOCATED_AT")
transfer_relation("address", "city", "SITUATED_IN")
transfer_relation("city", "country", "SITUATED_IN")
transfer_relation("store", "staff", "MANAGED_BY", fk="manager_staff_id")


print(time()-start)

8.670267343521118
