In [18]:
from datetime import datetime, timedelta, timezone





'2022-07-25 22:25:30+0000'

In [None]:
import boto3
from dateutil import parser
from datetime import datetime, timedelta, timezone
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
from envars import envars
from tqdm import tqdm
import json

class ParseTweet():
    """Use this to parse the tweet scrape object from the 'snscrape' library.
        - This will rename the dictionary keys so they are compatible with the database tables.
        - These lists will be used to update the 'tweets' and 'users' tablees.
    """
    def __init__(self, tweet, query_id, snapshot_date):
        self.tweet_dict = tweet
        self.query_id = query_id
        self.snapshot_date = snapshot_date

    def rename_keys_for_tweet_data(self):
        """Rename keys to match 'twitterdb' 'tweets' table
        """
        self.tweet_dict["url"] = self.tweet_dict.pop('url', None)
        self.tweet_dict["tweet_date"] = self.tweet_dict.pop('date', None)
        self.tweet_dict["tweet_content"] = self.tweet_dict.pop('content', None)
        self.tweet_dict["tweet_rendered_content"] = self.tweet_dict.pop('renderedContent', None)
        self.tweet_dict["id"] = self.tweet_dict.pop('id', None)
        self.tweet_dict["user_id"] = self.tweet_dict.pop('user', None)
        self.tweet_dict["reply_count"] = self.tweet_dict.pop('replyCount', None)
        self.tweet_dict["retweet_count"] = self.tweet_dict.pop('retweetCount', None)
        self.tweet_dict["like_count"] = self.tweet_dict.pop('likeCount', None)
        self.tweet_dict["quote_count"] = self.tweet_dict.pop('quoteCount', None)
        self.tweet_dict["conversation_id"] = self.tweet_dict.pop('conversationId', None)
        self.tweet_dict["lang"] = self.tweet_dict.pop('lang', None)
        self.tweet_dict["tweet_source"] = self.tweet_dict.pop('source', None)
        self.tweet_dict["source_url"] = self.tweet_dict.pop('sourceUrl', None)
        self.tweet_dict["source_label"] = self.tweet_dict.pop('sourceLabel', None)
        self.tweet_dict["outlinks"] = self.tweet_dict.pop('outlinks', None)
        self.tweet_dict["tco_outlinks"] = self.tweet_dict.pop('tcooutlinks', None)
        self.tweet_dict["media"] = self.tweet_dict.pop('media', None)
        self.tweet_dict["retweeted_tweet"] = self.tweet_dict.pop('retweetedTweet', None)
        self.tweet_dict["quoted_tweet"] = self.tweet_dict.pop('quotedTweet', None)
        self.tweet_dict["in_reply_to_tweet_id"] = self.tweet_dict.pop('inReplyToTweetId', None)
        self.tweet_dict["in_reply_to_user"] = self.tweet_dict.pop('inReplyToUser', None)
        self.tweet_dict["mentioned_users"] = self.tweet_dict.pop('mentionedUsers', None)
        self.tweet_dict["coordinates"] = self.tweet_dict.pop('coordinates', None)
        self.tweet_dict["place"] = self.tweet_dict.pop('place', None)
        self.tweet_dict["hashtags"] = self.tweet_dict.pop('hashtags', None)
        self.tweet_dict["cashtags"] = self.tweet_dict.pop('cashtags', None)
        self.tweet_dict["query_id"] = self.query_id
        return self.tweet_dict

    def rename_keys_for_user_data(self, users_list):
        """Rename keys to match 'twitterdb' 'users' table
        These data for these keys are all sourced from snscrape library scrape data.
        """
        users_list["username"] = users_list.pop("username")
        users_list["id"] = users_list.pop("id")
        users_list["display_name"] = users_list.pop("displayname")
        users_list["description"] = users_list.pop("description")
        users_list["raw_description"] = users_list.pop("rawDescription")
        users_list["description_urls"] = users_list.pop("descriptionUrls")
        users_list["verified"] = users_list.pop("verified")
        users_list["created"] = users_list.pop("created")
        users_list["followers_count"] = users_list.pop("followersCount")
        users_list["friends_count"] = users_list.pop("friendsCount")
        users_list["statuses_count"] = users_list.pop("statusesCount")
        users_list["favourites_count"] = users_list.pop("favouritesCount")
        users_list["listed_count"] = users_list.pop("listedCount")
        users_list["media_count"] = users_list.pop("mediaCount")
        users_list["location"] = users_list.pop("location")
        users_list["protected"] = users_list.pop("protected")
        users_list["link_url"] = users_list.pop("linkUrl")
        users_list["link_t_courl"] = users_list.pop("linkTcourl")
        users_list["profile_image_url"] = users_list.pop("profileImageUrl")
        users_list["profile_banner_url"] = users_list.pop("profileBannerUrl")
        users_list["label"] = users_list.pop("label")
        users_list["user_type"] = users_list.pop("user_type")
        users_list["snapshot_date"] = users_list.pop("snapshot_date")
        return users_list

    def split_tweet_data_and_user_data(self) -> (dict, dict):
        tweet_dict = self.rename_keys_for_tweet_data()
        users_list = []

        user_id = tweet_dict.pop("user_id")
        if user_id:
            user_id["user_type"] = "author"
            users_list.append(user_id)
            tweet_dict["user_id"] = user_id["id"]
        else:
            tweet_dict["user_id"] = None

        reply_to = tweet_dict.pop("in_reply_to_user")
        if reply_to:
            reply_to["user_type"] = "reply_to"
            users_list.append(reply_to)
            tweet_dict["in_reply_to_user"] = reply_to["id"]
        else:
            tweet_dict["in_reply_to_user"] = None

        mentioned_list = tweet_dict.pop("mentioned_users")
        if mentioned_list:
            tweet_dict["mentioned_users"] = [i["id"] for i in mentioned_list]
            for mentioned in mentioned_list:
                mentioned["user_type"] = "mentioned"
                users_list.append(mentioned)
        else:
            tweet_dict["mentioned_users"] = None
        # Add these keys/values to all items in 'users_list'
        for user in users_list:
            user["tweet_id"] = tweet_dict["id"]
            user["query_id"] = self.query_id
            user["snapshot_date"] = self.snapshot_date
            self.rename_keys_for_user_data(user)
        return tweet_dict, users_list


