**# Injecting pre-processed data into the Postgresql database**

In [None]:
# pip install psycopg2


In [25]:
import psycopg2

conn = psycopg2.connect(
    host="localhost",          # or your host
    database="mflix",          # your database name
    user="postgres",      # your PostgreSQL username
    password="12345678",  # your password
    port="5432"                # default PostgreSQL port
)
cursor = conn.cursor()


In [None]:
import pandas as pd

movies_df = pd.read_csv('movies_df.csv')
writers_df = pd.read_csv('writers_df.csv')
directors_df = pd.read_csv('directors_df.csv')
cast_df = pd.read_csv('cast_df.csv')
genres_df = pd.read_csv('genres_df.csv')
languages_df = pd.read_csv('languages_df.csv')
countries_df = pd.read_csv('countries_df.csv')
tomatoes_df = pd.read_csv('tomatoes_df.csv')
imdb_df = pd.read_csv('imdb_df.csv')
awards_df = pd.read_csv('awards_df.csv')
users_df = pd.read_csv('users_df.csv')
comments_df = pd.read_csv('comments_df.csv')
theaters_df = pd.read_csv('theaters_df.csv')
    




In [None]:
table_creation_queries = [

    """
    CREATE TABLE IF NOT EXISTS movies (
        _id TEXT PRIMARY KEY,
        plot TEXT,
        runtime FLOAT,
        poster TEXT,
        title TEXT,
        fullplot TEXT,
        released DATE,
        rated TEXT,
        lastupdated TIMESTAMP,
        year INT,
        type TEXT,
        num_mflix_comments INT,
        metacritic FLOAT,
        year_raw TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS writers (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        writer TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS directors (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        director TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS "cast" (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        cast_member TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS genres (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        genre TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS languages (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        language TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS countries (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        country TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS tomatoes (
        id SERIAL PRIMARY KEY,
        movie_id TEXT REFERENCES movies(_id),
        viewer_rating FLOAT,
        viewer_numreviews INT,
        viewer_meter INT,
        critic_rating FLOAT,
        critic_numreviews INT,
        critic_meter INT,
        boxOffice TEXT,
        consensus TEXT,
        fresh INT,
        rotten INT,
        lastUpdated TIMESTAMP,
        production TEXT,
        website TEXT,
        dvd_release DATE
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS imdb (
        movie_id TEXT PRIMARY KEY REFERENCES movies(_id),
        imdb_rating FLOAT,
        imdb_votes INT,
        imdb_id TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS awards (
        movie_id TEXT PRIMARY KEY REFERENCES movies(_id),
        award_wins INT,
        award_nominations INT,
        award_text TEXT
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS theaters (
        _id TEXT PRIMARY KEY,
        "theaterId" TEXT UNIQUE NOT NULL, 
        street1 TEXT,
        street2 TEXT, -- Allow null as per sample
        state TEXT,
        city TEXT,
        zipcode TEXT,
        type TEXT, -- e.g., 'Point'
        "Latitude" DOUBLE PRECISION,
        "Longitude" DOUBLE PRECISION
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS users (
        _id TEXT PRIMARY KEY,
        name TEXT,
        email TEXT UNIQUE NOT NULL, -- Email should be unique
        password TEXT -- Passwords (hashed) can be long
    );
    """,

    """
    CREATE TABLE IF NOT EXISTS comments (
        _id TEXT PRIMARY KEY,
        name TEXT,
        email TEXT,
        movie_id TEXT NOT NULL, 
        text TEXT,
        date TIMESTAMP
    );
    """

]

for query in table_creation_queries:
    cursor.execute(query)

conn.commit()
print("✅ All tables created successfully.")


In [None]:
# inserting the movies data into movies table
from datetime import datetime
import pandas as pd


movies_df = pd.read_csv("movies_df.csv") 

# Prepare insert query
insert_query = """
INSERT INTO movies (
    _id, plot, runtime, poster, title, fullplot,
    released, rated, lastupdated, year,
    type, num_mflix_comments, metacritic, year_raw
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (_id) DO NOTHING;
"""

# Convert NaNs and date strings
def convert_row(row):
    return (
        str(row['_id']),
        row['plot'],
        row['runtime'],
        row['poster'],
        row['title'],
        row['fullplot'],
        pd.to_datetime(row['released']).date() if pd.notnull(row['released']) else None,
        row['rated'],
        pd.to_datetime(row['lastupdated']) if pd.notnull(row['lastupdated']) else None,
        int(row['year']) if pd.notnull(row['year']) else None,
        row['type'],
        int(row['num_mflix_comments']) if pd.notnull(row['num_mflix_comments']) else 0,
        int(row['metacritic']) if pd.notnull(row['metacritic']) else None,
        str(row['year_raw'])
    )

# Apply conversion and insert
data_to_insert = movies_df.apply(convert_row, axis=1).tolist()
cursor.executemany(insert_query, data_to_insert)
conn.commit()
print("✅ Data inserted into movies.")


In [15]:
# import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values

def insert_dataframe_to_table(df, table_name, conn):
    if df.empty:
        print(f"Skipped empty DataFrame: {table_name}")
        return

    cols = list(df.columns)
    values = df.values.tolist()
    
    query = sql.SQL("""
        INSERT INTO {table} ({fields})
        VALUES %s
        ON CONFLICT DO NOTHING
    """).format(
        table=sql.Identifier(table_name),
        fields=sql.SQL(', ').join(map(sql.Identifier, cols))
    )

    with conn.cursor() as cur:
        execute_values(cur, query, values)
        conn.commit()
        print(f"✅ Inserted {len(df)} rows into {table_name}")


In [None]:
# Insert each DataFrame
insert_dataframe_to_table(writers_df, 'writers', conn)
insert_dataframe_to_table(directors_df, 'directors', conn)
insert_dataframe_to_table(cast_df, 'cast', conn)
insert_dataframe_to_table(genres_df, 'genres', conn)
insert_dataframe_to_table(languages_df, 'languages', conn)
insert_dataframe_to_table(countries_df, 'countries', conn)
insert_dataframe_to_table(tomatoes_df, 'tomatoes', conn)
insert_dataframe_to_table(imdb_df, 'imdb', conn)
insert_dataframe_to_table(awards_df, 'awards', conn)
insert_dataframe_to_table(users_df, 'users', conn)
insert_dataframe_to_table(comments_df, 'comments', conn)
insert_dataframe_to_table(theaters_df, 'theaters', conn)

conn.close()
# 