In [None]:
import time
import pymongo
import psycopg2
import pandas as pd
import json
from datetime import datetime
import os
import sys

# Get the current working directory
cwd = os.getcwd()

# Navigate to the parent directory
parent = os.path.abspath(os.path.join(cwd, os.pardir))

# Add the parent directory to the system path
sys.path.insert(0, parent)

# Import the SearchCache class
from utils.cache import SearchCache
import certifi
ca = certifi.where()

In [None]:
class SearchEngine:
    def __init__(self, cache_size=100, cache_ttl=3600):
        """
        Initializes a SearchEngine object with a specified database type and cache settings.

        Args:
        - cache_size (int): Maximum number of items to store in cache
        - cache_ttl (int): Time-to-live (in seconds) for cached items
        """
        # initialize a cache object for the search engine using the SearchCache class
        self.cache = SearchCache(cache_size, cache_ttl)
        self.db_client = pymongo.MongoClient('mongodb+srv://twitter_user:dbms@cluster0.wkyhu.mongodb.net/?retryWrites=true&w=majority',tlsCAFile=ca)
        self.tweets_collection = self.db_client['twitter_db']['tweets_data']
        self.db_conn = psycopg2.connect(database="postgres", user="postgres", password="kueen", host="localhost")
        self.users_cursor = self.db_conn.cursor()
        self.user_table = 'twitter_users_partitioned'
        self.pipeline = [
            {
                '$project': {
                    '_id': 0,
                    'tweet_id': 1,
                    'user': 1,
                    'name': 1,
                    'date': 1,
                    'text': 1,
                    'retweet': {
                        '$cond': {
                            'if': { '$eq': ['$is_retweet', True] },
                            'then': '$retweet',
                            'else': None
                        }
                    },
                    'quote': {
                        '$cond': {
                            'if': { '$eq': ['$is_quote', True] },
                            'then': '$quote',
                            'else': None
                        }
                    },
                    'retweet_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$quote', None] },
                                {
                                    '$cond': [
                                        { '$eq': ['$retweet', None] },
                                        '$retweet_count',
                                        '$retweet.retweet_count'
                                    ]
                                },
                                '$quote.retweet_count'
                            ]
                        }
                    },
                    'reply_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$quote', None] },
                                {
                                    '$cond': [
                                        { '$eq': ['$retweet', None] },
                                        '$reply_count',
                                        '$retweet.reply_count'
                                    ]
                                },
                                '$quote.reply_count'
                            ]
                        }
                    },
                    'favorite_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$quote', None] },
                                {
                                    '$cond': [
                                        { '$eq': ['$retweet', None] },
                                        '$favorite_count',
                                        '$retweet.favorite_count'
                                    ]
                                },
                                '$quote.favorite_count'
                            ]
                        }
                    },
                    'quote_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$quote', None] },
                                {
                                    '$cond': [
                                        { '$eq': ['$retweet', None] },
                                        '$quote_count',
                                        '$retweet.quote_count'
                                    ]
                                },
                                '$quote.quote_count'
                            ]
                        }
                    },
                }
            },
            {
                '$addFields': {
                    'engagement': {
                        '$add': [
                            {'$multiply': ['$retweet_count', 0.2]},
                            {'$multiply': ['$favorite_count', 0.2]},
                            {'$multiply': ['$reply_count', 0.3]},
                            {'$multiply': ['$quote_count', 0.3]}
                        ]
                    }
                }
            },
            {
                '$sort': {
                    'engagement': pymongo.DESCENDING,
                    'date': pymongo.DESCENDING
                }
            }
        ]

    
    def most_popular_users(self, n=10):
        """
        Returns the n most popular Twitter users along with their tweets.

        Args:
        - n (int): Number of users to return.

        Returns:
        - list: List of the top n Twitter users, each represented as a dictionary with a 'username' key and a 'tweets' key.
        """
        start_time = time.time()
        
        if 'most_popular_users' in self.cache:
            print("Retrieving 'most popular users' from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            return self.cache['most_popular_users']
        else:
            print(f"New entry, retrieving 'most popular users' from database!")

        query = f"""
            SELECT * FROM 
                (SELECT distinct user_id, name, twitter_join_date, location, 
            verified, followers_count, friends_count, favourites_count,
                    dense_rank () over (partition by user_id order by followers_count desc) rnk 
                FROM 
                (SELECT * FROM {self.user_table} order by followers_count desc) A
                ) B where rnk = 1 
                order by followers_count desc limit {n}"""
        
        self.users_cursor.execute(query)
        results = self.users_cursor.fetchall()

        users = []
        for row in results:
            user = {
                'user_id': row[0],
                'name': row[1],
                'twitter_join_date': row[2],
                'location': row[3],
                'verified': row[4],
                'followers_count': row[5],
                'friends_count': row[6],
                'favourites_count': row[7],
            }
            users.append(user)

        users = pd.DataFrame(users)
        self.cache['most_popular_users'] = users.to_json(orient='records')
        self.cache.save_checkpoint()
        end_time = time.time()
        print(f"Query took {end_time - start_time:.4f} seconds\n")
        
        return users
    
    
    def most_engaging_tweets(self, n=10):
        """
        Returns the most engaging n tweets in the database, where engagement is defined as the sum of retweet
        count, reply count, quote count, and favorite count.

        Args:
        - n (int): Number of tweets to return.

        Returns:
        - list: List of the top n tweets, each represented as a dictionary.
        """
        start_time = time.time()
        
        if 'most_engaging_tweets' in self.cache:
            print("Retrieving 'most engaging tweets' from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")            
            return self.cache['most_engaging_tweets']
        else:
            print(f"New entry, retrieving 'most engaging tweets' from database!")

        tweets = list(self.tweets_collection.aggregate(self.pipeline + [{'$limit': n}]))
        self.cache['most_engaging_tweets'] = tweets
        
        end_time = time.time()
        print(f"Query took {end_time - start_time:.4f} seconds\n")
        
        return tweets

    
    def most_popular_hashtags(self, n=10):
        """
        Returns the n most popular hashtags in the database.

        Args:
        - n (int): Number of hashtags to return.

        Returns:
        - list: List of the top n hashtags, each represented as a dictionary.
        """
        start_time = time.time()
        
        if 'most_popular_hashtags' in self.cache:
            print("Retrieving 'most popular hashtags' from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            return self.cache['most_popular_hashtags']
        else:
            print(f"New entry, retrieving 'most popular hashtags' from database!")

        pipeline = [
            {
                '$match': {
                    '$or': [
                        {'media.hashtags': {'$exists': True}},
                        {'retweet.media.hashtags': {'$exists': True}},
                        {'quote.media.hashtags': {'$exists': True}},
                    ]
                }
            },
            {
                '$project': {
                    '_id': 0,
                    'hashtags': {
                        '$concatArrays': [
                            {'$ifNull': ['$media.hashtags', []]},
                            {'$ifNull': ['$retweet.media.hashtags', []]},
                            {'$ifNull': ['$quote.media.hashtags', []]},
                        ]
                    }
                }
            },
            {
                '$unwind': '$hashtags'
            },
            {
                '$group': {
                    '_id': '$hashtags',
                    'count': {'$sum': 1}
                }
            },
            {
                '$sort': {
                    'count': pymongo.DESCENDING,
                    'date': pymongo.DESCENDING
                }
            },
            {
                '$limit': n
            }
        ]

        
        hashtags = list(self.tweets_collection.aggregate(pipeline))
        hashtags = [{x["_id"]: x["count"]} for x in hashtags]
        self.cache['most_popular_hashtags'] = hashtags
        self.cache.save_checkpoint()
        end_time = time.time()
        print(f"Query took {end_time - start_time:.4f} seconds\n")
        
        return hashtags

    
    def search_by_date_range(self, start_date_str, end_date_str):
        """
        Returns the top n tweets in the database that were posted within the specified date range.

        Args:
        - start_date_str (str): Start date of the range in the format 'Fri Apr 24 10:06:09 +0000 2020'
        - end_date_str (str): End date of the range in the format 'Fri Apr 24 10:06:09 +0000 2020'
        - n (int): Number of tweets to return.

        Returns:
        - list: List of the top n tweets, each represented as a dictionary.
        """
        start_date = datetime.strptime(start_date_str, '%a %b %d %H:%M:%S %z %Y')
        end_date = datetime.strptime(end_date_str, '%a %b %d %H:%M:%S %z %Y')

        query = {
            'date': {
                '$gte': start_date,
                '$lte': end_date
            }
        }
        
        tweets = self.tweets_collection.find(query)
        return tweets


    def search_by_username(self, username, n=10):
        """
        Returns the top n users in the database matching the given username.

        Args:
        - username (str): The username to search for.
        - n (int): Number of users to return.

        Returns:
        - list: List of the top n users, each represented as a dictionary.
        """
        start_time = time.time()
        
        if username in self.cache:
            print(f"Retrieving '{username}' from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")

            return self.cache[username]
        else:
            print(f"New entry, retrieving '{username}' from database!")

        query = f"""
            SELECT user_id, name, twitter_join_date, location, 
            verified, followers_count, friends_count, favourites_count
            FROM {self.user_table}
            WHERE name LIKE '%{username}%'
            AND (name, date, followers_count) IN (
                SELECT name, MAX(date), MAX(followers_count)
                FROM {self.user_table}
                WHERE name LIKE '%{username}%'
                GROUP BY name, user_id
            )
            ORDER BY followers_count DESC, verified DESC
            LIMIT {n}
        """

        self.users_cursor.execute(query)
        results = self.users_cursor.fetchall()

        users = []
        for row in results:
            user = {
                'user_id': row[0],
                'name': row[1],
                'twitter_join_date': row[2],
                'location': row[3],
                'verified': row[4],
                'followers_count': row[5],
                'friends_count': row[6],
                'favourites_count': row[7],
            }
            users.append(user)

        users = pd.DataFrame(users)
        self.cache[username] = users.to_json(orient='records')
        self.cache.save_checkpoint()
        end_time = time.time()
        print(f"Query took {end_time - start_time:.4f} seconds\n")
        
        return users
    
    
    def search_by_keyword(self, keyword, start_time, end_time, n=10):
        """
        Returns the top n tweets in the database that contain the given word.

        Args:
        - keyword (str): The word to search for.
        - n (int): Number of tweets to return.

        Returns:
        - list: List of the top n tweets, each represented as a dictionary.
        """
        st = time.time()
        
        if keyword in self.cache:
            print(f"Retrieving '{keyword}' from cache!")
            et = time.time()
            print(f"Query took {et - st:.4f} seconds\n")

            return self.cache[keyword]
        else:
            print(f"New entry, retrieving '{keyword}' from database!")
            
        start_date = datetime.strptime(start_time, '%a %b %d %H:%M:%S %z %Y')
        end_date = datetime.strptime(end_time, '%a %b %d %H:%M:%S %z %Y')

        pipeline = [
            {
                '$match': {
                    '$text': {
                        '$search': keyword
                    }
                }
            },
            {
                '$limit': n
            }
        ]
        
        if start_time and end_time:
            tweets = list(self.search_by_date_range(start_time, end_time))
        else:
            tweets = list(self.tweets_collection.aggregate(pipeline + self.pipeline))
        self.cache[keyword] = tweets
        self.cache.save_checkpoint()
        et = time.time()
        print(f"Query took {et - st:.4f} seconds\n")
        
        return tweets
    
    
    def search_by_hashtag(self, hashtag, n=10):
        """
        Returns the top n tweets in the database that contain the given hashtag.

        Args:
        - hashtag (str): The hashtag to search for.
        - n (int): Number of tweets to return.

        Returns:
        - list: List of the top n tweets, each represented as a dictionary.
        """
        
        start_time = time.time()
        
        if hashtag in self.cache:
            print(f"Retrieving '#{hashtag}' from cache!")
            end_time = time.time()
            print(f"Query took {end_time - start_time:.4f} seconds\n")
            return self.cache[hashtag]
        else:
            print(f"New entry, retrieving '#{hashtag}' from database!")

        pipeline = [
            {
                '$match': {
                    '$or': [
                        {'media.hashtags': {'$in': [hashtag]}},
                        {'retweet.media.hashtags': {'$in': [hashtag]}},
                        {'quote.media.hashtags': {'$in': [hashtag]}},
                    ]
                }
            },
            {
                '$project': {
                    '_id': 0,
                    'tweet_id': 1,
                    'user': 1,
                    'name': 1,
                    'date': 1,
                    'text': 1,
                    'retweet': {
                        '$cond': {
                            'if': { '$eq': ['$is_retweet', True] },
                            'then': '$retweet',
                            'else': None
                        }
                    },
                    'quote': {
                        '$cond': {
                            'if': { '$eq': ['$is_quote', True] },
                            'then': '$quote',
                            'else': None
                        }
                    },
                    'hashtags': '$media.hashtags',
                    'retweet_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$retweet', None] },
                                '$retweet_count',
                                '$retweet.retweet_count'
                            ]
                        }
                    },
                    'reply_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$quote', None] },
                                {
                                    '$cond': [
                                        { '$eq': ['$retweet', None] },
                                        '$reply_count',
                                        '$retweet.reply_count'
                                    ]
                                },
                                '$quote.reply_count'
                            ]
                        }
                    },
                    'favorite_count': {
                        '$sum': {
                            '$cond': [
                                { '$eq': ['$retweet', None] },
                                '$favorite_count',
                                '$retweet.favorite_count'
                            ]
                        }
                    },
                    'quote_count': {
                        '$max': {
                            '$cond': [
                                { '$eq': ['$quote', None] },
                                '$quote_count',
                                '$quote.quote_count'
                            ]
                        }
                    },
                },
            },
            {
                '$unwind': '$hashtags'
            },
            {
                '$addFields': {
                    'engagement': {
                        '$toInt': {
                            '$add': [
                                {'$multiply': ['$retweet_count', 0.2]},
                                {'$multiply': ['$favorite_count', 0.2]},
                                {'$multiply': ['$reply_count', 0.3]},
                                {'$multiply': ['$quote_count', 0.3]}
                            ]
                        }
                    },
                }
            },

            {
                '$sort': {
                    'engagement': pymongo.DESCENDING,
                    'date': pymongo.DESCENDING
                }
            },
            {
                '$limit': n
            }
        ]

        tweets = list(self.tweets_collection.aggregate(pipeline))
        self.cache['#'+hashtag] = tweets
        self.cache.save_checkpoint()
        end_time = time.time()
        print(f"Query took {end_time - start_time:.4f} seconds\n")
        
        return tweets
    

## Testing

In [None]:
# create a SearchEngine object with cache size of 50 and cache TTL of 30 seconds
search_engine = SearchEngine(cache_size=50, cache_ttl=30)

In [None]:
start_time='Sat Apr 25 14:19:11 +0000 2020'
end_time='Sat Apr 25 14:30:00 +0000 2020'

In [None]:
search_engine.most_popular_users()

In [None]:
search_engine.most_popular_users()

In [None]:
search_engine.most_engaging_tweets()

In [None]:
search_engine.cache.get_keys()

In [None]:
search_engine.most_popular_hashtags()

In [None]:
search_engine.most_popular_hashtags()

In [None]:
search_engine.search_by_username("Sözcü")

In [None]:
search_engine.search_by_username("Sözcü")

In [None]:
keyword = search_engine.search_by_keyword("Modiji", start_time, end_time)
keyword

In [None]:
search_engine.search_by_hashtag("corona")

In [None]:
search_engine.search_by_username("Sözcü")

In [None]:
search_engine.cache.get_items()