<a href="https://colab.research.google.com/github/giovanigoltara/house-music-story/blob/main/DISCOGS_extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# To mount drive where files are stored
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# To check the content of the xml file
import gzip

file_path = "/content/drive/MyDrive/DISCOGS/discogs_20250601_releases.xml.gz"

with gzip.open(file_path, 'rt', encoding='utf-8') as f:
    for i in range(20):
        print(f.readline())

In [None]:
# To extract the desired data frame from xml file filtering for House Music and keeping only desired fields
import xml.etree.ElementTree as ET
import pandas as pd
import csv


# File paths
input_file = '/content/drive/MyDrive/DISCOGS/discogs_20250601_releases.xml.gz'
output_file = '/content/drive/MyDrive/DISCOGS/house_music_releases_02.csv'

# Initialize CSV with header
columns = ['id', 'title', 'year', 'country', 'styles', 'genres', 'label']
with open(output_file, mode='w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=columns)
    writer.writeheader()

# Stream and parse the XML
with gzip.open(input_file, 'rb') as f:
    context = ET.iterparse(f, events=('end',))
    for i, (event, elem) in enumerate(context):
        if elem.tag == 'release':
            try:
                genre_tags = [g.text for g in elem.findall('./genres/genre')]
                style_tags = [s.text for s in elem.findall('./styles/style')]

                if 'Electronic' in genre_tags and any('House' in (s or '') for s in style_tags):
                    release_id = elem.attrib.get('id')
                    title = elem.findtext('title')
                    year = elem.findtext('released')
                    country = elem.findtext('country')
                    styles = ', '.join(style_tags)
                    genres = ', '.join(genre_tags)
                    label_elem = elem.find('./labels/label')
                    label = label_elem.attrib.get('name') if label_elem is not None else None

                    # Append row to CSV
                    with open(output_file, mode='a', newline='', encoding='utf-8') as out_f:
                        writer = csv.DictWriter(out_f, fieldnames=columns)
                        writer.writerow({
                            'id': release_id,
                            'title': title,
                            'year': year,
                            'country': country,
                            'styles': styles,
                            'genres': genres,
                            'label': label
                        })
            except Exception as e:
                print(f"⚠️ Skipping record due to error: {e}")
            finally:
                elem.clear()

        if i % 500 == 0:
            print(f"Processed {i} records...")

print("Extraction complete. File saved to Google Drive.")

In [None]:
# To extract Master Releases data fram from xml file

# File paths
input_file = '/content/drive/MyDrive/DISCOGS/discogs_20250601_masters.xml.gz'
output_file = '/content/drive/MyDrive/DISCOGS/master_releases.csv'

# Initialize CSV with header
columns = ['id', 'title', 'year', 'genres', 'styles']
with open(output_file, mode='w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=columns)
    writer.writeheader()

# Stream and parse the XML
with gzip.open(input_file, 'rb') as f:
    context = ET.iterparse(f, events=('end',))
    for i, (event, elem) in enumerate(context):
        if elem.tag == 'master':
            try:
                genre_tags = [g.text for g in elem.findall('./genres/genre')]
                style_tags = [s.text for s in elem.findall('./styles/style')]

                # Filter for House music
                if 'Electronic' in genre_tags and any('House' in (s or '') for s in style_tags):
                    master_id = elem.attrib.get('id')
                    title = elem.findtext('title')
                    year = elem.findtext('year')
                    genres = ', '.join(genre_tags)
                    styles = ', '.join(style_tags)

                    # Append row to CSV
                    with open(output_file, mode='a', newline='', encoding='utf-8') as out_f:
                        writer = csv.DictWriter(out_f, fieldnames=columns)
                        writer.writerow({
                            'id': master_id,
                            'title': title,
                            'year': year,
                            'genres': genres,
                            'styles': styles
                        })
            except Exception as e:
                print(f"⚠️ Skipping record due to error: {e}")
            finally:
                elem.clear()

        if i % 500 == 0:
            print(f"Processed {i} records...")

print("Extraction complete. File saved to Google Drive.")