In [None]:
# Assignment2: data cleaning
# Given the information collected in the previous assignment, address the problem
# related to the missing data (if any) and integrate the additional data (if any).

In [2]:
import json
import csv

In [3]:
# Load JSON data from a file
with open('tracks.json', 'r', encoding='utf-8') as json_file:
    data = json.load(json_file)

# Ensure it's a list of records
if isinstance(data, dict):
    
    data = [data]

# Write to CSV
with open('output.csv', 'w', newline='', encoding='utf-8') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=data[0].keys())
    writer.writeheader()
    writer.writerows(data)

print("Conversion complete. CSV file saved as 'output.csv'.")

Conversion complete. CSV file saved as 'output.csv'.


In [4]:
import xml.etree.ElementTree as ET
import csv

# Load and parse the XML file
tree = ET.parse('artists.xml')
root = tree.getroot()

# Extract all rows
rows = root.findall('row')

# Get all unique tags from the first row (assuming all rows have same structure)
headers = [elem.tag for elem in rows[0]]

# Write to CSV
with open('output2.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerow(headers)  # write header

    for row in rows:
        writer.writerow([row.find(tag).text if row.find(tag) is not None else '' for tag in headers])

print("XML data has been successfully converted to output2.csv.")

XML data has been successfully converted to output2.csv.


In [5]:
#Remove the row where 'lyrics' is null

input_file = 'output.csv'
output_file = 'tracks.csv'
columnToCheck = 'lyrics'


def cleanMissingData(input_file, output_file, columnToCheck):
    """
    Reads a CSV file, removes the rows with missing values 
    in the specified column, and saves the result to a new CSV.
    """
    removed_rows = 0
    missingID = ['NaN', 'nan', '']

    with open(input_file, mode='r', newline='', encoding='utf-8') as infile:
        # DictReader to read the data as dictionary (easier to handle columns)
        reader = csv.DictReader(infile)
        campi = reader.fieldnames # Get the headers of the original file

        # Open output file to write cleaned data
        with open(output_file, mode='w', newline='', encoding='utf-8') as outfile:
            writer = csv.DictWriter(outfile, fieldnames=campi)
            writer.writeheader() # Scrivi l'intestazione nel nuovo file

            # Itera su ogni riga del file originale
            for riga in reader:
                lyricsValue = riga.get(columnToCheck, '') # Prendi il valore di 'lyrics'

                # CLEANING STEP: check if the value is missing 
                # Applica .strip() per pulire eventuali spazi bianchi extra
                if lyricsValue.strip() not in missingID:
                    writer.writerow(riga)
                else:
                    removed_rows += 1

    print(f" Cleaning completed. Data saved to: {output_file}")
    print(f" Rows removed with missing '{columnToCheck}': {removed_rows}")
 

# Esegui la funzione
cleanMissingData(input_file, output_file, columnToCheck)

 Cleaning completed. Data saved to: tracks.csv
 Rows removed with missing 'lyrics': 3


In [7]:
import re # Used for advanced splitting and cleaning
# --- Configuration ---
input_file1 = 'tracks.csv' # File from the previous cleaning step
output_file1 = 'audioCleanData.csv'
columnsToFill = ['n_sentences', 'n_tokens', 'char_per_tok', 'avg_token_per_clause']

# --- Core Logic Functions ---

def calculate_metrics(lyrics: str) -> dict:
    """
    Calculates the linguistic metrics based on the song lyrics.
    This function uses simple string manipulation as external libraries are forbidden.
    
    Args:
        lyrics: The string containing the full song lyrics.
        
    Returns:
        A dictionary with the calculated values for the missing columns.
    """
    if not lyrics or lyrics.strip() == '':
        # Return zeros if lyrics are missing, though this should be rare for the 76 rows we target, we cleaned before.
        return {
            'n_sentences': 0, 
            'n_tokens': 0, 
            'char_per_tok': 0.0, 
            'avg_token_per_clause': 0.0
        }
    # 1. Tokenization (Counting Words/Tokens) --> to fill 'n_tokens'
    # Use a regex to split the text by any sequence of non-alphanumeric characters (including spaces, punctuation, etc.)
    # and filter out empty strings resulting from the split.
    words = [token for token in re.split(r'[^a-zA-Z0-9]+', lyrics) if token]
    n_tokens = len(words)

    # 2. Character Count
    total_chars = sum(len(word) for word in words)

    # 3. Sentence Count
    # Assuming sentences are separated by newline characters (the simplest heuristic)
    # This might overestimate or underestimate the true sentence count.
    sentences = [s for s in lyrics.split('\n') if s.strip() != '']
    n_sentences = len(sentences)
    
    # 4. Calculate Derived Metrics
    
    # Average characters per token
    char_per_tok = total_chars / n_tokens if n_tokens > 0 else 0.0
    
    # Average tokens per clause (using sentences as clauses)
    avg_token_per_clause = n_tokens / n_sentences if n_sentences > 0 else 0.0
    
    return {
        'n_sentences': n_sentences, 
        'n_tokens': n_tokens, 
        'char_per_tok': round(char_per_tok, 4), # Rounding for clean output
        'avg_token_per_clause': round(avg_token_per_clause, 4)
    }

# --- Main Processing ---

def fill_missing_metrics(input_file: str, output_file: str):
    """
    Main function to read the CSV, fill the missing metrics using lyrics, and write the new CSV.
    """
    updated_rows_count = 0
    
    try:
        with open(input_file, mode='r', newline='', encoding='utf-8') as infile:
            reader = csv.DictReader(infile)
            fieldnames = reader.fieldnames # Get the original header
            
            # Read all rows into memory for easier manipulation (assuming the file is not excessively large)
            data = list(reader)
            
    except FileNotFoundError:
        print(f"ERROR: Input file not found at {input_file}")
        return

    # Process Data
    for row in data:
        # Check if the row is one of the 76 missing ones (assuming missing values were left as empty strings)
        # We also check if 'lyrics' is NOT missing, otherwise we cannot calculate
        missing_values = ['', 'NaN', 'nan', None]
        is_metrics_missing = any(row.get(col) in missing_values for col in columnsToFill)
        has_lyrics = row.get('lyrics', '').strip() not in ['', 'NaN', 'nan']
        
        if is_metrics_missing and has_lyrics:
            
            # Calculate the new metrics using the available lyrics
            new_metrics = calculate_metrics(row['lyrics'])
            
            # Update the row with the calculated values
            for col, value in new_metrics.items():
                row[col] = str(value) # CSV writers expect string values
            
            updated_rows_count += 1
            
        elif is_metrics_missing and not has_lyrics:
             # In a production setting, you'd log or handle this case. 
             # Since we only removed 3 'lyrics' rows, this shouldn't affect the other 76 rows.
             pass 

    # Write the updated data to the new file
    with open(output_file, mode='w', newline='', encoding='utf-8') as outfile:
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

    print(f" Data imputation completed.")
    print(f" Total rows updated with calculated metrics: {updated_rows_count}")
    print(f" Cleaned and imputed data saved to: {output_file}")

# Execute the main function
fill_missing_metrics(input_file1, output_file1)

 Data imputation completed.
 Total rows updated with calculated metrics: 73
 Cleaned and imputed data saved to: audioCleanData.csv


In [19]:
import urllib.request
import urllib.parse
import base64
import time

# --- Configuration and Constants ---

INPUT_FILE = 'audioCleanData.csv' 
OUTPUT_FILE = 'DataPostSpotifyAPI.csv'
MISSING_VALUES = ['', 'NaN', 'nan'] 

# !!! REPLACE THESE WITH YOUR ACTUAL SPOTIFY CREDENTIALS !!!
SPOTIFY_CLIENT_ID = "5838810d86504d6bbc6947b39b82d8f7" 
SPOTIFY_CLIENT_SECRET = "b467db2aef154993a7610f779203bc87" 

# Spotify API Endpoints
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_SEARCH_URL = "https://api.spotify.com/v1/search"
SPOTIFY_FEATURES_URL = "https://api.spotify.com/v1/audio-features"

# Columns to be Imputed
AUDIO_COLUMNS = ['bpm', 'loudness', 'pitch', 'flux', 'rms', 'rolloff', 'flatness', 'spectral_complexity']
ALBUM_COLUMNS = ['album_name', 'album_release_date', 'album_type', 'disc_number', 'track_number', 'duration_ms', 'explicit', 'popularity', 'id_album']
LANGUAGE_COLUMN = 'language' 
ALL_EXTERNAL_COLUMNS = AUDIO_COLUMNS + ALBUM_COLUMNS + [LANGUAGE_COLUMN]

# --- 1. Spotify Authentication ---

def get_spotify_token(client_id: str, client_secret: str) -> str | None:
    """
    Obtains an Access Token from Spotify using the Client Credentials Flow.
    This token is required for all subsequent API requests.
    """
    try:
        # Base64 encode the Client ID and Secret for authentication header
        auth_string = f"{client_id}:{client_secret}"
        encoded_auth = base64.b64encode(auth_string.encode('utf-8')).decode('utf-8')

        headers = {
            "Authorization": f"Basic {encoded_auth}",
            "Content-Type": "application/x-www-form-urlencoded"
        }
        data = urllib.parse.urlencode({'grant_type': 'client_credentials'}).encode('utf-8')

        req = urllib.request.Request(SPOTIFY_TOKEN_URL, data=data, headers=headers, method='POST')
        
        with urllib.request.urlopen(req) as response:
            token_info = json.loads(response.read().decode('utf-8'))
            return token_info.get('access_token')
            
    except urllib.error.HTTPError as e:
        print(f"Error getting Spotify token: HTTP {e.code} - {e.reason}")
        return None
    except Exception as e:
        print(f"Error getting Spotify token: {e}")
        return None

# --- 2. Spotify Data Fetching ---

def fetch_spotify_data(title: str, artist: str, token: str) -> dict:
    """
    Searches Spotify for the track ID and then fetches its metadata and audio features.
    
    Returns:
        A dictionary containing track metadata and audio features, or {} on failure.
    """
    headers = {"Authorization": f"Bearer {token}"}
    track_id = None
    
    # --- PHASE A: Search for Track ID ---
    
    try:
        search_query = f"track:{title} artist:{artist}"
        encoded_query = urllib.parse.quote(search_query)
        search_url = f"{SPOTIFY_SEARCH_URL}?q={encoded_query}&type=track&limit=1"
        
        req = urllib.request.Request(search_url, headers=headers)
        with urllib.request.urlopen(req) as response:
            search_results = json.loads(response.read().decode('utf-8'))
        
        items = search_results.get('tracks', {}).get('items', [])
        
        if not items:
            return {} # Track not found
        
        # Extract ID and initial metadata
        track_info = items[0]
        track_id = track_info['id']
        album_info = track_info.get('album', {})
        
        # Prepare initial data dictionary
        data = {
            'duration_ms': track_info.get('duration_ms'),
            'explicit': str(track_info.get('explicit')),
            'popularity': track_info.get('popularity'),
            'album_name': album_info.get('name'),
            'album_release_date': album_info.get('release_date'),
            'album_type': album_info.get('album_type'),
            'disc_number': track_info.get('disc_number'),
            'track_number': track_info.get('track_number'),
            'id_album': album_info.get('id'),
            # Language is not reliably available via search endpoint, handle later
        }

    except urllib.error.HTTPError as e:
        # Rate limit or other HTTP error
        print(f"DEBUG: Search failed for '{title}': HTTP {e.code}")
        if e.code == 429: # Too Many Requests
            print("WARNING: Spotify Rate Limit hit. Pausing for 5 seconds...")
            time.sleep(5)
        return {}
    except Exception:
        return {} # Parsing or network error

    # --- PHASE B: Fetch Audio Features using Track ID ---

    try:
        features_url = f"{SPOTIFY_FEATURES_URL}/{track_id}"
        req = urllib.request.Request(features_url, headers=headers)
        with urllib.request.urlopen(req) as response:
            audio_features = json.loads(response.read().decode('utf-8'))
        
        # Impute audio features, mapping Spotify names to CSV column names
        data['bpm'] = audio_features.get('tempo')
        data['loudness'] = audio_features.get('loudness')
        data['pitch'] = audio_features.get('key') # 'key' represents pitch class (0-11)
        # Note: Spotify does not provide 'flux', 'rms', 'flatness', 'spectral_complexity', 
        # or 'rolloff' directly. These are usually derived from raw audio analysis.
        # We will leave these as missing if Spotify doesn't have a direct equivalent.
        
        # --- Handle non-available columns (as required) ---
        # If Spotify cannot provide these, they remain missing (NaN/empty string).
        
    except Exception:
        # Audio feature fetching failed (e.g., track is missing features)
        pass 
        
    return data # Returns the gathered data (potentially incomplete)

# --- 3. Main Imputation Logic ---

def impute_external_data_spotify(input_file: str, output_file: str):
    """
    Implements the conservative imputation using the Spotify API.
    Only fills the fields if data is successfully fetched; otherwise, preserves original value.
    """
    print("--- Starting Spotify API Imputation ---")
    
    # 1. Get Spotify Access Token
    spotify_token = get_spotify_token(SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET)
    if not spotify_token:
        print("FATAL: Could not retrieve Spotify token. Cannot proceed with API calls.")
        return

    imputed_count = 0
    data_to_write = []

    try:
        with open(input_file, mode='r', newline='', encoding='utf-8') as infile:
            reader = csv.DictReader(infile)
            fieldnames = reader.fieldnames
            data = list(reader) 
            
    except FileNotFoundError:
        print(f"ERROR: Input file not found at {input_file}.")
        return

    # 2. Iterate and Impute
    for i, row in enumerate(data):
        # Check if ANY external column is missing
        is_missing_external = any(row.get(col, '').strip().lower() in MISSING_VALUES for col in ALL_EXTERNAL_COLUMNS)
        
        if is_missing_external and row.get('title') and row.get('primary_artist'):
            
            # Fetch data (will be {} if not found)
            external_data = fetch_spotify_data(row['title'], row['primary_artist'], spotify_token)
            updated = False
            
            # Impute the missing fields ONLY IF external_data has a specific value
            for col in ALL_EXTERNAL_COLUMNS:
                current_value = str(row.get(col, '')).strip().lower()
                
                # Check 1: Is the current value missing? 
                # Check 2: Did Spotify successfully return a non-None value for this column?
                if current_value in MISSING_VALUES and col in external_data and external_data[col] is not None:
                    row[col] = str(external_data[col]) # Update the row
                    updated = True
            
            if updated:
                imputed_count += 1
        
        data_to_write.append(row)
        
        # Add a delay to respect Spotify's rate limits
        if i % 10 == 0 and i > 0:
             time.sleep(0.1) 

    # 3. Write the updated data to the new file
    with open(output_file, mode='w', newline='', encoding='utf-8') as outfile:
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data_to_write)

    print(f"\n Spotify Imputation Complete.")
    print(f" Total rows successfully imputed: {imputed_count}")
    print(f" Data saved to: {output_file}")

