In [130]:
from datetime import date
import json
import time
import requests
from requests.exceptions import HTTPError

import pandas as pd
from google.cloud import storage
from google.api_core.page_iterator import HTTPIterator

# Common

In [None]:
# Defenition Values
BUCKET_NAME = "shanari-datalake"
DATASOURCE_NAME = "pokeapi"
RAW_LAYER = "raw"
TRANSLATED_LAYER = "translated"
TARGET_DATE = date.today()
print(f"The target_date is {TARGET_DATE}")

In [24]:
# dictをGCSへアップロードする
def upload_json_to_gcs(
        bucket_name: str, 
        datasource_name: str,
        layer_name: str,
        target_date: date,
        object_name: str,
        json_object: dict
        ) -> bool:
    # Set date of string
    str_year = date.strftime(target_date, "%Y")
    str_month = date.strftime(target_date, "%m")
    str_day = date.strftime(target_date, "%d")

    # Prepare Client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob_path = f"{datasource_name}/{layer_name}/{str_year}/{str_month}/{str_day}/{object_name}.json"
    blob = bucket.blob(blob_path)

    # dump to json file
    json_data = json.dumps(json_object, indent=4)

    # 3回までは失敗してもリトライする
    count = 0
    while True:

        # Upload to GCS
        try:
            blob.upload_from_string(json_data)
            print(f"Success to uploaded file: {blob_path}")
            return True
        except Exception as e:
            print(f"Error uploading file: {e}")
            if count > 2:
                return False
            else:
                count = count + 1
                # countの2乗秒待つ
                time.sleep(count * count)



In [175]:
# DataFrameをGCSへアップロードする
def upload_df_to_gcs(
        bucket_name: str, 
        datasource_name: str,
        layer_name: str,
        target_date: date,
        object_name: str,
        df_object: pd.DataFrame
        ) -> bool:
    # Set date of string
    str_year = date.strftime(target_date, "%Y")
    str_month = date.strftime(target_date, "%m")
    str_day = date.strftime(target_date, "%d")

    # Prepare Client
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob_path = f"{datasource_name}/{layer_name}/{str_year}/{str_month}/{str_day}/{object_name}.csv"
    blob = bucket.blob(blob_path)

    # dump to csv file
    csv_data = df_object.to_csv(index=False)

    # 3回までは失敗してもリトライする
    count = 0
    while True:

        # Upload to GCS
        try:
            blob.upload_from_string(csv_data)
            print(f"Success to uploaded file: {blob_path}")
            return True
        except Exception as e:
            print(f"Error uploading file: {e}")
            if count > 2:
                return False
            else:
                count = count + 1
                # countの2乗秒待つ
                time.sleep(count * count)


# Extract

In [21]:
def extract_json_data(url: str) -> dict:
    # RestAPIをコールして値を取得する共通処理
    response = requests.get(url)
    if response.status_code != requests.codes.ok:
        HTTPError(f"Extract Error, URL: {url}")
    return response.json()

In [None]:
# Pokemon

# 全体のリストを取得する
pokemon_list = extract_json_data(url="https://pokeapi.co/api/v2/pokemon/?offset=0&limit=10000")

# ポケモン毎にJSONを取得してGCSへアップロードする
for p in pokemon_list["results"]:
    pokemon = extract_json_data(url=p["url"])
    is_uploaded = upload_json_to_gcs(
        bucket_name=BUCKET_NAME,
        datasource_name=DATASOURCE_NAME,
        layer_name=RAW_LAYER,
        target_date=TARGET_DATE,
        object_name="pokemon_"+p["url"].split("/")[-2], # "pokemon_<id>.json"になる
        json_object=pokemon
    )

    if not is_uploaded:
        break;

    # 0.5秒待つ
    time.sleep(0.5)


In [None]:
# Ability

# 全体のリストを取得する
ability_list = extract_json_data(url="https://pokeapi.co/api/v2/ability/?offset=0&limit=10000")