def list_objects_in_bucket(bucket_name) -> dict:
    s3_client = boto3.client("s3")
    response = s3_client.list_objects_v2(Bucket=bucket_name)
    bucket_files = response["Contents"]
    return bucket_files


def read_from_s3(bucket_name:str, key_name:str) -> dict:
    s3 = boto3.resource('s3')
    s3_object = s3.Object(bucket_name, key_name)
    response = s3_object.get()
    response_str = response["Body"].read().decode("utf-8")
    dict_obj = json.loads(response_str)
    return dict_obj


def batch_insert_into_database(table_name: str, record_list: [dict]) -> list:
    """BULK INSERT INTO database (1000 rows at a time). Input list shoud be items in dictionary format.
    """
    col_names = ", ".join(record_list[0].keys())
    insert_values = [tuple(e.values()) for e in record_list]
    with psycopg2.connect(
        dbname="twitterdb",
        user=envars.get("user"),
        password=envars.get("password"),
        host=envars.get("host"),
        port=envars.get("port")) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as curs:
            sql = f"INSERT INTO {table_name} ({col_names}) VALUES %s"
            insert_result = psycopg2.extras.execute_values(curs, sql, insert_values, page_size=1000)
    return


bucket_name = "twitter-scrape-results"
bucket_files = list_objects_in_bucket(bucket_name)

for file in tqdm(bucket_files):
    snapshot_date = file["LastModified"]
    query_id = int(file["Key"].split("_")[2])
    dict_obj = read_from_s3(bucket_name, file["Key"])
    tweets_updates = []
    users_updates = []
    for tweet in dict_obj:
        tweets_dict, users_dict = ParseTweet(tweet, query_id, snapshot_date).split_tweet_data_and_user_data()
        tweets_updates.append(tweets_dict)
        users_updates.extend(users_dict)
    batch_insert_into_database("tweets", tweets_updates)
    batch_insert_into_database("users", users_updates)



In [None]:
from envars import envars
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
from datetime import datetime, timedelta, timezone
import pandas as pd


def select_from_database(sql):
    with psycopg2.connect(
        dbname="twitterdb",
        user=envars.get("user"),
        password=envars.get("password"),
        host=envars.get("host"),
        port=envars.get("port")) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as curs:
            curs.execute(f"{sql}")
            query_result = curs.fetchall()
            query_dicts = [dict(row) for row in query_result]
            return query_dicts