# --- Execution ---

impute_external_data_spotify(INPUT_FILE, OUTPUT_FILE)

--- Starting Spotify API Imputation ---

 Spotify Imputation Complete.
 Total rows successfully imputed: 71
 Data saved to: DataPostSpotifyAPI.csv


In [24]:
import csv
from datetime import datetime

# --- Configuration ---
INPUT_FILE = 'DataPostSpotifyAPI.csv' 
OUTPUT_FILE = 'DataWithDateImputation.csv'

# Define all possible representations of missing data
MISSING_VALUES = ['', 'NaN', 'nan', None] 

# --- Main Imputation Function ---

def impute_date_parts(input_file: str, output_file: str):
    """
    Reads the CSV and imputes missing values in existing 'year', 'month', 
    and 'day' columns using data from 'album_release_date'.
    """
    data_to_write = []
    imputed_rows = 0
    
    try:
        # Read the input file and determine the fieldnames
        with open(input_file, mode='r', newline='', encoding='utf-8') as infile:
            reader = csv.DictReader(infile)
            fieldnames = reader.fieldnames
            data = list(reader)
            
    except FileNotFoundError:
        print(f" ERROR: Input file not found at: {input_file}")
        return

    # Ensure the target columns exist in the file
    if not all(col in fieldnames for col in ['year', 'month', 'day']):
        print(" ERROR: Required columns ('year', 'month', 'day') not found in the input file.")
        return
        
    for row in data:
        release_date = row.get('album_release_date')
        
        # Check if the source date is available and valid
        if release_date is None or str(release_date).strip().lower() in MISSING_VALUES:
            # If the source date is missing, we cannot impute, so we skip
            data_to_write.append(row)
            continue
            
        date_updated = False
        
        # Determine which columns are currently missing
        is_year_missing = str(row.get('year', '')).strip().lower() in MISSING_VALUES
        is_month_missing = str(row.get('month', '')).strip().lower() in MISSING_VALUES
        is_day_missing = str(row.get('day', '')).strip().lower() in MISSING_VALUES
        
        # Only proceed if at least one target column is missing
        if is_year_missing or is_month_missing or is_day_missing:
            
            # --- Parsing Logic ---
            try:
                # 1. Try YYYY-MM-DD format (Full date)
                dt_obj = datetime.strptime(release_date, '%Y-%m-%d')
                
                if is_year_missing: row['year'] = dt_obj.year; date_updated = True
                if is_month_missing: row['month'] = dt_obj.month; date_updated = True
                if is_day_missing: row['day'] = dt_obj.day; date_updated = True
                
            except ValueError:
                # 2. Try YYYY-MM format (Month granularity)
                try:
                    dt_obj = datetime.strptime(release_date, '%Y-%m')
                    
                    if is_year_missing: row['year'] = dt_obj.year; date_updated = True
                    if is_month_missing: row['month'] = dt_obj.month; date_updated = True
                    # Day remains missing if it was missing initially
                    
                except ValueError:
                    # 3. Try YYYY format (Year granularity only)
                    try:
                        dt_obj = datetime.strptime(release_date, '%Y')
                        
                        if is_year_missing: row['year'] = dt_obj.year; date_updated = True
                        # Month and Day remain missing if they were missing initially
                        
                    except ValueError:
                        # Parsing failed completely or the date is malformed
                        pass 
        
        if date_updated:
            imputed_rows += 1
            
        data_to_write.append(row)

    # --- Write Output ---
    with open(output_file, mode='w', newline='', encoding='utf-8') as outfile:
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data_to_write)

    print(f"\n Date part imputation complete.")
    print(f" Total rows where at least one date part was imputed: {imputed_rows}")
    print(f" File saved to: {output_file}")

