In [51]:
import twint_search

inp = input("Do you already have a tweets dataset to upload (y/n)? If no, let's start scrapping!")
if inp == 'y':
    file_path = input('Please enter the path to your existing tweets data file (.json file at local)')
    keyword = input('What is your tweets data about? (Enter the keyword of the topic)')
    tweets_list = json.load(open(file_path))
    tweets_list = list(tweets_list.values())
else:
    keyword = input('What is the topic that you are interested in?')
    before_time = input('What is the time window of your tweets (before yyyy-mm-dd)?')
    after_time = input('What is the time window of your tweets (after yyyy-mm-dd)?')
    num_tweets = input('How many tweets do you want?')
    tweets_list, keyword = twint_search.search_tweet({'keyword': keyword, 'since': after_time, 'until': before_time, 'limit': num_tweets}, from_ui=False)


In [3]:
import boto3
import json
import dataset

sqs = boto3.client('sqs')
aws_lambda = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')
s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')
rds = boto3.client('rds')

In [4]:
# Create S3 bucket to store raw JSON data
s3.create_bucket(Bucket='raw-tweet-bucket')

{'ResponseMetadata': {'RequestId': '6JAQD1QFFBNB1BVS',
  'HostId': 'rdpZxFbEtovSI4CDD3jFjFx6GoXbLn98mgrfnJQr9Aprogt64926NCAdt/w6JMmHy5aN37J2SbE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'rdpZxFbEtovSI4CDD3jFjFx6GoXbLn98mgrfnJQr9Aprogt64926NCAdt/w6JMmHy5aN37J2SbE=',
   'x-amz-request-id': '6JAQD1QFFBNB1BVS',
   'date': 'Fri, 03 Jun 2022 19:15:27 GMT',
   'location': '/raw-tweet-bucket',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': '/raw-tweet-bucket'}

In [35]:
try:
    response = rds.create_db_instance(
        DBInstanceIdentifier='relational-db',
        DBName='twitter_sentiment',
        MasterUsername='username',
        MasterUserPassword='password',
        DBInstanceClass='db.t2.micro',
        Engine='MySQL',
        AllocatedStorage=5
    )
except:
    pass

# Wait until DB is available to continue
rds.get_waiter('db_instance_available').wait(DBInstanceIdentifier='relational-db')

# Describe where DB is available and on what port
db = rds.describe_db_instances()['DBInstances'][0]
ENDPOINT = db['Endpoint']['Address']
PORT = db['Endpoint']['Port']
DBID = db['DBInstanceIdentifier']

print(DBID,
      "is available at", ENDPOINT,
      "on Port", PORT,
     )   

relational-db is available at relational-db.ccps3ediik0q.us-east-1.rds.amazonaws.com on Port 3306


In [36]:
# Get Name of Security Group
SGNAME = db['VpcSecurityGroups'][0]['VpcSecurityGroupId']

# Adjust Permissions for that security group so that we can access it on Port 3306
# If already SG is already adjusted, print this out
try:
    ec2 = boto3.client('ec2')
    data = ec2.authorize_security_group_ingress(
            GroupId=SGNAME,
            IpPermissions=[
                {'IpProtocol': 'tcp',
                 'FromPort': PORT,
                 'ToPort': PORT,
                 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
            ]
    )
except ec2.exceptions.ClientError as e:
    if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
        print("Permissions already adjusted.")
    else:
        print(e)

Permissions already adjusted.


In [37]:
# connect to RDS
db_url = 'mysql+mysqlconnector://{}:{}@{}:{}/twitter_sentiment'.format('username', 'password', ENDPOINT, PORT)
db = dataset.connect(db_url)

In [9]:
with open('twitter_sentiment_deployment_package.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='twitter_sentiment',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=100
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
    FunctionName='twitter_sentiment',
    ZipFile=lambda_zip
    )

lambda_arn = response['FunctionArn']    

In [10]:
# Create step function
!python sfn_setup.py

In [31]:
lambda_client = boto3.client('lambda')
sfn = boto3.client('stepfunctions')

def send_request(tweet_data, keyword):
    raw_bucket_name = f'{keyword}-bucket'
    tweet_batches = [{'batch': []} for i in range(10)]
    batch_size = int(len(response)/10)
    remaining = len(response)%10
    batch_num = 0

    for r in tweet_data:

        tweet = {
            'tweet_id': r['id'],
            'datestamp': r['date'],
            'timezone': r['timezone'],
            'user_id': r['user_id'],
            'num_retweets': r['nretweets'],
            'num_likes': r['nlikes'],
            'in_reply_to': r['user_rt_id'],
            'text': r['tweet']
        }

        tweet_batches[batch_num]['batch'].append(tweet)
        if len(tweet_batches[batch_num]['batch']) == batch_size:
            if remaining > 0:
                remaining -=1 
                continue
            batch_num += 1

    data_files = [{'batch': []} for _ in range(10)]

    for batch_id in range(10):
        batch = tweet_batches[batch_id]
        raw_file_name = f"{keyword}_batch_{batch_id}.json"
        with open('/tmp/' + raw_file_name, "w") as outfile:
            json.dump(batch, outfile)
        s3.upload_file('/tmp/' + raw_file_name, raw_bucket_name, raw_file_name)
        data_files[batch_id]['batch'] = [raw_bucket_name, raw_file_name]
    

    # step function for activating 10 lambda workers
    response = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                        for sm in response['stateMachines'] 
                        if sm['name'] == 'twitter_sm'][0]

    response = sfn.start_sync_execution(
        stateMachineArn=state_machine_arn,
        name='sentiment',
        input=json.dumps(data_files)
    )
                                
    return response['ResponseMetadata']['HTTPStatusCode']

In [38]:
send_request(tweets_list, keyword)

200

In [33]:
db['tweets_table'].columns

['id',
 'tweet_id',
 'sentiment',
 'sentiment_score',
 'timestamp',
 'user_id',
 'num_retweets',
 'num_likes',
 'in_reply_to',
 'text']

In [42]:
## dump rds to json
# all_data = db['tweets_table'].all()
# all_data = [data for data in all_data]
# with open('rds.json', "w") as outfile:
#     json.dump(all_data, outfile, default=str)
# json.load(open('rds.json'))