sql = "SELECT * FROM queries WHERE active is true"
query_tasks = select_from_database(sql)

sql = """
SELECT query_id, CAST(tweet_date as DATE), COUNT(*)
FROM tweets
GROUP BY query_id, CAST(tweet_date as DATE)
ORDER BY tweet_date ASC
"""
days_scraped = select_from_database(sql)

max_scrape_date = datetime.now(timezone.utc) - timedelta(days=2)  # limit scrape to data > 1 day old
for i in range(len(query_tasks)):
    db_scrape_data = [q["tweet_date"].strftime("%Y-%m-%d") for q in days_scraped if q["query_id"] == query_tasks[i]["id"]]

    scrape_from = query_tasks[i]["scrape_from_date"]
    scrape_to = min(query_tasks[i]["scrape_to_date"], max_scrape_date)
    date_range_dt = pd.date_range(start=scrape_from, end=scrape_to)
    date_range_str = list(date_range_dt.strftime("%Y-%m-%d"))

    query_tasks[i]["to_scrape"] = [date for date in date_range_str if date not in db_scrape_data]
    del query_tasks[i]["scrape_from_date"]
    del query_tasks[i]["scrape_to_date"]

len(query_tasks)


In [None]:
import pandas as pd
from datetime import datetime, timedelta

def query_factory(query_template, date_list):
    query_list = []
    for date in date_list:
        start_date = date.strftime("%Y-%m-%d")
        end_date = (date + timedelta(days=1)).strftime("%Y-%m-%d")
        query = query_template.replace("$STARTDATE", start_date).replace("$ENDDATE", end_date)
        query_list.append(
            {   
                "start_date": start_date,
                "query": query
            }
        )
    return query_list

query_template = "Example Query start=$STARTDATE end=$ENDDATE"
date_list = pd.date_range(start="2022-01-01", end="2022-02-01")#.strftime("%Y-%m-%d")
query_list = query_factory(query_template, date_list)
query_list

In [None]:
import requests


url = "https://httpbin.org/post"
data = {
    "name": "aaron"
}
r = requests.post(url, json=data)
r.json()["data"]

# SQL Queries
### GET Latest Record for each Query
```
SELECT c.*, p1.*
FROM queries c
JOIN tweets p1 ON (c.id = p1.queryid)
LEFT OUTER JOIN tweets p2 ON (c.id = p2.queryid AND 
    (p1.tweetdate < p2.tweetdate OR (p1.tweetdate = p2.tweetdate AND p1.queryid < p2.queryid)))
WHERE p2.queryid IS NULL;
```

# Database Connection with psycopg2

## SELECT

In [None]:
from envars import envars
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)



def select_from_database(db_name, sql):
    with psycopg2.connect(
        dbname="twitterdb",
        user=envars.get("user"),
        password=envars.get("password"),
        host=envars.get("host"),
        port=envars.get("port")) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as curs:
            curs.execute(f"{sql}")
            query_result = curs.fetchall()
            query_dicts = [dict(row) for row in query_result]
            return query_dicts


db_name = "twitterdb"
sql = """
SELECT
  query_id,
  CAST(tweet_date AS date)
FROM (SELECT
  *,
  ROW_NUMBER() OVER (PARTITION BY CAST(tweet_date AS date)
  ORDER BY tweet_date DESC) AS SN
FROM tweets) A
WHERE sn = 1
"""

response = select_from_database(db_name, sql)

response

# Research Terms
```
"drag queen" until:$ENDDATE since:$STARTDATE
- [] drag (drag until:2022-06-02 since:2022-06-01)
- [] drag show*
- [] drag shows
- [] drag queen
- [] drag story hour
- [] drag queen story hour
- [] drag queen story time

(groom OR gr00m) until:$ENDDATE since:$STARTDATE
- [] Groom*
- [] gr00m*

(ped0 OR pedo) until:$ENDDATE since:$STARTDATE
- [] ped0
- [] pedo

(pedophile OR pedophilia) until:$ENDDATE since:$STARTDATE
- [] pedophile
- [] pedophilia

```
### snscrape docs
    - https://github.com/JustAnotherArchivist/snscrape
    - https://github.com/JustAnotherArchivist/snscrape/blob/master/snscrape/modules/twitter.py
- Twiter Advanced Search: https://twitter.com/search-advanced
- https://github.com/igorbrigadir/twitter-advanced-search

