THIS IS THE MAIN FUNCTION INCLUDES: EXTRACTING STREAMING DATA FROM TWITTER, PRE-PROCESSING AND STORES INTO MYSQL.

In [6]:
#SETTING UP
import credentials              #IMPORTING CREDENTIALS.PY - IMPORTS API/ACCESS_TOKEN KEYS
import settings                 #IMPORT SETTINGS
import re
import tweepy
import pandas as pd
from textblob import TextBlob
import mysql.connector
import demoji
from unidecode import unidecode

In [7]:
# STORE DATA IN MYSQL, CONNECTING TO SQLDATABASE
mydb = mysql.connector.connect(
    host="localhost",
    user="root",
    passwd=credentials.MYSQLPASSWORD,
    database="twitterdb",
    auth_plugin='mysql_native_password',
    charset = 'utf8'
)

if mydb.is_connected():
#CHECK TO SEE IF TABLE EXISTS. IF NOT, CREATES ONE.
    mycursor = mydb.cursor()
    mycursor.execute("""
        SELECT COUNT(*)
        FROM information_schema.tables
        WHERE table_name = '{0}'
        """.format(settings.TABLE_NAME))
    if mycursor.fetchone()[0] != 1:
        mycursor.execute("CREATE TABLE {} ({})".format(settings.TABLE_NAME, settings.TABLE_ATTRIBUTES))
        mydb.commit()
    mycursor.close()

In [8]:
#FUNCTIONS

#PRE-PROCESSING
remove_newline=re.compile(r"\n") #REMOVE "\n" IN TWEETS, \n SEPERATES LINES VIA NEW LINE CHARACTER, REMOVES CLUTER
remove_links=re.compile(r"https\S+|www\.\S+") #REMOVE LINKS
remove_mentions=re.compile(r"@\S+\s?") #REMOVE MENTIONS
remove_id=re.compile(r"ID\S+\s?") #REMOVE ID/REFERALS IN TWEETS
remove_eth=re.compile(r"0x\S+\s?|Ox\S+\s?") #REMOVE ETH WALLET ADDRESSES IN TWEETS

def clean_tweet(tweet):
    post=tweet.text
    try:
        post=remove_newline.sub("",post) 
        post=remove_links.sub("",post)
        post=remove_mentions.sub("",post)
        post=demoji.replace(post, '') #REMOVE EMOJIS
        post=remove_id.sub("",post)
        post=remove_eth.sub("",post)        
        post = unidecode(post)  # CONVERT LATIN CHARACTERS TO ASCII EQUIVALENT
        post = ''.join([i if ord(i) < 128 else '' for i in post])  # REMOVE NON-ASCII CHARACTERS
        return post.strip()[:255] #TWITTER NEW TEXT LIMITS FOR TWITTER BLUE USERS INCLUDES TWEET LENGTH OF UPTO 4,000 CHARACTERS. WILL STORE TWEETS UPTO 255 CHRACTERS.
    except AttributeError:
        print("Error while cleaning tweet:", tweet)
    return ""

    
#SQL PUSH TO TABLES, FUNCTION INSERTS DATA INTO TABLE USING DATA IN THE 'STRUCT' DICTIONARY
def push_results_to_tables(table_name, struct, conn):
    cursor = conn.cursor()
    insert_SQL = f"""INSERT INTO {table_name}
                ({', '.join(map(str, struct))})
                VALUES('{"','".join(map(str,struct.values()))}');
                """
    cursor.execute(insert_SQL)
    cursor.commit()

In [9]:
class MyStream(tweepy.StreamingClient):

    # DISPLAYS "CONNECTED" ONCE STREAM IS CONNECTED
    def on_connect(self):        
        print("Connected") 

    def on_tweet(self,tweet):   
        
    #EXTRACTING ATTRIBUTES FROM TWEETS
        id_str = tweet.id
        created_at = tweet.created_at
        text = clean_tweet(tweet) # PRE-PROCESSING
        user_id = tweet.author_id
        ref_id=tweet.referenced_tweets
        lang=tweet.lang
        sentiment = TextBlob(text).sentiment
        polarity = sentiment.polarity
        subjectivity = sentiment.subjectivity
        struct={'id_str':id_str,
                'created_at':tweet.created_at,
                'text':text,
                'polarity':polarity,
                'subjectivity':subjectivity}
        print(struct)

        if mydb.is_connected():
            mycursor = mydb.cursor()
            sql = f"INSERT INTO {settings.TABLE_NAME} (id_str, created_at, text, polarity, subjectivity) VALUES (%s, %s, %s, %s, %s)"
            val = (id_str, created_at, text, polarity, subjectivity, )
            mycursor.execute(sql, val)
            mydb.commit()
            mycursor.close()
        else:
            print("Error: Database is not connected.")