# Execute the function
impute_date_parts(INPUT_FILE, OUTPUT_FILE)


 Date part imputation complete.
 Total rows where at least one date part was imputed: 1235
 File saved to: DataWithDateImputation.csv


In [None]:
#JUST TO CHECK--> CANNOT USE HERE




# import pandas as pd
# import numpy as np
# dfffff = pd.read_csv('DataWithDateImputation.csv')

# missing_countsPostSpotify = dfffff.isnull().sum() 
# # print("The number of missing values is: \n",missing_counts[missing_counts > 0]) #28 columns out of 37
# missing_countsPostSpotify = missing_countsPostSpotify[missing_countsPostSpotify > 0].to_frame(name="missing_values")
# print(missing_countsPostSpotify)

In [None]:
# --- Configuration ---
INPUT_CSV_FILE = 'output2.csv'
OUTPUT_FILE = 'artistClean.csv'
USER_AGENT = "MyApp/1.0 (email@example.com)" # Required by APIs for identification
MISSING_VALUES = ['', 'NaN', 'nan', '0', 0] # Define all values that count as missing/zero

# --- MusicBrainz Endpoints ---
MB_URL = "http://musicbrainz.org/ws/2/artist/"
NOMINATIM_URL = "http://nominatim.openstreetmap.org/search" # OpenStreetMap Geocoding