### Development Version
- pip3 install git+https://github.com/JustAnotherArchivist/snscrape.git

# Twitter Scraper (Local Testing)

In [None]:
from airflow import DAG
# from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python_operator import PythonOperator

from datetime import datetime, timedelta, timezone
from dateutil import parser
# import copy
import json
import time
import requests
from requests.exceptions import HTTPError
import pandas as pd
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)


default_args = {
    "owner": "Aaron Guzman",
    "depend_on_past": False,
    "start_date": datetime(2020, 1, 1),
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
}


class ParseTweet():
    """Use this to parse the tweet scrape object from the 'snscrape' library.
        - This will rename the dictionary keys so they are compatible with the database tables.
        - These lists will be used to update the 'tweets' and 'users' tablees.
    """
    def __init__(self, tweet, query_id):
        self.tweet_dict = tweet
        self.query_id = query_id

    def rename_keys_for_tweet_data(self):
        """Rename keys to match 'twitterdb' 'tweets' table
        """
        self.tweet_dict["url"] = self.tweet_dict.pop('url', None)
        self.tweet_dict["tweet_date"] = self.tweet_dict.pop('date', None)
        self.tweet_dict["tweet_content"] = self.tweet_dict.pop('content', None)
        self.tweet_dict["tweet_rendered_content"] = self.tweet_dict.pop('renderedContent', None)
        self.tweet_dict["id"] = self.tweet_dict.pop('id', None)
        self.tweet_dict["user_id"] = self.tweet_dict.pop('user', None)
        self.tweet_dict["reply_count"] = self.tweet_dict.pop('replyCount', None)
        self.tweet_dict["retweet_count"] = self.tweet_dict.pop('retweetCount', None)
        self.tweet_dict["like_count"] = self.tweet_dict.pop('likeCount', None)
        self.tweet_dict["quote_count"] = self.tweet_dict.pop('quoteCount', None)
        self.tweet_dict["conversation_id"] = self.tweet_dict.pop('conversationId', None)
        self.tweet_dict["lang"] = self.tweet_dict.pop('lang', None)
        self.tweet_dict["tweet_source"] = self.tweet_dict.pop('source', None)
        self.tweet_dict["source_url"] = self.tweet_dict.pop('sourceUrl', None)
        self.tweet_dict["source_label"] = self.tweet_dict.pop('sourceLabel', None)
        self.tweet_dict["outlinks"] = self.tweet_dict.pop('outlinks', None)
        self.tweet_dict["tco_outlinks"] = self.tweet_dict.pop('tcooutlinks', None)
        self.tweet_dict["media"] = self.tweet_dict.pop('media', None)
        self.tweet_dict["retweeted_tweet"] = self.tweet_dict.pop('retweetedTweet', None)
        self.tweet_dict["quoted_tweet"] = self.tweet_dict.pop('quotedTweet', None)
        self.tweet_dict["in_reply_to_tweet_id"] = self.tweet_dict.pop('inReplyToTweetId', None)
        self.tweet_dict["in_reply_to_user"] = self.tweet_dict.pop('inReplyToUser', None)
        self.tweet_dict["mentioned_users"] = self.tweet_dict.pop('mentionedUsers', None)
        self.tweet_dict["coordinates"] = self.tweet_dict.pop('coordinates', None)
        self.tweet_dict["place"] = self.tweet_dict.pop('place', None)
        self.tweet_dict["hashtags"] = self.tweet_dict.pop('hashtags', None)
        self.tweet_dict["cashtags"] = self.tweet_dict.pop('cashtags', None)
        self.tweet_dict["query_id"] = self.query_id
        return self.tweet_dict

    def rename_keys_for_user_data(self, users_list):
        """Rename keys to match 'twitterdb' 'users' table
        """
        users_list["username"] = users_list.pop("username")
        users_list["id"] = users_list.pop("id")
        users_list["display_name"] = users_list.pop("displayname")
        users_list["description"] = users_list.pop("description")
        users_list["raw_description"] = users_list.pop("rawDescription")
        users_list["description_urls"] = users_list.pop("descriptionUrls")
        users_list["verified"] = users_list.pop("verified")
        users_list["created"] = users_list.pop("created")
        users_list["followers_count"] = users_list.pop("followersCount")
        users_list["friends_count"] = users_list.pop("friendsCount")
        users_list["statuses_count"] = users_list.pop("statusesCount")
        users_list["favourites_count"] = users_list.pop("favouritesCount")
        users_list["listed_count"] = users_list.pop("listedCount")
        users_list["media_count"] = users_list.pop("mediaCount")
        users_list["location"] = users_list.pop("location")
        users_list["protected"] = users_list.pop("protected")
        users_list["link_url"] = users_list.pop("linkUrl")
        users_list["link_t_courl"] = users_list.pop("linkTcourl")
        users_list["profile_image_url"] = users_list.pop("profileImageUrl")
        users_list["profile_banner_url"] = users_list.pop("profileBannerUrl")
        users_list["label"] = users_list.pop("label")
        users_list["user_type"] = users_list.pop("user_type")
        users_list["snapshot_date"] = users_list.pop("snapshot_date")
        return users_list

    def split_tweet_data_and_user_data(self) -> (dict, dict):
        tweet_dict = self.rename_keys_for_tweet_data()
        users_list = []
        user_id = tweet_dict.pop("user_id")
        if user_id:
            user_id["user_type"] = "owner"
            users_list.append(user_id)
            tweet_dict["user_id"] = user_id["id"]
        else:
            tweet_dict["user_id"] = None

        reply_to = tweet_dict.pop("in_reply_to_user")
        if reply_to:
            reply_to["user_type"] = "reply_to"
            users_list.append(reply_to)
            tweet_dict["in_reply_to_user"] = reply_to["id"]
        else:
            tweet_dict["in_reply_to_user"] = None

        mentioned_list = tweet_dict.pop("mentioned_users")
        if mentioned_list:
            tweet_dict["mentioned_users"] = [i["id"] for i in mentioned_list]
            for mentioned in mentioned_list:
                mentioned["user_type"] = "mentioned"
                users_list.append(mentioned)
        else:
            tweet_dict["mentioned_users"] = None
        # Add these keys/values to all items in 'users_list'
        for user in users_list:
            user["snapshot_date"] = tweet_dict["tweet_date"]
            self.rename_keys_for_user_data(user)
        return tweet_dict, users_list


