<a href="https://colab.research.google.com/github/Kumarvels/GenAIProjects/blob/main/N8N_Workflow_to_Scrap_X.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Complete Workflow: Twitter Scraping with MCP and Agno Integration
##
# What
## Overview
# We are building a workflow that automates the process of scraping Twitter (X) # for top-performing posts based on a keyword, integrates with an MCP (Message # Control Protocol) server for decentralized communication, leverages Agno's #multi-agent framework for task coordination, and organizes the scraped data #into Google Sheets for further analysis and repurposing.

###########################################################################
## Components
###########################################################################
#
#1.Twitter Scraping: Retrieve tweets based on a keyword, capturing engagement #metrics and content.
#2.MCP Server Integration: Use MCP for message passing between different #components of the system.
#3.Agno Multi-Agent Framework: Coordinate multiple agents to handle scraping, #processing, and analysis tasks.
#4.Google Sheets Integration: Store the organized data in Google Sheets for #easy access and analysis.
#5.Data Analysis and Visualization: Analyze the scraped data to identify trends #and visualize engagement metrics.
#
###########################################################################

# Visualization for What Section
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.text(0.5, 0.5, 'Workflow Components', horizontalalignment='center', verticalalignment='center', fontsize=14)
plt.axis('off')
plt.title('Overview of Workflow Components')
plt.show()

# Why ?
## Efficiency
#Manually scrolling through social media for content ideas is inefficient and #time-consuming. This workflow automates the process, reducing it to minutes.

## Decentralized Communication
#MCP allows for decentralized message passing, ensuring that different #components of the system can communicate effectively without a central point #of failure.

## Multi-Agent Coordination
#Agno's framework enables the coordination of multiple agents, each specialized #in a specific task, improving the overall efficiency and reliability of the #workflow.

## Scalability and Availability
#The workflow is designed to handle high-scale data processing and maintain #availability through error handling, rate limiting, and parallel processing.

## Data-Driven Insights
#By analyzing engagement data, we can identify what content resonates with the #audience, informing better content strategies and decision-making.

# Visualization for Why Section
reasons = ['Efficiency', 'Decentralized Communication', 'Multi-Agent Coordination', 'Scalability and Availability', 'Data-Driven Insights']
importance = [8, 7, 8, 9, 9]  # Hypothetical importance scores
plt.figure(figsize=(10, 6))
plt.bar(reasons, importance, color=['blue', 'green', 'orange', 'purple', 'red'])
plt.xlabel('Reasons')
plt.ylabel('Importance (1-10)')
plt.title('Importance of Workflow Features')
plt.xticks(rotation=45)
plt.show()

# How
## Environment Setup
#We will use Google Colab to set up the environment, installing necessary #libraries like `tweepy`, `google-api-python-client`, `pandas`, `numpy`, #`matplotlib`, `agno`, and `mcp`.

## Authentication
#Authenticate with Twitter API using `tweepy` and `google-auth` and `google-api-python-client`.

## MCP Server Integration
#Set up an MCP client to connect to the MCP server and define endpoints for #scraping requests and data processing.

## Agno Multi-Agent Framework
#Define agents for scraping and processing tasks, create an agent team, and set #up a workflow to coordinate these tasks.

## Twitter Scraping
#Implement a function to scrape tweets based on a keyword, capturing engagement #metrics and content.

## Data Processing and Analysis
#Process the scraped data into a pandas DataFrame, sort it by engagement #metrics, and visualize the results.

## Google Sheets Integration
#Write the processed data to Google Sheets using the Google Sheets API.

## Scalability and Availability
#Implement rate limiting and parallel processing to ensure the workflow can #handle high-scale data and maintain availability.

