# Parsing

Since wikipedia doesn't have dynamic elements, BeautifulSoup can be used for parsing. Moreover it has no bot-check so I can send 50 requests (amount of films) in 2 minutes.

### Parser class
The parser contains the main method parse() that finds the information about each film on the main page and films' pages. As a resu;t of this method, the class collects information and stores it in its variable 'films' - list of dictionaries. See the methods descriptions below.

# Database design
I chose the PostgreSQL database for storing my data. Firstly, I created the database 'wiki' from the console:
```
sudo -iu postgres
createuser --interactive --pwprompt # superuser 12345678
createdb wiki
psql -l
psql -U superuser -d wiki 
```

I connect to it via psycopg2, created the 'films' table and insertes rows. I checked intermediate results via console using the following commands:
```
\dt
SELECT * FROM films;
```
The fields are the same as in the provided Assignment.pdf file, but additionally I have the 'poster' field with a url to the poster image (TEXT).


Since some films have 2 or 3 directors and 2 countries, I changed the 'director' and 'country' fields type from TEXT to JSONB. I also added the 'poster' field for posters' urls.

After the table creation I inserted the data from the ParserWikiPage variable parser one by one. Then I checked the correctness of insertions.

In [1]:
import requests
from bs4 import BeautifulSoup
import bs4
import warnings
warnings.filterwarnings('ignore')
import re

URL = "https://en.wikipedia.org/wiki/List_of_highest-grossing_films"
HEADERS = {'User-Agent': 'Mozilla/5.0'}

def get_soup(url=None):
    """ Return a parsed tag tree of our Nobel prize page """
    if not url:
        response = requests.get(URL, headers=HEADERS)
    else:
        response = requests.get(url, headers=HEADERS)
    if response.status_code != 200:
        print(response.status_code)
    return BeautifulSoup(response.content, "lxml")

# soup = get_soup()

