# Download libraries

In [None]:
!pip install pytz
!pip install tweepy
!pip install statsmodels
!pip install plotly
!pip install wordcloud
!pip install textblob
!pip install nltk
!pip install requests_oauthlib
!pip install flask
!pip install ast
!pip install pyspark

# Importing libraries

In [1]:
import pandas as pd
import os
import csv
import datetime
import time
import sys
from pytz import timezone
import tweepy
import json
import statsmodels.api as sm
import plotly.graph_objects as go
from datetime import datetime, timedelta
from pandas.errors import EmptyDataError
import logging
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
import nltk
from nltk.corpus import stopwords
from nltk.corpus import words
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk import FreqDist
from nltk.tokenize import word_tokenize
from textblob import TextBlob
import socket
import requests_oauthlib
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import requests
from flask import Flask,jsonify,request
from flask import render_template
import ast
import re

# Loading the credentials

In [2]:
try:
    # Load the Twitter API credentials from the config file
    with open('config.json', 'r') as f:
        config = json.load(f)
        consumer_key = config['consumer_key']
        consumer_secret = config['consumer_secret']
        access_token = config['access_token']
        access_token_secret = config['access_token_secret']

    # Verify the Twitter API credentials
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    try:
        api = tweepy.API(auth, wait_on_rate_limit=True)
        user = api.verify_credentials()
        logging.info("Twitter API connection successful.")
    except tweepy.error.TweepError as e:
        logging.error("Error: Failed to verify Twitter API credentials.")
        logging.error(str(e))

except FileNotFoundError:
    logging.error("Error: The config file 'config.json' does not exist.")

except json.JSONDecodeError as e:
    logging.error("Error: Failed to load Twitter API credentials from 'config.json'.")
    logging.error(str(e))

except Exception as e:
    logging.error("An error occurred during a Twitter API connection.")
    logging.error(str(e))

# Disabling warnings

In [3]:
import warnings
warnings.filterwarnings("ignore")

# Logger

In [4]:
# Get today's date
today = datetime.now().date()

# Create a log file with today's date in the name
log_file = f"TwitterAPI_{today}.log"

# Configure the root logger
logging.basicConfig(level=logging.INFO)

# Create a FileHandler and set its formatter
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s','%Y-%m-%d %H:%M:%S'))

# Attach the FileHandler to the root logger
logging.getLogger().addHandler(file_handler)

# Downloading and save twitter data
The free tier of the twitter API holds the limitation of:</br>
<b>**10 Day tweet history limit </br>
**1500 tweet request limit per 900sec circ. </b>

In [None]:
# Define the topic and initial date range
topic = "(ios OR apple OR AAPL OR iphone OR ipad)"
start_date = today - timedelta(days=91)

