Skip to content

Commit

Permalink
f765a29c84
Browse files Browse the repository at this point in the history
  • Loading branch information
Liftingthedata committed Jun 28, 2023
1 parent 1e322c6 commit 214e919
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 77 deletions.
111 changes: 48 additions & 63 deletions airflow/dags/scrape_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,8 @@
dag_id="scrapers",
schedule_interval="0 0 1 * *",
default_args=default_args,
catchup=True,
tags=["scraping", "vgchartz", "twitter", "metacritic"],
tags=["scraping", "vgchartz", "metacritic"],
) as dag:
twitter_task = KubernetesJobOperator(
task_id="scrape-tweets",
body_filepath=POD_TEMPALTE,
command=["python", f"{BASE}/twitter/sentiment_analysis.py"],
jinja_job_args={
"image": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/scraper:latest",
"name": "scrape-tweets",
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={"start_date": "{{ ds }}", "local_path": LOCAL_PATH, "num_tweets": 10000},
)

# backfill_first = LatestOnlyOperator(task_id="ensure_backfill_complete")

# with TaskGroup(group_id="process-metacritic-data") as metacritic_tg:
# consoles = ["xbox360", "xbox-series-x", "xboxone", "xbox"]
# for console in consoles:
Expand Down Expand Up @@ -147,50 +131,51 @@
# envs={"console": console, "local_path": LOCAL_PATH},
# )
# t1 >> tg1
# with TaskGroup(group_id="process-vgchartz-data") as vgchartz_tg:
# v1 = KubernetesJobOperator(
# task_id="scrape-vgchartz-hw-sales",
# body_filepath=POD_TEMPALTE,
# command=["python", f"{BASE}/vgchartz/scrape_hardware_sales.py"],
# jinja_job_args={
# "image": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/scraper:latest",
# "name": "scrape-vg-hw-sales",
# "gitsync": True,
# "volumes": [COMMON_VOLUME_CONFIG],
# },
# envs={"local_path": LOCAL_PATH},
# )
with TaskGroup(group_id="process-vgchartz-data") as vgchartz_tg:
v1 = KubernetesJobOperator(
task_id="scrape-vgchartz-hw-sales",
body_filepath=POD_TEMPALTE,
command=["python", f"{BASE}/vgchartz/scrape_hardware_sales.py"],
jinja_job_args={
"image": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/scraper:latest",
"name": "scrape-vg-hw-sales",
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={"local_path": LOCAL_PATH},
)

v2 = KubernetesJobOperator(
task_id="scrape-vgchartz-game-sales",
body_filepath=POD_TEMPALTE,
command=["python", f"{BASE}/vgchartz/scrape_game_sales.py"],
jinja_job_args={
"image": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/scraper:latest",
"name": "scrape-vg-game-sales",
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={"local_path": LOCAL_PATH},
)
gcp_task = KubernetesJobOperator(
task_id="load_to_gcp",
body_filepath=POD_TEMPALTE,
command=["/bin/bash", "/git/repo/airflow/dags/scripts/gcp_script.sh"],
jinja_job_args={
"image": "google/cloud-sdk:alpine",
"name": "ingest-and-load-to-bq",
"gitsync": True,
"volumes": [COMMON_VOLUME_CONFIG],
},
envs={
"LOCAL_DIR": LOCAL_PATH,
"TWITTER_DATASET": os.getenv("TWITTER_DATASET"),
"VGCHARTZ_DATASET": os.getenv("VGCHARTZ_DATASET"),
"METACRITIC_DATASET": os.getenv("METACRITIC_DATASET"),
"DATA_BUCKET": os.getenv("DATA_BUCKET"),
"PROJECT": GOOGLE_CLOUD_PROJECT,
},
)
# [metacritic_tg, vgchartz_tg] >> gcp_task

# v2 = KubernetesJobOperator(
# task_id="scrape-vgchartz-game-sales",
# body_filepath=POD_TEMPALTE,
# command=["python", f"{BASE}/vgchartz/scrape_game_sales.py"],
# jinja_job_args={
# "image": f"eu.gcr.io/{GOOGLE_CLOUD_PROJECT}/scraper:latest",
# "name": "scrape-vg-game-sales",
# "gitsync": True,
# "volumes": [COMMON_VOLUME_CONFIG],
# },
# envs={"local_path": LOCAL_PATH},
# )
# gcp_task = KubernetesJobOperator(
# task_id="load_to_gcp",
# body_filepath=POD_TEMPALTE,
# command=["/bin/bash", "/git/repo/airflow/dags/scripts/gcp_script.sh"],
# jinja_job_args={
# "image": "google/cloud-sdk:alpine",
# "name": "ingest-and-load-to-bq",
# "gitsync": True,
# "volumes": [COMMON_VOLUME_CONFIG],
# },
# envs={
# "LOCAL_DIR": LOCAL_PATH,
# "TWITTER_DATASET": os.getenv("TWITTER_DATASET"),
# "VGCHARTZ_DATASET": os.getenv("VGCHARTZ_DATASET"),
# "METACRITIC_DATASET": os.getenv("METACRITIC_DATASET"),
# "DATA_BUCKET": os.getenv("DATA_BUCKET"),
# "PROJECT": GOOGLE_CLOUD_PROJECT,
# },
# )
# twitter_task >> backfill_first >> [metacritic_tg, vgchartz_tg] >> gcp_task
# backfill_first >> metacritic_tg >> gcp_task
vgchartz_tg >> gcp_task
11 changes: 2 additions & 9 deletions airflow/dags/tweet_scrape_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,19 @@
primary purpose of these jobs is to perform scraping tasks.
The DAG follows the sequence:
twitter_task >> backfill_first >> metacritic_tg >> vgchartz_tg >> gcp_task
twitter_task >> backfill_first >> gcp_task
Task 'twitter_task': This task involves scraping tweets from Twitter for the previous
month and performing sentiment analysis on them.
Task 'backfill_first': This task ensures that the Twitter data is backfilled before
scraping other sites that do not require backfilling.
Task group 'metacritic_tg': This task group consists of multiple tasks that scrape data
from Metacritic. It scrapes the data for each game as well as the user and critic reviews.
Task group 'vgchartz_tg': This task group consists of 2 tasks that scrape data
from Vgchartz.
Task 'gcp_task': This final task saves the scraped data to a Google Cloud Storage (GCS)
bucket and subsequently loads it into a BigQuery table.
The DAG is scheduled to run on a cron schedule, specifically on the first day of each
month. The Twitter data is appended during each run, while the other data is replaced with
the latest version.
month.
"""
# pylint: disable=pointless-statement
# pylint: disable=wrong-import-order
Expand Down
1 change: 0 additions & 1 deletion scrapers/twitter/sentiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ def scrape_tweets(
if tag.lower() not in [h.lower().replace("#", "") for h in hashtags]
],
}
print(tweet.user.username)
tweets_list.append(tweet_dict)

return pd.DataFrame(tweets_list)
Expand Down
24 changes: 20 additions & 4 deletions scrapers/vgchartz/scrape_game_sales.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,39 @@ def build_url(genre: str, console_type: str, page_num: int) -> str:
return url


import pandas as pd


def clean_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean the scraped data by converting sales columns to float format, replacing
'Series' in the 'Console' column with 'XS', dropping the 'Gamex' column, and
returning the cleaned DataFrame.
Clean the scraped data by filtering for total sales > 100,000, converting sales columns to float format,
replacing 'Series' in the 'Console' column with 'XS', dropping the 'Gamex' column,
converting 'Release_Date' and 'Last_Update' columns to date format, and adding a 'Release_Year' column.
Additionally, update the values in the 'Console' column.
Args:
df: A pandas DataFrame containing scraped video game sales data.
Returns:
A pandas DataFrame with cleaned data.
"""
# Filter for total sales > 100,000
df = df[df["Total_Sales"] > 100000]

for col in df.columns:
if "Sales" in col or "Units" in col:
df[col] = df[col].str.replace("m", "").astype(float)

df["Console"] = df["Console"].str.replace("Series", "XS")
df["Console"] = df["Console"].replace(
{"XS": "Xbox Series X", "XOne": "Xbox One", "X360": "Xbox 360", "XB": "Xbox"}
)

df["Release_Date"] = pd.to_datetime(df["Release_Date"], format="%dth %b %y")
df["Last_Update"] = pd.to_datetime(df["Last_Update"], format="%dth %b %y")
df["Release_Year"] = df["Release_Date"].dt.year

df = df.dropna(subset=["Release_Year"])

df = df.drop(["Gamex"], axis=1)
return df

Expand Down

0 comments on commit 214e919

Please sign in to comment.