In [2]:
class ParserWikiPage():
    def __init__(self, main_url: str=None):
        """ Init method.
        Args:
            main_url (str, optional): The default wiki paage with a table and links inside. Defaults to None.
        """
        # during the main page's table parsing keep the links to films, which contain more information about movies
        self.films_urls = []
        if main_url:
            self.URL: str = main_url
        else:
            self.URL: str = URL
        # list of dictionaries, each contains film data (title, revenue, etc.)
        self.films = None

    def parse(self):
        """ Main method. Demosntrates the logic: parse the main page first,
            extract films' urls from it, and parse each film separately."""
        self.parse_main_page()
        self.parse_films()
        

    def find_table(self, url: str=None, table_class: str='wikitable') -> bs4.element.Tag:
        """ Method for finding desired tables on the page
        Args:
            url (str, optional): url of a page containing a table. Defaults to None.
            table_class (str, optional): the 'name' of a searched table. Defaults to None.
        Returns:
            bs4.element.Tag: element <table> is returned.
        """
        # if no url and table class => use defauld settings:
        # url = URL — base page with table of movies;
        # table_class='wikitable' — table on the page
        if not url:
            soup = get_soup(self.URL)
        else:
            soup = get_soup(url)

        # find and return the desired table
        table = soup.find('table', {'class':table_class})
        return table
    
    
    def parse_main_page(self, ) -> None:
        """ Method for parsing the main page.
            1) find the table
            2) split the table by rows (extract all <tr> elements), each row = a film
            3) parse each row (except the heading) - extract values: 
            3.1) find all table data <td> elements (table cells)
            3.2) each <td> contains <th> - cell with film name (and film link but it is parsed separately)
            3.3) assign values rank, peak, etc. (separate <td> elements)
            4) parse values separately:
            4.1) extract film title and get rid of some extra symbols
            4.2) parse peak: get rid of <sup> elements using regex (because it was extracted as text from a cell)
            4.3) compile the film url
            4.4) turn revenue (box_office) into a number
            5) form the dictionary
        """
        # 1) and 2) steps
        films_rank_table = self.find_table(self.URL, 'wikitable')
        rows = films_rank_table.find_all('tr')[1:]
        films = []
        # 3)
        for row in rows:
            tds = row.find_all('td')  # table data
            film_field = row.select_one('th')
            rank, peak, box_office, year = tds[0].text.strip(), tds[1].text.strip(), tds[2].text.strip(), tds[3].text.strip() 
            
            # 4)
            name = film_field.text.strip()
            acceptable_smbols = '012345678 .:&–'
            name = ''.join([symbol for symbol in name if symbol.isalpha() or symbol in acceptable_smbols])

            peak = re.sub(r'[A-Z]+\d?', '', peak)

            film_url = "https://en.wikipedia.org/" + row.find('i').find('a').get('href')
            self.films_urls.append(film_url)
            
            
            box_office = re.findall('\$.{0,20}', box_office)[0]
            box_office = int(re.sub(',', '', box_office[1:]))
            
            # 5)
            films.append({
                'name':name,
                'year':int(year),
                'rank':int(rank),
                'peak':int(peak),
                'revenue':box_office,
                'wiki_page': film_url,
            })
        self.films = films
    

    def parse_films(self) -> None:
        """ Method for parsing each film separately.
            self.films - list of dictionaries
            Use of self.parse_film_page()
        """  
        for film in self.films:
            film['director'] = []
            film['country'] = []
            film['poster'] = None
            url = film['wiki_page']
            self.parse_film_page(url, film)


    def parse_film_page(self, url:str, film: dict) -> None:
        """ Method for parsing a film page.
        Args:
            url (str): film url
            film (dict): dictionary of already collected data for a film (from the main page)
        """
        # find the table 'infobox vevent' on the page (a column on the right side of a page)
        # and split it into rows (get rid of the 1st one containing film name)
        film_info = self.find_table(url, 'infobox vevent')
        rows = film_info.find_all('tr')[1:]

        # iterate over each row
        for i,row in enumerate(rows):
            if i == 0:
                # case for the film's poster, will be parsed later
                continue

            # find heading - row's heading or the name of a section
            heading = row.find('th')
            if heading is None:
                # the problem solution for the Ne Zha 2 film
                # several rows contain empty <th> and they are empty itself
                continue
            
            # get text and row cells data (<td>)
            # the table has 2 columns, thus there is only 1 <td>
            heading = heading.text
            data = row.find('td')

            # parse the director
            if heading == 'Directed by':
                # several films are produced by several directors
                # they are listed in <ul>
                li = data.find('li')
                if li is None:
                    # if there is no list => doesn't mean there is only 1 director
                    # sometimes they are "listed" with <br> tag
                    # parse hard cases with </br>
                    if data.find('br'):
                        directors = list(data.stripped_strings)                         # find text and keep formatting
                        directors = [self.clean_str(string) for string in directors]    # go through each string and clean it
                        film['director'] = directors                                    # assign the 'director' field
                    else:
                    # if no <br> tags just extract the text in the <a> tag
                        film['director'] = [self.clean_str(data.find('a').text)]
                else:
                    # if there is a list in the data column:
                    film['director'] = self.find_directors_in_lists(data)
            
            # parse the country
            if heading in ['Country', 'Countries']:
                # extract text from this field
                countries = list(data.stripped_strings)
                # there is a problem with coutries filed:
                # it has a footnote next to countries names
                # and this footnote is also a text
                while '[' in countries:
                    # extract countries' names and get rid of footnotes
                    if countries.index('[') != 0:
                        film['country'].append(countries[:countries.index('[')][0])
                    countries = countries[countries.index('[') + 3:]
                # countirs list becomes empty during the while loop
                # if it is not, then there were no footnotes: assign this list to the 'country' field
                if countries:
                    film['country'] = countries
        # parse the poster url
        film['poster'] = self.get_image_url(rows[0])


    def find_directors_in_lists(self, data: bs4.element.Tag) -> list:
        """ Method for finding the directors in lists.
        Args:
            data (bs4.element.Tag): <td> elemement in a table
        Returns:
            list: list of directors
        The problem with lists appeared when some film's <td> contains
        a list <ul> containing one more list <ul> => nested lists.
        I had to extract the INNER lists and their texts.
        """

        directors = []
        # while a list contains lists inside: 
        # reassign directors list (make it empty), 
        # extract text, add it to 'directors', and
        # make data=ul
        # if new data doesn't contain <ul> anymore, then the last extracted text is correct => return it
        while data.find('ul') is not None:
            ul = data.find('ul')
            directors = []
            li = ul.find_all('li')
            for l in li:
                # remove sup tag in the end of strings
                text = self.clean_str(l.text)
                directors.append(text)
            data = ul
        return directors
    
    
    def clean_str(self, text: str) -> str:
        """ Method for deleting footnotes [i]
        Args:
            text (str): text from which [i] must be deleted
        Returns:
            str: cleaned and stripped text
        """
        return re.sub(r'\[\d\]', '', text.strip())
    
    def get_image_url(self, row: bs4.element.Tag) -> str:
        """ Method for finding the poster's url
        Args:
            row (bs4.element.Tag): row of a table, which contains poster url
        Returns:
            str: url of a poster
        """
        return 'https:' + row.find('td').find('a').find('img').get('src')

In [3]:
# p = ParserWikiPage()
# p.parse_film_page('https://en.wikipedia.org/wiki/Avatar_(2009_film)', {'director':[], 'country':[]})
# p.parse_film_page('https://en.wikipedia.org/wiki/Captain_America:_Civil_War', {'director':[], 'country':[]})
# p.parse_film_page('https://en.wikipedia.org/wiki/Moana_2', {'director':[], 'country':[]})
# p.parse_film_page('https://en.wikipedia.org/wiki/Furious_7', {'director':[], 'country':[]})
# p.parse_film_page('https://en.wikipedia.org/wiki/Barbie_(film)', {'director':[], 'country':[]})
# p.parse_film_page('https://en.wikipedia.org/wiki/Captain_Marvel_(film)', {'director':[], 'country':[]})