# Create a loop to run for 91 days
for _ in range(91):
    # Calculate the end date for the current iteration
    end_date = start_date + timedelta(days=1)
    
    # Format the dates as strings
    start_date_str = start_date.strftime("%Y-%m-%d")
    end_date_str = end_date.strftime("%Y-%m-%d")
    
    # Define the search query with the current date range
    query = f"{topic} until:{end_date_str} since:{start_date_str}"
    
    # Fetch tweets on the specified topic
    try:
        tweets = []
        for tweet in tweepy.Cursor(api.search_tweets, q=query, lang='en', tweet_mode='extended').items(3000):
            tweets.append({
                'Date': tweet.created_at.date(),
                'Tweet': tweet.full_text
            })
        
        if len(tweets) > 0:
            msg = "Tweets downloaded successfully for the date range: {} to {}"
            logging.info(msg.format(start_date_str, end_date_str))
            
            # Convert the tweets list into a DataFrame
            df_new = pd.DataFrame(tweets)
            
            # Check if the CSV file already exists
            if os.path.isfile('tweets.csv'):
                # Read the existing data from the CSV file
                try:
                    df_existing = pd.read_csv('tweets.csv')
                    
                    # Check if the existing DataFrame has any columns
                    if df_existing.columns.empty:
                        # Handle the case when the CSV file is empty
                        df_existing = pd.DataFrame()
                        
                except EmptyDataError:
                    # Handle the case when the CSV file is empty
                    df_existing = pd.DataFrame()
                
                # Check if the existing DataFrame is empty
                if df_existing.empty:
                    # Save the new DataFrame to a new CSV file
                    df_new.to_csv('tweets.csv', index=False)
                    logging.info("New CSV file created with the downloaded tweets.")
                else:
                    # Concatenate the existing and new data
                    df_combined = pd.concat([df_existing, df_new], ignore_index=True)
                    
                    # Save the combined DataFrame to the CSV file
                    df_combined.to_csv('tweets.csv', index=False)
                    logging.info("Tweets appended to the existing CSV file.")
            else:
                # Save the new DataFrame to a new CSV file
                df_new.to_csv('tweets.csv', index=False)
                logging.info("New CSV file created with the downloaded tweets.")
        else:
            msg = "No tweets found for the date range: {} to {}"
            logging.info(msg.format(start_date_str, end_date_str))
            
    except tweepy.TweepyException as e:
        if e.api_code == 88:
            # Rate limit reached, wait for the specified duration
            wait_time = int(e.response.headers['Retry-After'])
            msg = "Rate limit reached. Sleeping for: {} seconds."
            logging.info(msg.format(wait_time))
            time.sleep(wait_time)
        logging.error("Error: Failed to download tweets.")
        logging.error(e)
    
    # Update the start date for the next iteration
    start_date = end_date

# Log data

In [5]:
# Read the log file into a DataFrame
log_df = pd.read_csv(log_file, sep=":", names=["Timestamp","Type","Message","Sleep"])

# Define the CSV file name for saving the log DataFrame
csv_file = f"TwitterAPI_{today}.csv"

# Check if the CSV file already exists
if os.path.isfile(csv_file):
    # Append the log DataFrame to the existing CSV file
    log_df.to_csv(csv_file, mode='a', header=False, index=False)
else:
    # Save the log DataFrame to a new CSV file
    log_df.to_csv(csv_file, index=False)

# Display the log DataFrame
log_df

Unnamed: 0,Timestamp,Type,Message,Sleep
0,2023-05-24 19,45,01 - WARNING - Rate limit reached. Sleeping for,484.0
1,2023-05-24 19,53,55 - WARNING - Rate limit reached. Sleeping for,851.0
2,2023-05-24 20,09,06 - WARNING - Rate limit reached. Sleeping for,841.0
3,2023-05-24 20,24,09 - WARNING - Rate limit reached. Sleeping for,839.0
4,2023-05-24 20,39,10 - WARNING - Rate limit reached. Sleeping for,839.0
5,2023-05-24 20,54,11 - WARNING - Rate limit reached. Sleeping for,839.0
6,2023-05-24 21,09,12 - WARNING - Rate limit reached. Sleeping for,839.0
7,2023-05-24 21,24,13 - WARNING - Rate limit reached. Sleeping for,839.0
8,2023-05-24 21,39,15 - WARNING - Rate limit reached. Sleeping for,838.0
9,2023-05-24 21,54,09 - ERROR - Error occurred during tweet clean...,


# Cleaning tweets

