# First protocol
_Code written and runs in python 3.11.0. Modify environment variables and queries as needed._  
_Please use venv_

## Protocol
· Start with keywords:

- Smartchain

- Nft

- Airdrop

- Crypto

- …etc.

1. Sample up to 10k tweets containing at least one term from 100 random hours from the past year (so 1M tweets)

2. Determine the most engaged (top) with users from this combined sample (100 or 1000)

3. Pull up to 1000 comments for each top user

4. Determine top users whose comments mention at least three users other than the top user

5. Expand top user sample if we don’t have at least 100 airdrop seeders

6. Time series chart plots:

7. Top user activity

8. Airdrop seeder activity

9. Negative reaction activity? (based on sentiment analysis of replies to airdrop messages)

10. External crypto value signals (from where?)



# Dependencies
Run the following commands in the terminal to install the required packages

$pip install requests  
  
$pip install pandas  
  
$pip install datetime  
  
$pip install python-dateutil
  

--------------------  

Crypto packages/api's:
https://medium.com/codex/10-best-resources-to-fetch-cryptocurrency-data-in-python-8400cf0d0136

https://www.alphavantage.co/documentation/


Financial tools/packages:
https://twitter.com/pyquantnews/status/1568029967052640256?t=EthvrNWmYhAFDVOhRoDxrQ&s=03

https://pmorissette.github.io/ffn/quick.html#data-retrieval


# Authentication step
In the code cell below replace bearer_token with your bearer token. Run the cell, then delete your bearer token.
This creates the token as an environment variable to be used under the name TOKEN. The token can then be removed so that others do not have access to your token when code is shared via GitHub. I will change this to dotenv and a .gitignore file later I just havent done that yet.

In [2]:
import os
os.environ['TOKEN'] = ''

In [38]:
import json
import pandas as pd
import csv
from typing import Optional
import time
import datetime
import random
import requests


"""This code creates functions to be used for authentication as well as creating endpoints."""
def auth():
    """Retrieves your bearer token."""
    return os.getenv('TOKEN')

def create_headers(bearer_token):
    """Creates headers for proper authentication from an API request."""
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers


def create_full_search_url(keyword: str, start_date: list[str], end_date: list[str], max_results: int = 100):
    """Creates queries and params for a full archive search url."""
    search_url: str = "https://api.twitter.com/2/tweets/search/all" 

    # change params to desired params
    query_params: dict = {'query': keyword,
                    'start_time': start_date,
                    'end_time': end_date,
                    'max_results': max_results,
                    'expansions': 'referenced_tweets.id.author_id',
                    'tweet.fields': 'id,author_id,conversation_id,created_at,in_reply_to_user_id,lang,public_metrics,referenced_tweets,source,text',
                    #'user.fields': 'id,name,public_metrics,username,verified',
                    #'place.fields': 'country',
                    'next_token': {}}
    return (search_url, query_params)

def search_tweet_author_id_url(id: str):
    """Creates queries to find the author id of a tweet's author."""
    search_url: str = f"https://api.twitter.com/2/tweets/{id}" 

    # change params to desired params
    query_params: dict = {'tweet.fields': 'author_id'}
    return (search_url, query_params)

def connect_to_endpoint(url, headers, params, next_token = None):
    """This takes a url from a url creation function and the params from the same function as well as an optional next token and returns a json object response from the endpoint."""
    params['next_token'] = next_token   # if a next token is found this will assign it to params 'next_token' key
    response = requests.request("GET", url, headers = headers, params = params)
    print("\nEndpoint Response Code: " + str(response.status_code)) # prints the enpoint response code for debugging help
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()


"""This code cell contains two functions (is_leap_year and random_date) which help generate a random one hour date range when random_date() is called"""
def is_leap_year(year: int):
    """Returns True if the given year in typical four digit year format (i.e. 2023) is a leap year, False otherwise."""
    if year % 4 == 0:
        if year % 100 == 0:
            if year % 400 == 0:
                return True
            else:
                return False
        else:
            return True
    else:
        return False

def sort_timestamps(timestamps: list):
    """This function takes the list of start and end times in rfc 3339 format, returned from the return_n_random_hour_ranges_sorted function, and sorts the lists in chronological order."""
    # Convert timestamps to datetime objects
    datetimes = [datetime.datetime.fromisoformat(ts) for ts in timestamps]
    # Sort datetime objects
    datetimes.sort()
    # Convert sorted datetime objects back to timestamps
    sorted_timestamps = [dt.strftime("%Y-%m-%dT%H:%M:%SZ") for dt in datetimes]
    return sorted_timestamps