# 特性毎にJSONを取得してGCSへアップロードする
for i in ability_list["results"]:
    ability = extract_json_data(url=i["url"])
    is_uploaded = upload_json_to_gcs(
        bucket_name=BUCKET_NAME,
        datasource_name=DATASOURCE_NAME,
        layer_name=RAW_LAYER,
        target_date=TARGET_DATE,
        object_name="ability_"+i["url"].split("/")[-2], # "ability_<id>.json"になる
        json_object=ability
    )

    if not is_uploaded:
        break;

    # 0.5秒待つ
    time.sleep(0.5)

In [None]:
# Nature

# 全体のリストを取得する
nature_list = extract_json_data(url="https://pokeapi.co/api/v2/nature/?offset=0&limit=10000")

# 性格毎にJSONを取得してGCSへアップロードする
for i in nature_list["results"]:
    nature = extract_json_data(url=i["url"])
    is_uploaded = upload_json_to_gcs(
        bucket_name=BUCKET_NAME,
        datasource_name=DATASOURCE_NAME,
        layer_name=RAW_LAYER,
        target_date=TARGET_DATE,
        object_name="nature_"+i["url"].split("/")[-2], # "nature_<id>.json"になる
        json_object=nature
    )

    if not is_uploaded:
        break;

    # 0.5秒待つ
    time.sleep(0.5)

In [None]:
# Pokemon Form

# 全体のリストを取得する
pokemon_form_list = extract_json_data(url="https://pokeapi.co/api/v2/pokemon-form/?offset=0&limit=10000")

# フォーム毎にJSONを取得してGCSへアップロードする
for i in pokemon_form_list["results"]:
    pokemon_form = extract_json_data(url=i["url"])
    is_uploaded = upload_json_to_gcs(
        bucket_name=BUCKET_NAME,
        datasource_name=DATASOURCE_NAME,
        layer_name=RAW_LAYER,
        target_date=TARGET_DATE,
        object_name="pokemon-form_"+i["url"].split("/")[-2], # "pokemon-form_<id>.json"になる
        json_object=pokemon_form
    )

    if not is_uploaded:
        break;

    # 0.5秒待つ
    time.sleep(0.5)

In [None]:
# Pokemon Species

# 全体のリストを取得する
pokemon_species_list = extract_json_data(url="https://pokeapi.co/api/v2/pokemon-species/?offset=0&limit=10000")

# フォーム毎にJSONを取得してGCSへアップロードする
for i in pokemon_species_list["results"]:
    pokemon_species = extract_json_data(url=i["url"])
    is_uploaded = upload_json_to_gcs(
        bucket_name=BUCKET_NAME,
        datasource_name=DATASOURCE_NAME,
        layer_name=RAW_LAYER,
        target_date=TARGET_DATE,
        object_name="pokemon-species_"+i["url"].split("/")[-2], # "pokemon-species_<id>.json"になる
        json_object=pokemon_species
    )

    if not is_uploaded:
        break;

    # 0.5秒待つ
    time.sleep(0.5)

In [None]:
# Moves

# 全体のリストを取得する
move_list = extract_json_data(url="https://pokeapi.co/api/v2/move/?offset=0&limit=10000")

# フォーム毎にJSONを取得してGCSへアップロードする
for i in move_list["results"]:
    move = extract_json_data(url=i["url"])
    is_uploaded = upload_json_to_gcs(
        bucket_name=BUCKET_NAME,
        datasource_name=DATASOURCE_NAME,
        layer_name=RAW_LAYER,
        target_date=TARGET_DATE,
        object_name="move_"+i["url"].split("/")[-2], # "move_<id>.json"になる
        json_object=move
    )

    if not is_uploaded:
        break;

    # 0.5秒待つ
    time.sleep(0.5)

# Translated

In [39]:
def get_file_path(
        bucket_name: str, 
        datasource_name: str,
        layer_name: str,
        target_date: date,
        object_name: str,
        ) -> list[HTTPIterator]:
    # Set date of string
    str_year = date.strftime(target_date, "%Y")
    str_month = date.strftime(target_date, "%m")
    str_day = date.strftime(target_date, "%d")
    prefix = f"{datasource_name}/{layer_name}/{str_year}/{str_month}/{str_day}/{object_name}_"

    # GCSのファイル一覧を取得する共通処理
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)
    
    return blobs

