In [1]:
import requests
import os
import json
import pandas as pd
from datetime import datetime, timedelta
import csv
import time
import numpy as np
from sklearn.model_selection import train_test_split
import nltk
from nltk.tokenize.punkt import PunktSentenceTokenizer, PunktTrainer
from nltk.tokenize import TweetTokenizer
from nltk.lm.preprocessing import padded_everygram_pipeline
import re
from nltk.lm import MLE
import demoji 
from nltk.lm import Vocabulary
import functools

In [2]:
# src: https://towardsdatascience.com/an-extensive-guide-to-collecting-tweets-from-twitter-api-v2-for-academic-research-using-python-3-518fcb71df2a
def auth():
    return os.getenv('TOKEN')

def create_headers(bearer_token):
    headers = {"Authorization": f"Bearer {bearer_token}"}
    return headers


def create_url(query, start_time, end_time, max_results= 100):
    url = "https://api.twitter.com/2/tweets/search/recent"
    query_params = {"query": f"{query} lang:en", "expansions": "geo.place_id",
                    "max_results": max_results, 
                    "place.fields": "country_code,full_name,geo",
                    "tweet.fields": "author_id",
                   "next_token": {}}
    return (url, query_params)
    

def connect_to_endpoint(url, headers, params, next_token = None):
    params['next_token'] = next_token   #params object received from create_url function
    response = requests.request("GET", url, headers = headers, params = params)
    print("Endpoint Response Code: " + str(response.status_code) + "\n")
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()



In [269]:
"""keyword = "nft"
max_results = 100
bearer_token = auth()
headers = create_headers(bearer_token)
url, query_params = create_url(keyword, start_time, end_time, max_results)"""

In [270]:
"""csvFile = open("nfts.csv", "a", newline="", encoding='utf-8')
csvWriter = csv.writer(csvFile)
csvWriter.writerow(['author id', 'tweet id', 'text', 'geo', 'country_code', 'full_name', 'bbox'])"""

57

In [271]:
"""count = 0
json_response = connect_to_endpoint(url, headers, query_params)
amnt = 0

while(count < 10000):
    count += amnt
    for step, tweet in enumerate(json_response['data']):
        author_id = tweet['author_id']
        tweet_id = tweet['id'] 
        text = tweet['text'] 
        if ('geo' in tweet):   
            geo = tweet['geo']['place_id']
        else:
            geo = " "
        try: 
            country_code = json_response['includes']['places'][step]['country_code']
        except:
            country_code = " "
        try: 
            full_name = json_response['includes']['places'][step]['full_name']
        except:
            full_name = " "
        try: 
            bbox = json_response['includes']['places'][step]['geo']
        except:
            bbox = " "

        res = [author_id, tweet_id, text, geo, country_code, full_name, bbox]
        csvWriter.writerow(res)
    
    amnt = json_response['meta']['result_count']
        
    json_response = connect_to_endpoint(url, headers, query_params, 
                                        next_token = json_response['meta']['next_token'])
    

            """

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Response Code: 200

Endpoint Respo

In [55]:
#https://stackoverflow.com/questions/25363498/is-there-a-standard-approach-to-returning-values-from-coroutine-endpoints
class FINISH_PROCESSING_SIGNAL(Exception): pass
tknzr = TweetTokenizer(strip_handles=False, reduce_len=True)

def tokenize_remove_emoticons(elem):
    tokenized_list = tknzr.tokenize(elem)
    # replacing emojis with their description, 
    # after tokenizer so as not to make them seperate words
    return list(map(lambda x: demoji.replace_with_desc(x), tokenized_list))
def tokenize_sentences(array_elem):
    return list(map(lambda x: tokenize_remove_emoticons(x), array_elem))
vf = np.vectorize(tokenize_sentences)

def coroutine(func):
    @functools.wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

