# Analysing Twitter Data on Insomnia

This is my third-year Computer Science project at the University of Manchester. 

This notebook contains the code to fetch tweets about insomnia using the **Twitter API** (tweepy library is ued which does that in Python) by the chosen keywords. Tweets are appended to the existing file (or created a new one if one does not exist) in both csv and json formats. You can update the names of files in the constants section if preferred.

The code was developed using the **Google Colab** platform.

Essential things to have to run this notebook: 
1. Set the **BASE_PATH** which is the project directory to access the datasets and other files.
2. Obtain the **Bearer Token** to use Twitter API and add it to the .env file in the same directory.

Main resources used to build this code:
1. [Tweepy docs](https://docs.tweepy.org/en/stable/) 
2. [Twitter API](https://developer.twitter.com/en/docs/twitter-api) 

© 2023 Lukas Rimkus 

# Connect to the Google Drive

Firstly, connect to the Google Drive to be able to access files from there to read and store tweets. 

If other platform is used to run the notebook code, then comment this out. 

In [21]:
from google.colab import drive, files 

colab_path = '/content/drive'
drive.mount(colab_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Install and Import Required Libraries for Tweets Collection

In [22]:
#@title Install Libraries
!pip install python-dotenv
!pip install git+https://github.com/tweepy/tweepy.git  # install most recent version

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/tweepy/tweepy.git
  Cloning https://github.com/tweepy/tweepy.git to /tmp/pip-req-build-cdiz2nu9
  Running command git clone --filter=blob:none --quiet https://github.com/tweepy/tweepy.git /tmp/pip-req-build-cdiz2nu9
  Resolved https://github.com/tweepy/tweepy.git to commit 6fde20d61fd21b06408dedc38f542987cf91c1bc
  Preparing metadata (setup.py) ... [?25l[?25hdone


## Import Essential Libraries

In [23]:
import os
import re
import json
import requests

from dotenv import load_dotenv
import tweepy

import pandas as pd
import numpy as np

# Define Constants and Configurations  

**Change BASE_PATH to your own location on Google Drive**.

In [24]:
BASE_PATH = "/content/drive/MyDrive/Third Year Project"

# Load environment variables where API keys are stored
env_path = f"{BASE_PATH}/.env"
load_dotenv(env_path)

NUMBER_OF_MAX_REQUESTS = 1000  # in every 15 minutes there can be a maximum of 450 requests (but wait on rate limit is turned on)
NUMBER_OF_TWEETS_IN_REQUEST = 100  # between 10 - 100 (default is 10)

# Two or more words within "" mean that they should occur together in the tweet, e.g. if keyword is "sleep pill", then 
# a tweet containing "pill sleep" will not be matched
keywords = '(insomnia OR "sleep deprivation" OR "sleep problem" OR "sleeping problem" OR cantsleep OR "sleep pill" OR "sleeping pill" OR "sleep issue" OR "can’t sleep" OR melatonin OR ambien OR zolpidem OR trazadone OR teamnosleep OR sleepless OR sleepdeprived)'

csv_file_name = "data.csv"
json_last_id_file_name = "start_id.json"
json_file_name = "data.json"
sample_file_name = "sample.json"

csv_path = f"{BASE_PATH}/{csv_file_name}"  # consists of all collected tweets in csv format
json_last_id_path = f"{BASE_PATH}/{json_last_id_file_name}"  # path to the id of the last saved tweet to know from which tweet to collect subsequent tweets
json_file_path = f"{BASE_PATH}/{json_file_name}"  # consists of all collected tweets in json format
sample_file_path = f"{BASE_PATH}/{sample_file_name}"  # consists of a random sample from the dataset

sample_size = 200 

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', 1500)

# Data Collection Code 

## Data Collector Definition

This is the class used to define methods and workflow to obtain the data for required keywords and from a given timestamp. 

In [25]:
class DataCollector:
    """
    This class introduces methods which are used in collecting data from Twitter using 
    its API. 
    """

    def __init__(self, keywords: str, json_last_id_path: str, csv_path: str, json_file_path: str) -> None:
        """
        Constructor to set required objects and variables like paths for data. 
        """
        self.query = self.contruct_query(keywords)  # Construct a query to be made to Twitter to collect data
        self.json_last_id_path = json_last_id_path
        self.csv_path = csv_path
        self.json_file_path = json_file_path
        self.json_last_id_file_exists = os.path.exists(self.json_last_id_path)
        self.csv_file_exists = os.path.exists(self.csv_path)
        self.json_file_exists = os.path.exists(self.json_file_path)

        # Create the tweepy Client object to make requests and set wait on rate limit to True 
        # to be able to fetch all tweets even though it may take longer
        self.client = tweepy.Client(os.environ.get("BEARER_TOKEN"), wait_on_rate_limit=True)   
        self.most_recent_tweet_id = 0

    def contruct_query(self, keywords: str) -> str:
        """
        Define a query to be made to the API.
        Fetch only English tweets and ignore retweets
        """
        query = f"{keywords} lang:en -is:retweet"
        return query 
    
    def update_keywords(self, keywords: str) -> None:
        """
        Set new keywords and reconstruct a query. 
        """
        self.query = self.contruct_query(keywords)

    def collect_data_without_saving(self, limit: int, max_results: int) -> pd.DataFrame:
        """
        Collect data with given parameters and return the dataframe containing fetched data without storing it anywhere locally.  
        """
        tweets, includes = self.fetch_data_from_twitter(limit=limit, max_results=max_results)
        tweets_df = self.construct_tweets_dataframe(tweets, includes)
        return tweets_df

    def collect_data(self, limit: int=400, max_results: int=100) -> pd.DataFrame:
        """
        Collect data with given parameters and return the dataframe containing fetched data.

        Firstly, the last tweet id is obtained to know from which tweet we should continue asking for data.
        Then the data is fetched from the API.
        After that, the data is cleaned and combined to make a dataframe. 
        Finally, the data is stored locally at a chosen location.   
        """
        start_id = self.get_start_id()
        tweets, includes = self.fetch_data_from_twitter(limit=limit, max_results=max_results, start_id=start_id)
        tweets_df = self.construct_tweets_dataframe(tweets, includes)
        self.save_data(tweets_df)

        return tweets_df

    def get_start_id(self) -> int:
        """
        Return the latest collected tweet id. 
        """
        # If a CSV does not exist either then it means we should start collecting
        # data from the oldest tweet as possible which is one week from now due to Twitter restrictions. 
        if not self.json_last_id_file_exists:
            return None
        
        with open(self.json_last_id_path, "r") as file:
            data = json.load(file)
        
        start_id = data["start_id"]
        return start_id

    def construct_paginator(self, limit: int=400, max_results: int=100, start_id: int=None) -> tweepy.Paginator:
        """
        This method constructs Tweepy object Paginator which is responsible for making requests
        to the API to fetch data according to the give parameters. 
        It is iterated through with pages to make requests as a max of 100 tweets can be fetched per one request.  
        """
        # These are required to fetch data e.g. author id, text, time, publish time and location
        tweet_fields = ["author_id", "geo", "id", "created_at", "text"]
        place_fields = ["full_name", "geo", "id", "country", "country_code"]
        user_fields = ["name", "username", "id", "location"]
        expansions = ["geo.place_id", "author_id"]
    
        paginator = tweepy.Paginator(
            self.client.search_recent_tweets,
            self.query,
            expansions=expansions,
            place_fields=place_fields,
            tweet_fields=tweet_fields,
            user_fields=user_fields,
            max_results=max_results,
            since_id=start_id,
            limit=limit)

        return paginator

    def fetch_data_from_twitter(self, limit: int=400, max_results: int=100, start_id: int=None) -> tuple:
        """
        This method iterated through a Paginator object and collects fetched data which is returned. 
        Includes dataframe consists of extra information like user locations. 
        """
        
        tweets = list()
        includes = list()

        try:
            paginator = self.construct_paginator(limit=limit, max_results=max_results, start_id=start_id)

            for response in paginator:
                tweets.extend(response.data)
                includes.extend(response.includes["users"])
                errors = response.errors

                if errors:
                    print("BAD... DO SOMETHING!")
        except Exception as e:
            print(f"ERROR! Message: {e}")
        
        return tweets, includes

    def construct_tweets_dataframe(self, tweets: list, includes: pd.DataFrame) -> pd.DataFrame:
        """
        This method constructs a dataframe of collected cleaned tweets from tweets and includes dataframes. 
        """
        if not tweets:
            print("None tweets were found! Try again later!")
            return 

        tweets_df = pd.DataFrame(data=tweets, columns=['author_id', 'id', 'created_at', 'text'])
        includes_df = pd.DataFrame(data=includes)

        # Handle cases when location were not found in any tweet
        if 'location' in includes_df.columns:
            # Save only the columns of interest
            includes_df = includes_df[['id', 'location']]
        else:
            includes_df = includes_df[['id']]
            includes_df["location"] = np.nan

        includes_df.rename(columns={"id": "author_id"}, inplace=True)

        # Includes dataframe may contain less rows than tweets dataframe, thus it is 
        # important to merge them on the author_id to correctly link the data
        tweets_df = pd.merge(tweets_df, includes_df, on="author_id")

        # Remove the id of the user to comply with the ethics application
        tweets_df.drop(['author_id'], axis=1, inplace=True)

        # Sort the tweets dataframe as they start from the most recent one, as I need to start from the oldest one,
        # this is why the first tweet is skipped, as it already exists in the file. 
        tweets_df.sort_values(by=["id"], inplace = True)
        
        # Take the oldest tweet id after sorting the dataset 
        self.most_recent_tweet_id = int(tweets_df["id"].iloc[-1])

        tweets_df.drop(['id'], axis=1, inplace=True)

        # Rename and reorder columns
        tweets_df.rename(columns={"created_at": "Publish Date", "text": "Tweet", "location": "Location"}, inplace=True)
        tweets_df['Location'] = tweets_df['Location'].fillna("")
        tweets_df.insert(1, 'Location', tweets_df.pop("Location"))

        # Preprocess tweets text slightly    
        tweets_df['Tweet'] = tweets_df['Tweet'].apply(lambda tweet: self.simple_preprocessing(tweet))
        
        # Remove tweets which have the same Tweet text which implies that it can 
        # be spam by one or more users
        tweets_df.drop_duplicates(subset=['Tweet'], inplace=True)
        
        return tweets_df

    def simple_preprocessing(self, text: str) -> str:
        """
        This methods does slight preprocessing of tweets before storing them. 

        Hyperlinks can be regarded as noise, thus they are removed. However, their 
        position within a tweet can carry some semantic information which is necessary for 
        transformer models, so they are replaced by "url".
        Also, mentions of other users are removed due to ethics concerns. 
        What is more, carriages '\r' are removed as they produced some problems when data 
        was saved in a .csv files. 
        """
        text = re.sub(r'http[s]?://\S+', 'url', text)
        text = re.sub(r'@\S+', '', text)
        text = text.replace('\r', ' ')

        return text

    def save_data(self, tweets_df: pd.DataFrame) -> None:
        """
        This method stores collected and preprocessed tweets in provided locations. 
        """
        # If the file does not yet exist, then it is just created.
        # Otherwise, an existing json file is read, then concatenated with collected ones 
        # and finally saved to the same location.
        if not self.json_file_exists:
            tweets_df.to_json(self.json_file_path, orient="records", indent=4)
        else:
            json_tweets_df = pd.read_json(self.json_file_path, orient="records")
            
            merged_json_tweets_df = pd.concat([json_tweets_df, tweets_df])
            merged_json_tweets_df.to_json(self.json_file_path, orient="records", indent=4)

        if not self.csv_file_exists:
            # create a new file if it does not exist
            tweets_df.to_csv(self.csv_path, index = False, encoding='utf-8')
        else:
            # Append collected tweets to a created csv file
            tweets_df.to_csv(self.csv_path, mode='a', index=False, header=False, encoding='utf-8')

        # Take the latest tweet id
        start_id_dict = {"start_id": self.most_recent_tweet_id}

        # Save the last Tweet ID to a separate file. 
        with open(self.json_last_id_path, "w") as outfile:
            json.dump(start_id_dict, outfile)

        print(f"{len(tweets_df)} new tweets have been saved")

Define constants and parameters for the data collector object

In [26]:
data_collector = DataCollector(keywords, json_last_id_path, csv_path, json_file_path)
tweets_df = data_collector.collect_data(limit=NUMBER_OF_MAX_REQUESTS, max_results=NUMBER_OF_TWEETS_IN_REQUEST)

11 new tweets have been saved


In [27]:
tweets_df.head()

Unnamed: 0,Publish Date,Location,Tweet
10,2023-03-24 10:14:36+00:00,"Denver, CO",Another sleepless night 😣
9,2023-03-24 10:14:44+00:00,,"and when you can’t sleep at night, you hear my stolen lullabies"
8,2023-03-24 10:14:49+00:00,"Washington, USA","Can’t sleep, any pork chops looking to be put in their place? \n\nPaypig paysub findom findomme finsub fincuck walletrinse walletdrain"
7,2023-03-24 10:14:50+00:00,,i can’t sleep without my baby
6,2023-03-24 10:15:11+00:00,🏝️🇺🇸🇯🇲,I’m so stressed I can’t sleep!!


## Dataset Manual Correction

Thse functions inside help to fix the anomalies in the data, fix any issues if emerge, etc. 

In [28]:
def read_csv_dataset(csv_path: str) -> tuple:
    """
    This method reads a collected tweets csv file. 
    """
    file_exists = os.path.exists(csv_path)
    if not file_exists:
        print(f"There is no file at: {csv_path}")
        return False, None

    # Read the dataset
    parse_dates = ["Publish Date"]
    tweets_df = pd.read_csv(csv_path, encoding='utf-8', parse_dates=parse_dates, on_bad_lines="skip") 

    return True, tweets_df


def read_json_dataset(json_path: str) -> tuple:
    """
    This method reads a collected tweets json file. 
    """
    file_exists = os.path.exists(json_path)
    if not file_exists:
        print(f"There is no file at: {json_path}")
        return False, None

    # Read the dataset
    tweets_df = pd.read_json(json_path, orient="records")

    return True, tweets_df


def preprocess_datasets_manually(csv_path: str, json_path: str) -> None:
    """
    Clean tweets in both .csv and .json files as given in parameters. 
    """
    preprocess_csv_dataset_manually(csv_path)
    preprocess_json_dataset_manually(json_path)


def preprocess_csv_dataset_manually(csv_path: str) -> None:
    """
    This method does some preprocessing of the tweets .csv file form the given path. 
    Tweets are stored in the same location at the end.
    """
    success, tweets_df = read_csv_dataset(csv_path)
    if not success:
        return 

    number_of_original_tweets = len(tweets_df)
    print(f"Pre-processing: Current Tweet number in the CSV dataset: {number_of_original_tweets}")

    # Remove duplicate tweets
    tweets_df.drop_duplicates(subset=['Tweet'], inplace=True)
    
    # Drop tweets with non-existing values
    tweets_df.dropna(subset=['Tweet'], inplace=True)

    # Save the results
    tweets_df.to_csv(csv_path, index = False, encoding='utf-8')
    number_of_preprocessed_tweets = len(tweets_df)
    print(f"Pre-processing: Tweets number after some CSV pre-processing: {number_of_preprocessed_tweets}")


def preprocess_json_dataset_manually(json_path: str) -> None:
    """
    This method does some preprocessing of the tweets .json file form the given path. 
    Tweets are stored in the same location at the end.
    """
    success, tweets_df = read_json_dataset(json_path)
    if not success:
        return 

    number_of_original_tweets = len(tweets_df)
    print(f"Pre-processing: Current Tweet number in the Json dataset: {number_of_original_tweets}")

    # Remove duplicate tweets
    tweets_df.drop_duplicates(subset=['Tweet'], inplace=True)
    
    # Drop tweets with non-existing values
    tweets_df.dropna(subset=['Tweet'], inplace=True)

    # Save the results
    tweets_df.to_json(json_path, orient="records", indent=4)
    number_of_preprocessed_tweets = len(tweets_df)
    print(f"Pre-processing: Tweets number after some Json pre-processing: {number_of_preprocessed_tweets}")


def get_duplicate_tweets(tweets_df: pd.DataFrame) -> pd.DataFrame:
    """
    Return duplicate (the same text) tweets in the given dataframe.
    """
    return tweets_df[tweets_df.duplicated(['Tweet'], keep="first")]


def convert_to_json(csv_path: str) -> None:
    """
    Convert a .csv file to a .json one.
    This was needed as firstly I was storing the data in .csv format, but at the end
    I decided to switch to .json. 
    """

    success, tweets_df = read_csv_dataset(csv_path)
    if not success:
        print("ERROR!")
        return 

    json_file_path = f"{BASE_PATH}/data.json"
    tweets_df.to_json(json_file_path, orient="records", indent=4)


In [29]:
preprocess_datasets_manually(csv_path, json_file_path)

Pre-processing: Current Tweet number in the CSV dataset: 927469
Pre-processing: Tweets number after some CSV pre-processing: 927457
Pre-processing: Current Tweet number in the Json dataset: 927480
Pre-processing: Tweets number after some Json pre-processing: 927468


# Experimenting and Analysing Keywords

Removed each keyword which also had hashtag # at the start or contained other keywords as its subset like ambien and ambien-cr. 
Also, removed these keywords (mostly because they don't introduce many tweets and they usually contain much ads, other spam): "sleeping pill", lunesta, intermezzo, eszopiclone.
I found that some tweets about insomnia contained the keyword "sleepdeprived", thus I added this to the keywords list after revisions. 


In [30]:
initial_keywords = '(insomnia OR "sleep deprivation" OR "sleep problem" OR #insomnia OR #cantsleep OR "sleep pill" OR "sleep issue" OR "can’t sleep" OR melatonin OR ambien OR ambien-cr OR zolpidem OR lunesta OR intermezzo OR trazadone OR eszopiclone OR #teamnosleep OR sleepless)'

revised_keywords = '(insomnia OR "sleep deprivation" OR "sleep problem" OR "sleeping problem" OR cantsleep OR "sleep pill" OR "sleeping pill" OR "sleep issue" OR "can’t sleep" OR melatonin OR ambien OR zolpidem OR trazadone OR teamnosleep OR sleepless OR sleepdeprived)'

In [31]:
def test_specific_keywords(keywords: str, limit: int=2, max_results: int=100):
    """
    This method was used to analyse tweets which contained some chosen keyword. 
    I assessed with this if it is worth adding some keyword to the final list of keywords. 
    """

    # Search for all tweets with have the keyword in last week's tweets
    data_collector.update_keywords(keywords)
    tweets_df = data_collector.collect_data_without_saving(limit=limit, max_results=max_results)
    number_of_tweets = len(tweets_df)
    print(f"Number of tweets: {number_of_tweets}")

    return tweets_df


In [32]:
tweets_df = test_specific_keywords(keywords = '("sleepdeprived")')
tweets_df.head()

Number of tweets: 76


Unnamed: 0,Publish Date,Location,Tweet
76,2023-03-17 10:53:31+00:00,New Delhi,We have two moods: Sleep is for the weak. We need to sleep for a week.\n#WorldSleepDay #SleepGoals #SleepDeprived #SleepAwareness #Snooze #SleepMatters #SaffronTech url
75,2023-03-17 12:19:47+00:00,"South Holland, The Netherlands","New comic: ’𝓛𝓘𝓚𝓔 𝓐 𝓛𝓞𝓖’ 🌳💤\n\nIf you like this, please hit ‘Like’ 👉💙 🙏\n\n#webcomic #custardfist #webcomics #comic #comics #comicstrip #funny #sleep #sleeping #sleepwell #sleepdeprived #SleepDeprivation #sleepdeprivation #beautysleep #ripvanwinkle #goodsleep url"
74,2023-03-17 16:01:35+00:00,"Texas, US","Alphacare offers round-the-clock access to their doctors for your health needs. With 24/7 availability, you can consult with Alphacare physicians whenever you need.\n\n#sleepdeprivation #sleepdeprived #sleeptips #sleepbetter #healthysleep #digitalhealth #telemedicine #virtualcare url"
73,2023-03-17 17:41:29+00:00,,This sucks ass
72,2023-03-17 17:41:59+00:00,,I like this


## Generate a Random Sample of Tweets

Those tweets will be annotated for Sentiment Analysis.

In [33]:
def generate_sample(json_path: str, sample_size: int) -> pd.DataFrame:
    """
    This method is used to generate a random sample of tweets dataframe which are used
    for sentiment annotation.
    The resulting sample is saved in the given location.
    """
    success, tweets_df = read_json_dataset(json_path)
    if not success:
        return 
    
    sample_df = tweets_df.sample(n=sample_size)

    # Add a column with the default value for the sentiment which will
    # be changed during the annotation process
    sample_df["Sentiment"] = 0

    sample_df.to_json(sample_file_path, orient="records", indent=4)
    return sample_df


Uncomment the code below to create a random sample of given size.

In [34]:
# sample_df = generate_sample(json_path=json_file_path, sample_size=sample_size)
# sample_df.head(10)