In [25]:
import json
from datetime import datetime
import time
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

In [30]:
def connect_to_mongo_database():
    try:
        uri = "mongodb+srv://sk2953:SxVbw2sAorNNdbHJ@twitter.3lvckfi.mongodb.net/?retryWrites=true&w=majority&appName=Twitter"
        # Create a new client and connect to the server
        client = MongoClient(uri, server_api=ServerApi('1'))

        print("Connected to MongoDB database")
        print(f"Databases available: {client.list_database_names()}")
        return client
    except Exception as e:
        print(f"Error occurred while connecting to MongoDB: {e}")

In [27]:
def create_database(client, db_name):
    try:
        db = client[db_name]
        print(f"Created the database {db_name} successfully")
        return db
    except Exception as e:
        print(f"Error occurred while creating database in mongo: {e}")

In [28]:
def create_collection(db, collection_name):
    try:
        collection = db[collection_name]
        print(f"Created the collection {collection_name} successfully")
        return collection
    except Exception as e:
        print(f"Error occurred while creating collection inside mongo database: {e}")
    

In [31]:
client = connect_to_mongo_database()

Connected to MongoDB database
Databases available: ['sample_mflix', 'twitter-database', 'admin', 'local']


In [5]:
db = create_database(client, "twitter-database")

Created the database twitter-database successfully


In [8]:
collection = create_collection(db, "tweets")

Created the collection tweets successfully


In [9]:
class Tweet(object):
    
    def __init__(self, tweet, retweet_count=1, source_tweet_id=0):
        self.tweet_id = tweet['id_str']
        self.text =  tweet['text']
        self.hashtag = list(map(lambda x: x["text"], tweet['entities']['hashtags']))
        self.user_id = tweet['user']['id_str']
        self.user_name = tweet['user']['name']
        self.user_screen_name = tweet['user']['screen_name']
        self.likes_count = tweet['favorite_count']
        self.retweet_count = retweet_count
        self.source_tweet_id = source_tweet_id
        self.tweet_score = 0
        self.created_at = self.get_created_date(tweet['created_at'])
        
    @staticmethod
    def get_created_date(created_at):
        created_at_date = datetime.strptime(created_at, "%a %b %d %H:%M:%S %z %Y")
        created_at_date = created_at_date.strftime("%Y-%m-%d %H:%M:%S")
        return created_at_date
    
    def get_tweet(self):
        return vars(self)

In [10]:
def insert_tweet(collection, tweet):
    try:
        collection.insert_one(tweet)
    except Exception as e:
        print(f"Error occurred while inserting tweet: {e}")

In [11]:
def update_tweet(collection, tweet_id):
    try:
        collection.update_one({'tweet_id': tweet_id}, {"$inc": {'retweet_count': 1}})
    except Exception as e:
        print(f"Error updating tweet {tweet_id}: {e}")

In [12]:
def tweet_exists(collection, tweet_id):
    tweet = collection.find_one({"tweet_id": tweet_id})
    return True if tweet else False

In [13]:
def get_tweets_count(collection):
    return collection.count_documents({})

In [14]:
def load_tweet_data_to_database(collection, file_path):
    
    start_time = time.time()
    
    with open(file_path, "r") as read_file:
        for line in read_file:
            try:
                data = json.loads(line)

                if tweet_exists(collection, data['id_str']):
                    continue

                if data['text'].startswith('RT'):
                    if data.get('retweeted_status'):
                        source_tweet_id = data.get('retweeted_status').get('id_str')
                        if tweet_exists(collection, source_tweet_id):
                            update_tweet(collection, source_tweet_id)
                        else:
                            tweet = data.get('retweeted_status')
                            tweet_object = Tweet(tweet)
                            insert_tweet(collection, tweet_object.get_tweet())
                else:
                    source_tweet_id = 0

                retweet_object = Tweet(data, 0, source_tweet_id) 
              
                insert_tweet(collection, retweet_object.get_tweet())

            except:
                continue
        
    print(f"Successfully inserted {get_tweets_count(collection)} tweets in {time.time() - start_time} seconds")

In [16]:
start_time = time.time()
load_tweet_data_to_database(collection, "../data/corona-out-2")
print(f"Successfully loaded collection in {time.time() - start_time} seconds")

Successfully inserted 22144 tweets in 1342.8402450084686 seconds
Successfully loaded collection in 1342.8467650413513 seconds


In [17]:
start_time = time.time()
load_tweet_data_to_database(collection, "../data/corona-out-3")
print(f"Successfully loaded collection in {time.time() - start_time} seconds")

Successfully inserted 134127 tweets in 79376.43351173401 seconds
Successfully loaded collection in 79376.4600019455 seconds


In [20]:
client.list_database_names()

['sample_mflix', 'twitter-database', 'admin', 'local']

In [32]:
collection.count_documents({})

134127

In [33]:
# Update all documents
collection.update_many(
    {},  # Filter to select all documents
    [
        {'$set': {'tweet_score': {'$add': [{'$multiply': [0.6, '$retweet_count']}, {'$multiply': [0.4, '$likes_count']}]} } }
    ]
)


UpdateResult({'n': 134127, 'electionId': ObjectId('7fffffff0000000000000189'), 'opTime': {'ts': Timestamp(1754415302, 38), 't': 393}, 'nModified': 0, 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1754415302, 38), 'signature': {'hash': b'\xc5\xfa\t\xb2D\x9f\xc1\xb85(\x86\xb2$\x19\xa9\xc5\xe1\xfe\xb4\xff', 'keyId': 7494699759513370664}}, 'operationTime': Timestamp(1754415302, 38), 'updatedExisting': True}, acknowledged=True)

In [34]:
collection.create_index([("text", "text")])

'text_text'

In [35]:
collection.create_index([("user_screen_name", 1), ("tweet_score", -1)])

'user_screen_name_1_tweet_score_-1'