In [1]:
inp = input("Do you already have a tweets dataset to upload (y/n)? If no, let's start scrapping!")
if inp == 'n':
    pass
else:
    keyword = input('What is the topic that you are interested in?')
    language = input('What language should the tweets be in?')
    before_time = input('What is the time window of your tweets (before yyyy-mm-dd)?')
    after_time = input('What is the time window of your tweets (after yyyy-mm-dd)?')
    num_tweets = input('How many tweets do you want?')

In [3]:
import boto3
import json
import dataset

sqs = boto3.client('sqs')
aws_lambda = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')
dynamodb = boto3.resource('dynamodb')
dynamo_client = boto3.client('dynamodb')
s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')
rds = boto3.client('rds')

In [4]:
# Create S3 bucket to store raw JSON data
s3.create_bucket(Bucket='raw-tweet-bucket')

{'ResponseMetadata': {'RequestId': '6JAQD1QFFBNB1BVS',
  'HostId': 'rdpZxFbEtovSI4CDD3jFjFx6GoXbLn98mgrfnJQr9Aprogt64926NCAdt/w6JMmHy5aN37J2SbE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'rdpZxFbEtovSI4CDD3jFjFx6GoXbLn98mgrfnJQr9Aprogt64926NCAdt/w6JMmHy5aN37J2SbE=',
   'x-amz-request-id': '6JAQD1QFFBNB1BVS',
   'date': 'Fri, 03 Jun 2022 19:15:27 GMT',
   'location': '/raw-tweet-bucket',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': '/raw-tweet-bucket'}

In [35]:
try:
    response = rds.create_db_instance(
        DBInstanceIdentifier='relational-db',
        DBName='twitter_sentiment',
        MasterUsername='username',
        MasterUserPassword='password',
        DBInstanceClass='db.t2.micro',
        Engine='MySQL',
        AllocatedStorage=5
    )
except:
    pass

# Wait until DB is available to continue
rds.get_waiter('db_instance_available').wait(DBInstanceIdentifier='relational-db')

# Describe where DB is available and on what port
db = rds.describe_db_instances()['DBInstances'][0]
ENDPOINT = db['Endpoint']['Address']
PORT = db['Endpoint']['Port']
DBID = db['DBInstanceIdentifier']

print(DBID,
      "is available at", ENDPOINT,
      "on Port", PORT,
     )   

relational-db is available at relational-db.ccps3ediik0q.us-east-1.rds.amazonaws.com on Port 3306


In [36]:
# Get Name of Security Group
SGNAME = db['VpcSecurityGroups'][0]['VpcSecurityGroupId']

# Adjust Permissions for that security group so that we can access it on Port 3306
# If already SG is already adjusted, print this out
try:
    ec2 = boto3.client('ec2')
    data = ec2.authorize_security_group_ingress(
            GroupId=SGNAME,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': PORT,
                 'ToPort': PORT,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ]
    )
except ec2.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
        print("Permissions already adjusted.")
    else:
        print(e)

Permissions already adjusted.


In [37]:
# connect to RDS
db_url = 'mysql+mysqlconnector://{}:{}@{}:{}/twitter_sentiment'.format('username', 'password', ENDPOINT, PORT)
db = dataset.connect(db_url)

In [9]:
with open('twitter_sentiment_deployment_package.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='twitter_sentiment',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=100
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
    FunctionName='twitter_sentiment',
    ZipFile=lambda_zip
    )

lambda_arn = response['FunctionArn']    

In [10]:
# Create step function
!python sfn_setup.py

In [31]:
lambda_client = boto3.client('lambda')
sfn = boto3.client('stepfunctions')

def send_request(tweet_data, sqs_url):
    # for tweet in tweet_data:
    # response = sqs.send_message(QueueUrl=sqs_url,
    #                             MessageBody=json.dumps(tweet_data))
    # lambda_payload = {"data": tweet_data}
    # lambda_client.invoke(FunctionName='tweet_lambda', 
    #                  InvocationType='RequestResponse',
    #                  Payload=json.dumps(tweet_data))
    response = tweet_data
    raw_bucket_name = 'raw-tweet-bucket'
    tweet_batches = [{'batch': []} for i in range(10)]
    batch_size = int(len(response)/10)
    remaining = len(response)%10
    batch_num = 0

    for r in tweet_data:

        tweet = {
            'tweet_id': r['id'],
            'datestamp': r['date'],
            'timezone': r['timezone'],
            'user_id': r['user_id'],
            'num_retweets': r['nretweets'],
            'num_likes': r['nlikes'],
            'in_reply_to': r['user_rt_id'],
            'text': r['tweet']
        }

        tweet_batches[batch_num]['batch'].append(tweet)
        if len(tweet_batches[batch_num]['batch']) == batch_size:
            if remaining > 0:
                remaining -=1 
                continue
            batch_num += 1

    # step function for activating 10 lambda workers
    response = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                        for sm in response['stateMachines'] 
                        if sm['name'] == 'twitter_sm'][0]

    response = sfn.start_execution(
        stateMachineArn=state_machine_arn,
        name='sentiment',
        input=json.dumps(tweet_batches)
    )

                                
    return response['ResponseMetadata']['HTTPStatusCode']

In [50]:
# These define the bucket and object to read
bucketname = 'abortion-bucket' 
file_to_read = 'abortion_batch_0.json' 

#Create a file object using the bucket and object key. 
fileobj = s3.get_object(
    Bucket=bucketname,
    Key=file_to_read
) 
# open the file object and read it into the variable filedata. 
filedata = fileobj['Body'].read()
json.loads(filedata.decode('utf-8'))['batch'][0]['tweet_id']