# --- Utility Functions (APIs using urllib) ---

def safe_http_get(url: str, params: dict, headers: dict) -> dict | None:
    """Performs a GET request using urllib and returns JSON data."""
    
    query_string = urllib.parse.urlencode(params)
    full_url = f"{url}?{query_string}"
    
    try:
        req = urllib.request.Request(full_url, headers=headers)
        with urllib.request.urlopen(req) as response:
            if response.getcode() == 200:
                data = response.read().decode('utf-8')
                return json.loads(data)
            else:
                print(f"HTTP Error: {response.getcode()} for URL: {url}")
                return None
    except urllib.error.HTTPError as e:
        # Catch specific HTTP errors
        return None
    except Exception as e:
        # Catch network or parsing errors
        return None

# ----------------------------
# Function to search an artist on MusicBrainz
# ----------------------------
def search_artist_musicbrainz(name: str) -> dict | None:
    """
    Search for an artist on MusicBrainz using their name.
    Returns a dictionary with available data.
    """
    params = {
        "query": name,
        "fmt": "json",
        "limit": 1
    }
    headers = {"User-Agent": USER_AGENT}
    
    data = safe_http_get(MB_URL, params, headers)
    time.sleep(1) # Respect API rate limits (1 request/second)
    
    if data is None or "artists" not in data or len(data["artists"]) == 0:
        return None
    
    artist_info = data["artists"][0]
    result = {
        "birth_place": None,
        "country": None,
        # MusicBrainz usually provides the start date of the lifespan
        "birth_date": artist_info.get("life-span", {}).get("begin"), 
        "active_start": artist_info.get("life-span", {}).get("begin"),
        "active_end": artist_info.get("life-span", {}).get("end")
    }
    # Extract country from the 'area' object
    if "area" in artist_info:
        result["country"] = artist_info["area"].get("name")
        
    return result