In [45]:
def read_json_from_gcs(
        bucket_name: str,
        prefix: str
) -> dict:
    # GCSにあるjsonファイルを取得する共通処理
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(prefix)

    # 3回までは失敗してもリトライする
    count = 0
    while True:
        try:
            with blob.open('rb') as f:
                data = json.load(f)
            return data
        except Exception as e:
            print(f"Error reading file: {e}")
            if count > 2:
                return {}
            else:
                count = count + 1
                # countの2乗秒待つ
                time.sleep(count * count)


In [None]:
# Pokemon
pokemon_blobs = get_file_path(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=RAW_LAYER,
    target_date=TARGET_DATE,
    object_name="pokemon"
    )

str_year = date.strftime(TARGET_DATE, "%Y")
str_month = date.strftime(TARGET_DATE, "%m")
str_day = date.strftime(TARGET_DATE, "%d")

dfs_pokemon = []
dfs_pokemon_moves = []
dfs_pokemon_sprites = []
for blob in pokemon_blobs:
    print(f"Name of Blob: {blob.name}")
    pokemon_data = read_json_from_gcs(bucket_name=BUCKET_NAME, prefix=blob.name)

    species_id = pokemon_data["species"]["url"].split("/")[-2]
    species_prefix = f"{DATASOURCE_NAME}/{RAW_LAYER}/{str_year}/{str_month}/{str_day}/pokemon-species_{species_id}.json"
    species_data = read_json_from_gcs(bucket_name=BUCKET_NAME, prefix=species_prefix)

    # DataFrame of Pokemon (Master)
    type_2 = None
    if len(pokemon_data["types"]) > 1:
        type_2 = [type["type"]["name"] for type in pokemon_data["types"] if type["slot"] == 2][0]
    pokemon_name_jp = None
    if len(species_data) > 0:
        pokemon_name_jp = [name["name"] for name in species_data["names"] if name["language"]["name"] == "ja"][0]
    dfs_pokemon.append(
        pd.DataFrame({
            "pokemon_id": [pokemon_data["id"]],
            "pokemon_name_en": [pokemon_data["name"]],
            "pokemon_name_jp": [pokemon_name_jp],
            "height": [pokemon_data["height"]],
            "weight": [pokemon_data["weight"]],
            "type_1": [[type["type"]["name"] for type in pokemon_data["types"] if type["slot"] == 1][0]],
            "type_2": [type_2],
            "stat_hp": [[stat["base_stat"] for stat in pokemon_data["stats"] if stat["stat"]["name"] == "hp"][0]],
            "stat_attack": [[stat["base_stat"] for stat in pokemon_data["stats"] if stat["stat"]["name"] == "attack"][0]],
            "stat_defense": [[stat["base_stat"] for stat in pokemon_data["stats"] if stat["stat"]["name"] == "defense"][0]],
            "stat_special_attack": [[stat["base_stat"] for stat in pokemon_data["stats"] if stat["stat"]["name"] == "special-attack"][0]],
            "stat_special_defense": [[stat["base_stat"] for stat in pokemon_data["stats"] if stat["stat"]["name"] == "special-defense"][0]],
            "stat_speed": [[stat["base_stat"] for stat in pokemon_data["stats"] if stat["stat"]["name"] == "speed"][0]],
        })
    )

    # DataFrame of Pokemon - Moves (1:N)
    if len(pokemon_data["moves"]) != 0:
        dfs_version_group_detail = []
        for pokemon_move in pokemon_data["moves"]:
            for version_group_detail in pokemon_move["version_group_details"]:
                dfs_version_group_detail.append(
                        pd.DataFrame({
                        "pokemon_id": [pokemon_data["id"]],
                        "move_name": [pokemon_move["move"]["name"]],
                        "move_learn_method": [version_group_detail["move_learn_method"]["name"]],
                        "level_learned_at": [version_group_detail["level_learned_at"]],
                        "version_group": [version_group_detail["version_group"]["name"]]  
                }))
        dfs_pokemon_moves.append(pd.concat(dfs_version_group_detail))

    # DataFrame of Pokemon - Sprites (1:N)
    dfs_pokemon_sprites.append(
        pd.DataFrame({
            "pokemon_id": [pokemon_data["id"]],
            "front_default_url": [pokemon_data["sprites"]["front_default"]],
            "front_female_url": [pokemon_data["sprites"]["front_female"]],
            "front_shiny_url": [pokemon_data["sprites"]["front_shiny"]],
            "front_shiny_female_url": [pokemon_data["sprites"]["front_shiny_female"]],
            "back_default_url": [pokemon_data["sprites"]["back_default"]],
            "back_female_url": [pokemon_data["sprites"]["back_female"]],
            "back_shiny_url": [pokemon_data["sprites"]["back_shiny"]],
            "back_shiny_female_url": [pokemon_data["sprites"]["back_shiny_female"]],
        })
    )

