# Database Creation

In [None]:
import psycopg2 # PostgreSQL database adapter for the Python programming language
from psycopg2 import sql # SQL module for psycopg2 that supports advanced functionality and datatypes
import pandas as pd
import os
import glob
import getpass # Import the getpass module to hide the password input

### Connect to PostgreSQL

In [None]:
# Define the directory where output Parquet files will be stored.
directory = r""

# Define PostgreSQL connection details
db_config = {
    "dbname": getpass.getpass("Enter your database name: "),
    "user" : getpass.getpass("Enter your PostgreSQL username: "),
    "password": getpass.getpass("Enter your PostgreSQL password: "),
    "host": getpass.getpass("Enter your PostgreSQL host name: "),  
    "port":  getpass.getpass("Enter your PostgreSQL PostgreSQL port number: ")
}

# Establish a connection
try:
    conn = psycopg2.connect(**db_config)
    print("Connected to PostgreSQL successfully!")
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
    conn = None

### Drop an existing database and create a new one

In [None]:
# Drop & Create Tables
if conn:
    try:
        with conn.cursor() as cursor: # Create a cursor object to execute queries. Cursor is a control structure used to traverse and fetch records from the database.
            print("Dropping existing tables...")
            cursor.execute('DROP TABLE IF EXISTS bridge_articles_search_terms CASCADE;') # CASCADE is used to drop all dependent objects.
            cursor.execute('DROP TABLE IF EXISTS bridge_articles_authors CASCADE;')
            cursor.execute('DROP TABLE IF EXISTS fact_articles CASCADE;')
            cursor.execute('DROP TABLE IF EXISTS dim_authors CASCADE;')
            cursor.execute('DROP TABLE IF EXISTS dim_search_terms CASCADE;')
            cursor.execute('DROP TABLE IF EXISTS dim_journals CASCADE;')
            cursor.execute('DROP TABLE IF EXISTS dim_titles CASCADE;')

            print("Creating tables...")

            cursor.execute('''
                CREATE TABLE dim_titles (
                    title_id SERIAL PRIMARY KEY,
                    title TEXT UNIQUE NOT NULL
                );
            ''')

            cursor.execute('''
                CREATE TABLE dim_journals (
                    journal_id SERIAL PRIMARY KEY,
                    journal_name TEXT UNIQUE NOT NULL
                );
            ''')

            cursor.execute('''
                CREATE TABLE dim_search_terms (
                    search_term_id SERIAL PRIMARY KEY,
                    search_term TEXT UNIQUE NOT NULL
                );
            ''')

            cursor.execute('''
                CREATE TABLE dim_authors (
                    author_id SERIAL PRIMARY KEY,
                    author_name TEXT UNIQUE NOT NULL
                );
            ''')

            cursor.execute('''
                CREATE TABLE fact_articles (
                    article_id SERIAL PRIMARY KEY,
                    pmid TEXT UNIQUE NOT NULL,
                    publication_date DATE,
                    title_id INTEGER REFERENCES dim_titles(title_id),
                    journal_id INTEGER REFERENCES dim_journals(journal_id)
                );
            ''')

            cursor.execute('''
                CREATE TABLE bridge_articles_authors (
                    article_id INTEGER REFERENCES fact_articles(article_id) ON DELETE CASCADE,
                    author_id INTEGER REFERENCES dim_authors(author_id) ON DELETE CASCADE,
                    PRIMARY KEY (article_id, author_id)
                );
            ''')

            cursor.execute('''
                CREATE TABLE bridge_articles_search_terms (
                    article_id INTEGER REFERENCES fact_articles(article_id) ON DELETE CASCADE,
                    search_term_id INTEGER REFERENCES dim_search_terms(search_term_id) ON DELETE CASCADE,
                    PRIMARY KEY (article_id, search_term_id)
                );
            ''')

            conn.commit()
            print("Database schema updated successfully!")

    except Exception as e:
        print(f"Error creating database schema: {e}")

Dropping existing tables...
Creating tables...
Database schema updated successfully!


### Load a Parquet file

In [None]:
# Find the newest Parquet file in the directory
try:
    files = [file for file in glob.glob(os.path.join(directory, "*.parquet"))] # Get a list of all Parquet files in the directory. *.parquet is a wildcard that matches all files with the .parquet extension.

    if not files:
        raise FileNotFoundError("No valid Parquet files found in the directory.")

    newest_file = max(files, key=os.path.getmtime) # Get the newest file based on last modified time
    print(f"Newest Parquet file detected: {newest_file}")

except Exception as e:
    print(f"Error finding the newest file: {e}")
    newest_file = None