# HELPER FUNCTION
def select_from_database(sql) -> [dict]:
    """SELECT database and return results in dictionary format.
    """
    conn = PostgresHook(postgres_conn_id="aws_twitterdb").get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as curs:
        curs.execute(f"{sql}")
        query_result = curs.fetchall()
        result_list = [dict(row) for row in query_result]
    return result_list

# HELPER FUNCTION
def batch_insert_into_database(table_name: str, record_list: [dict]) -> list:
    """BULK INSERT INTO database (1000 rows at a time). Input list shoud be items in dictionary format.
    """
    col_names = ", ".join(record_list[0].keys())
    insert_values = [tuple(e.values()) for e in record_list]
    with PostgresHook(postgres_conn_id="aws_twitterdb").get_conn() as conn:
        with conn.cursor() as curs:
            sql = f"INSERT INTO {table_name} ({col_names}) VALUES %s RETURNING id"
            insert_result = psycopg2.extras.execute_values(curs, sql, insert_values, page_size=1000, fetch=True)
    return insert_result


# TASK 1
def get_query_tasks_from_database(**context) -> [dict]:
    """
    1. Use the query start and end date range to creat a list of days for queries that need to be updated.
    2. Query the database to make a list of the days already have scrape data in the database.
    Compare lists 1 and 2 to figure out which remaining days need scrape data.
    """
    sql = "SELECT * FROM queries WHERE active is true"
    query_tasks = select_from_database(sql)
    for i in range(len(query_tasks)):
        query_tasks[i]["to_scrape"] = []
        # Create a list of days that have at least 1 tweet entry in the tweets table
        sql = f"""
            SELECT TWEETS.*
            FROM TWEETS
            JOIN
                (SELECT MIN(T2.TWEET_DATE) AS MIN_TIMESTAMP
                    FROM TWEETS T2
                    GROUP BY DATE(T2.TWEET_DATE)) T2 ON TWEETS.TWEET_DATE = T2.MIN_TIMESTAMP
            AND TWEETS.QUERY_ID = {query_tasks[i]["id"]}
            """
        response = select_from_database(sql)
        scrape_processed_days = [i["tweet_date"].strftime("%Y-%m-%d") for i in response]

        scrape_from = query_tasks[i]["scrape_from_date"]
        scrape_to = query_tasks[i]["scrape_to_date"]
        if scrape_to > datetime.now(timezone.utc):
            scrape_to = datetime.now(timezone.utc)
        date_list = pd.date_range(start=scrape_from, end=scrape_to)
        date_list = [i.strftime("%Y-%m-%d") for i in date_list]

        for date in date_list:
            if date not in scrape_processed_days:
                query_tasks[i]["to_scrape"].append(date)

        query_tasks[i]["scrape_from_date"] = scrape_from.strftime("%Y-%m-%d")
        query_tasks[i]["scrape_to_date"] = scrape_to.strftime("%Y-%m-%d")
    context["ti"].xcom_push(key="query_tasks", value=query_tasks)

