# Modules/APIs


Ntscraper: Twitter profile scraping.<br>
Pytrends: Google Trends data scraping<br>
TweetNLP: Topic classification<br>
Flask, Flask-ngrok: Connecting with frontend


In [1]:
!pip install ntscraper pytrends tweetnlp fastapi uvicorn nest_asyncio pyngrok flask flask-ngrok

Collecting ntscraper
  Downloading ntscraper-0.3.13-py3-none-any.whl (11 kB)
Collecting pytrends
  Downloading pytrends-4.9.2-py3-none-any.whl (15 kB)
Collecting tweetnlp
  Downloading tweetnlp-0.4.4.tar.gz (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.6/54.6 kB[0m [31m813.7 kB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastapi
  Downloading fastapi-0.110.1-py3-none-any.whl (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.9/91.9 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting uvicorn
  Downloading uvicorn-0.29.0-py3-none-any.whl (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.8/60.8 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Collecting pyngrok
  Downloading pyngrok-7.1.6-py3-none-any.whl (22 kB)
Collecting flask-ngrok
  Downloading flask_ngrok-0.0.25-py3-none-any.whl (3.1 kB)
Collecting ray[tune] (from tweetnlp)
  Downloading ray-2.

# Twitter Profile Scraping Using ntscraper + Sentiment Analysis


Setting up ntscraper


In [2]:
import pandas as pd
import time
from ntscraper import Nitter

In [3]:
#creating nitter object
scraper = Nitter()

Testing instances: 100%|██████████| 77/77 [01:17<00:00,  1.00s/it]


Setting up Sentiment Analysis tool : RoBERTa

In [4]:
#Imports
import requests
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
import numpy as np
from scipy.special import softmax

In [5]:
# Preprocess text (username and link placeholders)
def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)


#loading the model
MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
config = AutoConfig.from_pretrained(MODEL)
# PT
model = AutoModelForSequenceClassification.from_pretrained(MODEL)

Downloading config.json:   0%|          | 0.00/929 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/878k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/478M [00:00<?, ?B/s]

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [36]:
#SENTIMENT ANALYSIS FUNCTION

#Gives Sentiment Score to an individual tweet

def give_sentiment_score(tweet_text):
  text = tweet_text
  text = preprocess(text)
  encoded_input = tokenizer(text, return_tensors='pt')
  output = model(**encoded_input)
  scores = output[0][0].detach().numpy()
  scores = softmax(scores)

  ranking = np.argsort(scores)
  ranking = ranking[::-1]



  label = config.id2label[ranking[0]]

  score = scores[ranking[0]]


  if label == 'positive':
    score = 5 + (5*score)
    return score
  elif label == 'negative':
    score = 5 - (5*score)
    return score
  elif label == 'neutral':
    score = 5
    return score

Setting Topic Classification tool

In [7]:
#imports
import tweetnlp

In [8]:
#Loading the Model
topic_classification_model = tweetnlp.TopicClassification()

Downloading config.json:   0%|          | 0.00/1.96k [00:00<?, ?B/s]

Downloading tokenizer_config.json:   0%|          | 0.00/354 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/780k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/1.29M [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/476M [00:00<?, ?B/s]

In [9]:
#TOPIC CLASSIFICATION FUNCTION

#Classifies/predicts the topic of an individual tweet

def give_topic_classification(tweet_text):

  try:
    topic = topic_classification_model.predict(tweet_text)
    if (topic['label']):
      return topic['label'][0]
    else:
      return ""
  except:
    print("An error occured while getting the topic.")

Combining Scraper + Sentiment + Topic Classification

In [10]:
def get_tweets(user, mode, no_of_tweets):
  try:
    final_tweets = []
    max_tries = 8
    tries = 0

    # while(tweets['tweets'] == [] and tries < max_tries):
    tweets = scraper.get_tweets(user , mode=mode , number=no_of_tweets)

    if(tweets['tweets'] == [] and tries < max_tries):
      while(tweets['tweets'] == [] and tries < max_tries):
        tweets = scraper.get_tweets(user , mode=mode , number=no_of_tweets)
        tries+=1
        time.sleep(5)

    for tweet in tweets['tweets']:
      sentiment = give_sentiment_score(tweet['text'])
      topic = give_topic_classification(tweet['text'])

      data = [tweet['link'], tweet['date'], tweet['text'] , sentiment , topic ,tweet['stats']['comments'] ,tweet['stats']['likes'] ,  tweet['stats']['retweets'] , tweet['stats']['quotes'] ]
      final_tweets.append(data)

    df = pd.DataFrame(final_tweets, columns=['Tweet Link', 'Date/Time' , 'Text', "Sentiment Score", "Tweet Topic" , 'Comments' ,'Likes' ,  'Retweets' , 'Quotes'])

    return df

  except Exception as e:
    return e

  except IndexError:
    return "You may have entered the wrong Username. Please recheck and try again",

  except:
    return "An error occured. Please try again in 30 seconds.",


In [37]:
get_tweets("Cristiano", 'user', 25)


Getting the Final Sentiment Score:

In [11]:
#returns mean of 'Sentiment Score' column in dataframe

def get_sentiment_score(df):
  sentiment_score = np.round(df['Sentiment Score'].mean() , 1)
  return sentiment_score

In [12]:
give_sentiment_score('AI will destroy the world')

{0: 'negative', 1: 'neutral', 2: 'positive'}
[0.77728945 0.18404053 0.03867   ]
negative
0.77728945


1.113552749156952

Getting the Topic Classification list

In [13]:
#returns 3 most frequent occuring topics of 'Sentiment Score' column in dataframe

def get_topic_classification(df):
  topic_counts = df['Tweet Topic'].value_counts()
  top3_topics = topic_counts.index[:4].tolist()
  if "" in top3_topics:
      top3_topics.remove("")
      return top3_topics

  return top3_topics[:3]




# Engagement Analysis

Setting Enagaement Score Function

In [14]:

#ENGAGEMENT SCORE FUNCTION

#Uses a simple mathematical model to calculate engagement score.
#Each feature has its own weightage in the equation to give a balanced score.

def get_engagement_score(df):

  try:

    features = {'Likes' : 1 , 'Comments' : 3, 'Retweets' : 3 , 'Quotes' : 4 }
    score = 0

    for feature, weight in features.items():

      feature_mean = df[feature].mean()
      feature_min = df[feature].min()
      feature_max = df[feature].max()

      feature_weight = weight

      mathematical_model = feature_weight * (  (( feature_mean - feature_min ) / (feature_max - feature_min) * 9) + 1 )

      score+= min(mathematical_model, 10)

    return np.round(score/4 , 1)

  except:
     return "An error occured while calculating the engagement score. Please try again later.",






# Popularity Trend Analysis

In [15]:
#Imports
from pytrends.request import TrendReq
import matplotlib.pyplot as plt

In [16]:
#Loading API tool
# pytrends = TrendReq(hl='en-US', tz=360)

Trend Score Function

In [17]:
#Takes slope of the average trend line to return a score

def get_trend_score(coefficients):
  slope_linear = coefficients[0]
  trend_score = slope_linear * 10
  return trend_score

Google Trends Plots Function

In [18]:
import matplotlib.pyplot as plt
import numpy as np
from pytrends.request import TrendReq
import base64
from PIL import Image
import io

In [35]:
def get_trend_score(search_text, timeframe):



  try:

    pytrends = TrendReq(hl='en-US', tz=180)
    profile_name = search_text[0]

    fig, ax = plt.subplots(figsize=(6, 3))
    print(pytrends)
    print(timeframe)

    pytrends.build_payload(search_text, cat=0, timeframe=str(timeframe), geo='', gprop='')

    chart_data = pytrends.interest_over_time()

    # Create a trendline using numpy.polyfit
    x_values = np.arange(len(chart_data))
    y_values = chart_data[f"{profile_name}"].values
    degree_linear = 1  # Linear trendline
    degree_polynomial = 2  # Moving Average Trendline

    coefficients = np.polyfit(x_values, y_values, degree_linear)
    coefficients2 = np.polyfit(x_values, y_values, degree_polynomial)

    trendline = np.poly1d(coefficients)
    trendline2 = np.poly1d(coefficients2)


    # Plotting the chart
    ax.plot(chart_data.index, chart_data[f"{profile_name}"], label=f"{profile_name} Data", marker='')
    ax.plot(chart_data.index, trendline(x_values), label='Trendline', linestyle='--', color='red')
    ax.plot(chart_data.index, trendline2(x_values), label='Trendline', linestyle='-.', color='green')


    # Getting the trend score
    trend_score = max(min(coefficients[0] * 10, 10), -10)

    ax.set_xlabel('Date')
    ax.set_ylabel(f"{profile_name}")
    ax.set_title(f"Timeframe: {timeframe}")

    #displaying the plot
    plt.tight_layout()
    plt.show()

    #saving the plot and encoding using base64
    figure = fig
    filename = f"trend_plot{timeframe}"
    figure.savefig(f'{filename}.png', format='png')

    # with open(f'{filename}.png', 'rb') as image_file:
    #   encoded_string = base64.b64encode(image_file.read()).decode('utf-8')

    image = Image.open(f'{filename}.png')
    image = image.convert("RGB")
    buffer = io.BytesIO()
    image.save(buffer, format="JPEG", quality=85)  # Adjust quality for further compression

    # Encode the compressed image to base64
    encoded_string = base64.b64encode(buffer.getvalue()).decode('utf-8')





    return {
            "trend_score": np.round(trend_score , 1),
            "encoded_plot": encoded_string
        }

    # return {
    #     "7_day_score" : score_list[0],
    #     "3_month_score" : score_list[1],
    #     "12_month_score": score_list[2],
    #     "5_year_score" : score_list[3],
    #     "figure": fig
    # }

  except Exception as e:
    print(e)
    print("An error occurred while loading the charts. Please try again later.")


In [20]:
def get_popularity_trend_score(search_text):


  trend_data = {}
  short_term_trend = None
  long_term_trend = None
  max_tries = 8
  tries = 0
  timeframe_list = ["today 3-m", "today 12-m"]

  while(short_term_trend == None and tries < max_tries):
    short_term_trend = get_trend_score(search_text, timeframe_list[0])
    time.sleep(5)
    tries+=1

  trend_data['short_term_trend'] = short_term_trend

  tries = 0
  while(long_term_trend == None and tries < max_tries):
    long_term_trend = get_trend_score(search_text, timeframe_list[1])
    time.sleep(5)
    tries+=1

  trend_data['long_term_trend'] = long_term_trend

  return trend_data



In [21]:
# timeframe_list = ["today 3-m", "today 12-m"]


# test = get_popularity_trend_score(["Cristiano Ronaldo"], timeframe_list )


In [22]:
# print(test)
# print(test['long_term_trend']['trend_score'])

# Profile Name and Details

In [23]:
test_username = "Cristiano"

Getting Profile Info

In [24]:
import time

def get_profile_info(username):
    try:
      max_tries = 8
      tries = 0
      profile_info = None
      while(profile_info == None and tries < max_tries):
        profile_info = scraper.get_profile_info(username)
        tries+=1
        time.sleep(5)

      if(profile_info == None):
        return "Failed to fetch profile info after max retries, recheck username or try again in some time"

      return profile_info

    except Exception as e:
      return "Failed to fetch profile info, recheck username or try again in some time"







Getting Profile Class

In [25]:
def get_profile_class(followers):
  if followers < 15000:
    return "Micro Influencer"

  elif followers>=15000 and followers<50000:
    return "Regular Influencer"

  elif followers>=50000 and followers<100000:
    return "Rising Influencer"

  elif followers>=100000 and followers<500000:
    return "Major Influencer"

  elif followers>=500000 and followers<1000000:
    return "Macro Influencer"

  else:
    return "Mega Influencer"

# Final App Function

In [26]:
import time

def run_app(username, mode, no_of_tweets):


  data = {}

  try:
    # Getting Profile Info and Class
    profile_info = get_profile_info(username)#profile info
    print("profile fetched!")
    profile_class = get_profile_class(profile_info['stats']['followers']) #profile class

    data['profile_info'] = profile_info
    data['profile_class'] = profile_class


    #Scraping the twitter profile and organizing into a dataframe

    if (data['profile_info']['stats']['tweets'] < no_of_tweets):
      no_of_tweets = data['profile_info']['stats']['tweets'] #ensuring that number of tweets to be fetched is not higher than total tweets of profile


    print(f"{username} , {no_of_tweets}")

    df = None
    tries = 0
    max_tries = 3

    while (df is None or df.shape[0] < no_of_tweets) and tries < max_tries:
      df = get_tweets(username, 'user', no_of_tweets)
      tries += 1

    print("Fetched tweets")

    #Getting the Sentiment Score and Engagement Score

    sentiment_score = get_sentiment_score(df)
    engagement_score = get_engagement_score(df)
    data['sentiment_score'] = sentiment_score
    data['engagement_score'] = engagement_score

    #getting topic classification:

    topics_of_discussion = get_topic_classification(df)
    data['topics_of_discussion'] = topics_of_discussion

    #Getting Popularity Trend Plots and Score
    search_term = [profile_info['name']]

    popularity_trend_score = get_popularity_trend_score(search_term)


    data['popularity_trend_score'] = popularity_trend_score
    return data


  except Exception as e:
    print(e)
    return print("encountered error fetching data. Please recheck username or try again in 30 seconds.")



In [27]:
# data = run_app(test_username , 'user', 20)
# print(data)

In [28]:
# print(data)

# print(data['profile_info']['stats']['tweets'])

# print(data['popularity_trend_score']['long_term_trend']['trend_score'])

{'profile_info': {'image': 'https://pbs.twimg.com/profile_images/1594446880498401282/o4L2z8Ay_400x400.jpg', 'name': 'Cristiano Ronaldo', 'username': '@Cristiano', 'id': '155659213', 'bio': 'Welcome to the official Twitter / X page of Cristiano Ronaldo.', 'location': '', 'website': 'https://www.cristianoronaldo.com/', 'joined': '7:09 PM - 14 Jun 2010', 'stats': {'tweets': 4034, 'following': 69, 'followers': 110784639, 'likes': 14, 'media': 0}},

'profile_class': 'Mega Influencer',

'sentiment_score': 8.91,

'engagement_score': 9.02,

'topics_of_discussion': ['diaries_&daily_life', 'sports', 'news&_social_concern'],

'popularity_trend_score': {'short_term_trend': {'trend_score': 0.25915879352064186, 'figure': <Figure size 600x300 with 1 Axes>}, 'long_term_trend': {'trend_score': 0.3014601601181821, 'figure': <Figure size 600x300 with 1 Axes>}}}

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
import socket
import os
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define a Pydantic model for the request body
class RunAppRequest(BaseModel):
    username: str
    mode: str
    no_of_tweets: int

class RunTrendsRequest(BaseModel):
    search_text: str


app = FastAPI()

# Add CORS middleware to allow requests from your React frontend
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000", "http://localhost:3001"],  # Specify the allowed origin for your frontend
    allow_credentials=True,
    allow_methods=["POST"],  # Restrict to only POST method
    allow_headers=["Content-Type"],
)

@app.post("/run_app")
async def api_run_app(request: RunAppRequest):
    try:
        # Placeholder for the run_app function
        message = {"message": f"Running app for {request.username} in {request.mode} mode with {request.no_of_tweets} tweets."}
        print(message)
        result = run_app(request.username, request.mode, request.no_of_tweets)
        return result

    except Exception as e:
        logging.error(f"Error in /run_app endpoint: {str(e)}")
        raise HTTPException(status_code=500, detail="Internal server error")


@app.post("/run_trends")
async def api_run_trends(request: RunTrendsRequest):
    try:
        result = get_popularity_trend_score(request.search_term)
        return result

    except Exception as e:
        logging.error(f"Error in /run_app endpoint: {str(e)}")
        raise HTTPException(status_code=500, detail="Internal server error")
        return e

# Function to check if a port is in use
def is_port_in_use(port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        return s.connect_ex(('localhost', port)) == 0

# Function to terminate the process using a specific port
def terminate_process_on_port(port):
    command = f"lsof -t -i:{port} | xargs kill -9"
    os.system(command)

# Run the app using uvicorn in a way that's compatible with Jupyter
if __name__ == "__main__":
    import nest_asyncio
    from threading import Thread

    nest_asyncio.apply()  # Apply the fix for the asyncio event loop

    PORT = 8000

    # Check if the port is in use and terminate the process if necessary
    if is_port_in_use(PORT):
        logging.warning(f"Port {PORT} is in use. Attempting to terminate the process using it.")
        terminate_process_on_port(PORT)

    def start_server():
        uvicorn.run(app, host="0.0.0.0", port=PORT)

    server_thread = Thread(target=start_server)
    server_thread.start()


In [None]:
from pyngrok import ngrok

# Replace 'your_authtoken' with the token you got from ngrok's website
ngrok.set_auth_token('2eGAe2syr7Zk1i6TS0TSZThJ5fi_4M9rkaRwZCvmEtLUNzz78')
ngrok.disconnect(8000)


public_url = ngrok.connect(8000)



# Now you can open a tunnel

print(public_url)


INFO:pyngrok.process:Updating authtoken for default "config_path" of "ngrok_path": /usr/local/lib/python3.10/dist-packages/pyngrok/bin/ngrok
INFO:pyngrok.ngrok:Opening tunnel named: http-8000-e30cb2ec-19af-46ec-910c-1d652e0b7fbb
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:32+0000 lvl=info msg="no configuration paths supplied"
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:32+0000 lvl=info msg="using configuration at default config path" path=/root/.config/ngrok/ngrok.yml
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:32+0000 lvl=info msg="open config file" path=/root/.config/ngrok/ngrok.yml err=nil
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:32+0000 lvl=info msg="starting web service" obj=web addr=127.0.0.1:4040 allow_hosts=[]
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:33+0000 lvl=info msg="client session established" obj=tunnels.session
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:33+0000 lvl=info msg="tunnel session started" obj=tunnels.session
INFO:pyngrok.process.ngrok:t=2024

NgrokTunnel: "https://138d-34-80-82-37.ngrok-free.app" -> "http://localhost:8000"


INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:33+0000 lvl=info msg="started tunnel" obj=tunnels name=http-8000-e30cb2ec-19af-46ec-910c-1d652e0b7fbb addr=http://localhost:8000 url=https://138d-34-80-82-37.ngrok-free.app
INFO:pyngrok.process.ngrok:t=2024-04-12T08:32:33+0000 lvl=info msg=end pg=/api/tunnels id=97afad3095740292 status=201 dur=184.646445ms
