In [41]:
pip install -q python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [42]:
pip install -q requests

Note: you may need to restart the kernel to use updated packages.


In [3]:
from dotenv import load_dotenv
import os
load_dotenv()
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")

print(client_id, client_secret, sep='\n')

61eefcc3a0204fd1ac3b0287bc57364b
bbc1cad0f3f74ddaa355b21abed2b343


In [4]:
import base64
from requests import post
import json
from requests import get
import requests

def get_token():
    auth_string = client_id + ':' + client_secret
    auth_bytes = auth_string.encode("utf-8")
    auth_base64 = str(base64.urlsafe_b64encode(auth_bytes), "utf-8")
    
    url = "https://accounts.spotify.com/api/token"
    headers = {
    "Authorization": "Basic " + auth_base64,
    "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {
    "grant_type": "client_credentials",
    "scope": "user-read-private user-read-email"  # Додайте потрібні області дії
    }
    result = post(url, headers=headers, data=data)
    json_result = json.loads(result.content)
    token = json_result["access_token"]
    return token;

def get_auth_header(token):
    return {"Authorization" : "Bearer " + token}

In [5]:
artists_ids = []
with open("../txt/artistsIds.txt", "r", encoding="utf-8") as file:
    for line in file:
        parts = line.strip().split(":")
        artists_ids.append(parts[1].strip())

In [6]:
def get_artist_info(token, artist_id):
    url = f"https://api.spotify.com/v1/artists/{artist_id}"
    headers = get_auth_header(token)

    result = get(url, headers=headers)
    json_result = json.loads(result.content)
    
    if 'error' in json_result:
        return False
            
    artist_name = json_result['name']
    artist_popularity = json_result['popularity']
    artist_image = json_result['images'][1]['url'][24:]
    artist_genres = json_result['genres']
    artist_genres = [genre.replace('ukrainian ', '') for genre in artist_genres]
    artist_genres = [genre.replace('russian ', '') for genre in artist_genres]
    return {
        'artist_id' : artist_id,
        'artist_name' : artist_name,
        'artist_popularity' : artist_popularity,
        'artist_image' : artist_image,
        'artist_genres' : artist_genres
    }

In [7]:
def insert_artist(cursor, artist_data):
    artist_id = artist_data['artist_id']
    name = artist_data['artist_name']
    popularity = artist_data['artist_popularity']
    photo = artist_data['artist_image']

    # Вставка даних в таблицю artists
    insert_artist_query = "INSERT INTO artists (id, name, popularity, photo) VALUES (%s, %s, %s, %s)"
    values = (artist_id, name, popularity, photo)
    cursor.execute(insert_artist_query, values)

def insert_genres(cursor, genres):
    # Перевірка існуючих жанрів у таблиці genres
    select_genres_query = "SELECT name FROM genres"
    cursor.execute(select_genres_query)
    existing_genres = [row[0] for row in cursor.fetchall()]

    # Вставка нових жанрів у таблицю genres
    for genre in genres:
        if genre not in existing_genres:
            insert_genre_query = "INSERT INTO genres (name) VALUES (%s)"
            cursor.execute(insert_genre_query, (genre,))

def insert_artist_genres(cursor, artist_id, genres):
    # Отримання id жанрів з таблиці genres
    select_genre_ids_query = "SELECT id, name FROM genres"
    cursor.execute(select_genre_ids_query)
    genre_ids = {row[1]: row[0] for row in cursor.fetchall()}

    # Вставка зв'язків між артистом і жанрами в таблицю artist_genres
    for genre in genres:
        if genre in genre_ids:
            insert_artist_genre_query = "INSERT INTO artist_genres (id_artist, id_genre) VALUES (%s, %s)"
            values = (artist_id, genre_ids[genre])
            cursor.execute(insert_artist_genre_query, values)

In [8]:
import mysql.connector
from mysql.connector import Error

