In [None]:
import os
import json
import requests
import boto3
from boto3.dynamodb.conditions import Key, Attr
from dotenv import load_dotenv
dynamodb = boto3.resource('dynamodb')

In [None]:
load_dotenv()
API_KEY = os.getenv('YOUTUBE_API_KEY')

In [None]:
def get_query_string(query):
  query = { **query, "key": API_KEY }
  container = []
  for i in query.items():
    container.append(f"{i[0]}={i[1]}")
  query_string = f"?{'&'.join(container)}"
  return query_string

In [None]:
def get_lists_all(base_url, query):
  query_string = get_query_string(query)
  url = f"{base_url}{query_string}"
  res = requests.get(url)
  if not res.ok:
    print(res.status_code)
    return []
  data = json.loads(res.content)
  lists = [
    {
      "title": x['snippet']['title'],
      "id": x['id']
    } for x in data['items']
  ]
  if "nextPageToken" in data:
    next_query = { **query, 'pageToken': data["nextPageToken"] }
    return [*lists, *get_lists_all(base_url, next_query)]
  return lists

In [None]:
def get_playlists():
  base_url = 'https://www.googleapis.com/youtube/v3/playlists'
  query = {
    "part": "snippet",
    "channelId": "UCU5LUUBNDzGOeAri5Wk8w5Q",
    "maxResults": 50
  }  
  lists = get_lists_all(base_url, query)
  lists = [x for x in lists if x['title'].startswith('BGM')]
  lists = sorted(lists, key=lambda x: x['title'])
  return lists

In [None]:
# playlists = get_playlists()

In [None]:
def get_list_items_all(base_url, query):
  query_string = get_query_string(query)
  url = f"{base_url}{query_string}"
  res = requests.get(url)
  if not res.ok:
    print(res.status_code)
    return []
  data = json.loads(res.content)
  list_items = [
    {
      "title": x['snippet']['title'],
      "id": x['snippet']['resourceId']['videoId'],
      "channel": x['snippet']['videoOwnerChannelTitle'] if 'videoOwnerChannelTitle' in x['snippet'] else 'unavailable',
      "channel_id": x['snippet']['videoOwnerChannelId'] if 'videoOwnerChannelTitle' in x['snippet'] else 'unavailable',
      "from": "list",
      "active": True if 'videoOwnerChannelTitle' in x['snippet'] else False
    } for x in data['items']
  ]
  if "nextPageToken" in data:
    next_query = { **query, 'pageToken': data["nextPageToken"] }
    return [*list_items, *get_list_items_all(base_url, next_query)]
  return list_items

In [None]:
def get_playlist_items(playlist_id):
  base_url = 'https://www.googleapis.com/youtube/v3/playlistItems'
  query = {
    "part": "snippet",
    "playlistId": playlist_id,
    "maxResults": 50
  }
  list_items = get_list_items_all(base_url, query)
  return list_items

In [None]:
# lists_items = [get_playlist_items(x['id']) for x in playlists]
# flatten_items = sum(lists_items, [])
# total_items = [dict(t) for t in {tuple(d.items()) for d in flatten_items}]

In [None]:
# table_total = dynamodb.Table('total')
# with table_total.batch_writer() as batch:
#   for item in total_items:
#     batch.put_item(
#       Item=item
#     )

In [None]:
table_total = dynamodb.Table('total')
table_monthly = dynamodb.Table('monthly')

In [None]:
# with table_monthly.batch_writer() as batch:
#   for idx, items in enumerate(lists_items):  
#     row = {
#       **playlists[idx],
#       "items": items
#     }
#     batch.put_item(
#       Item=row
#     )

In [None]:
def get_item_index(id, items):
  for idx, item in enumerate(items):
    if item['id'] == id:
      return idx
  return -1

In [None]:
def get_update_params(body):
  update_expression = ["set "]
  update_values = dict()

  for key, val in body.items():
      update_expression.append(f" {key} = :{key},")
      update_values[f":{key}"] = val

  return "".join(update_expression)[:-1], update_values

In [None]:
def update_song_in_monthly(monthly_id, song_id, update_values):
  res_monthly = table_monthly.scan(
    FilterExpression=Attr('id').eq(monthly_id)
  )['Items'][0]
  items = res_monthly['items']
  item_index = get_item_index(song_id, items)
  prev_item = items.pop(item_index)
  new_item = {
    **prev_item,
    **update_values
  }
  items.insert(item_index, new_item)
  table_monthly.put_item(
    Item={
      **res_monthly,
      'items': items
    }
  )

In [None]:
def update_song_in_total(song_id, update_values):
  a, v = get_update_params(update_values)
  table_total.update_item(
    Key={ 'id': song_id },
    UpdateExpression=a,
    ExpressionAttributeValues=dict(v)
  )

In [56]:
def update_song(monthly_id, song_id, update_values):
  update_song_in_monthly(monthly_id, song_id, update_values)
  update_song_in_total(song_id, update_values)

In [60]:
update_song('PLj63tGIo1sSoJptRycAjrMxNVLatBejpp', 'b12-WUgXAzg', { 'active': False })

In [61]:
res_monthly = table_monthly.scan(
  FilterExpression=Attr('id').eq('PLj63tGIo1sSoJptRycAjrMxNVLatBejpp')
)['Items'][0]
items = res_monthly['items']
items[3]

{'channel': 'Author wind - Topic',
 'active': False,
 'from': 'list',
 'id': 'b12-WUgXAzg',
 'title': 'Fairy Tale',
 'channel_id': 'UCeieSudBwInJNTbW0ylHfGg'}

In [62]:
res_total = table_total.scan(
  FilterExpression=Attr('id').eq('b12-WUgXAzg')
)['Items']
print(res_total)

[{'active': False, 'from': 'list', 'channel_id': 'UCeieSudBwInJNTbW0ylHfGg', 'id': 'b12-WUgXAzg', 'channel': 'Author wind - Topic', 'title': 'Fairy Tale'}]


In [None]:
total_id_list = [x for x in res_total]