# Test Database with SQLAlchemy

## Define Service Class for Database Queries using SQLAlchemy

In [2]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import or_
from db.movie_app_db import Movie, MovieGenre, Genre, MovieKeyword, Keyword,\
   MovieCredit, Person, MovieStudio, Studio, MovieDetails
from sqlalchemy import create_engine
import pandas as pd

database_path = 'sqlite:///movie_app.db'

class MovieServiceLocal:
    def __init__(self):
        # Connect to the SQLite database
        engine = create_engine(database_path)
        Session = sessionmaker(bind=engine)
        self.session = Session()
    
    def to_dataframe(self, object_list: list) -> pd.DataFrame: 
          # Convert each object into a dictionary of its attributes
        data = [obj.__dict__ for obj in object_list]

        # Create a pandas DataFrame from the list of dictionaries
        return pd.DataFrame(data)

    # 1. Query by title
    def query_by_title(self, search_string: str, exact_match: bool = False):
        if exact_match:
            # Exact title match
            return self.to_dataframe(self.session.query(Movie).filter(Movie.movie_name == search_string).all())
        else:
            # Wildcard match (case insensitive search)
            search_pattern = f"%{search_string}%"
            return self.to_dataframe(self.session.query(Movie).filter(Movie.movie_name.ilike(search_pattern)).all())
            

    # 2. Query by genre
    def query_by_genre(self, genre_name: str)-> pd.DataFrame:
        # Join with Genre table through MovieGenre
        return self.to_dataframe((
            self.session.query(Movie)
            .join(MovieGenre)
            .join(Genre)
            .filter(Genre.genre_name == genre_name)
            .all()
        ))

    # 3. Query by release date
    def query_by_release_date(self, start_date=None, end_date=None):
        query = self.session.query(Movie)
        if start_date:
            query = query.filter(Movie.movie_release_date >= start_date)
        if end_date:
            query = query.filter(Movie.movie_release_date <= end_date)
        return query.all()

    # 4. Query by keyword
    def query_by_keyword(self, keyword_name: str):
        # Join with Keyword table through MovieKeyword
        return (
            self.session.query(Movie)
            .join(MovieKeyword)
            .join(Keyword)
            .filter(Keyword.keyword_name == keyword_name)
            .all()
        )

    # 5. Query by person (actor, director, etc.)
    def query_by_person(self, person_name: str):
        # Join with Person table through MovieCredit
        return (
            self.session.query(Movie)
            .join(MovieCredit)
            .join(Person)
            .filter(Person.name == person_name)
            .all()
        )

    # 6. Query by studio
    def query_by_studio(self, studio_name: str):
        # Join with Studio table through MovieStudio
        return (
            self.session.query(Movie)
            .join(MovieStudio)
            .join(Studio)
            .filter(Studio.studio_name == studio_name)
            .all()
        )
        
        
    def get_movie_details(self,result_df: pd.DataFrame) -> pd.DataFrame:
        # Convert the movie_id column of result_df to a list
        movie_ids = result_df['movie_id'].tolist()

        # Query MovieDetails for all movies with movie_id in movie_ids
        movies = self.session.query(MovieDetails).filter(MovieDetails.movie_id.in_(movie_ids)).all()

        # Convert the result to a pandas DataFrame
        movies_df = pd.DataFrame([movie.__dict__ for movie in movies])

        # Remove the SQLAlchemy internal attributes (like _sa_instance_state) if needed
        movies_df = movies_df.drop(columns=['_sa_instance_state'], errors='ignore')
        
        return movies_df


## Create Unittest for Search by Name

In [3]:
import unittest
import datetime



# given: a movie title

# when: we query the database for the movie title

# then: it should return the data for this movie

# Test case for the MovieService class
class TestMovieService(unittest.TestCase):
    def setUp(self):
        # Create a MovieService instance with a mocked db_url
        self.movie_service = MovieService()
    
    def test_get_movie_by_title(self):
        # Call the method with the expected title
        result = self.movie_service.query_by_title(search_string = "Star Wars: Episode I - The Phantom Menace", exact_match = True)
        
        # Assertions to check if the result is as expected
        self.assertIsNotNone(result)
        self.assertEqual(result.iloc[0].movie_name, "Star Wars: Episode I - The Phantom Menace")
        self.assertEqual(result.iloc[0].movie_release_date, datetime.date(1999, 5, 19))

