# Marites Analyse

## Overview
Contains the logic for the analyse function

In [1]:
# Imports
import os
from dotenv import load_dotenv
import requests
from datetime import datetime
import pandas as pd
import re
import boto3
from io import StringIO, BytesIO
from uuid import uuid4
import tarfile
from tempfile import TemporaryDirectory

load_dotenv()
print("Import complete.")

Import complete.


In [2]:
max_twitter_posts = 100
max_following = 250
token = os.environ.get("BEARER_TOKEN")
test_username = 'JoseRizal619'

region = 'ap-southeast-2'
language_code = 'en'
input_bucket = 'marites-comprehend-input'
output_bucket = 'marites-comprehend-output'
data_access_role_arn = os.environ.get("DATA_ACCESS_ROLE")
input_doc_format = 'ONE_DOC_PER_LINE'

tg_input_folder = 'tigergraph' 
comprehend_input_folder = 'comprehend'

session_id = uuid4()
print(session_id)

2232610a-3a1e-4d49-b79e-1c7c3aba0ec2


In [3]:
# Twitter Functions

search_url = "https://api.twitter.com/2/tweets/search/recent"
following_url = "https://api.twitter.com/2/users/{}/following"
lookup_username_url = "https://api.twitter.com/2/users/by/username/{}"