In [None]:
try:
    # Load the tweets data from the CSV file
    df_tweets = pd.read_csv('tweets.csv')

    # Compile the regular expressions
    remove_chars_regex = re.compile(r'[^\w\s]')
    remove_links_regex = re.compile(r'http\S+|www\S+')
    remove_usernames_regex = re.compile(r'@[^\s]+')

    # Function to clean a single tweet
    def clean_tweet(tweet):
        # Remove unnecessary characters and links
        cleaned_tweet = remove_chars_regex.sub('', tweet)
        cleaned_tweet = remove_links_regex.sub('', cleaned_tweet)

        # Remove Twitter usernames
        cleaned_tweet = remove_usernames_regex.sub('', cleaned_tweet)

        # Remove non-English words
        cleaned_words = []
        english_words = set(words.words())
        for word in cleaned_tweet.split():
            if word.lower() in english_words:
                cleaned_words.append(word)
        cleaned_tweet = ' '.join(cleaned_words)

        return cleaned_tweet

    # Clean the tweets column using vectorization
    df_tweets['Tweet'] = df_tweets['Tweet'].str.replace(remove_chars_regex, '')
    df_tweets['Tweet'] = df_tweets['Tweet'].str.replace(remove_links_regex, '')
    df_tweets['Tweet'] = df_tweets['Tweet'].str.replace(remove_usernames_regex, '')
    df_tweets['Tweet'] = df_tweets['Tweet'].apply(clean_tweet)

    # Remove rows with empty tweet values
    df_tweets = df_tweets.dropna(subset=['Tweet'])

    # Save the cleaned tweets back to the CSV file
    df_tweets.to_csv('tweets.csv', index=False)

    # Log the execution of the tweet cleaning process
    logging.info("Tweet cleaning process completed.")

except Exception as e:
    # Log any exceptions or errors that occur
    logging.error("Error occurred during tweet cleaning process.")
    logging.error(str(e))

# Tweet EDA

In [None]:
# Read the tweets data from the CSV file
df_tweets = pd.read_csv('tweets.csv')
# Perform basic exploratory data analysis (EDA)
print("Number of tweets:", len(df_tweets))
print("Columns:", df_tweets.columns)
print("Sample tweets:")
print(df_tweets.head())

In [None]:
try:
    # Perform sentiment analysis using Vader SentimentIntensityAnalyzer
    nltk.download('vader_lexicon')
    sia = SentimentIntensityAnalyzer()
    df_tweets['Sentiment'] = df_tweets['Tweet'].apply(lambda x: sia.polarity_scores(x)['compound'])

    # Visualize sentiment distribution
    plt.figure(figsize=(8, 6))
    sns.histplot(df_tweets['Sentiment'], bins=30, kde=True)
    plt.title('Sentiment Distribution')
    plt.xlabel('Sentiment Score')
    plt.ylabel('Frequency')
    plt.show()

    # Log the successful execution of sentiment analysis and visualization
    logging.info("Sentiment analysis and visualization completed.")

except Exception as e:
    # Log any exceptions or errors that occur
    logging.error("Error occurred during sentiment analysis and visualization.")
    logging.error(str(e))

# Most frequent words wordcloud

In [None]:
# Generate word cloud of most frequent words
stop_words = set(stopwords.words('english'))
wordcloud = WordCloud(stopwords=stop_words, background_color='white').generate(' '.join(df_tweets['Tweet']))

plt.figure(figsize=(10, 8))
plt.imshow(wordcloud, interpolation='bilinear')
plt.title('Most Frequent Words in Tweets')
plt.axis('off')
plt.show()

# Time series forecast of the sentiment

In [None]:
# Load the tweet sentiment data from the CSV file
df = pd.read_csv('tweets_sentiment.csv', parse_dates=['Date'])

# Set the 'Date' column as the index
df.set_index('Date', inplace=True)

