## <a id='toc1_1_'></a>[TSV to Postgress DB](#toc0_)
Features:

- Unzipping gzip (gz) files using chunks and temporary files to prevent memory errors.
- Determining column types for the tables.
- Providing an option to alter DataFrame data to the proper list format for the PostgreSQL database when the column type is a list.
- Creating Pandas DataFrames in chunks for efficient memory usage.
- Uploading DataFrames as tables to a PostgreSQL database in chunks.
- Offering an option to alter PostgreSQL table types.
- Running a specific IMDb script that performs useful modifications to the IMDb database, including:
    - Creating lookup tables (if not created during the database upload).
    - Refining column data types.
    - Remove rows from the lookup tables where the corresponding unique identifiers are no longer present in their respective tables.
    - Adding primary and foreign keys to tables.
    - Dropping the title_crew column and moving its contents to the lookup tables.

**Table of contents**<a id='toc0_'></a>    
- [TSV to Postgress DB](#toc1_1_)    
      - [Initialization](#toc1_1_1_1_)    
      - [Connect or create and connect to the Postgres DB](#toc1_1_1_2_)    
      - [unzip gz file and read in chunks to avoid memory error](#toc1_1_1_3_)    
      - [also temporary save of unzip file needed to avoid memory error](#toc1_1_1_4_)    
      - [Create df with the tsv header only](#toc1_1_1_5_)    
      - [Read random lines from the tsv_temp](#toc1_1_1_6_)    
        - [Create random sample list with line numbers](#toc1_1_1_6_1_)    
        - [Create df using the random_lines sample list](#toc1_1_1_6_2_)    
      - [Copy df to ensure the original not be effected (just for experimental purpose)](#toc1_1_1_7_)    
      - [Change the columns with the `list` type to a proper postgres list format](#toc1_1_1_8_)    
      - [Check datatypes function](#toc1_1_1_9_)    
      - [Create df with the cell types](#toc1_1_1_10_)    
      - [Summerize the result which boils down to one data types for every columns](#toc1_1_1_11_)    
      - [Map out a df with cell lengths](#toc1_1_1_12_)    
      - [Create 2 rows in the temp_df to indicate the length of the cells](#toc1_1_1_13_)    
      - [Final temp_df](#toc1_1_1_14_)    
      - [Alter table query creator code](#toc1_1_1_15_)    
      - [Code for change the postgress db table with the alter_query](#toc1_1_1_16_)    
      - [so far read df in one go was ok](#toc1_1_1_17_)    
      - [Insert df as chunks to db table to avoid memory error](#toc1_1_1_18_)    
      - [Progress bar samples](#toc1_1_1_19_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

#### <a id='toc1_1_1_1_'></a>[Initialization](#toc0_)

In [48]:
# For jupyter notebbok on VSC
# %pip install --upgrade jupyter ipywidgets
# %jupyter nbextension enable --py widgetsnbextension

In [1]:
import os
import gzip
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, text
from sqlalchemy.exc import OperationalError
from sqlalchemy_utils import create_database
# from tqdm import tqdm
# from tqdm.notebook import tqdm
from tqdm.auto import tqdm
import random

tsv_directory = 'tsv'
gz_directory = 'gz'

# Check if the 'tsv' directory exists
if not os.path.exists(tsv_directory):
    print(f"Directory '{tsv_directory}' does not exist.")
    quit()

# username = input('Postgres username: ')
# password = input('Postgres password: ')
# host = input('Localhost: ')
# port = input('Port: ')
# dbase = input('Database: ')

username = 'postgres'
password = '123456'
host = 'localhost'
port = '5433'
dbase = 'imdb_db'

file_chunk_length = 1024*1024*25 # in bytes 1024*1024*500
sample_size = 50 # in rows
chunk_size = 10000 # in rows 200000

rectify_data_types = 1
pgres_list_rectif = 0
drop_tables = True # if True earlier table with the same name will be dropped, False means append
create_lookup = 1 # works only if rectify_data_types = 1


#### <a id='toc1_1_1_2_'></a>[Connect or create and connect to the Postgres DB](#toc0_)

In [2]:
try:
    engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{dbase}')
    # Initialize MetaData
    metadata = MetaData()
    metadata.reflect(bind=engine)
    print(f"Connected to database: '{dbase}'.")
except OperationalError:
    print(f"Database '{dbase}' does not exist. Creating...")
    create_database(engine.url)
    engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{dbase}')
    metadata = MetaData()
    metadata.reflect(bind=engine)
    print(f"Connected to the created database: '{dbase}'.")

Connected to database: 'imdb_db'.


#### <a id='toc1_1_1_6_'></a>[Read random lines from the tsv_temp](#toc0_)

##### <a id='toc1_1_1_6_1_'></a>[Create random sample list with line numbers](#toc0_)

In [3]:
def random_sampling(total_lines, sample_size): 
    random_lines = random.sample(range(1, total_lines), sample_size)
    random_lines = [0] + random_lines
    return random_lines

##### <a id='toc1_1_1_6_2_'></a>[Create df using the random_lines sample list](#toc0_)

In [4]:
def sample_df():
    df = pd.read_csv(tsv_temp, sep='\t', skiprows=lambda x: x not in random_lines, header=0)
    return df

#### <a id='toc1_1_1_8_'></a>[Change the columns with the `list` type to a proper postgres list format](#toc0_)
Measure required time too.

In [5]:
def pgres_list_gen(df):
    # start_t = time.time()
    for col in (df_result.columns):
        if df_result[col][0] == 'list':
            df[col] = df[col].apply(lambda x: '{'+', '.join(f'"{i}"' for i in x.split(','))+'}' if x else None)
        if df_result[col][0] == 'list[]':
            df[col] = df[col].apply(lambda x: '{' + f'{x[1:-1]}' + '}' if x else None)
    # end_t = time.time()
    # print(f"Execution time: {round((end_t - start_t), 4)} seconds")
    return df

def change_str_to_list_in_df(df):
    df = df.map(lambda x: None if x == '\\N' or x == 'NaN' else x)
    start_t = time.time()
    for col in (df_result.columns):
        if df_result[col][0] == 'list':
            df[col] = df[col].astype(str).str.split(',')
        if df_result[col][0] == 'list[]':
            df[col] = df[col].map(lambda x: [x[2:-2]] if x else None)
    print(f"Execution time: {round((end_t - start_t), 4)} seconds")
    return df

#### <a id='toc1_1_1_9_'></a>[Check datatypes function](#toc0_)
Checks datatypes of the df columns using the following logic:
* checks if the cell is `\N` or `NaN` alias empty cell and if it is return `None`
* checks if the cell is a `string`,
    - if ',' is in the str it considers as a `list`
    - if the first character is a '-' it checks if it is a negativ  integer and if it is it marks as `integer-`
else it checks if it a positiv integer
    - tries if the str is a `float` if it is returns `floatnr` nr is the total decimals
    - if all above guesses were `False` returns `string`
* checks if the cell is an `integer`
* checks if the cell is an `float` if it is returns the same as the str float check
* checks if the cell is a `list`
* if all the above failed tries to convert the cell type to `string` and check again everything
* if all fails returns `nan`

In [6]:
def check_datatype(cell):
    if cell:
        if isinstance(cell, str):
            if cell == '\\N' or cell == 'NaN':
                return None
            try:
                if len(cell.split(',')) > 1 and len(cell.split(', ')) == 1:
                    # return f'list_s{len(cell.split(','))}'

                    return 'list'
            except:
                pass
            try:
                if cell[0] == '[' and cell[-1] == ']':
                    return 'list[]'
            except:
                pass
            try:
                if cell[0] == '-':
                    if cell[1:].isdigit():
                        return 'integer-'
                if cell.isdigit():
                    return 'integer'
            except:
                pass
            try:
                float(cell)
                return f'float{len(str(cell).split('.')[1])}'
            except:
                pass
            return 'string'
        elif isinstance(cell, int):
            return 'integer'
        elif isinstance(cell, float):
            if pd.isna(cell):
                return None
            else:
                return f'float{len(str(cell).split('.')[1])}'
        elif isinstance(cell, list):
            return f'list{len(cell)}'
        else:
            try:
                return check_datatype(str(cell))
            except:
                pass
                return 'nan'  # Handle other data types
    else:
        return None


#### <a id='toc1_1_1_10_'></a>[Create df with the cell types](#toc0_)

#### <a id='toc1_1_1_11_'></a>[Summerize the result which boils down to one data types for every columns](#toc0_)
Logic:
* if `None` is found in the list of types removes it
* if `integer-` is found in the list changes the list to `[integer-]`, means that the column probably contains signed integers

At the end the goal is to have a list with one element for all the columns

#### <a id='toc1_1_1_12_'></a>[Map out a df with cell lengths](#toc0_)

#### <a id='toc1_1_1_13_'></a>[Create 2 rows in the temp_df to indicate the length of the cells](#toc0_)
* 1st row indicates if all cells are the same length `equal` or different `max`
* 2nd row indicates the length of the cell, if the above cell in the same column is max it means indicates the `max` length

#### <a id='toc1_1_1_14_'></a>[df_analysis](#toc0_)
column names are the same as in the original df
* 1st row shows the column data types
* 2nd row is the indicator if the cell lengths are equal or not in the given column
* 3rd indicates the length or max length

In [7]:
def df_analysis():
    df_result = pd.DataFrame(columns=df_lengths.columns).astype(str)
    df_result.loc[len(df_result)] = ['a' for i in range(df_result.shape[1])]

    for col in df_types.columns:
        # if d_types[col].unique() != None:
        unique_values = list(df_types[col].unique())
        if len(unique_values) > 1:
            for _ in unique_values:
                if df_result.loc[0, col] != 'list[]':
                    if _  == None:
                        continue
                    if _ == 'list[]':
                        df_result.loc[0, col] = _
                        break
                    if df_result.loc[0, col] != 'list':
                        if _ == 'list':
                            df_result.loc[0, col] = _
                            continue
                        if 'float' not in df_result.loc[0, col]:
                            if 'float' in _:
                                df_result.loc[0, col] = _
                                continue
                            if df_result.loc[0, col] != 'integer-':
                                if _ == 'integer-':
                                    df_result.loc[0, col] = _
                                    continue
                                if df_result.loc[0, col] != 'integer':
                                    if _ == 'integer':
                                        df_result.loc[0, col] = _
                                        continue
                                    if _ == 'string':
                                        df_result.loc[0, col] = _
        else:
            if unique_values[0] == None:
                df_result.loc[0, col] = 'string'
            else:
                df_result.loc[0, col] = unique_values[0]

    df_result.loc[len(df_result)] = ['equal' for i in range(df_result.shape[1])]
    df_result.loc[len(df_result)] = [0 for i in range(df_result.shape[1])]

    for column in df_lengths.columns:
        # Find the maximum and minimum lengths in each column
        max_length = df_lengths[column].max()
        min_length = df_lengths[df_lengths[column] != 0][column].min()
        
        if max_length == 1:
            list_unique = df_sample[col].unique()
            if len(list_unique) == 2 and 0 in list_unique and 1 in list_unique:
                df_result.loc[0, col] = 'bool'
        elif max_length == min_length:
            df_result.loc[2, column] = str(max_length)
        elif not max_length:
            df_result.loc[1, column] = 'NaN'
            df_result.loc[2, column] = '10'
        else:
            df_result.loc[1, column] = 'max'
            df_result.loc[2, column] = str(max_length)
    return df_result

#### <a id='toc1_1_1_15_'></a>[Alter table query creator code](#toc0_)
Creates the Altering table query for postgres to alter the uploaded table columns' types based on the temp_df contents
Logic:
* `string` if `equal` be `VARCHAR(length + 2)` else `VARCHAR(max length + 100)`
* `integer` if less than 5 digits be `SMALLINT` else `INT`
* `list` be `VARCHAR[]`
* `float` with any decimal places be `DOUBLE PRECISION`
* `bool` will be `boolean`
* none of the above be `TEXT`

In [8]:
def db_query_gen_alter_col_types(df_result, table_name):
    alter_query = f'ALTER TABLE {table_name} '
    alter_query_2 = ''
    for nr, col in enumerate(df_result.columns):
        col_cells = df_result[col]
        if col_cells[0] == 'string':
            if col_cells[1] == 'equal':
                col_type = f'VARCHAR({int(df_result[col][2])+2}) USING "{col}"::VARCHAR({int(df_result[col][2])+2})'
            elif col_cells[1] =='NaN':
                col_type = f'VARCHAR({int(df_result[col][2])}) USING "{col}"::VARCHAR({int(df_result[col][2])})'
            else:
                col_type = f'VARCHAR({int(df_result[col][2])+100}) USING "{col}"::VARCHAR({int(df_result[col][2])+100})'
        elif col_cells[0] == 'integer':
            if int(col_cells[2]) < 3:
                col_type = f'SMALLINT USING "{col}"::smallint'
            else:
                col_type = f'INT USING "{col}"::integer'
        elif 'list' in col_cells[0] and pgres_list_rectif:
                col_type = f'VARCHAR[] USING "{col}"::varchar[]'
        elif 'float' in col_cells[0]:
            col_type = f'DOUBLE PRECISION USING "{col}"::double precision'
        elif col_cells[0] == 'bool':
            alter_query_2 = f'ALTER TABLE {table_name} ADD COLUMN new_boolean_column BOOLEAN;\n\
                ALTER TABLE {table_name} ALTER COLUMN "{col}" TYPE INT USING "{col}"::integer;\n\
                UPDATE {table_name} SET new_boolean_column = ("{col}" <> 0 AND "isOriginalTitle" IS NOT NULL);\n\
                ALTER TABLE {table_name} DROP COLUMN "{col}";\n\
                ALTER TABLE {table_name} RENAME COLUMN new_boolean_column TO "{col}";\n'
            col_type = ''
            continue
        else:
            col_type = 'TEXT'
        if nr == len(df_result.columns) - 1:
            alter_query += f'ALTER COLUMN "{col}" TYPE {col_type}'
        else:
            alter_query += f'ALTER COLUMN "{col}" TYPE {col_type}, '

    if alter_query_2:
        alter_query = alter_query[:-2] + ';\n' + alter_query_2
        
    return alter_query

### Main cell to upload the tables to the database

In [None]:
# import sys

files = [f for f in os.listdir(tsv_directory) if f.endswith('.tsv')] or [f for f in os.listdir(tsv_directory) if f.endswith('.gz')] 
file_lines = {}
if '.tsv' in files[0]:
    try:
        with open('app_data', 'r', encoding='utf-8') as file:
            for _ in file.read().split('\n')[:-1]:
                file_lines[_.split()[0][:-1]] = int(_.split()[1])
    except:
        pass
progress = tqdm(files, desc=f"", ncols=800)
for _ in progress:
    total_lines = 0
    tsv_path = os.path.join(tsv_directory,_)
    if _[-3:] == 'tsv':
        tsv_temp = tsv_path
        try:
            total_lines = file_lines[tsv_temp]
        except:
            pass
    else:
        tsv_temp = f"{tsv_directory}/{_.rsplit('.', 1)[0]}"
    table_name = tsv_temp[4:-4].replace('.', '_')
    progress.set_description(f"Processing {table_name}, total lines: {total_lines} lines.")
    if table_name in metadata.tables and drop_tables:
        Table(table_name, metadata, autoload_with=engine).drop(engine)
        # print(f"Table {table_name} dropped.")
    if not total_lines:
        if _[-3:] == 'tsv':
            with open(tsv_path, 'r', encoding='utf-8') as file:
                while True:
                    chunk = file.read(file_chunk_length)
                    if not chunk:
                        break
                    total_lines += chunk.count('\n')
                    progress.set_description(f"Processing {table_name}, total lines: {total_lines} lines.")
        else:
            with gzip.open(tsv_path, 'rb') as f:
                f.seek(0, 2)  # Move to the end of the file
                uncompressed_size = f.tell()
            with gzip.open(tsv_path, 'rt', encoding='utf-8') as tsv_in, open(tsv_temp, 'w', encoding='utf-8') as tsv_out:
                # while True:
                total_nr_of_iter = uncompressed_size // file_chunk_length + 1
                if uncompressed_size % file_chunk_length == 0:
                    total_nr_of_iter -= 1
                progress_1 = tqdm(range(total_nr_of_iter), desc=f"Creating {tsv_temp} temp file total lines: {total_lines} lines.", ncols=800)
                for _ in progress_1:
                    content = tsv_in.read(file_chunk_length)
                    # progress_1.set_description(f"Creating {tsv_temp}temp file")
                    total_lines += content.count('\n')
                    progress_1.set_description(f"Creating {tsv_temp} temp file total lines: {total_lines} lines.")
                    progress.set_description(f"Processing {table_name}, total lines: {total_lines} lines.")
                    tsv_out.write(content)
    # rectify data types of the columns
    if rectify_data_types:
        # print(f"Checking data types")
        random_lines = random_sampling(total_lines, sample_size)
        df_sample = pd.read_csv(tsv_temp, sep='\t', skiprows=lambda x: x not in random_lines, header=0)
        df_types = df_sample.map(lambda x: (check_datatype(x)))
        df_lengths = df_sample.map(lambda x: len(str(x)) if x not in (r'\N', 'NaN') else 0)
        df_result = df_analysis()

    # Create a TextFileReader object using read_csv with the chunksize parameter
    df_chunks = pd.read_csv(tsv_temp, sep='\t', chunksize=chunk_size)

    # Process each chunk
    total_nr_of_iter = total_lines // chunk_size + 1
    if  total_lines % chunk_size == 0:
        total_nr_of_iter -= 1
    progress_2 = tqdm(df_chunks, desc=f"", ncols=800, total=total_nr_of_iter)
    check_is_done = 0

    lookup_queries = {}
    for df_chunk in progress_2:
        progress_2.set_description(f"Creating and uploading table in chunks of {chunk_size} rows.")
        # rectify list datatye (it is separated since it takes time) NOT WORKING
        df_chunk = df_chunk.drop(df_chunk[df_chunk[df_chunk.columns[0]] == df_chunk.columns[0]].index)
        if create_lookup and rectify_data_types:
            lookups = {}
            # df_result_lookup = pd.DataFrame()
            for col in df_result.columns:
                if df_result[col][0] == 'list':
                    df_raw = df_chunk[col].str.split(',', expand=True).stack().reset_index(level=1, drop=True).rename(col)
                    if 'nconst' in df_result:
                        colm = 'nconst'
                    if 'tconst' in df_result:
                        colm = 'tconst'
                    lookups[f'{col.lower()}_{table_name}_lookup'] = pd.merge(df_chunk[[colm]], df_raw, left_index=True, right_index=True).map(lambda x: None if x == '\\N' or x == 'NaN' else x)
            for key, value in lookups.items():
                value.to_sql(key, con=engine, if_exists='append', index=False)
                if not check_is_done:
                    random_lines = random_sampling(df_chunk.shape[0], sample_size)
                    df_sample = value.iloc[random_lines]
                    df_types = df_sample.map(lambda x: (check_datatype(x)))
                    df_lengths = df_sample.map(lambda x: len(str(x)) if x not in (r'\N', 'NaN') else 0)
                    # if df_result_lookup.empty:
                    df_result_lookup = df_analysis()
                    lookup_queries[key] = db_query_gen_alter_col_types(df_result_lookup, key)
                    # else:
                    #     df_result_lookup[df_analysis().iloc[:, 1].name] = df_analysis().iloc[:, 1]
            check_is_done = 1
        df_chunk = df_chunk.map(lambda x: None if x == '\\N' or x == 'NaN' else x)
        if pgres_list_rectif:
            progress_2.set_description(f"Rectifying list type in {table_name}")
            df_chunk = pgres_list_gen(df_chunk) # for test 5.74 it/s
            # df_chunk = change_str_to_list_in_df(df_chunk) # for test 4.96 it/s
        # insert df chunk
        df_chunk.to_sql(table_name, con=engine, if_exists='append', index=False)
        
    if rectify_data_types:
        progress.set_description(f"Processing {table_name}, rectifying the data types of the columns.")
        for _ in tqdm(range(1), desc=f"Rectifying col data types of {table_name} in database", ncols=800):
            alter_query = db_query_gen_alter_col_types(df_result, table_name)
            # special case for title_basics in order to avoid 'invalid input syntax for type integer: "Reality-TV"'
            if table_name == 'title_basics':
                alter_query = alter_query.replace('ALTER COLUMN "runtimeMinutes" TYPE INT USING "runtimeMinutes"::integer',
                                                'ALTER COLUMN "runtimeMinutes" TYPE VARCHAR(10) USING "runtimeMinutes"::VARCHAR(10)')
            with engine.connect().execution_options(autocommit=True) as conn:
                with conn.begin() as trans:
                    conn.execute(text(alter_query))
            if create_lookup and rectify_data_types:
                for key, value in lookup_queries.items():
                    with engine.connect().execution_options(autocommit=True) as conn:
                        with conn.begin() as trans:
                            conn.execute(text(value))
    check_is_done = 0
    file_lines[tsv_temp] = total_lines
    # if tsv_temp == 'tsv\\s_title.basics.tsv':
    #     sys.exit()
    # if table_name == 's_title_akas':
    #     sys.exit()
    progress.set_description(f"Process done.")
with open('app_data', 'w') as file:
    for key, value in file_lines.items():
        file.write(f'{key}: {value}\n')

# Disconnect the engine
engine.dispose()

#### Run the `schema.sql` file against the database
<p style="margin-top: -1rem;">or any other SQL schema file.</p>

In [None]:
# Read the SQL script file (schema.sql, for example)
with open('schema.sql', 'r') as file:
    sql_script = file.read()

# Execute the SQL script using the engine
with engine.connect() as connection:
    connection.execute(text(sql_script))

sending a query to the database

In [None]:
sql = "SELECT * FROM name_basics"
with engine.connect().execution_options(autocommit=True) as conn:
    with conn.begin() as trans:
        conn.execute(text(sql))