# Visualization for How Section
steps = ['Environment Setup', 'Authentication', 'MCP Integration', 'Agno Framework', 'Twitter Scraping', 'Data Processing', 'Google Sheets', 'Scalability']
duration = [2, 1, 3, 4, 5, 3, 2, 4]  # Hypothetical durations in minutes
plt.figure(figsize=(12, 6))
plt.barh(steps, duration, color='skyblue')
plt.xlabel('Duration (minutes)')
plt.title('Estimated Duration for Each Step')
plt.show()

# Outcomes
## Prototype Completion
#A fully functional prototype in Google Colab that demonstrates the entire #workflow from scraping to analysis and storage.

## Insights
#Detailed insights into trending content and engagement patterns, identified #through data analysis and visualization.

## Scalability Blueprint
#A clear blueprint for scaling the workflow to production, including #considerations for error handling, rate limiting, and parallel processing.

## Production Readiness
#The workflow is designed with production deployment in mind, ensuring high #availability and reliability when scaled.

## Reusable Code and Documentation
#Comprehensive code and documentation that can be easily adapted for production #implementation, facilitating a smooth transition from prototype to production.

# Visualization for Outcomes Section
outcomes = ['Prototype Completion', 'Insights', 'Scalability Blueprint', 'Production Readiness', 'Reusable Code']
impact = [9, 8, 7, 8, 9]  # Hypothetical impact scores
plt.figure(figsize=(10, 6))
plt.pie(impact, labels=outcomes, autopct='%1.1f%%', startangle=140)
plt.title('Impact of Outcomes')
plt.show()

##########################################################################

# Step 1: Set Up the Environment
!pip install tweepy google-auth google-auth-oauthlib google-auth-httplib2 google-api-python-client pandas numpy matplotlib pyautogen pip-check flake8 isort

import tweepy
from google.colab import auth
from google.colab import userdata
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import autogen
from google.auth import default # Import default

# Step 2: Authenticate with Twitter and Google APIs
print("Authenticating with Google...")
auth.authenticate_user()
creds, _ = default() # Use default() after authenticate_user()
service = build('sheets', 'v4', credentials=creds)
print("Google authentication successful.")

print("\nAuthenticating with Twitter API...")
# Twitter API Authentication (using input for demonstration as requested)
# NOTE: Using input() is less secure than Colab secrets as credentials can appear in output/history.
# The recommended secure way is to use Colab's Secrets manager.
print("🔑 Enter your Twitter API credentials below")
twitter_api_key = input("API Key: ")
twitter_api_secret = input("API Secret: ")
twitter_access_token = input("Access Token: ")
twitter_access_token_secret = input("Access Token Secret: ")


if not all([twitter_api_key, twitter_api_secret, twitter_access_token, twitter_access_token_secret]):
    print("⚠️ Warning: Twitter API credentials not provided.")
    print("Please enter your credentials when prompted.")
else:
    try:
        # auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_secret)
        # auth.set_access_token(twitter_access_token, twitter_access_token_secret)
        # api = tweepy.API(auth) # No longer needed for v2 endpoints
        # Verify credentials by making a simple API call using Client
        client_v2 = tweepy.Client(
            consumer_key=twitter_api_key,
            consumer_secret=twitter_api_secret,
            access_token=twitter_access_token,
            access_token_secret=twitter_access_token_secret
        )
        # Attempt a simple call to verify credentials
        client_v2.get_me() # A simple v2 endpoint call
        print("✅ Twitter API authentication successful.")
    except tweepy.TweepyException as e:
        print(f"❌ Twitter API authentication failed: {e}")
        print("Please check your Twitter API credentials.")
    except Exception as e:
        print(f"An unexpected error occurred during Twitter authentication: {e}")


# Step 3: Set Up Autogen Agents (MCP removed)
# Autogen Agents - This is a placeholder and will be adapted in the next steps
# to integrate with the Twitter scraping functions.
# user_proxy = autogen.UserProxyAgent(name="user_proxy", code_execution_config={"last_n_messages": 2, "work_dir": "tasks"})
# assistant = autogen.AssistantAgent(name="assistant")