def fill_artists_info(artists_ids):
    # Параметри підключення до бази даних
    servername = "localhost"
    username = "root"
    password = ""
    database = "muzua"

    try:
        # Підключення до бази даних
        conn = mysql.connector.connect(
            host=servername,
            user=username,
            password=password,
            database=database,
            charset='utf8mb4'
        )

        if conn.is_connected():
            print("Connected successfully")

            # Створення курсора для виконання запиту
            cursor = conn.cursor()

            for artist_id in artists_ids:
                result = get_artist_info(token, artist_id)
                if result == False:
                    token = get_token()
                    result = get_artist_info(token, artist_id)

                # Вставка даних про артиста
                insert_artist(cursor, result)

                # Вставка жанрів
                insert_genres(cursor, result['artist_genres'])

                # Вставка зв'язків між артистом і жанрами
                insert_artist_genres(cursor, result['artist_id'], result['artist_genres'])

            # Підтвердження змін у базі даних
            conn.commit()

            # Закриття курсора
            cursor.close()

    except Error as e:
        print("Error:", e)
    finally:
        if conn.is_connected():
            conn.close()
            print("Connection closed")

In [10]:
from datetime import datetime

def convert_into_date(date):
    try:
        # Спроба перетворити рядок у формат 'YYYY-MM-DD'
        converted_date = datetime.strptime(date, '%Y-%m-%d').date()
        return converted_date.strftime('%Y-%m-%d')
    except ValueError:
        try:
            # Спроба перетворити рядок у формат 'YYYY-MM'
            converted_date = datetime.strptime(date, '%Y-%m').date()
            return converted_date.strftime('%Y-%m-%d')
        except ValueError:
            try:
                # Спроба перетворити рядок у формат 'YYYY'
                converted_date = datetime.strptime(date, '%Y').date()
                return converted_date.strftime('%Y-01-01')
            except ValueError:
                # Встановлення значення None для некоректної дати
                return None

In [11]:
def get_tracks_by_album(token, album_id):
    url = f"https://api.spotify.com/v1/albums/{album_id}/tracks"
    headers = get_auth_header(token)
    params = {
        "limit": 50,
    }

    result = get(url, headers=headers, params=params)
    json_result = json.loads(result.content)

    if 'error' in json_result:
        return get_tracks_by_album(get_token(), album_id)

    tracks_info = []
    for item in json_result['items']:
        track_id = item['id']
        track_name = item['name']
        track_duration_ms = item['duration_ms']
        track_number = item['track_number']
        if item["preview_url"]: track_preview_url = item["preview_url"][30:70]
        else: track_preview_url = None
        track_artists = [artist['id'] for artist in item['artists']]

        tracks_info.append({
            'track_id': track_id,
            'track_name': track_name,
            'track_duration_ms': track_duration_ms,
            'track_number': track_number,
            'track_preview_url': track_preview_url,
            'track_artists': track_artists,
        })

    return tracks_info
#print(len("2719559ab4e852d2e25492980984ad39984c8404"))
#print(len("https://p.scdn.co/mp3-preview/"))
#display(get_tracks_by_album(get_token(), "0PNL8s3ghRN7hj8J3uGJPh"))

In [12]:
def get_albums_tracks_full_info(token, artist_id, group='album,single'):
    url = f"https://api.spotify.com/v1/artists/{artist_id}/albums"
    headers = get_auth_header(token)
    params = {
        "limit": 50,
        "include_groups": group,
    }
    result = get(url, headers=headers, params=params)
    json_result = json.loads(result.content)
    
    if 'error' in json_result:
        return get_albums_tracks_full_info(get_token(), artist_id)
    if 'items' not in json_result or not json_result['items']:
        return None
    
    albums_info = []
    for item in json_result['items']:
        
        album_id = item['id']
        album_name = item['name']
        album_release_date = convert_into_date(item['release_date'])
        album_total_tracks = item['total_tracks']
        album_cover = item['images'][1]['url'][24:]
        album_type = item['album_type']
        
        tracks_info = get_tracks_by_album(token, album_id)
        
        albums_info.append(
            {
                'album_id' : album_id,
                'album_name' : album_name,
                'album_release_date' : album_release_date,
                'album_total_tracks' : album_total_tracks,
                'album_cover' : album_cover,
                'album_type' : album_type, 
                'tracks_info' : tracks_info
            }
        )
    return albums_info

In [13]:
import mysql.connector
from mysql.connector import Error
from datetime import datetime

import mysql.connector
from mysql.connector import Error, IntegrityError