def random_date():
    """Generate a random one hour date range within the last year in RFC 3339 format to be used with twitter API."""
    month = random.randint(1, 12)
    year = random.randint(datetime.datetime.now().year - 1, datetime.datetime.now().year)
    if month <= datetime.datetime.now().month:
        year = datetime.datetime.now().year
    else:
        year = datetime.datetime.now().year - 1
    if month == datetime.datetime.now().month:
        if datetime.datetime.now().day <= 2:
            day = 1
        else:
            day = random.randint(1, datetime.datetime.now().day - 1)
    elif month == 2:
        if is_leap_year(year):
            day = random.randint(1, 29)
        else:
            day = random.randint(1, 28)
    elif month in [1, 3, 5, 7, 8, 10, 12]:
        day = random.randint(1, 31)
    else:
        day = random.randint(1, 30)
    hour = random.randint(0, 23)
    start_time = datetime.datetime(year, month, day, hour)
    end_time = start_time + datetime.timedelta(hours=1)
    start_timestamp = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    end_timestamp = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
    return start_timestamp, end_timestamp

def return_n_random_hour_ranges_sorted(n: int) -> list:
    """This returns two SORTED lists of start times and end times (where each index of the end time is one hour after the same index in start time)."""
    """This function returns n number of one hour ranges."""
    start_time1_list: list = list()
    end_time1_list: list = list()
    for i in range(0, n):
        s1, s2 = random_date()
        while s1 in start_time1_list:
            s1, s2 = random_date()
        start_time1_list.append(s1)
        end_time1_list.append(s2)

    sorted_start = sort_timestamps(start_time1_list)
    sorted_end = sort_timestamps(end_time1_list)
    return (sorted_start, sorted_end)


def tweets_per_range(keyword: str, start_times_list: list, end_times_list: list, results_per_range: int, next_token: Optional[str] = None) -> None:
    """This function takes a keyword(s), a list of start times and end times, and a integer amount of results per range."""
    """It then creates json files for each time range and stores the tweet results in those json files."""
    
    # AUTHENTICATION
    bearer_token = auth()
    headers: dict[str, str] = create_headers(bearer_token)
    # AUTHENTICATION

    json_obj_by_time_range: dict[str, dict] = dict() # creates a dictionary to which the time range will be created as a key and can therefore be found while in the json file
    max_results: int = 500 # this is the max results per request the twitter API allows and should be left at 500
    total_tweets_for_func_call: int = 0

    # Loops through the time ranges in a list
    for i in range(0, len(start_times_list)):
        json_obj_by_time_range: dict[str, dict] = dict()
        print("top of FOR loop")
        total_count = 0 # Tracks
        
        # Creates url and connects to endpoint then assignts the json object API response to json_obj_response
        url = create_full_search_url(keyword, start_times_list[i], end_times_list[i], max_results)
        json_obj_response = connect_to_endpoint(url[0], headers, url[1], next_token) # prints response code
        print(f"Outer for loop enpoint called for list index {i} / {len(start_times_list) - 1}") # for quality control
        json_obj_response.pop('includes', None) # removes 'includes' key which is a negative externality of calling 'referenced_tweets.id.author_id' expansion
        json_obj_response['time'] = [start_times_list[i], end_times_list[i]] # adds a 'time' key to the json_obj_response so that the time range of all tweets can be found by calling json_object_response['time']

        # Appends the json object API response to the json_obj_by_time_range dictionary
        json_obj_by_time_range[f'time_range_{i}'] = json_obj_response
        total_count += json_obj_response['meta']['result_count'] # increases the total count counter by the results count in the first API 'GET'
        print(f"endpoint called and data collected: {total_count} / {results_per_range} tweets in this range scraped")
        time.sleep(5) # time.sleep implementations are seen throughout the code to avoid hitting rate limits of twitter API
        
        # If there were insifficient results from first 'GET' request to meet the results per range value then the API begins to paginate to scrape more resutls
        while total_count <= results_per_range: 
            # Checks for next token in json response
            print("top of WHILE loop")
            if 'next_token' in json_obj_response['meta']: 
                
                next_token: str = json_obj_response['meta']['next_token'] # assigns 'next_token' to next_token: str object for easy use

                # Creates url and connects to endpoint then assignts the JSON API response to json_obj_esponse
                json_obj_response = connect_to_endpoint(url[0], headers, url[1], next_token) # prints response code
                print(f"While loop enpoint called: index {i} / {len(start_times_list) - 1}") # for quality control
                
                next_token = None # ensures next token does not get passed into another function call 

                if 'data' in json_obj_response:
                    # Loops through dictionaries in json_obj_response and appends them to the main json file
                    # This is done because while theoretically the entire item count be appended at once, certain python vectorizing methods might cause disagreeable types. Looping through items avoids this happening
                    for item in json_obj_response['data']:
                        json_obj_by_time_range[f'time_range_{i}']['data'].append(item)
                    
                    total_count += json_obj_response['meta']['result_count'] # increments the result count to match the total results currently aquired
                    json_obj_by_time_range[f'time_range_{i}']['meta']['result_count'] = total_count # changes the result count 'key' to meet the result count of all data

                    if json_obj_by_time_range[f'time_range_{i}']['meta']['result_count'] > results_per_range:
                        del json_obj_by_time_range[f'time_range_{i}']['data'][results_per_range:]
                        json_obj_by_time_range[f'time_range_{i}']['meta']['result_count'] = len(json_obj_by_time_range[f'time_range_{i}']['data'])

                    print(f"data key found and data appended: {total_count} / {results_per_range} tweets in this range scraped") # quality control
                    print(f"list index {i} / {(len(start_times_list) - 1)}")

                else:
                    print("empty next token") # quality control
                    print(f"max results scraped: {total_count} / {results_per_range} tweets in this range scraped") # quality control
                    print(f"list index {i} / {(len(start_times_list) - 1)} scraping over. Total tweets will be less than desired")
                    break
            
            else:
                print("No more tweets to scrape, total tweets will be less than amount desired.") # quality control
                print(f"total results {total_count}") # quality control
                next_token = None # ensures next token does not get passed into another function call
                break # exits while loop and calls for the next time range or terminates entire process
            

            time.sleep(5) # for rate limit 
        total_tweets_for_func_call += len(json_obj_by_time_range[f'time_range_{i}']['data'])
        print(f"{total_tweets_for_func_call} tweets scraped in entire function call")
        time.sleep(5) # for rate limit

            
        json_to_file = json.dumps(json_obj_by_time_range) # converts results from above code to a serialized json obj
        # Creates json file in directory of program and writes serialized json obj to the newly made file
        with open(f"data_range_{i}.json", "w") as outfile:
            outfile.write(json_to_file)
        