# Step 4: Scrape and Process Tweets
def scrape_tweets(keyword, count=100):
    tweets = []
    try:
        # Initialize Tweepy Client for v2 endpoints within the function
        client = tweepy.Client(
            consumer_key=twitter_api_key,
            consumer_secret=twitter_api_secret,
            access_token=twitter_access_token,
            access_token_secret=twitter_access_token_secret
        )
        # Use client.search_recent_tweets with pagination
        for response in tweepy.Paginator(client.search_recent_tweets,
                                         query=keyword,
                                         tweet_fields=['public_metrics', 'created_at'],
                                         max_results=100).flatten(limit=count):
            # Safely extract public metrics, defaulting to an empty dictionary if not present
            public_metrics = response.data.get('public_metrics', {}) if response.data and hasattr(response.data, 'get') else {}

            tweets.append({
                'text': response.text,
                'id': response.id,
                # Construct URL if user exists - Note: user object might not be included by default in v2 search
                'url': f"https://twitter.com/user/status/{response.id}", # Note: User info is not guaranteed in search_recent_tweets without specifying user_fields
                'likes': public_metrics.get('like_count', 0),
                'retweets': public_metrics.get('retweet_count', 0),
                'replies': public_metrics.get('reply_count', 0),
                'views': public_metrics.get('impression_count', 0),
                'date': response.created_at
            })
    except tweepy.TweepyException as e:
        print(f"Error: {e}")
    return tweets

def process_data(data):
    if not data: # Check if the data list is empty
        print("No data to process. Returning empty DataFrame.")
        return pd.DataFrame() # Return an empty DataFrame

    df = pd.DataFrame(data)
    top_tweets = df.sort_values(by='likes', ascending=False).head(10)
    return df

# Step 5: Integrate with Google Sheets
SPREADSHEET_ID = 'YOUR_SPREADSHEET_ID'
RANGE_NAME = 'Sheet1!A1'

tweets_data = scrape_tweets('AI automation', count=100)
df = process_data(tweets_data)

# Check if DataFrame is empty before proceeding with Google Sheets and Visualization
if not df.empty:
    values = [list(df.columns)] + df.values.tolist()
    body = {'values': values}
    try:
        result = service.spreadsheets().values().update(
            spreadsheetId=SPREADSHEET_ID, range=RANGE_NAME,
            valueInputOption='RAW', body=body).execute()
        print(f"{result.get('updatedCells')} cells updated.")
    except HttpError as error:
        print(f"An error occurred: {error}")

    # Step 6: Analyze and Visualize Data
    top_tweets = df.sort_values(by='likes', ascending=False).head(10)
    print("Top 10 tweets by likes:")
    print(top_tweets[['text', 'likes', 'retweets', 'replies']])

    plt.figure(figsize=(10, 6))
    plt.bar(df['date'], df['likes'], color='blue', alpha=0.6, label='Likes')
    plt.bar(df['date'], df['retweets'], color='green', alpha=0.6, label='Retweets')
    plt.xlabel('Date')
    plt.ylabel('Count')
    plt.title('Engagement Over Time')
    plt.legend()
    plt.show()
else:
    print("No data to write to Google Sheets or visualize.")


# Step 7: Ensure Scalability and Availability
import time
from concurrent.futures import ThreadPoolExecutor

def rate_limited_scrape(keyword, count=100, delay=15):
    tweets = []
    for i in range(0, count, 100):
        try:
            batch = scrape_tweets(keyword, count=100)
            tweets.extend(batch)
            time.sleep(delay)
        except tweepy.TweepyException as e:
            print(f"Error: {e}")
            time.sleep(delay * 2)
    return tweets

def parallel_scrape(keywords, count_per_keyword=100):
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(lambda kw: rate_limited_scrape(kw, count_per_keyword), keywords))
    return [item for sublist in results for item in sublist]