def insert_albums_tracks_full_info(cursor, artist_id, albums_data):
    # ALBUMS
    for album in albums_data:
        """ ------------------ ALBUM DATA ------------------"""
        album_id = album['album_id']
        album_name = album['album_name']
        album_release_date = album['album_release_date']
        album_total_tracks = album['album_total_tracks']
        album_cover = album['album_cover']
        album_type = album['album_type']
        
        """ ------------------ CHECK / INSERT ALBUM DATA ------------------"""
        insert_album_query = """
            INSERT INTO albums (id, name, release_date, album_total_tracks, background, type)
            VALUES (%s, %s, %s, %s, %s, %s)
        """
        check_query = """
            SELECT COUNT(*) 
            FROM albums
            WHERE id = %s
            """
        values = (album_id, album_name, album_release_date, album_total_tracks, album_cover, album_type)
        
        cursor.execute(check_query, (album_id,))
        if cursor.fetchone()[0] == 0:
            cursor.execute(insert_album_query, values)
        """ ------------------ CHECK / INSERT ARTISTS ALBUMS DATA ------------------"""
        insert_artist_albums = """
            INSERT INTO artist_albums (id_artist, id_album)
            VALUES (%s, %s)
        """
        check_query = """
            SELECT COUNT(*) 
            FROM artist_albums
            WHERE id_artist = %s AND id_album = %s
            """
        values = (artist_id, album_id)
        cursor.execute(check_query, values)
        if cursor.fetchone()[0] == 0:
            cursor.execute(insert_artist_albums, values)
        
        # SONGS
        for song in album['tracks_info']:
            """ ------------------ SONG DATA ------------------"""
            track_id = song['track_id']
            track_name = song['track_name']
            track_duration_ms = song['track_duration_ms']
            track_number = song['track_number']
            track_artists = song['track_artists']
            track_preview_url = song['track_preview_url']
            
            """ ------------------ CHECK / INSERT SONG DATA ------------------"""
            insert_song_query = """
                INSERT INTO songs (id, name, duration_ms, track_number, id_album, preview_url_id)
                VALUES (%s, %s, %s, %s, %s, %s)
            """
            check_query = """
                SELECT COUNT(*) 
                FROM songs
                WHERE id = %s
                """
            values = (track_id, track_name, track_duration_ms, track_number, album_id, track_preview_url)

            cursor.execute(check_query, (track_id,))
            if cursor.fetchone()[0] == 0:
                cursor.execute(insert_song_query, values)
            
            """ ------------------ CHECK / INSERT ARTISTS SONGS DATA ------------------"""
            for artist in track_artists:
                insert_artist_songs = """
                    INSERT INTO artist_songs (id_artist, id_song)
                    VALUES (%s, %s)
                """
                check_query = """
                    SELECT COUNT(*) 
                    FROM artist_songs
                    WHERE id_artist = %s AND id_song = %s
                    """
                values = (artist_id, track_id)

                cursor.execute(check_query, values)
                if cursor.fetchone()[0] == 0:
                    cursor.execute(insert_artist_songs, values)

            

In [None]:
def db_insert_albums_tracks_full_info(artists_ids, servername = "localhost", username = "root", password = "", database = "muzua"):
    token = get_token()
    try:
        conn = mysql.connector.connect(
            host=servername,
            user=username,
            password=password,
            database=database,
            charset='utf8mb4'
        )

        if conn.is_connected():
            print("Connected successfully")
            cursor = conn.cursor()
            i=0
            for artist_id in artists_ids:
                result = get_albums_tracks_full_info(token, artist_id)
                if result == False:
                    token = get_token()
                    result = get_albums_tracks_full_info(token, artist_id)
                if result == None or result == False: 
                    print("continue")
                    continue
                insert_albums_tracks_full_info(cursor, artist_id, result)
                conn.commit()
                i+=1
                print(i)
            cursor.close()

    except Error as e:
        print("Error:", e)
    finally:
        if conn.is_connected():
            conn.close()
            print("Connection closed")
            
db_insert_albums_tracks_full_info(artists_ids)

In [26]:
import spotipy
from spotipy import SpotifyOAuth

CLIENT_ID = client_id
CLIENT_SECRET = client_secret

# Авторизація та отримання токену доступу
sp_oauth = SpotifyOAuth(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, redirect_uri="http://localhost:8888/callback")
token = sp_oauth.get_access_token()

# Створення об'єкта Spotify
sp = spotipy.Spotify(auth_manager=sp_oauth)

# Відтворення пісні
track_uri = "spotify:track:7nJgAGJcnQyqowaCHDVBlm"  # Ідентифікатор пісні на Spotify
sp.start_playback(uris=[track_uri])

  token = sp_oauth.get_access_token()


PermissionError: [WinError 10013] Сделана попытка доступа к сокету методом, запрещенным правами доступа