"""This function takes all of the json files of tweet data and creates a sorted dictionary of the most appearing tweets."""
"""A post OR a retweet counts as ONE occurence of a tweet."""
def analyze_top_appearing_tweets_in_data(max_int_of_json: int) -> dict: # max_int_of_json is the max int of json files in your directory, this function will paginate through them
    tweet_metrics_dict: dict = dict()
    for i in range(0, max_int_of_json + 1):

        # opens json file and assigns serialized json data to data_file 
        f = open(f'data_range_{i}.json')
        data_file = json.load(f)

        for item in data_file[f'time_range_{i}']['data']: # loops through the tweets in json file and adds to a dict key of that tweet's id if the tweet appears
            # Increments each tweet id 'key' in tweet_metrics_dict by one for each appearance of a tweet or each retweet of that tweet
            if 'referenced_tweets' in item and item['referenced_tweets'][0]['type'] == "retweeted":
                original_tweet_id_from_retweet = item['referenced_tweets'][0]['id']
                if original_tweet_id_from_retweet in tweet_metrics_dict:
                    tweet_metrics_dict[original_tweet_id_from_retweet] += 1
                else:
                    tweet_metrics_dict[original_tweet_id_from_retweet] = 1
            else:
                this_tweet_id = item['id']
                if this_tweet_id in tweet_metrics_dict:
                    tweet_metrics_dict[this_tweet_id] += 1
                else:
                    tweet_metrics_dict[this_tweet_id] = 1

        f.close() # closes the json file

    sorted_dict = {} # creates a dictionary to assist with sorting tweet id's by the most appearances
    sorted_keys = sorted(tweet_metrics_dict, key=tweet_metrics_dict.get, reverse=True)  # sorts the keys by highes value (most tweet appearances)

    for w in sorted_keys:
        sorted_dict[w] = tweet_metrics_dict[w] # sorts the dictionary by value from greates to least
    return sorted_dict # returns sorted dict


def return_stats_of_file(num_of_file: int):
    f = open(f'data_range_{num_of_file}.json')
    data_file = json.load(f)
    print(f"There are {len(data_file[f'time_range_{num_of_file}']['data'])} tweets in this data file")
    print(data_file[f'time_range_{num_of_file}']['meta'])
    print(data_file[f'time_range_{num_of_file}']['time'])
    f.close()


