Connessione a spotipy

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import json
import os 

# Configura le tue credenziali Spotify
client_id = os.getenv('YOUR_CLIENT_ID')
client_secret = os.getenv('YOUR_CLIENT_SECRET')
redirect_uri = os.getenv('YOUR_REDIRECT_URI')

scope = "user-library-read playlist-read-private"

sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=client_id,
                                               client_secret=client_secret,
                                               redirect_uri=redirect_uri,
                                               scope=scope))

In [None]:
def get_audio_features_and_analysis(track_id):
    features = sp.audio_features(track_id)[0]
    analysis = sp.audio_analysis(track_id)
    
    return {
        'danceability': features['danceability'],
        'energy': features['energy'],
        'key': features['key'],
        'loudness': features['loudness'],
        'mode': features['mode'],
        'speechiness': features['speechiness'],
        'acousticness': features['acousticness'],
        'instrumentalness': features['instrumentalness'],
        'liveness': features['liveness'],
        'valence': features['valence'],
        'tempo': features['tempo'],
        'duration_ms': features['duration_ms'],
        'time_signature': features['time_signature'],
        'segments': len(analysis['segments']),
        'bars': len(analysis['bars']),
        'beats': len(analysis['beats']),
        'sections': len(analysis['sections']),
        'tatums': len(analysis['tatums'])
    }

In [None]:
# Create a directory to store the JSON files if it doesn't exist
if not os.path.exists('temp_data'):
    os.makedirs('temp_data')

# Save each batch of songs to a separate JSON file
def save_batch(batch, batch_number):
    filename = f'temp_data/songs_batch_{batch_number}.json'
    with open(filename, 'w') as f:
        json.dump(batch, f)
    print(f"Saved batch {batch_number} to {filename}")

# In your main loop where you process songs:
batch_size = 20
current_batch = []
batch_number = 1

for item in results['items']:
    # Process the track and get audio features/analysis as before
    track = item['track']
    track_id = track['id']
    audio_info = get_audio_features_and_analysis(track_id)
    
    song_data = {
        "uri": track['uri'],
        "name": track['name'],
        "artist": track['artists'][0]['name'],
        "album": track['album']['name'],
        "popularity": track['popularity'],
        "added_at": item['added_at'],
        **audio_info
    }
    
    current_batch.append(song_data)
    
    if len(current_batch) == batch_size:
        save_batch(current_batch, batch_number)
        current_batch = []
        batch_number += 1

# Save any remaining songs in the last batch
if current_batch:
    save_batch(current_batch, batch_number)

In [None]:
from pymongo import MongoClient
from pymongo.server_api import ServerApi
import os

mongo_uri = os.getenv('YOUR_MONGO_URI')
# Connect to MongoDB
client = MongoClient(mongo_uri, server_api=ServerApi('1'))
db = client['portfolio-db']
collection = db['songs']
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

In [None]:
# Function to upload a batch of songs
def upload_batch(filename):
    with open(filename, 'r') as f:
        songs = json.load(f)
    
    result = collection.insert_many(songs)
    print(f"Uploaded {len(result.inserted_ids)} songs from {filename}")

# Upload all batches
for filename in os.listdir('temp_data'):
    if filename.endswith('.json'):
        upload_batch(os.path.join('temp_data', filename))

print("All data uploaded to MongoDB")

Le altre cose da importare oltre ai saved tracks

In [None]:
def get_liked_songs(sp):
    results = sp.current_user_saved_tracks()
    liked_songs = []
    while results:
        for item in results['items']:
            track = item['track']
            liked_songs.append(track)
        if results['next']:
            results = sp.next(results)
        else:
            results = None
    return liked_songs

def get_liked_albums(sp):
    results = sp.current_user_saved_albums()
    liked_albums = []
    while results:
        for item in results['items']:
            album = item['album']
            for track in album['tracks']['items']:
                liked_albums.append(track)
        if results['next']:
            results = sp.next(results)
        else:
            results = None
    return liked_albums

def get_playlist_tracks(sp):
    results = sp.current_user_playlists()
    playlist_tracks = []
    while results:
        for playlist in results['items']:
            tracks = sp.playlist_tracks(playlist['id'])
            for item in tracks['items']:
                track = item['track']
                playlist_tracks.append(track)
        if results['next']:
            results = sp.next(results)
        else:
            results = None
    return playlist_tracks

liked_songs = get_liked_songs(sp)
liked_albums = get_liked_albums(sp)
playlist_tracks = get_playlist_tracks(sp)

all_tracks = liked_songs + liked_albums + playlist_tracks

# Rimuovi duplicati basati sull'ID della canzone
unique_tracks = {track['id']: track for track in all_tracks}.values()

# Stampa i titoli delle canzoni
for track in unique_tracks:
    print(track['name'], '-', track['artists'][0]['name'])