In [4]:
parser = ParserWikiPage()
parser.parse()

In [5]:
parser.films[:3]

[{'name': 'Avatar',
  'year': 2009,
  'rank': 1,
  'peak': 1,
  'revenue': 2923706026,
  'wiki_page': 'https://en.wikipedia.org//wiki/Avatar_(2009_film)',
  'director': ['James Cameron'],
  'country': ['United Kingdom', 'United States'],
  'poster': 'https://upload.wikimedia.org/wikipedia/en/thumb/d/d6/Avatar_%282009_film%29_poster.jpg/220px-Avatar_%282009_film%29_poster.jpg'},
 {'name': 'Avengers: Endgame',
  'year': 2019,
  'rank': 2,
  'peak': 1,
  'revenue': 2797501328,
  'wiki_page': 'https://en.wikipedia.org//wiki/Avengers:_Endgame',
  'director': ['Anthony Russo', 'Joe Russo'],
  'country': ['United States'],
  'poster': 'https://upload.wikimedia.org/wikipedia/en/0/0d/Avengers_Endgame_poster.jpg'},
 {'name': 'Avatar: The Way of Water',
  'year': 2022,
  'rank': 3,
  'peak': 3,
  'revenue': 2320250281,
  'wiki_page': 'https://en.wikipedia.org//wiki/Avatar:_The_Way_of_Water',
  'director': ['James Cameron'],
  'country': ['United States'],
  'poster': 'https://upload.wikimedia.org

### Database

In [6]:
import psycopg2

db_config = {
    'dbname': 'wiki',
    'user': 'superuser',
    'password': '12345678',
    'host': 'localhost',
    'port': 5432
}

# connect to the 'wiki' database and create a cursor - for commands execution
try:
    connection = psycopg2.connect(**db_config)
    print("connected")
except Exception as e:
    print(f"Error in connection: {e}")
    exit()

cursor = connection.cursor()

connected


In [7]:
# create the 'films' table
try:
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS films (
            id SERIAL PRIMARY KEY,
            title TEXT NOT NULL,
            year INTEGER,
            directors JSONB,
            revenue REAL,
            country JSONB,
            poster TEXT
        )
    ''')    
    connection.commit()
    print("Table 'films' created or already exists.")
except Exception as e:
    print(f"Error: {e}")

Table 'films' created or already exists.


In [8]:
import json

# insert data
def insert_film(cursor, title, year, director, revenue, country, poster):
    cursor.execute('''
        INSERT INTO films (title, year, directors, revenue, country, poster)
        VALUES (%s, %s, %s, %s, %s, %s)
    ''', (title, year, director, revenue/1_000_000, country, poster))
    connection.commit()


for film in parser.films:
    insert_film(cursor, 
                film['name'], 
                film['year'], 
                json.dumps(film['director']), 
                film['revenue'], 
                json.dumps(film['country']), 
                film['poster'])


# check the correctness
cursor.execute('SELECT * FROM films')
films = cursor.fetchall()
for film in films:
    print(film)

cursor.close()
connection.close()


(1, 'Avatar', 2009, ['James Cameron'], 2923.706, ['United Kingdom', 'United States'], 'https://upload.wikimedia.org/wikipedia/en/thumb/d/d6/Avatar_%282009_film%29_poster.jpg/220px-Avatar_%282009_film%29_poster.jpg')
(2, 'Avengers: Endgame', 2019, ['Anthony Russo', 'Joe Russo'], 2797.5012, ['United States'], 'https://upload.wikimedia.org/wikipedia/en/0/0d/Avengers_Endgame_poster.jpg')
(3, 'Avatar: The Way of Water', 2022, ['James Cameron'], 2320.2502, ['United States'], 'https://upload.wikimedia.org/wikipedia/en/thumb/5/54/Avatar_The_Way_of_Water_poster.jpg/220px-Avatar_The_Way_of_Water_poster.jpg')
(4, 'Titanic', 1997, ['James Cameron'], 2257.8445, ['United States'], 'https://upload.wikimedia.org/wikipedia/en/1/18/Titanic_%281997_film%29_poster.png')
(5, 'Star Wars: The Force Awakens', 2015, ['J. J. Abrams'], 2068.2236, ['United States'], 'https://upload.wikimedia.org/wikipedia/en/a/a2/Star_Wars_The_Force_Awakens_Theatrical_Poster.jpg')
(6, 'Avengers: Infinity War', 2018, ['Anthony Rus

In [9]:
# cursor.execute('''
#     DROP TABLE films
# ''')
# connection.commit()