## TWITTER HAS RATE LIMITS, STOP DATA SCARPING AFTER THRESHOLD.
    def on_error(self,status_code):
        if status_code == 420:
            return False
        print(status_code)
         

In [11]:
stream = MyStream(bearer_token=credentials.BEARER_TOKEN,wait_on_rate_limit=True)

# CLEARS RULESET BEFORE STREAMING DATA
for rule in stream.get_rules().data:
        stream.delete_rules(rule.id)

# ADDING RULES TO RULESET TO STREAM SPECIFIC DATA - ONLY ORIGINAL ENGLISH TWEETS WITH NO ATACHMENTS AND FILTERED QUERY ARE STORED.
stream.add_rules(tweepy.StreamRule('(#ETH OR ethereum OR #ethereum OR #crypto) -is:retweet -is:quote -is:reply -has:media lang:en -#sportsbet -"Price Update" -"getting some" -signal -minted -#Giveaway -"transferred from" -"check out this item" -airdrop -"Public Sale"  -whitelist -"ETH Wallet" -"Eth Address" -link -gwei -faucet -excesscaps -bounty -token -"ICO" -"presale" -"pump and dump" -"technical analysis" -"staking rewards" -"staking pool"'))
#START STREAM
stream.filter(expansions=["author_id",],tweet_fields=["created_at","referenced_tweets","lang","attachments"]) 



Stream encountered HTTP error: 503
HTTP error response text: {"title":"Service Unavailable","detail":"Service Unavailable","type":"about:blank","status":503}


Connected
{'id_str': 1642983668674166786, 'created_at': datetime.datetime(2023, 4, 3, 20, 13, 26, tzinfo=datetime.timezone.utc), 'text': '$437.28k #ETH bought at $1789.00 (USDT) on Binance Spot.#ETH $ETH #crypto #whale #alert #coinscreenerFor real-time Whale insights', 'polarity': 0.0, 'subjectivity': 0.0}
{'id_str': 1642983669785833477, 'created_at': datetime.datetime(2023, 4, 3, 20, 13, 26, tzinfo=datetime.timezone.utc), 'text': '$402.23k #LTC bought at $91.46 (USDT) on Binance Spot.#LTC $LTC #crypto #whale #alert #coinscreenerFor real-time Whale insights', 'polarity': 0.0, 'subjectivity': 0.0}
{'id_str': 1642983670863593475, 'created_at': datetime.datetime(2023, 4, 3, 20, 13, 26, tzinfo=datetime.timezone.utc), 'text': '$355.43k #ETH bought at $1789.00 (USDT) on Binance Spot.#ETH $ETH #crypto #whale #alert #coinscreenerFor real-time Whale insights', 'polarity': 0.0, 'subjectivity': 0.0}
{'id_str': 1642983667755806720, 'created_at': datetime.datetime(2023, 4, 3, 20, 13, 26, tzinfo=dat

KeyboardInterrupt: 

In [12]:
stream.get_rules()
print(stream.get_rules())

Response(data=[StreamRule(value='#ETH OR ether OR ethereum OR #ethereum OR #crypto OR blockchain OR cryptocurrency OR decentralized OR "smart contract" -is:retweet -is:quote -is:reply -has:media lang:en -#sportsbet -"Price Update" -"getting some" -signal -minted -#Giveaway -"transferred from" -"check out this item" -airdrop -"Public Sale"  -whitelist -"ETH Wallet" -"Eth Address" -link -gwei -faucet -excesscaps -bounty -token -"ICO" -"presale" -"pump and dump" -"technical analysis" -"staking rewards" -"staking pool"', tag=None, id='1642620610630090757')], includes={}, errors=[], meta={'sent': '2023-04-02T20:11:15.952Z', 'result_count': 1})