# ----------------------------
# Function to search Wikipedia for artist and extract country (Simplified)
# ----------------------------
def search_artist_wikipedia(name: str) -> dict | None:
    """
    Search the artist's Wikipedia page and try to extract the country using heuristics.
    Returns a dictionary with available country data.
    """
    wiki_url = "https://en.wikipedia.org/api/rest_v1/page/summary/"
    search_path = urllib.parse.quote(name.replace(' ', '_'))
    full_url = f"{wiki_url}{search_path}"
    headers = {"User-Agent": USER_AGENT}
    
    # Since Wikipedia API endpoint does not take params in the same way, we request directly
    try:
        req = urllib.request.Request(full_url, headers=headers)
        with urllib.request.urlopen(req) as response:
            if response.getcode() != 200: return None
            data = json.loads(response.read().decode('utf-8'))
    except:
        return None
    
    time.sleep(1) # Respect rate limits
    
    if "description" in data:
        desc = data["description"].lower()
        country = None
        # Basic heuristics based on nationality in description (as per original logic)
        if "italian" in desc:
            country = "Italy"
        elif "american" in desc:
            country = "USA"
        return {"country": country}
    return None

# ----------------------------
# Function for geocoding a place (OpenStreetMap Nominatim)
# ----------------------------
def geocode_place(place_name: str, country_name: str | None = None) -> tuple[float or None, float or None]:
    """
    Get latitude and longitude of a place using OpenStreetMap Nominatim.
    """
    query = place_name
    if country_name:
        query += f", {country_name}"
        
    params = {"q": query, "format": "json", "limit": 1}
    # Nominatim requires a distinct User-Agent and is sensitive to rate limits
    headers = {"User-Agent": "MyDataPipeline/1.0 (contact@example.com)"} 

    # Geocoding logic using safe_http_get structure
    data = safe_http_get(NOMINATIM_URL, params, headers)
    time.sleep(1) # Respect Nominatim usage policy (1 request/second)
    
    if data is None or len(data) == 0:
        return None, None
    try:
        return float(data[0]["lat"]), float(data[0]["lon"])
    except (ValueError, KeyError):
        return None, None