def source(df, targets):
    for target in targets: 
        target.send( df)
        processed_data = target.throw(FINISH_PROCESSING_SIGNAL)
        return processed_data
    
"""
Note, these transformations are completely deterministic
irrespective of the dataset being used, thus we can apply 
this initial part on the entire datafram."""
@coroutine  
def clean_train_test_split(targets):
    try:
        while True: 
            df = (yield)
            df.text = df.text.apply(lambda x: re.sub(r'0x[a-fA-F0-9]{40}', 'ethe_addy', x)) 
            #lowercasing all letters
            df['text'] = df.text.str.lower()
            # will keep the handles for the sake of n-gram approximation, but specifics don't matter
            df.text = df.text.apply(lambda x: re.sub(r"@([a-zA-Z0-9_]{1,50})", 'randomhandle', x))
            # removing url links
            df.text = df.text.apply(lambda x: re.sub(r'https?:\/\/\S+', '', x))
            df.text = df.text.apply(lambda x: re.sub(r"www\.[a-z]?\.?(com)+|[a-z]+\.(com)", '', x))
            # removing embedded links, videos
            df.text = df.text.apply(lambda x: re.sub(r'{link}', '', x))
            df.text = df.text.apply(lambda x: re.sub(r"\[video\]", '', x))
            train, test = train_test_split(df, test_size=0.1, random_state=42)
            for target in targets: 
                target.send( (train, test) )
    except FINISH_PROCESSING_SIGNAL as err:
        yield target.throw(err)
        
@coroutine  
def train_tokenizer(targets):
    try:
        while True: 
            trainer = PunktTrainer()
            train, test = (yield)
            array_strs =train[['text']].values.flatten()
            array_strs_test =test[['text']].values.flatten()
            map(lambda x: trainer.train(x), array_strs)
            trainer.finalize_training()
            tokenizer = PunktSentenceTokenizer(trainer.get_params())
            for target in targets: 
                target.send( (tokenizer, array_strs, array_strs_test) )
    except FINISH_PROCESSING_SIGNAL as err:
        yield target.throw(err)
            
@coroutine  
def segment_sentences(targets):
    try:
        while True: 
            tokenizer, x, test = (yield)
            segment_sentences = np.array([tokenizer.tokenize(xi) for xi in x], dtype = object)
            segment_sentences_test = np.array([tokenizer.tokenize(xi) for xi in test], dtype = object)
            for target in targets: 
                target.send( (segment_sentences, segment_sentences_test))
    except FINISH_PROCESSING_SIGNAL as err:
        yield target.throw(err)

@coroutine   
def sentence_tokenizer(targets):
    try:
        while True: 
            x, test = (yield)
            x, test = vf(x), vf(test)
            flattened, flattened_test = [], []
            for xi in x:
                flattened.extend(xi)
            for xi in test:
                flattened_test.extend(xi)
            for target in targets: 
                target.send( (flattened, flattened_test) )
    except FINISH_PROCESSING_SIGNAL as err:
        yield target.throw(err)
            
@coroutine  
def train_model(targets):
    try:
        while True: 
            n = 3
            training_set, test = (yield)
            train, vocab = padded_everygram_pipeline(n, training_set)
            model = MLE(n)
            model.fit(train, vocab)
            for target in targets: 
                target.send((model, training_set, test))
    except FINISH_PROCESSING_SIGNAL as err:
        yield target.throw(err)
        
@coroutine
def sink():
    data = {}
    try:
        while True:
            data = (yield)
    except FINISH_PROCESSING_SIGNAL:
        yield data

In [57]:

a = sink()
b = train_model(targets = [a])
c = sentence_tokenizer(targets = [b])
d = segment_sentences(targets = [c])
e = train_tokenizer(targets = [d])
f = clean_train_test_split(targets = [e])

df = pd.read_csv("nfts.csv")

model, training_set, test_set = source(df, targets = [f])

  otypes = ''.join([asarray(outputs[_k]).dtype.char
