In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import re
import psycopg2
from psycopg2 import sql
import os
from dotenv import load_dotenv

In [231]:
def get_director_and_country(link):
    """
    Retrieves the director's name and country of origin for a given film title
    from its Wikipedia page.

    Args:
        title (str): The title of the film.

    Returns:
        tuple: A tuple containing the director's name and the country of origin.
               Returns (None, None) if the information is not found or if an error occurs.
    """
    response = requests.get(link)
    response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)
    soup = BeautifulSoup(response.content, 'html.parser')
    infobox = soup.find('table', {'class': 'infobox vevent'})

    if infobox is None:
        print("No infobox found on this page.")
        return None

    director = []
    countries = []

    # Find director(s)
    director_row = infobox.find('th', string='Directed by')
    if not director_row:
        country_row = infobox.find('th', string='director')
    if director_row:
        director_values = director_row.find_next('td')
        if director_values:
            director_html = re.findall(r'<td class="infobox-data">(.*?)</td>', str(director_values))
            # Directly parse the country_values Tag object
            if director_values.find("ul"):  # Check for <ul> tag indicating a list of countries
                for li in director_values.find_all("li"):
                    dire = re.sub(r'<.*?>|\[.*?\]', '', li.text).strip()
                    director.append(dire)
            else:
                match = re.search(r'<a.*?>(.*?)</a>', str(director_html))
                director = [c.strip() for c in match.group(1).split("<br/>")]


    # Find country(ies)
    country_row = infobox.find('th', string='Country')
    if not country_row:
        country_row = infobox.find('th', string='Country of origin')
    if not country_row:
        country_row = infobox.find('th', string='Countries')
    if country_row:
        country_values = country_row.find_next('td')
        if country_values:
            country_html = re.findall(r'<td class="infobox-data">(.*?)</td>', str(country_values))
            # Directly parse the country_values Tag object
            if country_values.find("ul"):  # Check for <ul> tag indicating a list of countries
                for li in country_values.find_all("li"):
                    country = re.sub(r'<.*?>|\[.*?\]', '', li.text).strip()
                    countries.append(country)
            else:
                countries = [c.strip() for c in str(country_html)[2:-2].split("<br/>")]

    
    return director, countries

In [232]:
def get_film_data():
    url = "https://en.wikipedia.org/wiki/List_of_highest-grossing_films"
    response = requests.get(url)

    if response.status_code != 200:
        print(f"Error during the GET request: {response.status_code}")
        return

    soup = BeautifulSoup(response.content, 'html.parser')
    table = soup.find('table', class_='wikitable')
    data = []
    if table:
        rows = table.find_all('tr')
        for row in rows:
            cells = row.find_all(['td', 'th'])  # Include th for header and row headers
            row_data = []
            for cell in cells:
                #if there is a link, extract it
                link = cell.find('a')
                if link and 'href' in link.attrs and 'title' in link.attrs:
                    url = f"https://en.wikipedia.org{link['href']}"
                else:
                    url = None
                text = cell.get_text(strip=True)
                row_data.append(text)
                if url:
                    row_data.append(url)
            data.append(row_data)

    df = pd.DataFrame(data)
    df = df.iloc[1:]  # Skip the original header row

    df = df.dropna(axis=1, how='all')
    df = df.rename(columns={df.columns[2]: 'Title', 
                            df.columns[4]: 'Worldwide gross', 
                            df.columns[5]: 'Year',
                            df.columns[6]: 'Ref',
                            df.columns[0]: 'Rank',
                            df.columns[1]: 'Peak',
                            df.columns[3]: 'Link'})

    # change to appropriate types
    try:
        df['Title'] = df['Title'].astype(str).replace('†', '', regex=True) 
        df['Worldwide gross'] = df['Worldwide gross'].replace({'.*\$': '', ',': ''}, regex=True).astype(int)
        df['Year'] = df['Year'].astype(int)
        df['Rank'] = df['Rank'].astype(int)
    except KeyError as e:
        print(f"Error: Column '{e}' not found. Please check your column names.")
    except ValueError as e:
        print(f"Error: Could not convert data type.  Check the contents of the columns. {e}")

    # add columns for directors and countries
    directors = []
    countries = []
    for title in df['Link']:
        director, country = get_director_and_country(title)
        directors.append(director)
        countries.append(country)


    df['Director'] = directors
    df['Country'] = countries

    df = df.drop(['Peak', 'Ref', 'Link'], axis=1)
    df.to_csv('output.txt', sep='|', index=False)
    return pd.DataFrame(df)


  df['Worldwide gross'] = df['Worldwide gross'].replace({'.*\$': '', ',': ''}, regex=True).astype(int)