# Load the newest Parquet file into a Pandas DataFrame
if newest_file:
    try:
        df_input = pd.read_parquet(newest_file)
        print("Parquet file loaded successfully!")
    except Exception as e:
        print(f"Error loading Parquet file: {e}")
        df_input = None

# Ensure correct data types
if df_input is not None:
    df_input['PMID'] = df_input['PMID'].astype(str)  # Ensure PMID is string type
    df_input.fillna("Unknown", inplace=True)  # Fill missing values

# Insert Data into PostgreSQL
if df_input is not None:
    try:
        conn = psycopg2.connect(**db_config) # Reconnect to ensure connection is still open. **db_config unpacks the dictionary into keyword arguments for the connect method of psycopg2 module. 
        with conn.cursor() as cursor: # Use the cursor object to interact with the database
            print("Inserting data into tables...")

            # Insert into dim_titles table. ON CONFLICT DO NOTHING prevents duplicate entries. 
            # %s is a placeholder for the values that will be inserted into the table.
            for title in df_input['Title'].unique():
                cursor.execute('''
                    INSERT INTO dim_titles (title)
                    VALUES (%s)
                    ON CONFLICT (title) DO NOTHING;
                ''', (title,))

            # Insert into dim_journals
            for journal in df_input['Journal'].unique():
                cursor.execute('''
                    INSERT INTO dim_journals (journal_name)
                    VALUES (%s)
                    ON CONFLICT (journal_name) DO NOTHING;
                ''', (journal,))

            # Insert into dim_search_terms
            for term in df_input['Search Term'].unique():
                cursor.execute('''
                    INSERT INTO dim_search_terms (search_term)
                    VALUES (%s)
                    ON CONFLICT (search_term) DO NOTHING;
                ''', (term,))

            # Insert into dim_authors
            for author in df_input['Author'].unique():
                cursor.execute('''
                    INSERT INTO dim_authors (author_name)
                    VALUES (%s)
                    ON CONFLICT (author_name) DO NOTHING;
                ''', (author,))

            # Insert into fact_articles table with the retrieved IDs from the dim_titles and dim_journals tables. ON CONFLICT DO NOTHING prevents duplicate entries.
            for _, row in df_input.iterrows(): # Iterates over each row of the DataFrame containing article data. df_input.iterrows() returns (index, row). The _ is a placeholder for the index of the row, because the index in not used for SQL insertion.
                
                cursor.execute('SELECT journal_id FROM dim_journals WHERE journal_name = %s;', (row['Journal'],)) # Retrieve the journal_id from the dim_journals table.
                journal_id = cursor.fetchone()[0] # Fetch the first result from the query. Fetchone() retrieves the next row of a query result set, returning a tuple. [0] extracts the ID from the returned tuple

                cursor.execute('SELECT title_id FROM dim_titles WHERE title = %s;', (row['Title'],))
                title_id = cursor.fetchone()[0]

                # Insert into fact_articles table with the retrieved IDs from the dim_titles and dim_journals tables.
                cursor.execute('''
                    INSERT INTO fact_articles (pmid, title_id, publication_date, journal_id)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (pmid) DO NOTHING;
                ''', (row['PMID'], title_id, row['Publication Date'], journal_id))

                cursor.execute('SELECT article_id FROM fact_articles WHERE pmid = %s;', (row['PMID'],))
                article_id = cursor.fetchone()[0]

                # Insert into bridge_articles_authors table with the retrieved author_id from the dim_authors table.
                cursor.execute('SELECT author_id FROM dim_authors WHERE author_name = %s;', (row['Author'],))
                author_id = cursor.fetchone()[0]

                cursor.execute('''
                    INSERT INTO bridge_articles_authors (article_id, author_id)
                    VALUES (%s, %s)
                    ON CONFLICT DO NOTHING;
                ''', (article_id, author_id))

                # Insert into bridge_articles_search_terms table with the retrieved search_term_id from the dim_search_terms table.
                cursor.execute('SELECT search_term_id FROM dim_search_terms WHERE search_term = %s;', (row['Search Term'],))
                search_term_id = cursor.fetchone()[0]

                cursor.execute('''
                    INSERT INTO bridge_articles_search_terms (article_id, search_term_id)
                    VALUES (%s, %s)
                    ON CONFLICT DO NOTHING;
                ''', (article_id, search_term_id))

            conn.commit() # Commit the transaction and save the changes to the database.
            print("Data inserted successfully into PostgreSQL!")

    except Exception as e:
        print(f"Error inserting data: {e}")

    finally:
        conn.close()
        print("PostgreSQL connection closed.")


In [None]:
#Close the Connection:
if conn:
    conn.close()
    print("PostgreSQL connection closed.")

PostgreSQL connection closed.


In [None]:
# Check the file location
import os
print("Database location:", os.getcwd())