## Document

## Install necessary dependencies

In [1]:
!pip install bs4
!pip install pymongo

Collecting bs4
  Downloading bs4-0.0.2-py2.py3-none-any.whl.metadata (411 bytes)
Downloading bs4-0.0.2-py2.py3-none-any.whl (1.2 kB)
Installing collected packages: bs4
Successfully installed bs4-0.0.2
Collecting pymongo
  Downloading pymongo-4.11.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Downloading pymongo-4.11.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pymongo
Successfully installed pymongo-4.11.1


## Import modules

In [88]:
from urllib.request import urlopen, urlparse
from urllib.error import HTTPError, URLError
from bs4 import BeautifulSoup
from typing import Tuple, List, TypedDict, Optional, Any, NamedTuple
import re
from pymongo import MongoClient
import json

In [170]:
TARGET_URL = "https://en.wikipedia.org/wiki/List_of_highest-grossing_films"
BASE_URL = '://'.join(urlparse(TARGET_URL)[:2])

def extract_html(url: str) -> Any:
    try:
        return urlopen(url)
    except HTTPError as e:
        print(e.__str__())
    except URLError:
        print("The server could not be found")
    return None

def exclude_refs(name: str) -> str:
    pos = name.find('[')
    return name[:pos] if pos > 0 else name

In [4]:
if (html := extract_html(TARGET_URL)) is not None:
    main_page = BeautifulSoup(html, 'html.parser')

In [5]:
highest_grossing_films = main_page.find('table', {'class': 'wikitable plainrowheaders sticky-header col4right col5center col6center'})
assert highest_grossing_films is not None
assert len(highest_grossing_films.find_all('tr')) != 0

In [108]:
class FilmRecord(TypedDict):
    title: str
    release_year: Optional[int]
    director: Optional[str]
    box_office: Optional[float]
    country: Optional[str]

In [224]:
soup = BeautifulSoup(extract_html("https://en.wikipedia.org/wiki/Captain_Marvel_(film)"), 'html.parser')
table = soup.find('table', {'class': 'infobox vevent'})
directors = table.find(lambda tag: re.compile('^\s*Directed by').match(tag.text))
# directors_list_element = directors.find_all('div', {'class': 'plainlist'})[-1]
# directors = ';'.join(director_element.text for director_element in directors_list_element.find_all('li'))
# print(directors)
annotated_text = directors.find_all()[1].get_text(strip=True, separator='\n')
directors_list = re.split("\[[^\]]*\]|\n", annotated_text)
directors = ';'.join(directors_list)
print(directors)

Anna Boden;Ryan Fleck


In [225]:
def parse_revenue(revenue: str) -> Optional[float]:
    revenue = exclude_refs(revenue)
    quantity, order = revenue.split()
    value = quantity[re.search("[\d\.]+", quantity).start():]
    if (order == 'million'):
        major = ''.join(value.split('.'))
        digits_after_decimal = len(value.split('.')[1])
        return float(major + '0' * (6-digits_after_decimal))
    elif (order == 'billion'):
        major = ''.join(value.split('.'))
        digits_after_decimal = len(value.split('.')[1])
        return float(major + '0' * (9-digits_after_decimal))
    else:
        print(f"Unresolved order: {order}")
        return None


