# Tweets Streaming

In [1]:
# import libraries
import pandas as pd
import tweepy
import mysql.connector
import re
from Preprocessor import Preprocessor
from mysql.connector import Error
from scrappingLocation import scrap_latlon
import json
from keras.preprocessing.text import Tokenizer, tokenizer_from_json
import keras.preprocessing.text as kpt
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.models import load_model
import time

In [6]:
def connect_dbs(host, user, password):
    '''
    Funtion to connect to the data base
    
    Inputs:
        host: the host where is the data base
        user: the username
        password: the password 
    '''
    try:
        conn = mysql.connector.connect(host=host, user=user,  
                            password=password)#give ur username, password
        if conn.is_connected():
            cursor = conn.cursor()
            cursor.execute("CREATE DATABASE databasecovid")
            print("Database is created")
    except Error as e:
        print("Error while connecting to MySQL", e)

In [3]:
def add_tweets_db():
    '''
    Funtion to add the tweets to the table from the data base
    '''
    mydb = mysql.connector.connect(
        host="localhost",
        user="root",
        passwd="",
        database="databasecovid"
    )
    if mydb.is_connected():
        '''
        Check if this table exits. If not, then create a new one.
        '''
        mycursor = mydb.cursor()
        mycursor.execute("""
            SELECT COUNT(*)
            FROM information_schema.tables
            WHERE table_name = '{0}'
            """.format(table_name))
        if mycursor.fetchone()[0] != 1:
            mycursor.execute("CREATE TABLE {} ({})".format(table_name, attributes))
            mydb.commit()
        mycursor.close()
        return mydb
    else:
        return mydb

In [14]:
class MyStreamListener(tweepy.StreamListener): #Class for streaming
    '''
    Class to get streaming tweets
    '''
    def __init__(self, no_tweets=100):
        super().__init__()
        self.no_tweets = no_tweets
    
    def on_status(self, status): #Extract tweets text
        preprocessor = Preprocessor()
        if status.retweeted:
            # Avoid retweeted info, and only original tweets will be received
            return True
        # Extract attributes from each tweet
        text = status.text
        text = preprocessor.cleaning(text)# Pre-processing the text
        if text == '': # pass the empty text
            return True
        
        # some preproccessing to the pure tweet 
        pure_text = status.text
        pure_text = preprocessor.clean_emojis(pure_text)
        pure_text = preprocessor.re_process(pure_text)
        
        # get the creation date
        created_at = status.created_at
        
        # get the location user
        user_location = preprocessor.clean_emojis(status.user.location)
        if user_location == None: # if the locations is none, pass
            return True
        
        # preproccessing to location 
        user_location_check = re.sub(r'[^A-Za-z\s]+', '', user_location).lower()
        
        # check if the location has the word "here"
        list_check = re.findall(r'\bhere\b', user_location_check)
        
        if list_check: # pass if the location has the word "here"
            return True
        
        # predict the word if it is toxic or no 
        split_word = [word.split() for word in [text]]
        split_word = [[word for word in split_word[0] if word in tokenizer.word_index]]
        tokenizer.fit_on_texts(split_word) 
        X_data = tokenizer.texts_to_sequences(split_word)
        X_data = pad_sequences(X_data, padding = 'pre', maxlen = 20)
        toxic = int((model.predict(X_data) > 0.5).astype(int)[0][0])
        
        
        # if there is a error in the code from scrappingLatLon
        try:
            lat_lon = scrap_latlon(user_location)
        except:
            return True
        if not lat_lon:
            return True
        # get latitude and longitude
        latitude = lat_lon[0]
        longitude = lat_lon[1]
        
        # Store all data in MySQL
        if mydb.is_connected():
            mycursor = mydb.cursor()
            sql = f"INSERT INTO {table_name} (created_at, pure_tweet, pro_tweet, toxic, user_location, longitude, latitude) VALUES (%s, %s, %s, %s, %s, %s, %s)"
            val = (created_at, pure_text, text, toxic, user_location, longitude, latitude)
            mycursor.execute(sql, val)
            mydb.commit()
            mycursor.close()
        
        # stop to avoid errors
        time.sleep(5)
        # stop when the number of tweets to search is 0
        if self.no_tweets == 0:
            return False
        else:
            self.no_tweets -= 1
    
    
    def on_error(self, status_code):
        #Since Twitter API has rate limits, stop srcraping data as it exceed to the thresold.
        
        if status_code == 420:
            # return False to disconnect the stream
            return False

In [2]:
# open and store the tokenizer 
with open('tokenizer.json') as f:
    data = json.load(f)
    tokenizer = tokenizer_from_json(data)
    
# open and store the model 
model = load_model('FinalModel.model', compile = True)

In [4]:
# words to search into twitter
search_words = ['Coronavirus','Covid19','covid','covid19','coronavirus','Covid-19','covid-19','Covid']
# name of the table where will be the stored
table_name = 'tweetsCovid'
# attributes to the columns 
attributes = "created_at DATETIME, pure_tweet VARCHAR(255), pro_tweet VARCHAR(255), toxic INT, user_location VARCHAR(255), longitude DOUBLE, latitude DOUBLE"

In [7]:
# connect to the database
connect_dbs('localhost', 'root', '')

Error while connecting to MySQL 1007 (HY000): Can't create database 'databasecovid'; database exists


In [8]:
# read the API kyas and access
login = pd.read_csv('login.csv')

In [9]:
#  store the keys and access
api_key = login['api_key'][0]
api_key_secret = login['api_key_secret'][0]
access_token = login['access_token'][0]
access_token_secret = login['access_token_secret'][0]

In [10]:
# creat the api
authenticate = tweepy.OAuthHandler(api_key, api_key_secret)
authenticate.set_access_token(access_token, access_token_secret)
api = tweepy.API(authenticate)

In [None]:
# get the tweets from twitter, filter: english and tweets with covid 
mydb = add_tweets_db()
n_times = 5
while n_times >= 1:
    try:
        myStreamListener = MyStreamListener(no_tweets=150)
        myStream = tweepy.Stream(auth = api.auth, listener = myStreamListener)
        myStream.filter(languages=["en"], track = search_words)
    except:
        n_times -= 1
        print('Lost connection')
    else:
        n_times -= 1
mydb.close()

In [2]:
# connect to the database
db_connection = mysql.connector.connect(
    host="localhost", 
    user="root",
    passwd="",
    database="databasecovid"
)

In [11]:
# store to DataFame
df = pd.read_sql('SELECT pure_tweet, toxic FROM {}'.format(table_name), con = db_connection)

In [14]:
#  get the no-toxic tweets
df = df[df['toxic'] == 0]

In [16]:
# save the DataFrame to model Generative
df.to_csv('NoToxicTweets.csv')