df_pokemon = pd.concat(dfs_pokemon)
df_pokemon_moves = pd.concat(dfs_pokemon_moves)
df_pokemon_sprites = pd.concat(dfs_pokemon_sprites)

upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="pokemon",
    df_object=df_pokemon
)
upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="pokemon_moves",
    df_object=df_pokemon_moves
)
upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="pokemon_sprites",
    df_object=df_pokemon_sprites
)

In [None]:
# Ability
ability_blobs = get_file_path(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=RAW_LAYER,
    target_date=TARGET_DATE,
    object_name="ability"
    )

dfs_ability = []
for blob in ability_blobs:
    print(f"Name of Blob: {blob.name}")
    ability_data = read_json_from_gcs(bucket_name=BUCKET_NAME, prefix=blob.name)
    abitily_name_jp = None
    if len(ability_data["names"]) > 1:
        abitily_name_jp = [ language  for language in ability_data["names"] if language["language"]["name"] == "ja-Hrkt" ][0]["name"]
    dfs_ability.append(
        pd.DataFrame({
            "ability_id": [ability_data["id"]],
            "ability_name_en": [ability_data["name"]],
            "ability_name_jp": [abitily_name_jp],
        })
    )
df_ability = pd.concat(dfs_ability)

upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="ability",
    df_object=df_ability
)

In [None]:
# Nature
nature_blobs = get_file_path(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=RAW_LAYER,
    target_date=TARGET_DATE,
    object_name="nature"
)
dfs_nature = []
for blob in nature_blobs:
    print(f"Name of Blob: {blob.name}")
    nature_data = read_json_from_gcs(bucket_name=BUCKET_NAME, prefix=blob.name)

    nature_name_jp = None
    if len(nature_data["names"]) > 1:
        nature_name_jp = [ language  for language in nature_data["names"] if language["language"]["name"] == "ja-Hrkt" ][0]["name"]
    
    increased_stat = None
    if nature_data["increased_stat"] is not None:
        increased_stat = nature_data["increased_stat"]["name"]
    
    decreased_stat = None
    if nature_data["decreased_stat"] is not None:
        decreased_stat = nature_data["decreased_stat"]["name"]
    dfs_nature.append(
        pd.DataFrame({
            "nature_id": [nature_data["id"]],
            "nature_name_en": [nature_data["name"]],
            "nature_name_jp": [nature_name_jp],
            "increased_stat": [increased_stat],
            "decreased_stat": [decreased_stat],
        })
    )

df_nature = pd.concat(dfs_nature)

upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="nature",
    df_object=df_nature
)

In [None]:
# Pokemon Form
form_blobs = get_file_path(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=RAW_LAYER,
    target_date=TARGET_DATE,
    object_name="pokemon-form"
)
dfs_form = []
for blob in form_blobs:
    print(f"Name of Blob: {blob.name}")
    form_data = read_json_from_gcs(bucket_name=BUCKET_NAME, prefix=blob.name)
    if form_data["form_name"] != "":
        form_name_jp = None
        if len(form_data["form_names"]) > 1:
            form_name_lang = [ language  for language in form_data["form_names"] if language["language"]["name"] == "ja-Hrkt" ]
            form_name_jp = form_name_lang[0]["name"] if len(form_name_lang) > 0 else None
        dfs_form.append(
            pd.DataFrame({
                "form_id": [form_data["id"]],
                "form_name_en": [form_data["name"]],
                "form_name_jp": [form_name_jp]
            })
        )