In [None]:
# --- Configuration ---
INPUT_CSV_FILE = 'output2.csv'
OUTPUT_FILE = 'artistClean.csv'
USER_AGENT = "MyApp/1.0 (email@example.com)" # Required by APIs for identification
MISSING_VALUES = ['', 'NaN', 'nan', '0', 0] # Define all values that count as missing/zero

# --- MusicBrainz Endpoints ---
MB_URL = "http://musicbrainz.org/ws/2/artist/"
NOMINATIM_URL = "http://nominatim.openstreetmap.org/search" # OpenStreetMap Geocoding

# --- Utility Functions (APIs using urllib) ---

def safe_http_get(url: str, params: dict, headers: dict) -> dict | None:
    """Performs a GET request using urllib and returns JSON data."""
    
    query_string = urllib.parse.urlencode(params)
    full_url = f"{url}?{query_string}"
    
    try:
        req = urllib.request.Request(full_url, headers=headers)
        with urllib.request.urlopen(req) as response:
            if response.getcode() == 200:
                data = response.read().decode('utf-8')
                return json.loads(data)
            else:
                print(f"HTTP Error: {response.getcode()} for URL: {url}")
                return None
    except urllib.error.HTTPError as e:
        # Catch specific HTTP errors
        return None
    except Exception as e:
        # Catch network or parsing errors
        return None