try:
    # Fit an ARIMA model to the sentiment data
    model = sm.tsa.ARIMA(df['sentiment'], order=(1, 0, 1), trend='c').fit()

    # Generate predictions for the next 1 week, 1 month, and 3 months
    forecast_1w = model.predict(start=len(df), end=len(df) + 6, dynamic=False)
    forecast_1m = model.predict(start=len(df), end=len(df) + 30, dynamic=False)
    forecast_3m = model.predict(start=len(df), end=len(df) + 90, dynamic=False)

    # Create Plotly figure
    fig = go.Figure()

    # Add actual sentiment data
    fig.add_trace(go.Scatter(x=df.index, y=df['sentiment'], name='Actual'))

    # Add forecasted sentiment data
    forecast_dates_1w = pd.date_range(start=df.index[-1], periods=7)[1:]
    forecast_dates_1m = pd.date_range(start=df.index[-1], periods=31)[1:]
    forecast_dates_3m = pd.date_range(start=df.index[-1], periods=91)[1:]
    fig.add_trace(go.Scatter(x=forecast_dates_1w, y=forecast_1w, name='1 Week Forecast'))
    fig.add_trace(go.Scatter(x=forecast_dates_1m, y=forecast_1m, name='1 Month Forecast'))
    fig.add_trace(go.Scatter(x=forecast_dates_3m, y=forecast_3m, name='3 Months Forecast'))

    # Update layout
    fig.update_layout(
        title='Time Series Forecast of Sentiment',
        xaxis_title='Date',
        yaxis_title='Sentiment',
        legend_title='Forecast',
        hovermode='x unified'
    )

    # Show the interactive Plotly graph
    fig.show()

except ValueError as e:
    print("Error: Failed to make time series forecast.")
    print(e)

# Sentiment analysis

In [None]:
try:
    # Load the tweets from the CSV file
    df = pd.read_csv('tweets.csv')

    # Perform sentiment analysis using TextBlob
    df['sentiment'] = df['Tweet'].apply(lambda x: TextBlob(x).sentiment.polarity)

    # Classify sentiment as positive, negative, or neutral
    df['sentiment_label'] = df['sentiment'].apply(lambda x: 'Positive' if x > 0 else 'Negative' if x < 0 else 'Neutral')

    # Save the updated DataFrame to CSV
    df.to_csv('tweets_sentiment.csv', index=False)

except pd.errors.EmptyDataError:
    # Handle the case when the CSV file is empty
    logging.error("Error: The CSV file is empty.")

except FileNotFoundError:
    # Handle the case when the CSV file is not found
    logging.error("Error: The CSV file 'tweets.csv' does not exist.")

except Exception as e:
    # Handle any other exceptions that may occur
    logging.error("An error occurred during sentiment analysis and classification.")
    logging.error(str(e))

In [None]:
# Convert forecast data to strings
forecast_1w_str = forecast_1w.to_string(header=False)
forecast_1m_str = forecast_1m.to_string(header=False)
forecast_3m_str = forecast_3m.to_string(header=False)
# Print the forecast data
print("1 Week Forecast:")
print(forecast_1w_str)
print("1 Month Forecast:")
print(forecast_1m_str)
print("3 Months Forecast:")
print(forecast_3m_str)

# Sentiment distribution by sentiment category

In [None]:
# Read the tweets data from the CSV file
df_tweets = pd.read_csv('tweets_sentiment.csv')

# Perform basic exploratory data analysis (EDA)
print("Number of tweets:", len(df_tweets))
print("Columns:", df_tweets.columns)
print("Sample tweets:")
print(df_tweets.head())

# Visualize sentiment distribution by sentiment category
plt.figure(figsize=(8, 6))
sns.countplot(x='Sentiment', hue='Category', data=df_tweets)
plt.title('Sentiment Distribution by Category')
plt.xlabel('Sentiment')
plt.ylabel('Count')
plt.legend(title='Category')
plt.show()

# Sentiment distribution 

In [None]:
# Visualize sentiment distribution
plt.figure(figsize=(8, 6))
sns.countplot(x='Sentiment', data=df_tweets)
plt.title('Sentiment Distribution')
plt.xlabel('Sentiment')
plt.ylabel('Count')
plt.show()

# Sentiment distribution over time

In [None]:
# Convert the 'Date' column to datetime
df_tweets['Date'] = pd.to_datetime(df_tweets['Date'])

# Group by date and sentiment to calculate counts
df_sentiment_counts = df_tweets.groupby(['Date', 'Sentiment']).size().reset_index(name='Count')

# Pivot the data to have sentiment types as columns
df_sentiment_pivot = df_sentiment_counts.pivot(index='Date', columns='Sentiment', values='Count')

