In [None]:
import pandas as pd
import tweepy
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


# Twitter API credentials
consumer_key = ""
consumer_secret = ""
access_token = ""
access_secret = ""

# Authentication and API object:
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)

# Define screen name for Netflix:
screen_name = "netflix"

# Define search query for the movie "Beef" on Netflix:
search_query = "Beef"

def get_tweets():
    # Search for recent tweets related to the movie:
    tweets = api.search(q=search_query, count=500)

    # To get structured output in csv format:
    list = []
    for tweet in tweets:
        text = tweet._json["full_text"]

        refined_tweet = {"user": tweet.user.screen_name,
                        'text' : text,
                        'favorite_count' : tweet.favorite_count,
                        'retweet_count' : tweet.retweet_count,
                        'created_at' : tweet.created_at}

        list.append(refined_tweet)

    df = pd.DataFrame(list)
    df.to_csv('s3://abhishek_airflow_bucket/refined_tweets.csv') # sending the file directly to Amazon S3 bucket

# Airflow DAG configuration
default_args = {
    'owner': 'Abhishek',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 10),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'dagrun_timeout': timedelta(days=7) # set DAG run timeout to 7 days
}

# Define the DAG
dag = DAG(
    'get_tweets',
    default_args=default_args,
    schedule_interval=timedelta(days=1), # set DAG schedule to run every day 
    catchup=False,
    max_active_runs=1,
)

# Define the task
run_etl = PythonOperator(
    task_id='get_tweet_task',
    python_callable=get_tweets,
    dag=dag
)

# Set task dependencies
run_etl