# ----------------------------
# Function to search an artist on MusicBrainz
# ----------------------------
def search_artist_musicbrainz(name: str) -> dict | None:
    """
    Search for an artist on MusicBrainz using their name.
    Returns a dictionary with available data.
    """
    params = {
        "query": name,
        "fmt": "json",
        "limit": 1
    }
    headers = {"User-Agent": USER_AGENT}
    
    data = safe_http_get(MB_URL, params, headers)
    time.sleep(1) # Respect API rate limits (1 request/second)
    
    if data is None or "artists" not in data or len(data["artists"]) == 0:
        return None
    
    artist_info = data["artists"][0]
    result = {
        "birth_place": None,
        "country": None,
        # MusicBrainz usually provides the start date of the lifespan
        "birth_date": artist_info.get("life-span", {}).get("begin"), 
        "active_start": artist_info.get("life-span", {}).get("begin"),
        "active_end": artist_info.get("life-span", {}).get("end")
    }
    # Extract country from the 'area' object
    if "area" in artist_info:
        result["country"] = artist_info["area"].get("name")
        
    return result

# ----------------------------
# Function to search Wikipedia for artist and extract country (Simplified)
# ----------------------------
def search_artist_wikipedia(name: str) -> dict | None:
    """
    Search the artist's Wikipedia page and try to extract the country using heuristics.
    Returns a dictionary with available country data.
    """
    wiki_url = "https://en.wikipedia.org/api/rest_v1/page/summary/"
    search_path = urllib.parse.quote(name.replace(' ', '_'))
    full_url = f"{wiki_url}{search_path}"
    headers = {"User-Agent": USER_AGENT}
    
    # Since Wikipedia API endpoint does not take params in the same way, we request directly
    try:
        req = urllib.request.Request(full_url, headers=headers)
        with urllib.request.urlopen(req) as response:
            if response.getcode() != 200: return None
            data = json.loads(response.read().decode('utf-8'))
    except:
        return None
    
    time.sleep(1) # Respect rate limits
    
    if "description" in data:
        desc = data["description"].lower()
        country = None
        # Basic heuristics based on nationality in description (as per original logic)
        if "italian" in desc:
            country = "Italy"
        elif "american" in desc:
            country = "USA"
        return {"country": country}
    return None

# ----------------------------
# Function for geocoding a place (OpenStreetMap Nominatim)
# ----------------------------
def geocode_place(place_name: str, country_name: str | None = None) -> tuple[float or None, float or None]:
    """
    Get latitude and longitude of a place using OpenStreetMap Nominatim.
    """
    query = place_name
    if country_name:
        query += f", {country_name}"
        
    params = {"q": query, "format": "json", "limit": 1}
    # Nominatim requires a distinct User-Agent and is sensitive to rate limits
    headers = {"User-Agent": "MyDataPipeline/1.0 (contact@example.com)"} 

    # Geocoding logic using safe_http_get structure
    data = safe_http_get(NOMINATIM_URL, params, headers)
    time.sleep(1) # Respect Nominatim usage policy (1 request/second)
    
    if data is None or len(data) == 0:
        return None, None
    try:
        return float(data[0]["lat"]), float(data[0]["lon"])
    except (ValueError, KeyError):
        return None, None

In [None]:
import requests
import time

