In [2]:
# Importing the necessary Libraries 

import boto3
import json
import os
import tweepy
from collections import Counter
from datetime import datetime



In [5]:
# Initialize the Firehose client
firehose = boto3.client('firehose')

# Initialize the Comprehend client
comprehend = boto3.client('comprehend')



In [None]:
#  Getting Twitter API credentials from environment variables
TWITTER_API_KEY = os.getenv('TWITTER_API_KEY')
TWITTER_API_SECRET_KEY = os.getenv('TWITTER_API_SECRET_KEY')
TWITTER_ACCESS_TOKEN = os.getenv('TWITTER_ACCESS_TOKEN')
TWITTER_ACCESS_TOKEN_SECRET = os.getenv('TWITTER_ACCESS_TOKEN_SECRET')
TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')



In [None]:
# Checking if all environment variables are set
if not all([TWITTER_API_KEY, TWITTER_API_SECRET_KEY, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET, TWITTER_BEARER_TOKEN]):
    raise EnvironmentError("One or more Twitter API credentials are not set in environment variables.")

# Tweepy client initialization
client = tweepy.Client(bearer_token=TWITTER_BEARER_TOKEN, wait_on_rate_limit=True)

# Replace 'YourDeliveryStreamName' with the actual name of your Firehose delivery stream.
DELIVERY_STREAM_NAME = 'PUT-S3-v23mZ'



In [None]:
# Defining our function 
def lambda_handler(event, context):
    try:
        # Fetch recent tweets containing  keywords related to Apple
        query = "Apple Inc OR Apple Watch OR Apple OR #WWDC24  iPhone OR MacBook OR #Apple OR WWDC24 -is:retweet -from:Apple"
        response = client.search_recent_tweets(query=query, max_results=100)

        sentiments = []
        key_phrases = []

        if response.data:
            for tweet in response.data:
                # Structure the tweet data for Glue schema inference
                structured_tweet = {
                    'id': tweet.id,
                    'text': tweet.text,
                    'created_at': tweet.created_at.isoformat() if tweet.created_at else None,
                    'author_id': tweet.author_id,
                    'lang': tweet.lang,
                    'possibly_sensitive': tweet.possibly_sensitive if 'possibly_sensitive' in tweet.data else None,
                    'source': tweet.source
                }

                # Call Amazon Comprehend to analyze the tweet text
                comprehend_response = comprehend.detect_sentiment(Text=tweet.text, LanguageCode='en')
                structured_tweet['sentiment'] = comprehend_response['Sentiment']
                structured_tweet['sentiment_score'] = comprehend_response['SentimentScore']
                
                # Collect the sentiment
                sentiments.append(comprehend_response['Sentiment'])

                # Extract key phrases from the tweet text
                key_phrases_response = comprehend.detect_key_phrases(Text=tweet.text, LanguageCode='en')
                tweet_key_phrases = [phrase['Text'] for phrase in key_phrases_response['KeyPhrases']]
                key_phrases.extend(tweet_key_phrases)
                structured_tweet['key_phrases'] = tweet_key_phrases

                # Prepare the record data
                record_data = json.dumps(structured_tweet) + '\n'

                # Send the record to Firehose
                firehose_response = firehose.put_record(
                    DeliveryStreamName=DELIVERY_STREAM_NAME,
                    Record={'Data': record_data}
                )
                print("Firehose put_record response:", firehose_response)
        
        # Calculate the most common sentiment
        sentiment_counts = Counter(sentiments)
        top_sentiment = sentiment_counts.most_common(1)[0]

        # Calculate the most common key phrases
        key_phrase_counts = Counter(key_phrases)
        top_key_phrases = key_phrase_counts.most_common(10)

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Tweets processed successfully.',
                'top_sentiment': top_sentiment[0],
                'top_sentiment_count': top_sentiment[1],
                'top_key_phrases': top_key_phrases
            })
        }

    except tweepy.Unauthorized as e:
        return {
            'statusCode': 401,
            'body': json.dumps('Unauthorized access - please check your Twitter API credentials.')
        }
    except Exception as e:
        print(f'An error occurred: {str(e)}')
        return {
            'statusCode': 500,
            'body': json.dumps(f'An error occurred: {str(e)}')
        }