In [None]:
load_dotenv()  # Load variables from .env file for security reasons

DB_NAME = os.environ.get("DB_NAME")
DB_USER = os.environ.get("DB_USER")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_HOST = os.environ.get("DB_HOST")
DB_PORT = os.environ.get("DB_PORT")

TABLE_NAME = "HighestGrossingFilms"

def connect_to_db(db_name, db_user, db_password, db_host, db_port):
    """Connects to the PostgreSQL database."""
    try:
        conn = psycopg2.connect(
            dbname=db_name,
            user=db_user,
            password=db_password,
            host=db_host,
            port=db_port
        )
        return conn
    except psycopg2.Error as e:
        print(f"Error connecting to database: {e}")
        return None

def create_table(conn, table_name):
    """Creates the table if it doesn't exist."""
    try:
        cur = conn.cursor()
        create_table_sql = sql.SQL("""
            CREATE TABLE IF NOT EXISTS {} (
                id INTEGER PRIMARY KEY,
                title TEXT NOT NULL,
                worldwide_gross BIGINT, 
                release_year INTEGER,
                director TEXT[],  
                country TEXT[] 
            );
        """).format(sql.Identifier(table_name))

        cur.execute(create_table_sql)
        conn.commit()
        # print(f"Table '{table_name}' created successfully (if it didn't exist).")
        cur.close()

    except psycopg2.Error as e:
        print(f"Error creating table: {e}")
        conn.rollback()  # Rollback in case of error

def insert_data(conn, df, table_name):
    """Inserts the DataFrame data into the PostgreSQL table."""
    try:
        cur = conn.cursor()
        # Prepare the SQL INSERT statement using parameterized queries to prevent SQL injection
        insert_sql = sql.SQL("""
            INSERT INTO {} (id, title, worldwide_gross, release_year, director, country)
            VALUES (%s, %s, %s, %s, %s, %s);
        """).format(sql.Identifier(table_name))

        # Iterate through the DataFrame rows and insert data
        for index, row in df.iterrows():
            director = list(row['Director']) if isinstance(row['Director'], (list, tuple, pd.Series)) else [row['Director']] if row['Director'] else None # Convert to lists.
            country = list(row['Country']) if isinstance(row['Country'], (list, tuple, pd.Series)) else [row['Country']] if row['Country'] else None

            try:
                cur.execute(insert_sql, (row['Rank'], row['Title'], row['Worldwide gross'], row['Year'], director, country))
            except psycopg2.Error as e:
                # print(f"Error inserting row {index}: {e}")
                # print(f"Data: {row.to_dict()}") 
                conn.rollback()
                continue 

        conn.commit()
        cur.close()

    except psycopg2.Error as e:
        # print(f"Error inserting data: {e}")
        conn.rollback()



In [None]:
if __name__ == "__main__":
    df = get_film_data()
    conn = connect_to_db(DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT)

    if conn:
        create_table(conn, TABLE_NAME)
        insert_data(conn, df, TABLE_NAME)
        conn.close()
    else:
        print("Failed to establish database connection.")

Table 'HighestGrossingFilms' created successfully (if it didn't exist).
