# Storing IMDB DATASETS to PostgreSQL 

In this notebook, I prepared a pipeline to dump IMDB data to Postgres DB through Python. But as a requirement it is needed to install Postgres to your local machine and create a DB as well as a user to upload all datasets as tables. This notebook does not cover this part. 

Of course, the datasets can be imported directly to Postgres by using ***CREATE TABLE*** and ***COPY*** commands or just clicking from available menus through pgAdmin IDE. But the main idea in here is to split them into chunks and manupilate datasets before dumping them. 

After creation of DB and extracting downloaded files from IMDB data page, you can use this notebook to create tables and load data into that tables separately. The main drawback of IMDB dataset is its volume. So I used pandas in chunks for reading some of the tables and used Dask for some others to show both options. 

I downloaded publicly published IMDB dataset from this [link](https://datasets.imdbws.com/), in which includes 7 different datasets as tsv files. Explanalations of the data can be investigated through this [page](https://www.imdb.com/interfaces/). 

*** Please do not forget to change FILE DIRECTORY, DB NAME, USERNAME AND PASSWORD with your own ***

In [1]:
import psycopg2
import pandas as pd
import csv, os, io
from io import StringIO
from sqlalchemy import create_engine
from dask import dataframe as dd


In [2]:
conn = psycopg2.connect("dbname= <your DB name> user=<your username>  password=<your password> ")

In [3]:
cur = conn.cursor()
        
# execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')

# display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)

# close the communication with the PostgreSQL
cur.close()

PostgreSQL database version:
('PostgreSQL 12.8 (Ubuntu 12.8-0ubuntu0.20.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit',)


In [4]:
#  I got this function from this webpage to solve to upload big data problem 
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method


def psql_insert_copy(table, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

In [6]:
for datafile in [x for x in os.listdir('/home/burcin/Downloads') if x.endswith('tsv') and x not in ['titleakas.tsv', 'titleprincipals.tsv']]:
    dataname = datafile.split('.')[0]
    # read data
    df = pd.read_csv('~/Downloads/'+datafile, sep= '\t',chunksize=10000)
    pd_df = pd.concat(df)
    pd_df = pd_df.replace( '\\N', '')
    # load data to DB
    conn = psycopg2.connect("dbname= <your DB name> user=<your username>  password=<your password> ")
    engine = create_engine('postgresql://<your username>:<your password>@localhost:5432/<your DB name')
    pd_df.to_sql(dataname, engine, method=psql_insert_copy)
    #control operation
    cur = conn.cursor()

    cur.execute("""
    select count(*) from """+dataname+"""
                """)

    datacount = cur.fetchone()
    print(dataname, 'has number of rows:' , datacount)

    cur.close()
    
    # delete dataframe to lighten memory
    del pd_df

titlecrew has number of : (8285162,)
titleepisode has number of : (6079951,)
namebasics has number of : (11238430,)
titleratings has number of : (1187847,)
titlebasics has number of : (8285162,)


In [38]:
for datafile in [x for x in os.listdir('/home/burcin/Downloads') if x.endswith('tsv') and x in ['titleakas.tsv', 'titleprincipals.tsv']]:
    dataname = datafile.split('.')[0]
    # read data
    
    if datafile == 'titleakas.tsv':
        dask_df = dd.read_csv('~/Downloads/'+ datafile, sep = '\t', dtype={'isOriginalTitle': 'object'})
        dask_df = dask_df.replace( '\\N', '')
    else :
        dask_df = dd.read_csv('~/Downloads/'+ datafile, sep = '\t')
        dask_df = dask_df.replace( '\\N', '')        

    
    # create empty table in DB
    
    engine = create_engine('postgresql://<your username>:<your password>@localhost:5432/<your DB name')
    
    pd.DataFrame(columns=dask_df.columns).to_sql(
        dataname, 
        con=engine, 
        if_exists='replace', 
        index=False)

    
    # load data to DB
    cur = conn.cursor()
    for n in range(dask_df.npartitions):    
        table_chunk = dask_df.get_partition(n).compute()
        output = io.StringIO()
        table_chunk.to_csv(output, sep='\t', header=False, index=False)
        output.seek(0)
        try:
            cur.copy_from(output, dataname, null='')
        except Exception:
            err_tables.append(table_chunk)
            conn.rollback()
            continue
        conn.commit()

        
    # check if data loaded to DB    
    cur.execute("""
    select count(*) from """+dataname+"""
                """)

    datacount = cur.fetchone()
    print(dataname, 'has number of rows:' , datacount)

    cur.close()
    
    # delete dataframe to lighten memory
    del dask_df

titleprincipals has number of rows: (46886580,)
titleakas has number of rows: (29186753,)
