In [1]:
"""Real-time analysis project - tweeter sentiment analysis""";

In [2]:
import json
import requests

In [76]:
# =============================================================
# CONFIGURATION
# =============================================================
with open("config.json", "r", encoding='utf-8') as conf:
    config = json.load(conf)

In [4]:
# =============================================================
# TWEET SCRAPING
# =============================================================

In [5]:
class ApiConnector:
    """Object providing methods for tweeter data scraping based on hashtag list provided by user"""
    
    url_base = "https://api.twitter.com/2/tweets/search/recent?query={}&max_results={}&tweet.fields=created_at"
    
    def __init__(self, hashtags: list, max_results: int, bearer_token: str):
        self.hashtags = hashtags
        self.max_results = max_results
        self.headers = {"Authorization": f"Bearer {bearer_token}"}
        self.session = requests.Session()
        
    @property
    def query(self) -> str:
        
        _query_list = ["%23" + self.hashtags[0]]
        
        for tag in self.hashtags[1:]:
            _query_list.append("%20OR%20%23" + tag)
            
        _query = "".join(_query_list)
        return _query
        
    def get_hashtags(self) -> list:
        return self.hashtags
    
    def set_hashtags(self, hashtags: list) -> None:
        self.hashtags = hashtags
        
    def get_max_results(self) -> int:
        return self.max_results
        
    def set_max_results(max_results: int) -> None:
        self.max_results = max_results
        
    @property
    def api_url(self) -> str:
        return self.url_base.format(self.query, self.max_results)
    
    def get_tweets(self) -> list:
        """Returns a list containing scraped tweets"""
        response = self.session.get(self.api_url, headers=self.headers)
        tweets = response.json()["data"]
        return tweets

In [6]:
_HASHTAGS = ["polskilad", "polskiwal", "nowylad", "nowywal", "drozyznapis"]
_MAX_RESULTS = 10

In [7]:
conn = ApiConnector(_HASHTAGS,
                   _MAX_RESULTS,
                   config["bearer_token"])


In [None]:
# =============================================================
# SENTIMENT ANALYSIS
# =============================================================

In [8]:
import re

import pandas as pd

from textblob import TextBlob

from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
from nltk.sentiment.vader import SentimentIntensityAnalyzer

In [16]:
"""Create spark session"""

# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterAnalysisApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create spark session
spark = SparkSession(sc)

In [87]:
"""Define transformations"""

def get_score(text: str) -> dict:
    return SentimentIntensityAnalyzer().polarity_scores(text)


def clean_text(text) -> str:

        text=re.sub(r'@[A-Za-z0-9!#$%^&*_]+', '', text)

        text=re.sub(r'#', '', text)

        text=re.sub(r'RT[\s]+', '', text)

        text=re.sub(r'https?:\/\/\S+', '', text)

        text=re.sub(r'\n', '', text) 

        if text[0] == ":":
            text = text[1:]

        return"".join([i if ord(i) < 128 else "" for i in text])

    
def leave_char(letter):
    return str.isalpha(letter) or letter == " "


def prepare_text(text):
    text_cleaned = clean_text(text)
    text_raw = ''.join(filter(leave_char, text_cleaned))
    return str(TextBlob(text_raw).translate(from_lang = 'pl', to = 'eng'))    


def count_hashtags(text: str) -> dict:
    res = {}
    text = text.replace("\n", " ")
    for word in text.split(" "):
        if "#" in word:
            if word.strip() not in res:
                res[word.strip()] = 1
            else:
                res[word.strip()] += 1
    return res

def score_raw_text(raw_text: str) -> dict:
    return get_score(prepare_text(raw_text))

In [None]:
"""Define operations"""

def tranform_tweets(spark: pyspark.sql.session.SparkSession, tweets: dict) -> pd.DataFrame:
    sDf = spark.createDataFrame(tweets, ["id", "timestamp", "text"])
    rdd = sDf.rdd
    rdd2 = rdd.map(lambda x: (x["id"], x["timestamp"], score_raw_text(x["text"]), count_hashtags(x["text"])))
    tempDf = rdd2.toDF(["timestamp", "id", "score", "hashtags"])
    rdd3 = tempDf.rdd.map(lambda x: (x["timestamp"], x["id"], x["score"], x["hashtags"], len(x["hashtags"])))
    final_sDf = rdd3.toDF(["timestamp", "id", "score", "hashtags", "hashtag_count"])
    final_sDf.show()  # display the current batch df
    return final_sDf.toPandas()

def prepare_tables(pDf: pd.DataFrame) -> tuple:
    pDf = pd.concat([pDf, pd.json_normalize(pDf["score"])], axis=1)

    # create table for score
    score_df = pDf.loc[:, ["id", "timestamp", "neg", "neu", "pos", "compound"]]

    # create table for hashtags
    hashtag_df = pd.DataFrame([[i, t, k, v, c] for i, t, d, c in pDf[['id', 'timestamp', 'hashtags', 'hashtag_count']].values for k, v in d.items()],
    columns=['id','timestamp', 'hashtag_name', 'value', 'hashtag_count'])
    hashtag_df.drop("value", axis=1, inplace=True)
    
    return score_df, hashtag_df


In [None]:
# =============================================================
# DATABASE
# =============================================================

In [72]:
"""Connect to database"""
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

In [78]:
config = config["database"]

In [79]:
engine = create_engine(URL(
    user=config["user"],
    password=config["password"],
    account=config["account"],
    warehouse=config["warehouse"],
    database=config["database"],
    schema =config["schema"]
))
 
db_connection = engine.connect()

In [96]:
"""Define operation"""

def send_to_sql(db_connection: sqlalchemy.engine.base.Connection, score_df: pd.DataFrame, hashtag_df: pd.DataFrame) -> None:
    try:
        # save score to database
        score_df.to_sql("analiza_sentymentu", db_connection, if_exists='append', index=False)

        # save hashtags to database
        hashtag_df.to_sql("hashtags", db_connection, if_exists='append', index=False)
    except Exception as e:
        print(f"{type(e)}: {str(e)}")

In [None]:
# =============================================================
# MAIN
# =============================================================

In [None]:
import time

_TIMEOUT = 10  # time interval between batches


def main():
    try:
        # while True:
        for _ in range(100):
            try:
                tweets = conn.get_tweets()
                pDf = transform_tweets(spark, tweets)
                score_df, hashtag_df = prepare_tables(pDf)
                send_to_sql(db_connection, score_df, hashtag_df)
            
            except Exception as e:
                print(f"[GLOBAL WHILE] {type(e)}: {str(e)}")
            
            finally:
                time.sleep(_TIMEOUT)  # no matter what happens, wait before proceeding
    
    except Exception as e:
        print(f"[GLOBAL] {type(e)}: {str(e)}")
        return -1
    
    finally:
        conn.session.close()  # close Api connector session
        spark.stop()  # close spark session
        sc.stop()  # close spark
        db_connection.close()  # close database connection
        
    return 0

In [None]:
if __name__ == "__main__":
    main()

In [105]:
!pip3 install nbconvert