# Plot the sentiment distribution over time
plt.figure(figsize=(12, 6))
sns.lineplot(data=df_sentiment_pivot, dashes=False)
plt.title('Sentiment Distribution Over Time')
plt.xlabel('Date')
plt.ylabel('Count')
plt.legend(title='Sentiment')
plt.xticks(rotation=45)
plt.show()

# Most frequent words by sentiment

In [None]:
# Tokenize the tweets into words
tokenized_words = [word.lower() for tweet in df_tweets['Tweet'] for word in word_tokenize(tweet)]

# Calculate the frequency distribution of words
freq_dist = FreqDist(tokenized_words)
most_common = freq_dist.most_common(20)

# Plot the most frequent words by sentiment
plt.figure(figsize=(10, 6))
sns.barplot(x='Count', y='Word', hue='Sentiment', data=pd.DataFrame(most_common, columns=['Word', 'Count']))
plt.title('Most Frequent Words by Sentiment')
plt.xlabel('Count')
plt.ylabel('Word')
plt.legend(title='Sentiment')
plt.show()

# PySpark tweet upload

In [None]:
# Replace the values below with yours
ACCESS_TOKEN = access_token
ACCESS_SECRET = access_token_secret
CONSUMER_KEY = consumer_key
CONSUMER_SECRET = consumer_secret
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

def get_tweets():
    url = 'https://stream.twitter.com/1.1/statuses/filter.json'
    query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')]
    query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
    response = requests.get(query_url, auth=my_auth, stream=True)
    print(query_url, response)
    return response

def send_tweets_to_spark(http_resp, tcp_connection):
    for line in http_resp.iter_lines():
        try:
            full_tweet = json.loads(line)
            tweet_text = full_tweet['text']
            print("Tweet Text: " + tweet_text)
            print ("------------------------------------------")
            tcp_connection.send(tweet_text + '\n')
        except:
            e = sys.exc_info()[0]
            print("Error: %s" % e)
            
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)

# Configure Spark

In [None]:
# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")
# read data from port 9009
dataStream = ssc.socketTextStream("localhost",9009)

# Filter Hastags and stream

In [None]:
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']
def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    try:
        # Get spark sql singleton context from the current context
        sql_context = get_sql_context_instance(rdd.context)
        # convert the RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
        # create a DF from the Row RDD
        hashtags_df = sql_context.createDataFrame(row_rdd)
        # Register the dataframe as table
        hashtags_df.registerTempTable("hashtags")
        # get the top 10 hashtags from the table using SQL and print them
        hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
        hashtag_counts_df.show()
        # call this method to prepare top 10 hashtags DF and send them
        send_df_to_dashboard(hashtag_counts_df)
    except:
        e = sys.exc_info()[0]
        print("Error: %s" % e)

# Stream to dashboard

In [None]:
def send_df_to_dashboard(df):
    # extract the hashtags from dataframe and convert them into array
    top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
    # extract the counts from dataframe and convert them into array
    tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
    # initialize and send the data through REST API
    url = 'http://localhost:5001/updateData'
    request_data = {'label': str(top_tags), 'data': str(tags_count)}
    response = requests.post(url, data=request_data)

# Building the Flask app

In [None]:
app = Flask(__name__)
labels = []
values = []
@app.route("/")
def get_chart_page():
    global labels,values
    labels = []
    values = []
    return render_template('chart.html', values=values, labels=labels)
@app.route('/refreshData')
def refresh_graph_data():
    global labels, values
    print("labels now: " + str(labels))
    print("data now: " + str(values))
    return jsonify(sLabel=labels, sData=values)
@app.route('/updateData', methods=['POST'])
def update_data():
    global labels, values
    if not request.form or 'data' not in request.form:
        return "error",400
    labels = ast.literal_eval(request.form['label'])
    values = ast.literal_eval(request.form['data'])
    print("labels received: " + str(labels))
    print("data received: " + str(values))
    return "success",201
if __name__ == "__main__":
    app.run(host='localhost', port=5001)