In [1]:
import pymongo
import pprint
from pprint import PrettyPrinter
import pandas as pd
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import os
import json
import time
from datetime import datetime
import re
import psycopg2
from bson import ObjectId

In [2]:
def handle_non_utf8(value):
    if isinstance(value, ObjectId):
        return str(value)
    elif isinstance(value, bool):
        return str(value)
    elif isinstance(value, (int, float)):  # Handling int and float types
        return str(value)
    elif isinstance(value, str):
        try:
            return value.encode('utf-8').decode('utf-8')
        except UnicodeDecodeError:
            return value.encode('utf-8', 'ignore').decode('utf-8')
    else:
        # For other types, you might choose to return them as they are or handle them differently
        return value

In [3]:
connection_string = "mongodb+srv://amrutha:root2000@cluster0.mrubvba.mongodb.net/"
client = MongoClient(connection_string)
db = client["twitter"]
c1 = db["trailtweets"]
c2 = db["trailretweets"]

In [4]:
# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname="postgres",
    user="postgres",
    password="Call@11pm",
    host="localhost",
    port="5432"
)
cur = conn.cursor()

In [5]:
def time_filter(data, stime, etime, fetch_time):
    if stime==None and etime==None:
        return {'data': data, 'fetch_time': fetch_time }
    
    if stime==None:
        res = []
        etime = int(etime)/1000
        for tweet in data:
            if tweet['created_at'] <= etime:
                res.append(tweet)
        return {'data': res, 'fetch_time': fetch_time }

    if etime==None:
        res = []
        stime = int(stime)/1000
        for tweet in data:
            if tweet['created_at'] >= stime:
                res.append(tweet)
        return {'data': res, 'fetch_time': fetch_time }
    
    res = []
    stime, etime = int(stime)/1000, int(etime)/1000
    for tweet in data:
        if stime <= tweet['created_at'] <= etime:
            res.append(tweet)
    return {'data': res, 'fetch_time': fetch_time }

### search_by_hashtag

In [6]:
def search_by_hashtag(hashtags, fromDate=None, toDate=None):
    try :
        start=time.time()
        hashtags_list = [re.escape(tag.lower()) for tag in hashtags.split('#') if tag.strip()]
        pipeline = [
            {
                "$match": {
                    "hashtags": {"$in": hashtags_list}
                }
            },
            {
                "$project": {
                    "_id": 1,
                    "retweet_count": 1,
                    "text": 1,
                    "created_at":1,
                    "user_id_str":1
                }
            },
            {
                "$sort": {"retweet_count": -1}
            }
        ]
        db_final = pd.DataFrame(list(c1.aggregate(pipeline)))
        
        # Fetch user information and add to DataFrame
        for index, row in db_final.iterrows():
            user_id = row['user_id_str']
            cur.execute(f"""SELECT screen_name FROM users1 WHERE id_str = '{user_id}'""")
            user_data = cur.fetchone()
            if user_data:
                user_info = list(user_data)
                #print("user_info: ",user_info)
                db_final.loc[index, 'screen_name'] = user_info[0]
      

        db_final_encoded = db_final.map(handle_non_utf8)
        output = json.loads(db_final_encoded.to_json(orient='records', date_format='iso'))

        end=time.time()
        time_taken = time.time() - start

        output1 = time_filter(output,fromDate,toDate,time_taken)
        return output1
    except Exception as e:
        print(f"Retrieval of Tweet from hashtags failed : {e}")

In [7]:

fromDate= "2020-01-01"
toDate="2020-12-31"
hashtags="#COVID2019#CORONA"
a=search_by_hashtag(hashtags)
a