'1532439327422173191'

In [14]:
import sys
sys.getsizeof(tweet_data[0])

1176

In [38]:
param = {
    "keyword": "abortion",
}
data = [{'tweet_id': '0001',
        'TimeStamp': '2022-06-02 18:49:35 UTC', #format slightly different: '2022-06-02 18:49:35 UTC'
        'Twitter account': '0001',
        'Num of comments/retweets': 10,
        'Likes': 10,
        'Reply_to': '0002',
        'text': 'abortion'}, {
        'tweet_id': '0002',
        'TimeStamp': '2022-06-02 18:49:35 UTC', #format slightly different: '2022-06-02 18:49:35 UTC'
        'Twitter account': '0002',
        'Num of comments/retweets': 10,
        'Likes': 10,
        'Reply_to': '0003', 
        'text': 'abortion'}]
send_request(tweet_data, sqs_url=queue_url)

200

In [15]:
tweet_batches = [{'batch': []} for i in range(10)]
batch_size = int(len(response)/10)
remaining = len(response)%10
batch_num = 0

for r in response:
    tweet_dict = {
                'id': r._json['id'],
                'TimeStamp': r._json['created_at'],
                'Twitter account': r._json['user']['id_str'],
                'Num of comments/retweets': r._json['retweet_count'],
                'Likes': r._json['favorite_count'],
                'Reply_to': r._json['in_reply_to_user_id']
            }
    tweet_batches[batch_num]['batch'].append(tweet_dict)
    if len(tweet_batches[batch_num]['batch']) == batch_size:
        if remaining > 0:
            remaining -=1 
            continue
    batch_num += 1

tweet_batches

[{'batch': [{'id': 1531674365242359808,
    'TimeStamp': 'Tue May 31 16:30:00 +0000 2022',
    'Twitter account': '815733290955112448',
    'Num of comments/retweets': 5326,
    'Likes': 27356,
    'Reply_to': None},
   {'id': 1531723932767617024,
    'TimeStamp': 'Tue May 31 19:46:58 +0000 2022',
    'Twitter account': '878284831',
    'Num of comments/retweets': 3905,
    'Likes': 13427,
    'Reply_to': None}]},
 {'batch': [{'id': 1531621877726208010,
    'TimeStamp': 'Tue May 31 13:01:26 +0000 2022',
    'Twitter account': '19706851',
    'Num of comments/retweets': 1023,
    'Likes': 2077,
    'Reply_to': None},
   {'id': 1532088007917981696,
    'TimeStamp': 'Wed Jun 01 19:53:40 +0000 2022',
    'Twitter account': '1311079340168404997',
    'Num of comments/retweets': 8,
    'Likes': 0,
    'Reply_to': None}]},
 {'batch': [{'id': 1532088005065945091,
    'TimeStamp': 'Wed Jun 01 19:53:39 +0000 2022',
    'Twitter account': '1224132582717251585',
    'Num of comments/retweets': 532

In [14]:
batch_size

1

In [74]:
db.tables

[]

In [33]:
db['tweets_table'].columns

['id',
 'tweet_id',
 'sentiment',
 'sentiment_score',
 'timestamp',
 'user_id',
 'num_retweets',
 'num_likes',
 'in_reply_to',
 'text']

In [14]:
db['tweets_table'].drop()

In [41]:
list(db['tweets_table'].find(id=500))

[OrderedDict([('id', 500),
              ('tweet_id', '1532435952370602018'),
              ('sentiment', 'NEUTRAL'),
              ('sentiment_score',
               {'Mixed': 0.003288233419880271,
                'Neutral': 0.5341841578483582,
                'Negative': 0.4396229386329651,
                'Positive': 0.02290474809706211}),
              ('timestamp', datetime.datetime(2022, 6, 2, 18, 56, 16)),
              ('user_id', 1526449622834716672),
              ('num_retweets', 0),
              ('num_likes', 1),
              ('in_reply_to', ''),
              ('text',
               "@definitely_down Why do they get pregnant in the first place while condoms are free, and if they like it raw they should gt morning after or th monthly injection....all thse choices are there to prevent pregnancy, u can't ignore these steps and start demanding legalisation of abortion,...")])]

In [42]:
all_data = db['tweets_table'].all()
all_data = [data for data in all_data]
with open('rds.json', "w") as outfile:
    json.dump(all_data, outfile, default=str)

In [43]:
json.load(open('rds.json'))

[{'id': 1,
  'tweet_id': '1532437300147126272',
  'sentiment': 'NEGATIVE',
  'sentiment_score': {'Mixed': 0.007484623696655035,
   'Neutral': 0.10701057314872742,
   'Negative': 0.8621703386306763,
   'Positive': 0.023334486410021785},
  'timestamp': '2022-06-02 19:01:38',
  'user_id': 34756550,
  'num_retweets': 12,
  'num_likes': 39,
  'in_reply_to': '',
  'text': 'It’s Gun Control season already??? I feel like I just put up my Abortion decorations and put away my Ukraine stuff.'},
 {'id': 2,
  'tweet_id': '1532437286666784790',
  'sentiment': 'NEGATIVE',
  'sentiment_score': {'Mixed': 0.0009462723974138498,
   'Neutral': 0.47555848956108093,
   'Negative': 0.5212928652763367,
   'Positive': 0.0022024207282811403},
  'timestamp': '2022-06-02 19:01:35',
  'user_id': 1443394080906547202,
  'num_retweets': 0,
  'num_likes': 0,
  'in_reply_to': '',
  'text': '@antifaoperative @Logically_JC The states with the most stringent anti- abortion laws  are certainly not the most Pro-Life States 