df_form = pd.concat(dfs_form)

upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="pokemon-form",
    df_object=df_form
)
    

In [None]:
# Moves
move_blobs = get_file_path(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=RAW_LAYER,
    target_date=TARGET_DATE,
    object_name="move"
)

dfs_moves = []
dfs_move_stat_change = []
for blob in move_blobs:
    print(f"Name of Blob: {blob.name}")
    move_data = read_json_from_gcs(bucket_name=BUCKET_NAME,prefix=blob.name)

    move_name_jp = None
    if len(move_data["names"]) > 1:
        move_name_lang = [ language  for language in move_data["names"] if language["language"]["name"] == "ja-Hrkt" ]
        move_name_jp = move_name_lang[0]["name"] if len(move_name_lang) > 0 else None
    
    effect_entries = None
    if len(move_data["effect_entries"]) > 0:
        effect_entries = move_data["effect_entries"][0]["effect"]

    if move_data["meta"] is None:
        ailment = None
        move_category = None
        crit_rate = None
        drain = None
        flinch_chance = None
        healing = None
        max_hits = None
        min_hits = None
        max_turns = None
        min_turns = None
        stat_chance = None
    else:
        ailment = move_data["meta"]["ailment"]["name"]
        move_category = move_data["meta"]["category"]["name"]
        crit_rate = move_data["meta"]["crit_rate"]
        drain = move_data["meta"]["drain"]
        flinch_chance = move_data["meta"]["flinch_chance"]
        healing = move_data["meta"]["healing"]
        max_hits = move_data["meta"]["max_hits"]
        min_hits = move_data["meta"]["min_hits"]
        max_turns = move_data["meta"]["max_turns"]
        min_turns = move_data["meta"]["min_turns"]
        stat_chance = move_data["meta"]["stat_chance"]
    dfs_moves.append(
        pd.DataFrame(
            {
                "move_id": [move_data["id"]],
                "move_name_en": [move_data["name"]],
                "move_name_jp": [move_name_jp],
                "type": [move_data["type"]["name"]],
                "power": [move_data["power"]],
                "pp": [move_data["pp"]],
                "priority": [move_data["priority"]],
                "accuracy": [move_data["accuracy"]],
                "damage_class": [move_data["damage_class"]["name"]],
                "effect_chance": [move_data["effect_chance"]],
                "effect_entries": [effect_entries],
                "ailment": [ailment],
                "move_category": [move_category],
                "crit_rate": [crit_rate],
                "drain": [drain],
                "flinch_chance": [flinch_chance],
                "healing": [healing],
                "max_hits": [max_hits],
                "min_hits": [min_hits],
                "max_turns": [max_turns],
                "min_turns": [min_turns],
                "stat_chance": [stat_chance],
                "target": [move_data["target"]["name"]]
            }
        )
    )

    if len(move_data["stat_changes"]) > 0:
        dfs_stats = []
        for stat in move_data["stat_changes"]:
            dfs_stats.append(
                pd.DataFrame({
                    "move_id": [move_data["id"]],
                    "stat_change_num": [stat["change"]],
                    "stat_change_name": [stat["stat"]["name"]]
                })
            )
        dfs_move_stat_change.append(pd.concat(dfs_stats))

df_moves = pd.concat(dfs_moves)
df_move_stat_change = pd.concat(dfs_move_stat_change)

upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="moves",
    df_object=df_moves
)
upload_df_to_gcs(
    bucket_name=BUCKET_NAME,
    datasource_name=DATASOURCE_NAME,
    layer_name=TRANSLATED_LAYER,
    target_date=TARGET_DATE,
    object_name="move_stat_change",
    df_object=df_move_stat_change
)