def analyze_top_retweeted_tweets_in_data(min_int_of_json: int, max_int_of_json: int) -> dict: # max_int_of_json is the max int of json files in your directory, this function will paginate through them
    tweet_metrics_dict: dict = dict()
    for i in range(min_int_of_json, max_int_of_json + 1):

        # opens json file and assigns serialized json data to data_file 
        f = open(f'data_range_{i}.json')
        data_file = json.load(f)

        for item in data_file[f'time_range_{i}']['data']: # loops through the tweets in json file and adds retweet count to a dict key of that tweet's id 
            # Creates a key for each unique tweet id and assigns its retweet count to that key
            if 'referenced_tweets' in item and item['referenced_tweets'][0]['type'] == "retweeted":
                original_tweet_id_from_retweet = item['referenced_tweets'][0]['id']
                if original_tweet_id_from_retweet not in tweet_metrics_dict:
                    retweet_count = item['public_metrics']['retweet_count']
                    tweet_metrics_dict[original_tweet_id_from_retweet] = retweet_count
            else:
                this_tweet_id = item['id']
                if this_tweet_id not in tweet_metrics_dict:
                    tweet_metrics_dict[this_tweet_id] = item['public_metrics']['retweet_count']
        f.close() # closes the json file

    sorted_dict = {} # creates a dictionary to assist with sorting tweet id's by the most appearances
    sorted_keys = sorted(tweet_metrics_dict, key=tweet_metrics_dict.get, reverse=True)  # sorts the keys by highes value (most retweets)

    for w in sorted_keys:
        sorted_dict[w] = tweet_metrics_dict[w] # sorts the dictionary by value from greates to least
    return sorted_dict # returns sorted dict


def json_replies_to_tweet_from_conversation_id(conversation_id: str, num_of_results: int):
    # Make a request to the search endpoint to retrieve all tweets in the conversation
    bearer_token = auth()
    headers: dict[str, str] = create_headers(bearer_token)
    total_replies_scraped: int = 0

    response: requests.models.Response = requests.get(f"https://api.twitter.com/2/tweets/search/all?max_results=100&query=conversation_id:{conversation_id} is:reply&tweet.fields=author_id,in_reply_to_user_id,created_at,conversation_id", headers={"Authorization": f"Bearer {bearer_token}"})

    # Extract the tweets from the response
    reply_tweets: json = response.json()
    tweets_temp: json = reply_tweets
    total_replies_scraped = reply_tweets['meta']['result_count']

    # Checks if more replies need to be scraped and finds the next token if true"
    while total_replies_scraped < num_of_results:
        if 'next_token' in tweets_temp['meta']:
            next_token1: str = tweets_temp['meta']['next_token']
            response: requests.models.Response = requests.get(f"https://api.twitter.com/2/tweets/search/all?max_results=100&next_token={next_token1}&query=conversation_id:{conversation_id} is:reply&tweet.fields=author_id,in_reply_to_user_id,created_at,conversation_id", headers={"Authorization": f"Bearer {bearer_token}"})
            tweets_temp = response.json()
            if 'data' in tweets_temp:
                total_replies_scraped += tweets_temp['meta']['result_count']
                for item in tweets_temp['data']:
                    reply_tweets['data'].append(item)
                reply_tweets['meta']['result_count'] = total_replies_scraped
                if reply_tweets['meta']['result_count'] > num_of_results:
                    del reply_tweets['data'][num_of_results:]
                    reply_tweets['meta']['result_count'] = len(reply_tweets['data'])
            else:
                print("empty next token") # quality control
                print(f"max results scraped: {total_replies_scraped} / {num_of_results}") # quality control
                break
        else:
            print("No more tweets to scrape, total tweets will be less than amount desired.") # quality control
            print(f"total results {total_replies_scraped}") # quality control
            break
        time.sleep(5) # for rate limit 

        # Print the tweets
    print(f"tweet replies scraped/replies desired: {total_replies_scraped} / {num_of_results}") # quality control
    print(json.dumps(reply_tweets, indent=5))


# Main Working Cell

The below cell is intended to be the final cell which ties together all functions into the 'solution' to the protocol

In [None]:
"""This is and example function use which retrieves 10,000 tweets per hour range for 100 random hour ranges from the past year."""
start_list, end_list = return_n_random_hour_ranges_sorted(100)
json_final_data = tweets_per_range("Smartchain OR Airdrop OR Crypto OR Nft", start_list, end_list, 10000)

In [None]:
"""This function use returns the most retweeted tweets between the hour ranges desired."""
analyze_top_retweeted_tweets_in_data(10, 13)

In [None]:
convo_id_recent: str = '1617202411923468288'
convo_id_old: str = '1499360230245015553'
json_replies_to_tweet_from_conversation_id(convo_id_old, 100)
json_replies_to_tweet_from_conversation_id(convo_id_recent, 100)