{'data': [{'_id': '6625460bfe189a37000b8efa',
   'created_at': '2020-03-27 17:23:16',
   'text': 'आदरणीय प्रधानमंत्री जी ने आज रेडियो के साथियों से बात की,कैसे सब मिलकर #Corona से लड़ सकते हैं @RedFMIndia परिवार… https://t.co/CCGc6ZBCcE',
   'retweet_count': '5983',
   'user_id_str': '97367270'},
  {'_id': '6625444ffe189a37000b42c7',
   'created_at': '2020-04-11 17:38:05',
   'text': 'मेरे पति सचिन जी का जुर्म बस इतना था कि वो #Corona महामारी के कारण #Lockdown21 में फंसे लोगों को राशन उपलब्ध करा र… https://t.co/G68yDo85vG',
   'retweet_count': '4315',
   'user_id_str': '1082896194731495424'},
  {'_id': '662544f6fe189a37000b5f8d',
   'created_at': '2020-03-27 10:05:11',
   'text': '#Corona தொற்றால் ஏழை எளிய மக்கள் எதிர்கொண்டுள்ள மிகப்பெரிய இன்னல்களிலிருந்து அவர்களை விடுவிக்கவும், தடுப்பு நடவடிக்… https://t.co/HYjyfcIGgH',
   'retweet_count': '1949',
   'user_id_str': '844890761409777664'},
  {'_id': '66254489fe189a37000b4cda',
   'created_at': '2020-04-04 13:45:38',
   'text': 'Those mu

### search_by_user

In [8]:
def search_by_user(user_id=None,fromDate=None,toDate=None):

    try:
        if user_id:
            start=time.time()
            pipeline = [
                {
                    "$match": {
                        "user_id_str":user_id
                    }
                },
                {
                    "$project": {
                        "_id":0,
                        "id_str": 1,
                        "retweet_count": 1,
                        "text": 1,
                        'created_at':1
                    }
                },
            ]
            db_final1 = pd.DataFrame(list(c1.aggregate(pipeline)))
            db_final2 = pd.DataFrame(list(c2.aggregate(pipeline)))
            merged_df = pd.concat([db_final1, db_final2], ignore_index=True)
            cur.execute(f"""SELECT screen_name FROM user_data WHERE id_str = '{user_id}'""")
            user_data = cur.fetchone()
            user_info = list(user_data)
            merged_df['screen_name']=user_info[0]
            #print(merged_df)
            db_final_encoded = merged_df.map(handle_non_utf8)
            # Convert the DataFrame to JSON
            output = json.loads(db_final_encoded.to_json(orient='records', date_format='iso'))

            end=time.time()
            time_taken = time.time() - start

            output1 = time_filter(output,fromDate,toDate,time_taken)
            return output1

    except Exception as e:
        print(f"Retrieval of Tweet from hashtags failed : {e}")

In [9]:
user_id="1131227186"
a=search_by_user(user_id)
a

{'data': [{'created_at': '2020-03-10 17:52:14',
   'id_str': '1237436114887041024',
   'text': 'THIS MAN IS A GENIUS he figured out the Corona virus problem 😮 https://t.co/EZP7IqTtxV',
   'retweet_count': '179479',
   'screen_name': '_AyeeCarlos_'}],
 'fetch_time': 0.057047128677368164}

### search_by_tweet

In [10]:
def search_by_tweet(tweet_str=None, fromDate=None, toDate=None):
    try:
        if tweet_str:
            start= time.time()
            pipeline = [
                {
                    "$match": {
                        "$text": {"$search": tweet_str}
                    }
                },
                {
                    "$project": {
                        "id_str":1,
                        "relevance_score": {"$meta": "textScore"}, 
                        "text": 1, 
                        "retweet_count":1,
                        "user_id_str": 1,
                        "created_at":1,
                        "favorite_count":1
                    }
                }
            ]
            db_final = pd.DataFrame(list(c1.aggregate(pipeline,allowDiskUse=True)))
            
            # Fetch user information and add to DataFrame
            for index, row in db_final.iterrows():
                user_id = row['user_id_str']
                cur.execute(f"""SELECT screen_name FROM users1 WHERE id_str = '{user_id}'""")
                user_data = cur.fetchone()
                if user_data:
                    user_info = list(user_data)
                    #print("user_info: ",user_info)
                    db_final.loc[index, 'screen_name'] = user_info[0]
            # Drop unnecessary columns
            db_final = db_final.drop(['_id'], axis=1).sort_values(by='relevance_score',ascending=False)
            
             # Apply the function to the DataFrame
            db_final_encoded = db_final.map(handle_non_utf8)
            # Convert the DataFrame to JSON
            output = json.loads(db_final_encoded.to_json(orient='records', date_format='iso'))
            
            end=time.time()
            time_taken = time.time() - start
            output1 = time_filter(output,fromDate,toDate,time_taken)
            return output1

    except Exception as e:
        print(f"Retrieval of Tweet from hashtags failed : {e}")

In [11]:
fromDate= "2020-01-01"
toDate="2020-12-31"
tweet_str="corona"
a=search_by_tweet(tweet_str)
a

{'data': [{'created_at': '2020-04-12 18:32:19',
   'id_str': '1249405001346187264',
   'text': '@simplicio10 CORONA CORONA CORONA CORONA',
   'retweet_count': '0',
   'favorite_count': '0',
   'user_id_str': '457944387',
   'relevance_score': '1.6875'},
  {'created_at': '2020-04-12 18:39:30',
   'id_str': '1249406807652655108',
   'text': 'corona virus corona virus corona-So I misspelled corona and many other words.  I haven\'t misspelled this, "Halo cor… https://t.co/461k6jeyfY',
   'retweet_count': '0',
   'favorite_count': '0',
   'user_id_str': '752240689',
   'relevance_score': '1.171875'},
  {'created_at': '2020-04-12 14:20:14',
   'id_str': '1249341560514174976',
   'text': 'corona oh corona wkwkw',
   'retweet_count': '1',
   'favorite_count': '0',
   'user_id_str': '1248098315620610053',
   'relevance_score': '1.125'},
  {'created_at': '2020-04-12 18:37:54',
   'id_str': '1249406404013764608',
   'text': 'Corona',
   'retweet_count': '0',
   'favorite_count': '0',
   'user_id_

### top_10

In [12]:
def get_top_data(data_type):
    try:
        if data_type == "users":
            cur.execute(f"""SELECT name,id_str, screen_name, followers_count, friends_count, verified, description, location FROM user_data ORDER BY followers_count DESC LIMIT 10""")
            user_data = cur.fetchall()
            db_final = pd.DataFrame(user_data, columns=['name', 'id_str','screen_name', 'followers_count', 'friends_count', 'verified', 'description', 'location']) 
            output = json.loads(db_final.to_json(orient='records', date_format='iso'))
            return output

        elif data_type == "tweets":
            tweets = c1.find({}, {"text": 1, "user_id_str": 1, '_id': 0}).sort("retweet_count", -1).limit(10)
            db_final = pd.DataFrame(list(tweets))
            for index, row in db_final.iterrows():
                user_id = row['user_id_str'].strip()
                cur.execute(f"""SELECT screen_name FROM user_data WHERE id_str = '{user_id}'""")
                user_data = cur.fetchone()

                if user_data:
                    user_info = list(user_data)
                    db_final.loc[index, 'screen_name'] = user_info[0]

            #db_final.drop('user_id_str',axis=1,inplace=True)
                    
             # Apply the function to the DataFrame
            db_final_encoded = db_final.map(handle_non_utf8)

            # Convert the DataFrame to JSON
            output = json.loads(db_final_encoded.to_json(orient='records', date_format='iso'))
            return output

        else:
            print("Invalid data type")
            return None
    except Exception as e:
        print(f"Retrieval of data failed: {e}")

In [13]:
get_top_data('tweets')

[{'text': 'ALERT‼️‼️‼️\nThe corona virus can be spread through money. If you have any money at home, put on some gloves, put al… https://t.co/juJjDpFN3I',
  'user_id_str': '2863558530',
  'screen_name': 'nan'},
 {'text': 'THIS MAN IS A GENIUS he figured out the Corona virus problem 😮 https://t.co/EZP7IqTtxV',
  'user_id_str': '1131227186',
  'screen_name': '_AyeeCarlos_'},
 {'text': 'Watch this. It shows why we should all do the right thing and stay home to the fullest extent possible. All of us c… https://t.co/GOODRTNI2e',
  'user_id_str': '813286',
  'screen_name': 'BarackObama'},
 {'text': 'If I gave you 100 skittles and told you 3 of them could kill you.... I’m sure you would avoid the fucking skittles',
  'user_id_str': '29942414',
  'screen_name': 'nan'},
 {'text': 'It wasn’t no corona till y’all started balancing brooms in the house, y’all let the devil in',
  'user_id_str': '219582851',
  'screen_name': 'nan'},
 {'text': 'Corona virus....its coming',
  'user_id_str': '124257578

In [14]:
get_top_data('users')

[{'name': 'Barack Obama',
  'id_str': '813286',
  'screen_name': 'BarackObama',
  'followers_count': 115603427,
  'friends_count': 607612,
  'verified': True,
  'description': 'Dad, husband, President, citizen.',
  'location': 'Washington, DC'},
 {'name': 'Narendra Modi',
  'id_str': '18839785',
  'screen_name': 'narendramodi',
  'followers_count': 55406011,
  'friends_count': 2368,
  'verified': True,
  'description': 'Prime Minister of India',
  'location': 'India'},
 {'name': 'Virender Sehwag',
  'id_str': '92724677',
  'screen_name': 'virendersehwag',
  'followers_count': 20571543,
  'friends_count': 143,
  'verified': True,
  'description': 'Proud Indian | For commercial queries, call my manager @AmritanshuGupta on +91 9873690935 | For @SehwagSchool , call +91 9711188700',
  'location': 'India'},
 {'name': 'detikcom',
  'id_str': '69183155',
  'screen_name': 'detikcom',
  'followers_count': 15884915,
  'friends_count': 28,
  'verified': True,
  'description': 'Official Twitter of 

### search_retweets

In [15]:
def search_retweets(tweet_id):
    try:
        if tweet_id:
            pipeline = [
                {
                    "$match": {
                        "org_tweet_id": tweet_id
                    }
                },
                {
                    "$project": {
                        "id_str":1,
                        "user_id_str": 1,
                        "created_at":1,
                        "text":1
                    }
                }
            ]
            db_final = pd.DataFrame(list(c2.aggregate(pipeline)))
            print(db_final)
            # Fetch user information and add to DataFrame
            for index, row in db_final.iterrows():
                user_id = row['user_id_str']
                cur.execute(f"""SELECT screen_name FROM user_data WHERE id_str = '{user_id}'""")
                user_data = cur.fetchone()
                if user_data:
                    user_info = list(user_data)
                    db_final.loc[index, 'screen_name'] = user_info[0]
                    
            # Drop unnecessary columns
            db_final = db_final.drop(['_id', 'user_id_str'], axis=1)
            db_final_encoded = db_final.map(handle_non_utf8)
            # Convert the DataFrame to JSON
            output = json.loads(db_final_encoded.to_json(orient='records', date_format='iso'))
            return output

    except Exception as e:
        print(f"Retrieval of Tweet from hashtags failed : {e}")

In [16]:
tweet_id="1239696517008482300"
df=search_retweets(tweet_id)
df

Empty DataFrame
Columns: []
Index: []
Retrieval of Tweet from hashtags failed : "['_id', 'user_id_str'] not found in axis"


### search_by_username

In [17]:
def search_by_user_name(user_name=None):
    try:
        cur.execute(f"""SELECT id_str,name,screen_name,verified,followers_count,friends_count FROM user_data WHERE (name LIKE '%{user_name}%') OR (screen_name LIKE '%{user_name}%')""")
        user_data=cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        df = pd.DataFrame(user_data, columns=columns)
        db_final_encoded = df.map(handle_non_utf8)
            
            # Convert the DataFrame to JSON
        output = json.loads(db_final_encoded.to_json(orient='records', date_format='iso'))
        return output
    except Exception as e:
        print(f"Retrieval of Tweet from username failed : {e}")

In [18]:
search_by_user_name('Ben')

[{'id_str': '579373345',
  'name': 'Jacob Bennett',
  'screen_name': 'Red4Jacob',
  'verified': 'False',
  'followers_count': '142',
  'friends_count': '333'},
 {'id_str': '502796617',
  'name': 'Bengt Höjer',
  'screen_name': 'BengtHojer',
  'verified': 'False',
  'followers_count': '8061',
  'friends_count': '5472'},
 {'id_str': '2925375086',
  'name': 'Benin',
  'screen_name': 'Benin_offl',
  'verified': 'False',
  'followers_count': '1166',
  'friends_count': '346'},
 {'id_str': '3106609956',
  'name': 'News18Bangla',
  'screen_name': 'News18Bengali',
  'verified': 'True',
  'followers_count': '24666',
  'friends_count': '298'},
 {'id_str': '1210135033752756224',
  'name': 'Hasan Muhammad',
  'screen_name': 'HasanBenMuhammd',
  'verified': 'False',
  'followers_count': '590',
  'friends_count': '1161'},
 {'id_str': '1072867156046503936',
  'name': 'TNIE Karnataka',
  'screen_name': 'XpressBengaluru',
  'verified': 'True',
  'followers_count': '7536',
  'friends_count': '85'},
 {'id