-
Notifications
You must be signed in to change notification settings - Fork 176
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
798 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
engine_version: 1 | ||
pipelines: | ||
chess: | ||
is_dirty: false | ||
last_commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 | ||
last_commit_timestamp: '2023-06-05T14:09:46+02:00' | ||
files: | ||
chess/__init__.py: | ||
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 | ||
git_sha: 66ccbbdf9e59e8bcb0b932a80eb033a298c3e9f5 | ||
sha3_256: fe7d5923ae62f7bba3b6aa0447af9f7c187f719636e62410d3d0c0c193abed26 | ||
chess/README.md: | ||
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 | ||
git_sha: 4d2c6988018890d8834b32874887f59d3c4ef4a1 | ||
sha3_256: 7c0de2dd5788615f1a18f5b1018ad0013070b82204a780becb8706869447a1c8 | ||
chess/settings.py: | ||
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 | ||
git_sha: e7c335449972e4e4de5ee1035114571f135fd9cf | ||
sha3_256: 01728e594272c541af519f58ebb3ef1b13bcd9253fafc8cfdea532449f96d017 | ||
chess/helpers.py: | ||
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528 | ||
git_sha: c607c5172afbf52578ec1563cc5eb45f4c209cfd | ||
sha3_256: ed62520d9d09960ce88719306d5f080876435b6425808c5587a6078eaa093538 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# put your configuration values here | ||
|
||
[runtime] | ||
log_level="WARNING" # the system log level of dlt | ||
# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/telemetry | ||
dlthub_telemetry = true | ||
|
||
[sources.chess] | ||
config_int = 0 # please set me up! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
--- | ||
title: Chess.com | ||
description: dlt pipeline for Chess.com API | ||
keywords: [chess.com api, chess.com pipeline, chess.com] | ||
--- | ||
|
||
# Chess.com | ||
|
||
This pipeline can be used to load player data from the [Chess.com API](https://www.chess.com/news/view/published-data-api) into a [destination](../general-usage/glossary.md#destination) of your choice. | ||
|
||
## Initialize the pipeline | ||
|
||
Initialize the pipeline with the following command: | ||
``` | ||
dlt init chess bigquery | ||
``` | ||
Here, we chose BigQuery as the destination. To choose a different destination, replace `bigquery` with your choice of destination. | ||
|
||
Running this command will create a directory with the following structure: | ||
```bash | ||
├── .dlt | ||
│ ├── .pipelines | ||
│ ├── config.toml | ||
│ └── secrets.toml | ||
├── chess | ||
│ └── __pycache__ | ||
│ └── __init__.py | ||
├── .gitignore | ||
├── chess_pipeline.py | ||
└── requirements.txt | ||
``` | ||
|
||
## Add credentials | ||
|
||
Before running the pipeline you may need to add credentials in the `.dlt/secrets.toml` file for your chosen destination. For instructions on how to do this, follow the steps detailed under the desired destination in the [destinations](https://dlthub.com/docs/destinations) page. | ||
|
||
## Run the pipeline | ||
|
||
1. Install the necessary dependencies by running the following command: | ||
``` | ||
pip install -r requirements.txt | ||
``` | ||
2. Now the pipeline can be run by using the command: | ||
``` | ||
python3 chess_pipeline.py | ||
``` | ||
3. To make sure that everything is loaded as expected, use the command: | ||
``` | ||
dlt pipeline chess_pipeline show | ||
``` | ||
|
||
## Customize parameters | ||
|
||
Without any modifications, the chess pipeline will load data for a default list of players over a default period of time. You can change these values in the `chess_pipeline.py` script. | ||
|
||
For example, if you wish to load player games for a specific set of players, add the player list to the function `load_player_games_example` as below. | ||
```python | ||
def load_players_games_example(start_month: str, end_month: str): | ||
|
||
pipeline = dlt.pipeline(pipeline_name="chess_pipeline", destination='bigquery', dataset_name="chess_players_games_data") | ||
|
||
data = chess( | ||
[], # Specify your list of players here | ||
start_month=start_month, | ||
end_month=end_month | ||
) | ||
|
||
info = pipeline.run(data.with_resources("players_games", "players_profiles")) | ||
print(info) | ||
``` | ||
To specify the time period, pass the starting and ending months as parameters when calling the function in the `__main__` block: | ||
```python | ||
if __name__ == "__main__" : | ||
load_players_games_example("2022/11", "2022/12") # Replace the strings "2022/11" and "2022/12" with different months in the "YYYY/MM" format | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
"""A pipeline loading player profiles and games from chess.com api""" | ||
|
||
from typing import Callable, Iterator, List, Sequence, Dict, Any | ||
|
||
import dlt | ||
from dlt.common import pendulum | ||
from dlt.common.typing import TDataItem, StrAny | ||
from dlt.extract.source import DltResource | ||
from dlt.sources.helpers import requests | ||
from .helpers import get_url_with_retry, get_path_with_retry, validate_month_string | ||
|
||
from .settings import UNOFFICIAL_CHESS_API_URL | ||
|
||
|
||
@dlt.source(name="chess") | ||
def source( | ||
players: List[str], start_month: str = None, end_month: str = None | ||
) -> Sequence[DltResource]: | ||
""" | ||
A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing | ||
various types of data: user profiles or chess match results | ||
Args: | ||
players (List[str]): A list of the player usernames for which to get the data. | ||
start_month (str, optional): Filters out all the matches happening before `start_month`. Defaults to None. | ||
end_month (str, optional): Filters out all the matches happening after `end_month`. Defaults to None. | ||
Returns: | ||
Sequence[DltResource]: A sequence of resources that can be selected from including players_profiles, | ||
players_archives, players_games, players_online_status | ||
""" | ||
return ( | ||
players_profiles(players), | ||
players_archives(players), | ||
players_games(players, start_month=start_month, end_month=end_month), | ||
players_online_status(players), | ||
) | ||
|
||
|
||
@dlt.resource(write_disposition="replace") | ||
def players_profiles(players: List[str]) -> Iterator[TDataItem]: | ||
""" | ||
Yields player profiles for a list of player usernames. | ||
Args: | ||
players (List[str]): List of player usernames to retrieve profiles for. | ||
Yields: | ||
Iterator[TDataItem]: An iterator over player profiles data. | ||
""" | ||
|
||
# get archives in parallel by decorating the http request with defer | ||
@dlt.defer | ||
def _get_profile(username: str) -> TDataItem: | ||
return get_path_with_retry(f"player/{username}") | ||
|
||
for username in players: | ||
yield _get_profile(username) | ||
|
||
|
||
@dlt.resource(write_disposition="replace", selected=False) | ||
def players_archives(players: List[str]) -> Iterator[List[TDataItem]]: | ||
""" | ||
Yields url to game archives for specified players. | ||
Args: | ||
players (List[str]): List of player usernames to retrieve archives for. | ||
Yields: | ||
Iterator[List[TDataItem]]: An iterator over list of player archive data. | ||
""" | ||
for username in players: | ||
data = get_path_with_retry(f"player/{username}/games/archives") | ||
yield data.get("archives", []) | ||
|
||
|
||
@dlt.resource(write_disposition="append") | ||
def players_games( | ||
players: List[str], start_month: str = None, end_month: str = None | ||
) -> Iterator[Callable[[], List[TDataItem]]]: | ||
""" | ||
Yields `players` games that happened between `start_month` and `end_month`. | ||
Args: | ||
players (List[str]): List of player usernames to retrieve games for. | ||
start_month (str, optional): The starting month in the format "YYYY/MM". Defaults to None. | ||
end_month (str, optional): The ending month in the format "YYYY/MM". Defaults to None. | ||
Yields: | ||
Iterator[Callable[[], List[TDataItem]]]: An iterator over callables that return a list of games for each player. | ||
""" # do a simple validation to prevent common mistakes in month format | ||
validate_month_string(start_month) | ||
validate_month_string(end_month) | ||
|
||
# get a list of already checked archives | ||
# from your point of view, the state is python dictionary that will have the same content the next time this function is called | ||
checked_archives = dlt.current.resource_state().setdefault("archives", []) | ||
# get player archives, note that you can call the resource like any other function and just iterate it like a list | ||
archives = players_archives(players) | ||
|
||
# get archives in parallel by decorating the http request with defer | ||
@dlt.defer | ||
def _get_archive(url: str) -> List[TDataItem]: | ||
print(f"Getting archive from {url}") | ||
try: | ||
games = get_url_with_retry(url).get("games", []) | ||
return games # type: ignore | ||
except requests.HTTPError as http_err: | ||
# sometimes archives are not available and the error seems to be permanent | ||
if http_err.response.status_code == 404: | ||
return [] | ||
raise | ||
|
||
# enumerate the archives | ||
url: str = None | ||
for url in archives: | ||
# the `url` format is https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM} | ||
if start_month and url[-7:] < start_month: | ||
continue | ||
if end_month and url[-7:] > end_month: | ||
continue | ||
# do not download archive again | ||
if url in checked_archives: | ||
continue | ||
else: | ||
checked_archives.append(url) | ||
# get the filtered archive | ||
yield _get_archive(url) | ||
|
||
|
||
@dlt.resource(write_disposition="append") | ||
def players_online_status(players: List[str]) -> Iterator[TDataItem]: | ||
""" | ||
Returns current online status for a list of players. | ||
Args: | ||
players (List[str]): List of player usernames to check online status for. | ||
Yields: | ||
Iterator[TDataItem]: An iterator over the online status of each player. | ||
""" | ||
# we'll use unofficial endpoint to get online status, the official seems to be removed | ||
for player in players: | ||
status = get_url_with_retry( | ||
"%suser/popup/%s" % (UNOFFICIAL_CHESS_API_URL, player) | ||
) | ||
# return just relevant selection | ||
yield { | ||
"username": player, | ||
"onlineStatus": status["onlineStatus"], | ||
"lastLoginDate": status["lastLoginDate"], | ||
"check_time": pendulum.now(), # dlt can deal with native python dates | ||
} | ||
|
||
|
||
@dlt.source | ||
def chess_dlt_config_example( | ||
secret_str: str = dlt.secrets.value, | ||
secret_dict: Dict[str, Any] = dlt.secrets.value, | ||
config_int: int = dlt.config.value, | ||
) -> DltResource: | ||
""" | ||
An example of a source that uses dlt to provide secrets and config values. | ||
Args: | ||
secret_str (str, optional): Secret string provided by dlt.secrets.value. Defaults to dlt.secrets.value. | ||
secret_dict (Dict[str, Any], optional): Secret dictionary provided by dlt.secrets.value. Defaults to dlt.secrets.value. | ||
config_int (int, optional): Config integer provided by dlt.config.value. Defaults to dlt.config.value. | ||
Returns: | ||
DltResource: Returns a resource yielding the configured values. | ||
""" | ||
print(secret_str) | ||
print(secret_dict) | ||
print(config_int) | ||
|
||
# returns a resource yielding the configured values - it is just a test | ||
return dlt.resource([secret_str, secret_dict, config_int], name="config_values") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
"""Chess pipeline helpers""" | ||
|
||
from dlt.common.typing import StrAny | ||
from dlt.sources.helpers import requests | ||
from .settings import OFFICIAL_CHESS_API_URL | ||
|
||
|
||
def get_url_with_retry(url: str) -> StrAny: | ||
r = requests.get(url) | ||
return r.json() # type: ignore | ||
|
||
|
||
def get_path_with_retry(path: str) -> StrAny: | ||
return get_url_with_retry(f"{OFFICIAL_CHESS_API_URL}{path}") | ||
|
||
|
||
def validate_month_string(string: str) -> None: | ||
"""Validates that the string is in YYYY/MM format""" | ||
if string and string[4] != "/": | ||
raise ValueError(string) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
"""Chess pipeline settings and constants""" | ||
|
||
OFFICIAL_CHESS_API_URL = "https://api.chess.com/pub/" | ||
UNOFFICIAL_CHESS_API_URL = "https://www.chess.com/callback/" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import dlt | ||
from chess import source | ||
import os | ||
|
||
def load_table_counts(p: dlt.Pipeline, *table_names: str): | ||
"""Returns row counts for `table_names` as dict""" | ||
|
||
# try sql, could be other destination though | ||
query = "\nUNION ALL\n".join([f"SELECT '{name}' as name, COUNT(1) as c FROM {name}" for name in table_names]) | ||
with p.sql_client() as c: | ||
with c.execute_query(query) as cur: | ||
rows = list(cur.fetchall()) | ||
return {r[0]: r[1] for r in rows} | ||
|
||
|
||
def load_players_games_example(start_month: str, end_month: str) -> None: | ||
"""Constructs a pipeline that will load chess games of specific players for a range of months.""" | ||
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = "freeze-and-raise" | ||
|
||
# configure the pipeline: provide the destination and dataset name to which the data should go | ||
pipeline = dlt.pipeline( | ||
import_schema_path="schemas/import", | ||
export_schema_path="schemas/export", | ||
pipeline_name="chess_pipeline", | ||
destination='duckdb', | ||
dataset_name="chess_players_games_data", | ||
full_refresh=True | ||
) | ||
# create the data source by providing a list of players and start/end month in YYYY/MM format | ||
data = source( | ||
["magnuscarlsen", "vincentkeymer", "dommarajugukesh", "rpragchess"], | ||
start_month=start_month, | ||
end_month=end_month, | ||
) | ||
# load the "players_games" and "players_profiles" out of all the possible resources | ||
info = pipeline.run(data.with_resources("players_games", "players_profiles")) | ||
print(info) | ||
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()]) | ||
print(table_counts) | ||
|
||
|
||
def load_players_games_incrementally() -> None: | ||
"""Pipeline will not load the same game archive twice""" | ||
# loads games for 11.2022 | ||
load_players_games_example("2022/11", "2022/11") | ||
# second load skips games for 11.2022 but will load for 12.2022 | ||
load_players_games_example("2022/11", "2022/12") | ||
|
||
|
||
if __name__ == "__main__": | ||
# run our main example | ||
load_players_games_example("2022/11", "2022/12") |
Oops, something went wrong.