# ----------------------------
# Function to search an artist on MusicBrainz
# ----------------------------
def search_artist_musicbrainz(name):
    """
    Search for an artist on MusicBrainz using their name.
    Returns a dictionary with available data: birth_place, country, birth_date, active_start, active_end
    """
    url = "https://musicbrainz.org/ws/2/artist/"
    params = {
        "query": name,
        "fmt": "json",
        "limit": 1
    }
    headers = {"User-Agent": "MyApp/1.0 (email@example.com)"}
    
    try:
        r = requests.get(url, params=params, headers=headers)
        r.raise_for_status()
    except:
        return None
    
    data = r.json()
    if "artists" not in data or len(data["artists"]) == 0:
        return None
    
    artist_info = data["artists"][0]
    result = {
        "birth_place": None,
        "country": None,
        "birth_date": artist_info.get("life-span", {}).get("begin"),
        "active_start": artist_info.get("life-span", {}).get("begin"),
        "active_end": artist_info.get("life-span", {}).get("end")
    }
    # MusicBrainz provides the artist's area (country)
    if "area" in artist_info:
        result["country"] = artist_info["area"].get("name")
    return result

# ----------------------------
# Function to search Wikipedia for artist and extract city/country
# ----------------------------
def search_artist_wikipedia(name):
    """
    Search the artist's Wikipedia page and try to extract the country.
    Returns a dictionary with available data.
    """
    url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{name.replace(' ', '_')}"
    headers = {"User-Agent": "MyApp/1.0 (email@example.com)"}
    
    try:
        r = requests.get(url, headers=headers)
        r.raise_for_status()
    except:
        return None
    
    data = r.json()
    if "description" in data:
        desc = data["description"].lower()
        country = None
        # Basic heuristics based on nationality in description
        if "italian" in desc:
            country = "Italy"
        elif "american" in desc:
            country = "USA"
        # add more cases if needed
        return {"country": country}
    return None

# ----------------------------
# Function for geocoding a place
# ----------------------------
def geocode_place(place_name, country_name=None):
    """
    Get latitude and longitude of a place using OpenStreetMap Nominatim.
    """
    query = place_name
    if country_name:
        query += f", {country_name}"
    url = "https://nominatim.openstreetmap.org/search"
    params = {"q": query, "format": "json", "limit": 1}
    headers = {"User-Agent": "Mozilla/5.0"}
    
    try:
        r = requests.get(url, params=params, headers=headers)
        r.raise_for_status()
    except:
        return None, None
    
    data = r.json()
    if len(data) == 0:
        return None, None
    return float(data[0]["lat"]), float(data[0]["lon"])

# ----------------------------
# Update artists without latitude/longitude
# ----------------------------
for artist in df_artists:
    if artist["latitude"] == 0 or artist["longitude"] == 0:
        print(f"Processing: {artist['name']}")
        
        # 1️⃣ Try MusicBrainz first
        info = search_artist_musicbrainz(artist["name"])
        time.sleep(1)
        if info:
            artist["birth_place"] = info.get("birth_place") or artist.get("birth_place")
            artist["country"] = info.get("country") or artist.get("country")
            artist["birth_date"] = info.get("birth_date") or artist.get("birth_date")
            artist["active_start"] = info.get("active_start") or artist.get("active_start")
            artist["active_end"] = info.get("active_end") or artist.get("active_end")
        
        # 2️⃣ If country still missing, try Wikipedia
        if not artist.get("country"):
            wiki_info = search_artist_wikipedia(artist["name"])
            time.sleep(1)
            if wiki_info and wiki_info.get("country"):
                artist["country"] = wiki_info["country"]
        
        # 3️⃣ Geocode if we have at least birth_place or country
        place = artist.get("birth_place") or artist.get("country")
        country = artist.get("country") if artist.get("birth_place") else None
        if place:
            lat, lon = geocode_place(place, country)
            if lat is not None and lon is not None:
                artist["latitude"] = lat
                artist["longitude"] = lon
            time.sleep(1)

# ----------------------------
# Final check of artists data
# ----------------------------
for artist in df_artists:
    print(artist["name"], artist["birth_place"], artist["country"], artist["latitude"], artist["longitude"])
