Skip to content

Commit

Permalink
5497d5a6f8
Browse files Browse the repository at this point in the history
  • Loading branch information
Liftingthedata committed Jun 28, 2023
1 parent bb5e5d9 commit 1e322c6
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 314 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/scrape_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={"start_date": "{{ ds }}", "local_path": LOCAL_PATH, "num_tweets": 100},
envs={"start_date": "{{ ds }}", "local_path": LOCAL_PATH, "num_tweets": 10000},
)

# backfill_first = LatestOnlyOperator(task_id="ensure_backfill_complete")
Expand Down
8 changes: 3 additions & 5 deletions airflow/dags/scripts/gcp_script.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/bin/bash
# the script performs the following tasks:
# 1. Copies Parquet files for Twitter, VGChartz, and Metacritic to their respective
# directories in GCS
# 2. Loads data from Twitter Parquet files into BigQuery
# 3. Loads data from VGChartz and Metacritic Parquet files into their respective BigQuery
# datasets.
# 1. Copies Parquet files for VGChartz and Metacritic to their respective directories in GCS
# 2. Loads data from VGChartz and Metacritic Parquet files into their respective BigQuery datasets.


LOCAL_DIR=$LOCAL_DIR
echo "$LOCAL_DIR - $DATA_BUCKET"
Expand Down
35 changes: 35 additions & 0 deletions airflow/dags/scripts/twitter_gcp_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash
# the script performs the following tasks:
# 1. Copies Parquet twitter data files to GCP.
# 2. Loads data from Twitter Parquet files into BigQuery


LOCAL_DIR=$LOCAL_DIR
echo "$LOCAL_DIR - $DATA_BUCKET"
echo "uploading files to GCP bucket..."
gsutil -m cp $LOCAL_DIR/tweets-*.parquet gs://${DATA_BUCKET}/twitter/



echo "Loading twitter data"
for file in $(find $LOCAL_DIR -type f -name 'tweets-*.parquet'); do
TWITTER_DATASET=${TWITTER_DATASET}
# Extract the table name from the filename
table=$(basename $file .parquet)
# Check if the table exists in BigQuery
exists=$(bq query --use_legacy_sql=false \
--format=json \
--max_rows=1 \
"SELECT COUNT(*) as table_exists \
FROM \`$TWITTER_DATASET.INFORMATION_SCHEMA.TABLES\` \
WHERE table_name = '$table' \
AND table_type IN ('TABLE', 'BASE TABLE')" | sed -n 's/.*"table_exists":"\([^"]*\)".*/\1/p')
echo $exists
if [ $exists -eq 0 ]; then
# Create the BigQuery table
bq load --autodetect --source_format=PARQUET $TWITTER_DATASET.$table $file >/dev/null 2>&1
else
echo "Table $table already exists, skipping"
fi
done