def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """
    r.headers["Authorization"] = f"Bearer {token}"
    return r


def fetch_user_by_username(username):
    url = lookup_username_url.format(username)
    response = requests.get(url, auth=bearer_oauth)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    json_res = response.json()
    return json_res['data']

def map_tweets_to_post(raw_data):
    if 'data' not in raw_data:
        return []

    tweets = raw_data['data']
    username = raw_data['includes']['users'][0]['username']
    ref_tweets = { tweet['id']: tweet['text'] for tweet in raw_data['includes']['tweets'] } if 'includes' in raw_data and 'tweets' in raw_data['includes'] else {}
    
    results = []
    for t in tweets:
        post = { 
            'tweet_id': t['id'],
            'username': username,
            'created_at': t['created_at']
        }
        if 'referenced_tweets' in t:
            combined_text = []
            for rt in t['referenced_tweets']:
                rt_id = rt['id']
                if rt_id in ref_tweets:
                    rt_text = ref_tweets[rt_id]
                    combined_text.append(rt_text)
            post['text'] = ' '.join(combined_text)
        else:
            post['text'] = t['text']

        results.append(post)
    
    return results

def fetch_tweets_by_username(username):
    params = {
        "query": "from:{} -is:reply".format(username),
        "max_results": max_twitter_posts,
        "expansions": "referenced_tweets.id,author_id",
        "tweet.fields": "created_at",
        "user.fields": "username"
    }
    response = requests.get(search_url, auth=bearer_oauth, params=params)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    data = response.json()
    return map_tweets_to_post(data)

def fetch_following(user_id):
    url = following_url.format(user_id)
    params = {
        'max_results': max_following
    }
    response = requests.get(url, auth=bearer_oauth, params=params)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    json_res = response.json()
    return json_res['data']

In [13]:
# Twitter data extraction

def get_user_tweets(users_to_search):
    processed = 0
    all_tweets = []
    for user in users_to_search:
        user_tweets = fetch_tweets_by_username(user)
        processed += 1
        all_tweets.extend(user_tweets)
        progress = round((processed / len(users_to_search)) * 100, 2)
        print("Processed {}/{} users ({}%)".format(processed, len(users_to_search), progress))
    user_tweets = pd.DataFrame(all_tweets)
    return user_tweets

def get_user_following_map(user, following):
    date = datetime.now().strftime("%m-%d-%y")
    username = user['username']
    follow_names = list(map(lambda x: x['username'], following))
    
    return pd.DataFrame({
        'user': [username] * len(following),
        'following': follow_names,
        'date': [date] * len(following)
    })


def clean_posts(data):
    user_tweets = data
    
    if user_tweets.empty:
        return []
    
    # Clean up the links from the text (they're useless to us)
    user_tweets['text'] = user_tweets['text'].apply(lambda x: re.split('https:\/\/.*', str(x))[0])

    # Remove all emojis
    user_tweets = user_tweets.astype(str).apply(lambda x: x.str.encode('ascii', 'ignore').str.decode('ascii'))

    # Remove blank tweets
    user_tweets = user_tweets[user_tweets.text.str.strip().str.len() != 0]

    # Ensure that all text is in a single line
    user_tweets.text = user_tweets.text.str.replace('\n', ' ');
    user_tweets.text = user_tweets.text.str.replace('\r', ' ');
    
    return user_tweets

def extract_twitter_data(username):
    users_list = []
    user = fetch_user_by_username(username)
    user_following = fetch_following(user['id'])

    users_list.append(user)
    users_list.extend(user_following)
    
    users_to_search = list(map(lambda x: x['username'], users_list))
    
    posts_df = get_user_tweets(users_to_search)
    following_df = get_user_following_map(user, user_following) # user -> following edges
    users_df = pd.DataFrame(users_list) # users vertex
    
    print(posts_df)
    
    return {
        'posts': clean_posts(posts_df),
        'following': following_df,
        'users': users_df
    }


In [14]:
# Upload to S3 functions

def upload_text_to_s3(data, bucket_name, file_name):
    text_buffer = StringIO()
    data.text.to_csv(text_buffer, sep=' ', index=False, header=False)
    s3_resource = boto3.resource('s3')
    return s3_resource.Object(bucket_name, '{}.txt'.format(file_name)).put(Body=text_buffer.getvalue())

def upload_frames_to_s3(tar_filename, bucket_name, frame_dict):
    tar_buffer = BytesIO()
    
    # Create a tarfile into which frames can be added
    with tarfile.open(fileobj=tar_buffer, mode='w:gz') as tfo:
        
        # Loop over all dataframes to be saved
        for file_name, df in frame_dict.items():
            
            # Compute the full path of the output file within the archive
            archive_name = os.path.join('output', file_name)
            
            # Create a temporary directory for packaging into a tar_file
            with TemporaryDirectory(prefix='rev_processing__') as temp_dir:
                
                # Write a csv dump of the dataframe to a temporary file
                temp_file_name = os.path.join(temp_dir, archive_name)
                os.makedirs(os.path.dirname(temp_file_name), exist_ok=True)
                df.to_csv(temp_file_name, index=False)
                
                # Add the temp file to the tarfile
                tfo.add(temp_file_name, arcname=archive_name)
    
    # Upload to S3
    s3_resource = boto3.resource('s3')
    return s3_resource.Object(bucket_name, f'{tar_filename}.tar.gz').put(Body=tar_buffer.getvalue())
    

def upload_csv_to_s3(data, bucket_name, file_name):
    buffer = StringIO()
    data.to_csv(buffer, index=False)
    s3_resource = boto3.resource('s3')
    return s3_resource.Object(bucket_name, '{}.csv'.format(file_name)).put(Body=buffer.getvalue())


In [15]:
test_data = extract_twitter_data(test_username)

Processed 1/26 users (3.85%)
Processed 2/26 users (7.69%)
Processed 3/26 users (11.54%)
Processed 4/26 users (15.38%)
Processed 5/26 users (19.23%)
Processed 6/26 users (23.08%)
Processed 7/26 users (26.92%)
Processed 8/26 users (30.77%)
Processed 9/26 users (34.62%)
Processed 10/26 users (38.46%)
Processed 11/26 users (42.31%)
Processed 12/26 users (46.15%)
Processed 13/26 users (50.0%)
Processed 14/26 users (53.85%)
Processed 15/26 users (57.69%)
Processed 16/26 users (61.54%)
Processed 17/26 users (65.38%)
Processed 18/26 users (69.23%)
Processed 19/26 users (73.08%)
Processed 20/26 users (76.92%)
Processed 21/26 users (80.77%)
Processed 22/26 users (84.62%)
Processed 23/26 users (88.46%)
Processed 24/26 users (92.31%)
Processed 25/26 users (96.15%)
Processed 26/26 users (100.0%)
Empty DataFrame
Columns: []
Index: []


In [18]:
test_data

{'posts': [],
 'following':             user        following      date
 0   JoseRizal619       JoseMaBasa  04-18-22
 1   JoseRizal619   LeonorRivera00  04-18-22
 2   JoseRizal619  lopezantonio606  04-18-22
 3   JoseRizal619       senorfabie  04-18-22
 4   JoseRizal619      SaturninaMH  04-18-22
 5   JoseRizal619    GovGenVWeyler  04-18-22
 6   JoseRizal619  SilvestreUbaldo  04-18-22
 7   JoseRizal619   LeonorRivera19  04-18-22
 8   JoseRizal619   PacianoMercad0  04-18-22
 9   JoseRizal619  WenceslaoRetana  04-18-22
 10  JoseRizal619    SuzanneJacoby  04-18-22
 11  JoseRizal619   AntonioLuna000  04-18-22
 12  JoseRizal619          _dandoy  04-18-22
 13  JoseRizal619  PlaridelMarcelo  04-18-22
 14  JoseRizal619  _JosePanganiban  04-18-22
 15  JoseRizal619    MarianoPonce5  04-18-22
 16  JoseRizal619      AuntIsabel2  04-18-22
 17  JoseRizal619  FrancisoMercado  04-18-22
 18  JoseRizal619     TenantsniDon  04-18-22
 19  JoseRizal619       platonnica  04-18-22
 20  JoseRizal619   EduardBo

In [6]:
# Comprehend analysis

def start_targeted_sentiment_job(input_s3_url, output_s3_url, job_tag):
    input_data_config = {
        'S3Uri': input_s3_url,
        'InputFormat': input_doc_format
    }

    output_data_config = {
        'S3Uri': output_s3_url
    }

    job_name = 'Targeted_Sentiment_Job_{}'.format(job_tag)
    
    comprehend = boto3.client('comprehend', region_name=region)
    return comprehend.start_targeted_sentiment_detection_job(InputDataConfig=input_data_config,
                                                             OutputDataConfig=output_data_config, 
                                                             DataAccessRoleArn=data_access_role_arn, 
                                                             LanguageCode=language_code,
                                                             JobName=job_name)

def analyse_tweets(username):
    date = datetime.now().strftime("%m-%d-%y")
    tag = "{}-{}".format(date, username)
    
    twitter_data = extract_twitter_data(username)

    posts = twitter_data['posts']
    posts['line_id'] = posts.index.map(lambda x: '{}-{}'.format(x, tag)) # used for mapping entities

    following = twitter_data['following']
    users = twitter_data['users']
    
    session_folder = '{}/{}'.format(session_id, username)
    tg_folder = '{}/{}'.format(tg_input_folder, session_folder) # Tigergraph files
    comp_folder = '{}/{}'.format(comprehend_input_folder, session_folder) # Comprehend files

    posts_filename = 'posts'
    following_filename = 'following'
    users_filename = 'users'
    
    # Upload data to Comprehend input folder
    print("Uploading comprehend input files...")
    upload_text_to_s3(posts, input_bucket, '{}/{}_{}'.format(comp_folder, posts_filename, tag))
    
    print("Uploading Tigergraph input files...")
    # Upload data to Tigergraph input folder
    uploaded_frames = {
        f'{users_filename}.csv': users,
        f'{following_filename}.csv': following,
        f'{posts_filename}.csv': posts
    }
    upload_frames_to_s3(tg_folder, input_bucket, uploaded_frames)
    
    print("Starting comprehend job...")
    # Start comprehend job
    input_s3_url = 's3://{}/{}'.format(input_bucket, comp_folder)
    output_s3_url = 's3://{}/{}'.format(output_bucket, session_folder)
    return start_targeted_sentiment_job(input_s3_url, output_s3_url, tag)


In [15]:
analyse_tweets(test_username)

Processed 1/115 users (0.87%)
Processed 2/115 users (1.74%)
Processed 3/115 users (2.61%)
Processed 4/115 users (3.48%)
Processed 5/115 users (4.35%)
Processed 6/115 users (5.22%)
Processed 7/115 users (6.09%)
Processed 8/115 users (6.96%)
Processed 9/115 users (7.83%)
Processed 10/115 users (8.7%)
Processed 11/115 users (9.57%)
Processed 12/115 users (10.43%)
Processed 13/115 users (11.3%)
Processed 14/115 users (12.17%)
Processed 15/115 users (13.04%)
Processed 16/115 users (13.91%)
Processed 17/115 users (14.78%)
Processed 18/115 users (15.65%)
Processed 19/115 users (16.52%)
Processed 20/115 users (17.39%)
Processed 21/115 users (18.26%)
Processed 22/115 users (19.13%)
Processed 23/115 users (20.0%)
Processed 24/115 users (20.87%)
Processed 25/115 users (21.74%)
Processed 26/115 users (22.61%)
Processed 27/115 users (23.48%)
Processed 28/115 users (24.35%)
Processed 29/115 users (25.22%)
Processed 30/115 users (26.09%)
Processed 31/115 users (26.96%)
Processed 32/115 users (27.83%)

{'JobId': '4bdbfbdf964c2e06aaf70392c50572e7',
 'JobArn': 'arn:aws:comprehend:ap-southeast-2:368767127050:targeted-sentiment-detection-job/4bdbfbdf964c2e06aaf70392c50572e7',
 'JobStatus': 'SUBMITTED',
 'ResponseMetadata': {'RequestId': '6b1ef4b3-b645-41ac-b05f-dc1cd034319a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6b1ef4b3-b645-41ac-b05f-dc1cd034319a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '192',
   'date': 'Sat, 16 Apr 2022 00:18:22 GMT'},
  'RetryAttempts': 0}}