# TASK 2
def scrape_tweets_for_query(**context):
    """Get scrape results for each day for each query.
    The scraping job is offloaded to a Lambda function that returns a list of scrape results.
    Lastly, the scrape results are INSERTED INTO the database 'tweets' and 'users' tables.
    """
    query_tasks = context["ti"].xcom_pull(task_ids="get_query_tasks_from_database", key="query_tasks")
    for query in query_tasks:
        date_list = query["to_scrape"]
        if len(date_list) == 0:
            print("No updates found at this time.")
        else:
            print(f"Scraping {len(date_list)} days worth of updates for query ID: {query['id']}: {date_list}")
            query_template = query["query_string"]
            for query_date in date_list:
                start_date_dt = datetime.strptime(query_date, "%Y-%m-%d")
                start_date_str = start_date_dt.strftime('%Y-%m-%d')
                end_date_dt = (start_date_dt + timedelta(days=1)).date()
                end_date_str = end_date_dt.strftime('%Y-%m-%d')
                if end_date_dt >= datetime.utcnow().date():
                    # Only scrape if all of results are from the previous day
                    pass
                else:
                    query_string = query_template.replace("$STARTDATE", start_date_str).replace("$ENDDATE", end_date_str)
                    data = {"query": query_string}
                    try:
                        # the lambda endpoing would be converted to an environment variable in production
                        r = requests.post("https://66s4jhi0ma.execute-api.us-west-2.amazonaws.com/api/query", json=data)
                        time.sleep(5)
                        r.raise_for_status()
                        
                    except HTTPError as e:
                        print(f"Request failed with status code {r.status_code} while working on date {start_date_str} for query: {query_string}")
                        print(r.json())
                        time.sleep(15)
                        raise

                    query_response = r.json()
                    print(f"Response for {query_string} on {start_date_str}contains {len(query_response)} tweets for query: {query_string}")

                    tweets_table_updates = []
                    users_table_updates = []
                    query_id = query["id"]
                    for tweet in query_response:
                        tweets_dict, users_dict = ParseTweet(tweet, query_id).split_tweet_data_and_user_data()
                        tweets_table_updates.append(tweets_dict)
                        users_table_updates.extend(users_dict)

                    insert_result = batch_insert_into_database("tweets", tweets_table_updates)
                    print(f"Added {len(insert_result)} new tweets for query: {query_string}")
                    insert_result = batch_insert_into_database("users", users_table_updates)
                    print(f"Added {len(insert_result)} new users for query: {query_string}")
    return

with DAG(
    dag_id="twitter_scrape",
    default_args=default_args,
    schedule_interval="@hourly",
    catchup=False,
) as dag:
    get_query_tasks_from_database = PythonOperator(
        task_id="get_query_tasks_from_database",
        python_callable=get_query_tasks_from_database,
        provide_context=True,
    )
    
    scrape_tweets_for_query = PythonOperator(
        task_id="scrape_tweets_for_query",
        python_callable=scrape_tweets_for_query,
        provide_context=True,
    )
    
    get_query_tasks_from_database >> scrape_tweets_for_query


# Function URL (Local Dev)

In [None]:
from snscrape.modules import twitter as sntwitter
from datetime import datetime, timedelta, timezone
import requests
import platform
import json
import time
import boto3