# Run the tests in the notebook
if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)


E
ERROR: test_get_movie_by_title (__main__.TestMovieService.test_get_movie_by_title)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "C:\Users\pogawal\AppData\Local\Temp\ipykernel_9612\1460586355.py", line 16, in setUp
    self.movie_service = MovieService()
                         ^^^^^^^^^^^^
NameError: name 'MovieService' is not defined

----------------------------------------------------------------------
Ran 1 test in 0.001s

FAILED (errors=1)


## Wildcard search title


In [3]:
import unittest

# given: a movie title

# when: we query the database for the movie title

# then: it should return the data for this movie

# Test case for the MovieService class
class TestMovieService(unittest.TestCase):
    def setUp(self):
        # Create a MovieService instance with a mocked db_url
        self.movie_service = MovieService()
    
    def test_get_movie_by_title(self):
        # Call the method with the expected title
        result_df = self.movie_service.query_by_title(search_string = "Kung Fu Panda")
        
        # Assertions to check if the result is as expected
        self.assertIsNotNone(result_df)
        expected_movie_names = ["Kung Fu Panda", "Kung Fu Panda: Secrets of the Furious Five", \
        "Kung Fu Panda 2", "Kung Fu Panda Holiday", "Kung Fu Panda: Secrets of the Masters", \
        "Kung Fu Panda 3", "Kung Fu Panda - The Midnight Stranger Vol.4", "Kung Fu Panda: Legends of Awesomeness 1 : The Scorpion Sting", \
        "Kung Fu Panda: Legends of Awesomeness (Good Croc, Bad Croc)", "Kung Fu Panda: Secrets of the Scroll", "Kung Fu Panda:  The Ultimate Secrets Collection", \
        "Kung Fu Panda: Unstoppable Awesomeness", "Kung Fu Panda: The Emperor's Quest"]

        actual_movie_names = result_df['movie_name']

        # Convert to set for easier comparison
        expected_set = set(expected_movie_names)
        actual_set = set(actual_movie_names)

        # Assert that all expected movies are in the actual DataFrame
        assert expected_set.issubset(actual_set), f"Missing movies in DataFrame: {expected_set - actual_set}"

        # Assert that the actual DataFrame contains no extra movies other than those expected
        assert actual_set.issubset(expected_set), f"Unexpected movies in DataFrame: {actual_set - expected_set}"


# Run the tests in the notebook
if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)


.
----------------------------------------------------------------------
Ran 1 test in 0.719s

OK


In [5]:
import unittest
from app.services.movie import MovieService

class TestMovieService(unittest.TestCase):
    def setUp(self):
        self.movie_service = MovieService()
    
    def test_get_movie_by_genre(self):
        # given
        search_genre = 'Music'
        
        # when
        result_df = self.movie_service.query_by_genre(genre_name = search_genre)
        
        # then
        self.assertIsNotNone(result_df)
                
        movies_df = self.movie_service.get_movie_details(result_df)
        
        assert movies_df['genres'].apply(lambda x: search_genre in x).all(), f"Not all rows contain the genre {search_genre}"

# Run the tests in the notebook
if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)

.
----------------------------------------------------------------------
Ran 1 test in 15.274s

OK


In [None]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from db.movie_app_db import MovieDetails

# Create the engine
engine = create_engine(database_path)  # Replace with your actual database URI

# Create a session
Session = sessionmaker(bind=engine)
session = Session()

# Query the view for movies with "Kung Fu Panda" in their name
search_string = "Kill Bill"
movies = session.query(Movie).filter(Movie.movie_name.like(f"%{search_string}%")).all()
# Assuming each object in the list is an instance of a class with a movie_id attribute
movie_ids = [movie.movie_id for movie in movies]
movies_details = session.query(MovieDetails).filter(MovieDetails.movie_id.in_(movie_ids)).all()

# Print the matching movies
# for movie in movies:
for details in movies_details:
    print(details.keywords)

# Optional: Closing session
session.close()


In [5]:
# import sqlite3

# # Connect to the SQLite database (or create it if it doesn't exist)
# conn = sqlite3.connect('movie_app.db')

# # Create a cursor object
# cursor = conn.cursor()

