In [1]:
import os
from pathlib import Path
from google.oauth2 import service_account
import googleapiclient
from dotenv import load_dotenv
import json
from typing import List, Union
from datetime import datetime, timezone
import gzip

load_dotenv()

True

In [None]:
def create_service_account_credentials(api_config: dict) -> service_account.Credentials:
    return service_account.Credentials.from_service_account_file(
        filename=api_config["key_file_path"],
        scopes=api_config["scopes"]
    )

def create_youtube_api_resource(
    api_config: dict,
    credentials: service_account.Credentials
) -> googleapiclient.discovery.Resource:
    return googleapiclient.discovery.build(
        serviceName=api_config["api_service_name"], 
        version=api_config["api_version"], 
        credentials=credentials
    )


def get_channel_info(
        youtube_resource: googleapiclient.discovery.Resource = None, 
        id: str = None
    ) -> dict:

    request = youtube_resource.channels().list(
        part="snippet,contentDetails,statistics,topicDetails,status",
        id=id
    )
    return request.execute()


def parse_channel_uploads_playlist_id(response_channel: dict) -> str:
    return response_channel["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]


def paginate_all_channel_uploads(
    youtube_resource: googleapiclient.discovery.Resource, 
    uploads_playlist_id: str
) -> List[dict]:
    def get_uploads_playlist_items(next_page_token: str = None):
        request = youtube_resource.playlistItems().list(
            part="snippet,contentDetails,status",
            maxResults=50,
            playlistId=uploads_playlist_id,
            pageToken=next_page_token
        )
        response_playlist_items = request.execute()

        return response_playlist_items

    video_metadata = []
    response_playlist_items = get_uploads_playlist_items()
    video_metadata.extend(response_playlist_items["items"])

    while response_playlist_items.get("nextPageToken"):
        response_playlist_items = get_uploads_playlist_items(response_playlist_items.get("nextPageToken"))
        video_metadata.extend(response_playlist_items["items"])

    return video_metadata


def identify_project_root_dir() -> Path:
    return Path(__file__).resolve().parent.parent.parent.parent


def create_abs_file_path(channel_folder_name: str, data_source: str) -> str:
    return f"{identify_project_root_dir()}/datasets/{channel_folder_name}/youtube_api/raw/{data_source}/{datetime.now().strftime('%Y-%m-%d')}.json.gz"
    

def create_file_path_if_doesnt_exist(file_path: str) -> None:
    os.makedirs(os.path.dirname(file_path), exist_ok=True)


def write_object_to_json_gzip_file(object: Union[dict, List[dict]], file_path: str) -> None:
    create_file_path_if_doesnt_exist(file_path)
    with gzip.open(file_path, "wt", encoding="utf-8") as f:
        json.dump(object, f, indent=4)

In [None]:
api_config = {
    "scopes": [
        "https://www.googleapis.com/auth/youtube.readonly"
    ],
    "api_service_name": "youtube",
    "api_version": "v3",
    "key_file_path": "key_youtube-stats-459404-eefde03eff46.json"
}

credentials = create_service_account_credentials(api_config)
youtube_resource = create_youtube_api_resource(api_config, credentials)

# identify channel
lirik_plays_channel_id = "UCebh6Np0l-DT9LXHrXbmopg"
response_channel = get_channel_info(youtube_resource, lirik_plays_channel_id)

# # identify uploads playlist, pull all videos
# uploads_playlist_id = parse_channel_uploads_playlist_id(response_channel)
# all_video_metadata = paginate_all_channel_uploads(youtube_resource, uploads_playlist_id)

In [None]:
# write raw channel & video data as json to code repo

# # channel
# write_object_to_json_gzip_file(response_channel, create_abs_file_path("lirik_plays", "channel"))

# # video
# write_object_to_json_gzip_file(all_video_metadata, create_abs_file_path("lirik_plays", "video"))


In [None]:
from pathlib import Path


def get_workflow_runs(file_path: str) -> Union[None, List[dict]]:
    # if file path doesnt exist, return
    if not Path(file_path).is_file():
        return

    with gzip.open(file_path, "rt", encoding="utf-8") as f:
        return json.load(f)
    

def create_workflow_run_log(
    channel_id: str,
    channel_folder_name: str,
    key_file_path: str,
    created_at: str,
):
    return {
        "id": 0,
        "channel_id": channel_id,
        "channel_folder_name": channel_folder_name,
        "key_file_path": key_file_path,
        "created_at": created_at
    }
    

def increment_log_id(
    new_workflow_run: dict,
    workflow_runs: List[dict]
) -> dict:
    new_workflow_run["id"] = workflow_runs[-1]["id"] + 1
    return new_workflow_run
    

def update_workflow_runs(
    new_workflow_run: dict,
    workflow_runs: Union[None, List[dict]]
) -> List[dict]:
    if not workflow_runs:
        return [new_workflow_run]

    new_workflow_run = increment_log_id(new_workflow_run, workflow_runs)
    return workflow_runs + [new_workflow_run]

In [306]:
import uuid
str(uuid.uuid4())

def create_workflow_run_log(
    id: str,
    channel_id: str,
    channel_folder_name: str,
    key_file_path: str,
    created_at: str,
):
    return {
        "id": id,
        "channel_id": channel_id,
        "channel_folder_name": channel_folder_name,
        "key_file_path": key_file_path,
        "created_at": created_at
    }

In [310]:
create_workflow_run_log(
    str(uuid.uuid4()),
    "abc",
    "lirik_plays",
    "asdsadsada.json",
    datetime.now(timezone.utc).strftime('%Y-%m-%d_%H-%M-%S_%z')
)

{'id': '670cecf4-fe3a-4d1b-8da4-808b53c6de07',
 'channel_id': 'abc',
 'channel_folder_name': 'lirik_plays',
 'key_file_path': 'asdsadsada.json',
 'created_at': '2025-05-14_18-54-55_+0000'}

In [296]:
from datetime import datetime, timezone

In [297]:
datetime.now(timezone.utc)

datetime.datetime(2025, 5, 14, 18, 2, 56, 732067, tzinfo=datetime.timezone.utc)

In [304]:
datetime.now(timezone.utc).strftime('%Y-%m-%d_%H-%M-%S_%z')

'2025-05-14_18-05-37_+0000'

In [299]:
aware_local_now = datetime.now(timezone.utc).astimezone()
print(aware_local_now)

2025-05-14 12:03:20.442521-06:00


In [295]:
aware_local_now = datetime.now(timezone.utc).astimezone()
print(aware_local_now)

2025-05-14 12:02:46.254871-06:00


In [309]:
# def refresh_workflow_runs(
#     file_path: str,
#     channel_id: str,
#     channel_folder_name: str,
#     key_file_path: str,
#     run_date: str,
# ):
#     logs = get_workflow_runs(file_path)
#     log = create_workflow_run(
#         channel_id=channel_id,
#         channel_folder_name=channel_folder_name,
#         key_file_path=key_file_path,
#         run_date=run_date
#     )
#     logs = update_workflow_runs(log, logs)

#     with gzip.open(file_path, "wt", encoding="utf-8") as f:
#         json.dump(logs, f, indent=4)

In [305]:
# logs = get_workflow_runs("logs.json.gz")
# log = create_workflow_run(
#     channel_id="UCebh6Np0l-DT9LXHrXbmopg",
#     channel_folder_name="lirik_plays",
#     key_file_path="key_youtube-stats-459404-eefde03eff46.json",
#     run_date=datetime.now().strftime('%Y-%m-%d ')
# )
# logs = update_workflow_runs(log, logs)

# with gzip.open("logs.json.gz", "wt", encoding="utf-8") as f:
#     json.dump(logs, f, indent=4)

# print(json.dumps(log, indent=4))
# print(json.dumps(logs, indent=4))
