# Assignment: Data Wrangling, Visualization, and Web Presentation

First, let's import all the necessary libraries: requests and matplotlib for parsing data, sqlite3 for interaction with the SQLite local database.

In [1]:
from typing import List, Optional, Dict
from bs4 import BeautifulSoup
import requests
import sqlite3
import json
import re

## Data Extracting

In [4]:
MAIN_URL = "https://en.wikipedia.org/wiki/List_of_highest-grossing_films"
WIKIPEDIA_MAIN_URL = "https://en.wikipedia.org"
USER_AGENT = "Mozilla/5.0"

class Film:
    def __init__(
            self,
            title: str,
            release_year: Optional[int] = None,
            director: Optional[str] = None,
            box_office: Optional[str] = None,
            country: Optional[str] = None,
            ):
        self.title = title
        self.release_year = release_year
        self.director = director
        self.box_office = box_office
        self.country = country
    
    def __repr__(self):
        return f"""Film(title={self.title},
            release_year={self.release_year},
            director={self.director},
            box_office={self.box_office},
            country={self.country})"""
    
    def to_dict(self):
        return {
            "title": self.title,
            "release_year": self.release_year,
            "director": self.director,
            "box_office": self.box_office,
            "country": self.country
        }

In [25]:
def get_processed_text(raw_text: str) -> List[str]:
    pattern = r'[A-Za-z. ]{2,}'
    processed_text = ', '.join(re.findall(pattern, raw_text))
    return processed_text

def parse_film_details(film: Film, film_wikipedia_url: str):
    film_page = requests.get(film_wikipedia_url, headers={'User-Agent': USER_AGENT})
    film_page_soup = BeautifulSoup(film_page.content, 'html.parser')
    film_table = film_page_soup.select_one('table.infobox.vevent')
    # parse the directors of the movie
    rows = film_table.find_all('tr')
    for row in rows:
        title = row.select_one('th')
        if not title:
            continue
        if title.get_text().strip() == "Directed by":
            raw_directors_text = row.select_one('td').get_text(separator=',').strip()
            film.director = get_processed_text(raw_directors_text)
        if title.get_text().strip() in ['Country', 'Countries']:
            raw_country_text = row.select_one('td').get_text().strip()
            film.country = get_processed_text(raw_country_text)


In [26]:
def parse_grossing_films():
    main_page = requests.get(MAIN_URL, headers={'User-Agent': USER_AGENT})
    main_page_soup = BeautifulSoup(main_page.content, 'html.parser')
    table = main_page_soup.select_one('table.wikitable.sortable')

    if not table:
        raise ValueError("Main table not found")
    
    films = []
    for row in table.select('tbody tr')[1:]:
        attributes = row.find_all('td')
        gross_raw_text = attributes[2].get_text().strip()
        release_year = int(attributes[3].get_text().strip())

        # get cleaned gross information
        pattern = r'[0-9,]{2,}'
        gross = '$' + re.findall(pattern, gross_raw_text)[0]

        title = row.select_one('th a').get_text().strip()
        wikipedia_url = WIKIPEDIA_MAIN_URL + row.select_one('th a')['href']
        film = Film(title, release_year=release_year, box_office=gross)
        parse_film_details(film, wikipedia_url)
        films.append(film)

    return films
    

In [27]:
def connect_to_db(database_name: str):
    try:
        conn = sqlite3.connect(database_name)
        return conn
    except sqlite3.Error as error:
        print(f"Error with database connection: {error}")
        return None

def create_table(conn: sqlite3.Connection):
    try:
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS films (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT NOT NULL,
                release_year INTEGER,
                director TEXT NOT NULL,
                box_office TEXT,
                country TEXT
            )
            ''')
        conn.commit()
    except sqlite3.Error as error:
        print(f"Failed to create table: {error}")

def insert_films_to_db(conn: sqlite3.Connection, films: List[Film]):
    try:
        cursor = conn.cursor()
        for film in films:
            cursor.execute(f"INSERT INTO films (title, release_year, director, box_office, country) VALUES (?, ?, ?, ?, ?)", (
                film.title,
                film.release_year,
                film.director,
                film.box_office,
                film.country
            ))
        conn.commit()
    except sqlite3.Error as error:
        print(f"Failed to insert values to database: {error}")

def get_films_from_db(conn: sqlite3.Connection) -> List[Film]:
    try:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM films")
        films = [Film(*film[1:]) for film in cursor.fetchall()]
        return films
    except sqlite3.Error as error:
        print(f"Failed to get all the films from database: {error}")

def delete_films_from_db(conn: sqlite3.Connection) -> List[Film]:
    try:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM films")
        conn.commit()
    except sqlite3.Error as error:
        print(f"Failed to delete all films from the database: {error}")

def close_connection(conn: sqlite3.Connection):
    if conn:
        conn.close()

In [32]:
def convert_films_into_json(json_path: str, films: List[Film]):
    films_dict = [film.to_dict() for film in films]
    films_dict.sort(key=lambda film: film['box_office'])
    films_dict = films_dict[::-1]
    for rank, film in enumerate(films_dict):
        film['rank'] = rank+1
    with open(json_path, "w", encoding='utf-8') as file:
        json.dump(films_dict, file, indent=4)

In [29]:
films = parse_grossing_films()

In [30]:
connection = connect_to_db("grossing_films.db")
create_table(connection)
delete_films_from_db(connection)
insert_films_to_db(connection, films)
films = get_films_from_db(connection)
close_connection(connection)

In [None]:
convert_films_into_json("movies.json", films)