## JetBrains-YT-Test

Создаём конфигурацию, импортируем библиотеки Google API:

In [45]:
# -*- coding: utf-8 -*-

# Sample Python code for youtube.channels.list
# See instructions for running these code samples locally:
# https://developers.google.com/explorer-help/guides/code_samples#python

import os

import googleapiclient.discovery
import googleapiclient.errors

scopes = ["https://www.googleapis.com/auth/youtube.readonly"]
api_key = open('api_key.txt', "r").read()
api_service_name = "youtube"
api_version = "v3"

# Get credentials and create an API client
youtube = googleapiclient.discovery.build(
        api_service_name, api_version, developerKey=api_key)

Функции для получения данных с сервера:

In [46]:
def get_channel_info(channel_id):
    request = youtube.channels().list(
        part="snippet,contentDetails,statistics",
        id=channel_id
    )
    response = request.execute()

    return response['items'][0]



def get_playlist_info(playlist_id):
    request = youtube.playlistItems().list(
        part="contentDetails",
        playlistId=playlist_id
    )
    response = request.execute()

    return response

def get_playlist_iterating(playlist_id, page_token=None):
    if page_token:
        request = youtube.playlistItems().list(
            part="contentDetails",
            playlistId=playlist_id,
            pageToken=page_token
        )
    else:
        request = youtube.playlistItems().list(
        part="contentDetails",
        playlistId=playlist_id
    )
    response = request.execute()

    return response

def get_video_info(video_id_list):
    request = youtube.videos().list(
        part="snippet,contentDetails,statistics",
        id=",".join(video_id_list)
    )
    response = request.execute()

    return response['items']

Функции для записи в БД:

In [47]:
import logging
from datetime import datetime
from sqlite_utils import Database
from sqlite_utils.db import NotFoundError
from json.decoder import JSONDecodeError

db = Database("rossiya24.db")
#db = Database("tvrain.db")

def video_table_init():
    video_db_table = db.table("video_table")
    if not video_db_table.exists():
        video_db_table.create({
            "video_id": str,
            "title": str,
            "tags_list": str,
            "category_id": int,
            "length": str,
            "publish_date": str
        }, pk="video_id")
        logging.log(logging.WARNING, "db->video_table was created")


def state_table_init():
    table_name = datetime.utcnow().strftime("%Y-%m-%dT%H-%M")
    new_table = db.table(table_name)
    if not new_table.exists():
        new_table.create({
            "video_id": str,
            "views": int,
            "likes": int,
            "dislikes": int,
            "comments": int,
            "popular_comment_likes": int
        }, pk="video_id")
        logging.log(logging.WARNING, "db->%s table was created" % table_name)
    return table_name

def db_insert_video(video_info):
    video_db_table = db.table("video_table")
    snippet = video_info["snippet"]
    
    video_id = video_info.get("id")
    title = snippet.get("title")
    publish_date = snippet.get("publishedAt")
    length = video_info["contentDetails"].get("duration")
    tags = ",".join(snippet.get("tags",[]))
    category = int(snippet.get("categoryId"))

    try:
        video_db_table.insert({
            "video_id": video_id,
            "title": title,
            "tags_list": tags,
            "category_id": category,
            "publish_date": publish_date,
            "length": length
        })
    except:
        pass

def db_set_state(table_time, video_info):
    state_table = db.table(table_time)
    statistics = video_info.get("statistics")
    
    views = statistics.get("viewCount")
    likes = statistics.get("likeCount")
    dislikes = statistics.get("dislikeCount")
    comments = statistics.get("commentCount")
    try:
        state_table.insert({
                "video_id": video_info["id"],
                "views": views,
                "likes": likes,
                "dislikes": dislikes,
                "comments": comments})
    except:
        logging.log(logging.WARNING, "video error %s" % video_info['id'])


test functions

In [48]:
channel_info = get_channel_info("UC_IEcnNeHc_bwd92Ber-lew") # rossiya-24
#channel_info = get_channel_info("UCdubelOloxR3wzwJG9x8YqQ") # tvrain
playlist_id = channel_info["contentDetails"]["relatedPlaylists"]["uploads"]

print("Videos count:", channel_info["statistics"]["videoCount"])
upload_playlist = get_playlist_iterating(playlist_id)
table_name = state_table_init()


def parse_from_playlist(upload_playlist):
    video_id_list = []
    for video in upload_playlist["items"]:
        video_id_list.append(video["contentDetails"]["videoId"])
    video_info_list = get_video_info(video_id_list)

    video_table_init()

    for video in video_info_list:
        db_insert_video(video)
        db_set_state(table_name, video)

parse_from_playlist(upload_playlist)

num_playlists = 1
while "nextPageToken" in upload_playlist and num_playlists < 50:
    token = upload_playlist["nextPageToken"]
    upload_playlist = get_playlist_iterating(playlist_id, token)
    parse_from_playlist(upload_playlist)
    num_playlists += 1






Videos count: 219929
