In [3]:
import json
import pandas as pd
import numpy as np
import re
import hashlib
import duckdb

In [4]:
def canonicalize_value(x):
    if x is None or (isinstance(x, float) and pd.isna(x)):
        return ""
    if isinstance(x, (dict, list)):
        return json.dumps(x, sort_keys=True, separators=(',', ':'))
    return str(x)

def row_hash_from_row(row, fields=None):
    if fields is None:
        fields = ["track_metadata", "listened_at", "recording_msid", "user_name"]
    parts = [canonicalize_value(row.get(f)) for f in fields]
    return hashlib.sha1("|".join(parts).encode("utf-8")).hexdigest()

In [5]:
with open('dataset.parquet') as file:
    data = [json.loads(line) for line in file]

df = pd.DataFrame(data).drop(['recording_msid'], axis=1)

FileNotFoundError: [Errno 2] No such file or directory: 'dataset.parquet'

In [4]:
# creating primary key 'id' column
df["id"] = df.apply(row_hash_from_row, axis=1)

In [5]:
meta_df = pd.json_normalize(df['track_metadata'])

df_expanded = pd.concat([df.drop(columns=['track_metadata']), meta_df], axis=1)

df_expanded.columns = df_expanded.columns.str.replace('^additional_info\\.', '', regex=True)                       

In [6]:
def val_not_null(val):
    return val is not None

def val_ms(val):
    if pd.isna(val):            
        return True
    try:
        return float(val) <= 600000
    except Exception:
        return False

def val_hash(val):
    if pd.isna(val):
        return True
    try:
        return isinstance(val, str) and len(val) == 36 and not val.startswith('http')
    except Exception:
        return False

def val_date(val):
    if pd.isna(val):            
        return True
    try:
        pd.to_datetime(val)
        return True
    except Exception:
        return False

def full_date(val):
    if pd.isna(val):
        return True
    try:
        date = pd.to_datetime(val, errors='coerce')
        if isinstance(val, str):
            date_parts = [p for p in re.split(r'[-/ ,]', val) if p]
            return len(date_parts) >= 3 and not pd.isna(date)
        return False
    except Exception:
        return False

def val_int(val):
    if pd.isna(val):
        return True
    try:
        return isinstance(val, int) and val.is_integer()
    except Exception:
        return False
    
def val_float(val):
    if pd.isna(val):
        return True
    try:
        return isinstance(val, float)
    except Exception:
        return False

def val_string(val):
    if pd.isna(val):
        return True
    try:
        return isinstance(val, str)
    except Exception:
        return False

In [7]:
non_null_cols = ["listened_at","recording_msid","user_name","artist_name","track_name"]
list_str_cols = ["tags","artist_names","release_artist_names","spotify_album_artist_ids","spotify_artist_ids"]
list_hash_cols = ["work_mbids","artist_mbids"]
date_cols = ["date"]
int_cols = ["totaldiscs","totaltracks","tracknumber","discnumber","track_length","duration"]
float_cols = ["dedup_tag","choosen_by_user"]
hash_cols = ["recording_msid","release_msid","release_mbid","release_group_mbid","track_mbid","artist_mbid"]
ms_cols = ["duration_ms"]
string_cols = [c for c in df_expanded.columns if c not in set(non_null_cols + list_str_cols + list_hash_cols + date_cols + int_cols + float_cols + hash_cols + ms_cols)]

config = {}
for c in non_null_cols: config[c] = val_not_null
for c in date_cols: config[c] = val_date
for c in int_cols: config[c] = val_int
for c in float_cols: config[c] = val_float
for c in hash_cols: config[c] = val_hash
for c in ms_cols: config[c] = val_ms
for c in string_cols: config[c] = val_string

In [8]:
def row_is_valid(row):
    for col, validator in config.items():
        val = row.get(col)
        if isinstance(val, pd.Series):
            # take first non-null scalar
            val = next((x for x in val.tolist() if not pd.isna(x)), None)
        try:
            if not validator(val):
                return False
        except Exception as e:
            print(f"Row failed on column {col} with value {val!r}: {e!r}")
            return False
    return True

In [9]:
mask = df_expanded.apply(row_is_valid, axis=1)
df_valid = df_expanded[mask].reset_index(drop=True)
print(f"Valid rows: {len(df_valid)} / {len(df_expanded)}")

Valid rows: 332218 / 333034


In [10]:
df_valid['listened_at'] = pd.to_datetime(df_valid['listened_at'], unit='s')

In [11]:
df_clean = df_valid.drop(['artist_mbids', 'tags', 'work_mbids'], axis=1).dropna(axis=1, how='all')

In [12]:
con = duckdb.connect("database/sample.db")
con.execute(f"CREATE SCHEMA IF NOT EXISTS stg;")                                

<_duckdb.DuckDBPyConnection at 0x2821f9aaaf0>

In [13]:
con.execute(f"""
CREATE TABLE IF NOT EXISTS stg.raw (
    PRIMARY KEY (id),
    id VARCHAR NOT NULL,
    listened_at TIMESTAMP NOT NULL,
    recording_msid VARCHAR NOT NULL,
    user_name VARCHAR NOT NULL,
    artist_name VARCHAR NOT NULL,
    track_name VARCHAR NOT NULL,
    release_name VARCHAR,
    release_msid VARCHAR,
    release_mbid VARCHAR,
    recording_mbid VARCHAR,
    release_group_mbid VARCHAR,
    isrc VARCHAR,
    spotify_id VARCHAR,
    tracknumber INTEGER,
    track_mbid VARCHAR,
    artist_msid VARCHAR,
    dedup_tag DOUBLE,
    artist_names VARCHAR,
    discnumber INTEGER,
    duration_ms INTEGER,
    listening_from VARCHAR,
    release_artist_name VARCHAR,
    release_artist_names VARCHAR,
    spotify_album_artist_ids VARCHAR,
    spotify_album_id VARCHAR,
    spotify_artist_ids VARCHAR
);
""")

<_duckdb.DuckDBPyConnection at 0x2821f9aaaf0>

In [14]:
con.execute("""
            INSERT INTO 
            stg.raw BY NAME 
            SELECT * 
            FROM df_clean
            where NOT EXISTS (
                SELECT 1 FROM stg.raw r WHERE r.id = df_clean.id
            )
            """)

<_duckdb.DuckDBPyConnection at 0x2821f9aaaf0>

In [15]:
con.close()