def lambda_handler(event):  # windows
# def lambda_handler(event, context):  # lambda
    limit = None
    if event["body"]:
        data = json.loads(event["body"])
        query_string = data.get("query_string")
        if query_string is None:
            error_type = "Missing 'query_string'"
            print(error_type)
            return {"statusCode": 422, 'body': error_type}

        bucket_name = data.get("bucket_name")
        if bucket_name is None:
            error_type = "Missing 'bucket_name'"
            print(error_type)
            return {"statusCode": 422, 'body': error_type}

        key_name = data.get("key_name")
        if key_name is None:
            error_type = "Missing 'key_name'"
            print(error_type)
            return {"statusCode": 422, 'body': error_type}

        twitter_dicts = []
        for tweet in sntwitter.TwitterSearchScraper(query_string).get_items():
            twitter_dicts.append(todict(tweet))
            if limit and len(twitter_dicts) == limit:
                print(f"Limitting to {limit} results.")
                break
        print(f"Found {len(twitter_dicts)} items for query: {query_string}")
        return twitter_dicts
        response = write_to_s3(bucket_name, key_name, twitter_dicts)
        return {"statusCode": 200, "body": f"Sucessfully processed {len(twitter_dicts)} items for query: '{query_string}'. File can be found in S3."}
    else:
        return {'statusCode': 500, "body": "Request failed, see logs for details."}


def write_to_s3(bucket_name:str, key_name:str, upload_dict:dict) -> dict:
    """Upload Python dictionary to S3
    """
    twitter_bytes = json.dumps(upload_dict).encode('utf-8')
    s3 = boto3.resource('s3')
    object = s3.Object(bucket_name, key_name)
    return object.put(Body=twitter_bytes)


def todict(obj, classkey=None) -> dict:
    """Recursively convert object to dictionary.
    """
    if isinstance(obj, dict):
        data = {}
        for (k, v) in obj.items():
            data[k] = todict(v, classkey)
        return data
    elif hasattr(obj, "_ast"):
        return todict(obj._ast())
    elif hasattr(obj, "__iter__") and not isinstance(obj, str):
        return [todict(v, classkey) for v in obj]
    elif hasattr(obj, "__dict__"):
        data = dict([(key, todict(value, classkey)) 
            for key, value in obj.__dict__.items() 
            if not callable(value) and not key.startswith('_')])
        if classkey is not None and hasattr(obj, "__class__"):
            data[classkey] = obj.__class__.__name__
        return data
    elif isinstance(obj, datetime):
        return obj.strftime("%Y-%m-%d %H:%M:%S%z")
    else:
        return obj


query_string = '"drag queen" until:2022-03-18 since:2022-03-17'
bucket_name = "twitter-scrape-results"
key_name = "test1.json"

# This represents the POST request data that gets sent to Lambda function
data = {
    "query_string": query_string,
    "bucket_name": bucket_name,
    "key_name": key_name
}

event = {"body": json.dumps(data)} # Only used for local testing
resp = lambda_handler(event)
len(resp)

In [None]:
resp[0]

In [None]:
import pandas as pd


df = pd.DataFrame(resp)
df.to_excel("df.xlsx")

In [None]:
import boto3


def write_to_s3(bucket_name:str, key_name:str, upload_dict:dict) -> dict:
    """Upload Python dictionary to S3

    Returns:
        str: Amazon ETag that can be used to download the file.
    """
    twitter_bytes = json.dumps(upload_dict).encode('utf-8')
    s3 = boto3.resource('s3')
    object = s3.Object(bucket_name, key_name)
    return object.put(Body=twitter_bytes)


def read_from_s3(bucket_name:str, key_name:str) -> dict:
    s3 = boto3.resource('s3')
    object = s3.Object(bucket_name, key_name)
    response = object.get()
    response_str = response["Body"].read().decode("utf-8")
    dict_obj = json.loads(response_str)
    return dict_obj


bucket_name = "twitter-scrape-results"
key_name = "upload_test2.json"

upload_dict = twitter_dicts
response = write_to_s3(bucket_name, key_name, upload_dict)


download_dict1 = read_from_s3(bucket_name, key_name)
download_dict1

In [None]:
import boto3


def write_to_s3(bucket_name:str, key_name:str, upload_dict:dict) -> dict:
    """Upload Python dictionary to S3

    Returns:
        str: Amazon ETag that can be used to download the file.
    """
    twitter_bytes = json.dumps(upload_dict).encode('utf-8')
    s3 = boto3.resource('s3')
    object = s3.Object(bucket_name, key_name)
    return object.put(Body=twitter_bytes)

bucket_name = "twitter-scrape-results"
key_name = "test_json_file.json"

response = write_to_s3(bucket_name, key_name, upload_dict)