# # SQL command to create an index on movie_name
# create_index_sql = "CREATE INDEX IF NOT EXISTS idx_movie_name ON movie (movie_name);"

# # Execute the SQL command
# cursor.execute(create_index_sql)

# # Commit the changes
# conn.commit()

# # Close the connection
# conn.close()


In [1]:
import sqlite3

def drop_and_recreate_watch_history_table():
    # Connect to the SQLite database
    conn = sqlite3.connect('movie_app.db')
    
    try:
        # Create a cursor object
        cursor = conn.cursor()

        # SQL script to drop and recreate the watch_history table
        sql_script = """
        DROP TABLE IF EXISTS watch_history;

        CREATE TABLE watch_history (
            user_id INTEGER NOT NULL,
            movie_id INTEGER NOT NULL,
            watch_date DATE NOT NULL,
            rating FLOAT,
            is_favorite BOOLEAN DEFAULT 0,
            FOREIGN KEY (user_id) REFERENCES users (id),
            FOREIGN KEY (movie_id) REFERENCES movies (id)
        );
        """

        # Execute the SQL script
        cursor.executescript(sql_script)

        # Commit the changes
        conn.commit()

        print("Table watch_history has been dropped and recreated with the new schema.")
    
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        # Rollback if there's an error
        conn.rollback()

    finally:
        # Close the connection to the database
        conn.close()

drop_and_recreate_watch_history_table()

Table watch_history has been dropped and recreated with the new schema.


In [3]:
# import sqlite3

# # Connect to the SQLite database (or create it if it doesn't exist)
# conn = sqlite3.connect('movie_app.db')

# # Create a cursor object
# cursor = conn.cursor()

# delete_command = "DELETE FROM user;"

# cursor.execute(delete_command)
# # Commit the changes
# conn.commit()

# # SQL command to create an index on movie_name
# create_index_sql = "CREATE UNIQUE INDEX IF NOT EXISTS uq_user_name ON user (user_name);"

# # Execute the SQL command
# cursor.execute(create_index_sql)

# create_index_sql = "CREATE UNIQUE INDEX IF NOT EXISTS uq_user_email ON user (e_mail);"

# cursor.execute(create_index_sql)
# # Commit the changes
# conn.commit()

# # Close the connection
# conn.close()

In [1]:
import unittest
from app.services.movie import MovieService

class TestMovieService(unittest.TestCase):
    def setUp(self):
        self.movie_service = MovieService()
    
    def test_get_movie_by_genre(self):        
        # given
        search_genre = 'Music'
        
        # when
        result_df = self.movie_service.query_by_genre(genre_name=search_genre)
        
        # then
        self.assertIsNotNone(result_df)
                
        movies_df = self.movie_service.get_movie_details(result_df)
        
        assert movies_df['genres'].apply(lambda x: search_genre in x).all(), f"Not all rows contain the genre {search_genre}"

    def test_query_by_keyword(self):
        # given
        search_keyword = 'vienna austria'
        
        # when
        result_df = self.movie_service.query_by_keyword(keyword_name=search_keyword)
        
        # then
        self.assertIsNotNone(result_df)
        
        movies_df = self.movie_service.get_movie_details(result_df)
        
        assert movies_df['keywords'].apply(lambda x: search_keyword in x).all(), f"Not all rows contain the keyword {search_keyword}"

    def test_query_by_person(self):
        # given
        search_person = 'Johnny Depp'
        
        # when
        result_df = self.movie_service.query_by_person(person_name=search_person)
        
        # then
        self.assertIsNotNone(result_df)
        
        movies_df = self.movie_service.get_movie_details(result_df)
        
        assert movies_df['credits'].apply(lambda x: search_person in x).all(), f"Not all rows contain the person {search_person}"

    def test_query_by_studio(self):
        # given
        search_studio = 'Warner Bros.'
        
        # when
        result_df = self.movie_service.query_by_studio(studio_name=search_studio)
        
        # then
        self.assertIsNotNone(result_df)
        
        movies_df = self.movie_service.get_movie_details(result_df)
        
        assert movies_df['studios'].apply(lambda x: search_studio in x).all(), f"Not all rows contain the studio {search_studio}"

# Run the tests in the notebook
if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)


....
----------------------------------------------------------------------
Ran 4 tests in 28.115s

OK