108 changes: 108 additions & 0 deletions airflow/dags/tweet_scrape_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
This DAG utilizes the KubernetesJobOperator to execute scripts as Kubernetes jobs. The
primary purpose of these jobs is to perform scraping tasks.
The DAG follows the sequence:
twitter_task >> backfill_first >> metacritic_tg >> vgchartz_tg >> gcp_task
Task 'twitter_task': This task involves scraping tweets from Twitter for the previous
month and performing sentiment analysis on them.
Task 'backfill_first': This task ensures that the Twitter data is backfilled before
scraping other sites that do not require backfilling.
Task group 'metacritic_tg': This task group consists of multiple tasks that scrape data
from Metacritic. It scrapes the data for each game as well as the user and critic reviews.
Task group 'vgchartz_tg': This task group consists of 2 tasks that scrape data
from Vgchartz.
Task 'gcp_task': This final task saves the scraped data to a Google Cloud Storage (GCS)
bucket and subsequently loads it into a BigQuery table.
The DAG is scheduled to run on a cron schedule, specifically on the first day of each
month. The Twitter data is appended during each run, while the other data is replaced with
the latest version.
"""
# pylint: disable=pointless-statement
# pylint: disable=wrong-import-order

import os
import sys
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.latest_only import LatestOnlyOperator
from airflow_kubernetes_job_operator.kubernetes_job_operator import (
KubernetesJobOperator,
)

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

default_args = {
"owner": "airflow",
"start_date": datetime(2022, 12, 1),
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(seconds=60),
"concurrency": 0,
# "max_active_runs": 1,
"in_cluster": True,
"random_name_postfix_length": 3,
"name_prefix": "",
# "max_active_tasks_per_dag": 4,
}


today = datetime.today().strftime("%Y-%m-%d")
POD_TEMPALTE = os.path.join(os.path.dirname(__file__), "templates", "pod_template.yaml")
BASE = "/git/repo/scrapers"
GOOGLE_CLOUD_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
COMMON_VOLUME_CONFIG = {
"name": "persistent-volume",
"type": "persistentVolumeClaim",
"reference": "data-pv-claim",
"mountPath": "/etc/scraped_data/",
}
LOCAL_PATH = "/etc/scraped_data/"

with DAG(
dag_id="twitter_scraper",
schedule_interval="0 0 1 * *",
default_args=default_args,
catchup=True,
tags=["scraping", "twitter"],
) as dag:
twitter_task = KubernetesJobOperator(
task_id="scrape-tweets",
body_filepath=POD_TEMPALTE,
command=["python", f"{BASE}/twitter/sentiment_analysis.py"],
jinja_job_args={
"image": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/scraper:latest",
"name": "scrape-tweets",
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={"start_date": "{{ ds }}", "local_path": LOCAL_PATH, "num_tweets": 10000},
)

backfill_first = LatestOnlyOperator(task_id="ensure_backfill_complete")

gcp_task = KubernetesJobOperator(
task_id="load_to_gcp",
body_filepath=POD_TEMPALTE,
command=["/bin/bash", "/git/repo/airflow/dags/scripts/twitter_gcp_script.sh"],
jinja_job_args={
"image": "google/cloud-sdk:alpine",
"name": "ingest-and-load-to-bq",
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={
"LOCAL_DIR": LOCAL_PATH,
"TWITTER_DATASET": os.getenv("TWITTER_DATASET"),
"DATA_BUCKET": os.getenv("DATA_BUCKET"),
"PROJECT": GOOGLE_CLOUD_PROJECT,
},
)
twitter_task >> backfill_first >> gcp_task
159 changes: 42 additions & 117 deletions scrapers/twitter/sentiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,90 +236,55 @@ def scrape_tweets(
return pd.DataFrame(tweets_list)


def scrape_tweets_distributed(
def main(
hashtags: list[str],
since_date: str,
until_date: str,
lang: str,
exclude_keywords: list[str],
num_tweets: int,
hashtag_operator: str = "OR",
) -> pd.DataFrame:
dates = pd.date_range(start=since_date, end=until_date)
tweets_per_day = num_tweets // len(dates)
remaining_tweets = num_tweets % len(dates)
"""
main function that utilizes scrape_tweets, clean_tweets and get_sentiment_scores
to get a dataframe of tweets with desired data.
"""
tweets_df = scrape_tweets(
hashtags, since_date, until_date, lang, exclude_keywords, num_tweets
)

tweets_list = []
logger.info(f"Processing tweets from {since_date} until {until_date}.")

for date in dates:
date_str = date.strftime("%Y-%m-%d")
query = (
f" {hashtag_operator} ".join(hashtags)
+ f" lang:{lang}"
+ "".join([f" -{kw}" for kw in exclude_keywords])
+ f" since:{date_str} until:{date_str}"
)
tweet_count = 0

for tweet in sntwitter.TwitterSearchScraper(query).get_items():
if tweet_count >= tweets_per_day + (remaining_tweets > 0):
break
tweet_dict = {
"Datetime": tweet.date,
"Tweet Id": tweet.id,
"Original Text": tweet.rawContent,
"Username": tweet.user.username,
"Likes": tweet.likeCount,
"Views": int(tweet.viewCount) if tweet.viewCount is not None else 0,
"Replies": tweet.replyCount,
"Retweets": tweet.retweetCount,
"Followers": tweet.user.followersCount,
"Extra Hashtags": [
tag.lower()
for tag in re.findall(r"#(\w+)", tweet.rawContent)
if tag.lower() not in [h.lower().replace("#", "") for h in hashtags]
],
}
tweets_list.append(tweet_dict)
logger.info(len(tweets_list))
tweet_count += 1
if tweet_count == tweets_per_day:
remaining_tweets -= 1
# Clean text and add column to DataFrame
if not tweets_df.empty:
tweets_df["Cleaned Text"] = tweets_df["Original Text"].apply(clean_tweet)

return pd.DataFrame(tweets_list)
# Get sentiment scores and add columns to DataFrame
sentiment_scores = tweets_df["Cleaned Text"].apply(get_sentiment_scores)
tweets_df = pd.concat([tweets_df, sentiment_scores.apply(pd.Series)], axis=1)

# Add additional columns
tweets_df = tweets_df[
[
"Datetime",
"Tweet Id",
"Original Text",
"Cleaned Text",
"Polarity",
"Subjectivity",
"Sentiment",
"Negative Score",
"Neutral Score",
"Positive Score",
"Compound Score",
"Username",
"Likes",
"Views",
"Replies",
"Retweets",
"Followers",
"Extra Hashtags",
]
]

def select_scrape_mode(
hashtags: list[str],
since_date: str,
until_date: str,
lang: str,
exclude_keywords: list[str],
num_tweets: int,
hashtag_operator: str = "OR",
distribute_tweets: bool = False,
) -> pd.DataFrame:
if distribute_tweets:
return scrape_tweets_distributed(
hashtags,
since_date,
until_date,
lang,
exclude_keywords,
num_tweets,
hashtag_operator,
)
else:
return scrape_tweets(
hashtags,
since_date,
until_date,
lang,
exclude_keywords,
num_tweets,
hashtag_operator,
)
return tweets_df


if __name__ == "__main__":
Expand Down Expand Up @@ -350,49 +315,9 @@ def select_scrape_mode(
"shopify",
]

tweets_df = select_scrape_mode(
hashtags,
start_date_str,
end_date_str,
lang,
exclude_keywords,
num_tweets,
distribute_tweets=True,
df = main(
hashtags, start_date_str, end_date_str, lang, exclude_keywords, num_tweets
)

# Clean text and add column to DataFrame
if not tweets_df.empty:
tweets_df["Cleaned Text"] = tweets_df["Original Text"].apply(clean_tweet)

# Get sentiment scores and add columns to DataFrame
sentiment_scores = tweets_df["Cleaned Text"].apply(get_sentiment_scores)
tweets_df = pd.concat([tweets_df, sentiment_scores.apply(pd.Series)], axis=1)

# Add additional columns
tweets_df = tweets_df[
[
"Datetime",
"Tweet Id",
"Original Text",
"Cleaned Text",
"Polarity",
"Subjectivity",
"Sentiment",
"Negative Score",
"Neutral Score",
"Positive Score",
"Compound Score",
"Username",
"Likes",
"Views",
"Replies",
"Retweets",
"Followers",
"Extra Hashtags",
]
]

data_vol = os.getenv("local_path")
tweets_df.to_parquet(f"{data_vol}tweets-{start_date_str}.parquet")
print(tweets_df.head())
logger.info(f"Saved data to file tweets-{start_date_str}.parquet")
df.to_parquet(f"{data_vol}tweets-{start_date_str}.parquet")
logger.info(f"saved data to file tweets-{start_date_str}.parquet")
8 changes: 8 additions & 0 deletions sql/all_tweets.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE OR REPLACE TABLE \`stellarismusv5.twitter_data.bq_tweets\`
AS
WITH TweetData AS (
SELECT *
FROM \`stellarismusv5.twitter_data.tweets-*\`
)
SELECT *
FROM TweetData;
11 changes: 11 additions & 0 deletions sql/genre_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"CREATE OR REPLACE TABLE \`${PROJECT}.${METACRITIC_DATASET}.bq_metacritic_genre_data\` AS
WITH GenreData AS (
SELECT TRIM(genre) AS genre,
AVG(meta_score) AS average_meta_score,
AVG(user_score) AS average_user_score,
COUNT(*) AS game_count
FROM \`${PROJECT}.${METACRITIC_DATASET}.bq_metacritic_gamedata\`, UNNEST(SPLIT(genre, ',')) AS genre
GROUP BY genre
)
SELECT *
FROM GenreData;"
Loading

0 comments on commit 1e322c6

Please sign in to comment.