def parse_film_page(film_url: str) -> Optional[Tuple[str, float, str]]:
    if (html := extract_html(film_url)) is None:
      return None
    film_information = BeautifulSoup(html, 'html.parser').find('table', {'class': 'infobox vevent'})

    # Directors
    directors_row_element = film_information.find(lambda tag: re.compile('^\s*[Dd]irected\s+by').match(tag.text) is not None)
    if directors_row_element is not None:
        if directors_row_element.find('div', {'class': 'plainlist'}) is not None:
            directors_list_element = directors_row_element.find_all('div', {'class': 'plainlist'})[-1]
            directors = ';'.join(exclude_refs(director_element.text) for director_element in directors_list_element.find_all('li'))
        else:
            annotated_text = directors_row_element.find_all()[1].get_text(strip=True, separator='\n')
            directors_list = re.split("\[[^\]]*\]|\n", annotated_text)
            directors = ';'.join(directors_list)
    else:
        print(f'Film (url={film_url}): directors list is not found')
        directors = None

    # Box office revenue
    box_office_revenue_row_element = film_information.find(lambda tag: re.compile('^\s*[Bb]ox\s+office').match(tag.text) is not None)
    if box_office_revenue_row_element is not None:
        if (box_office_revenue_element:=box_office_revenue_row_element.find('td', {'class': 'infobox-data'})) is not None:
            box_office_revenue = parse_revenue(box_office_revenue_element.text.strip())
        else:
            print(f'Film (url={film_url}): box revenue is not found')
            box_office_revenue = None
    else:
        print(f'Film (url={film_url}): box revenue is not found')
        box_office_revenue = None

    # Countries
    countries_row_element = film_information.find(lambda tag: re.compile('^\s*[Cc]ountry|[Cc]ountries').match(tag.text) is not None)
    if countries_row_element is not None:
        if countries_row_element.find('div', {'class': 'plainlist'}) is not None:
            country_list_element = countries_row_element.find_all('div', {'class': 'plainlist'})[-1]
            countries = ';'.join(exclude_refs(country_element.text) for country_element in country_list_element.find_all('li'))
        else:
            annotated_text = countries_row_element.find_all()[1].get_text(strip=True, separator='\n')
            countries_list = re.split("\[[^\]]*\]|\n", annotated_text)
            countries = ';'.join(countries_list)
    else:
        print(f'Film (url={film_url}): countries list is not found')
        countries = None

    return (directors, box_office_revenue, countries)

In [226]:
films: List[FilmRecord] = []
film_rows = highest_grossing_films.find_all('tr')[1:]

In [227]:
for i, row in enumerate(film_rows):
    print(f"Row {i}: Started processing")
    elements = row.find_all(recursive=False)
    assert len(elements) == 6

    # Title collection
    title_element = elements[2].find('a')
    if title_element is None:
        print(f"Row {i}: title element was not found, the row is excluded")
        continue
    title_link = title_element.attrs['href']
    title = title_element.text.strip()

    # Year collection
    year_element = elements[4]
    if len(year_element.text.strip()) == 0:
        print(f"Row {i}: release year is missing")
    try:
        release_year = int(year_element.text.strip())
    except ValueError:
        release_year = None
        print(f"Row {i}: invalid year format: {year_element.text.strip()}")

    # Moving to film page
    film_url = BASE_URL + title_link
    director, box_office, country = parse_film_page(film_url)
    print(f"Row {i}: parsed a film")
    films.append(FilmRecord(title=title, release_year=release_year, director=director, box_office=box_office, country=country))


Row 0: Started processing
Row 0: parsed a film
Row 1: Started processing
Row 1: parsed a film
Row 2: Started processing
Row 2: parsed a film
Row 3: Started processing
Row 3: parsed a film
Row 4: Started processing
Row 4: parsed a film
Row 5: Started processing
Row 5: parsed a film
Row 6: Started processing
Row 6: parsed a film
Row 7: Started processing
Row 7: parsed a film
Row 8: Started processing
Row 8: parsed a film
Row 9: Started processing
Row 9: parsed a film
Row 10: Started processing
Row 10: parsed a film
Row 11: Started processing
Row 11: parsed a film
Row 12: Started processing
Row 12: parsed a film
Row 13: Started processing
Row 13: parsed a film
Row 14: Started processing
Row 14: parsed a film
Row 15: Started processing
Row 15: parsed a film
Row 16: Started processing
Row 16: parsed a film
Row 17: Started processing
Row 17: parsed a film
Row 18: Started processing
Row 18: parsed a film
Row 19: Started processing
Row 19: parsed a film
Row 20: Started processing
Row 20: parse

## Database creation

In [73]:
# Replace the placeholders with your actual MongoDB Atlas credentials
username = "pyclient"
password = "admin"
URL = "@mycluster.hszuy.mongodb.net/?retryWrites=true&w=majority&appName=MyCluster"

# Construct the MongoDB URI with authentication details
mongo_uri = f"mongodb+srv://{username}:{password}{URL}"

# Create a MongoClient object with the URI
client = MongoClient(mongo_uri)

In [230]:
db = client["wikipedia"]
if "highest_grossing" not in db.list_collection_names():
  collection = db["highest_grossing"]
  for film in films:
      collection.insert_one(film)
else:
  collection = db["highest_grossing"]
  # collection.drop()

## Exporting to JSON

In [231]:
cursor = collection.find()

# Converting cursor to the list of dictionaries
list_cur = list(cursor)
for film in list_cur:
  film.pop('_id')
json_data = json.dumps(list_cur, indent=4, ensure_ascii=False)

with open('data.json', 'w', encoding='utf-8') as f:
    f.write(json_data)