In [3]:
import pyspark
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import SparkSession

spark = SparkSession.builder \
         .master("local[16]") \
         .appName("Exp") \
         .getOrCreate()

spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.maxResultSize", "4g")
spark.conf.set("spark.sql.broadcastTimeout", "900")

schema = StructType([StructField("value", IntegerType(), True), StructField("name", StringType(), True)])
my_list = [[1, "ciao"],
           [2, "pippo"],
           [3, "topolino"],
           [4, "paperino"]]

df = spark.createDataFrame(my_list, schema)

In [13]:
import pyarrow.parquet as pq

dataset = pq.ParquetDataset("people/", use_legacy_dataset=True)

dataset.read().to_pandas()

Unnamed: 0,value,name
0,1,ciao
1,2,pippo
2,3,topolino
3,4,paperino


In [14]:
import json

mumu_dir = '../mumu/MuMu_dataset/'

db_review = []
with open(mumu_dir + '/amazon_reviews_MuMu.json') as f:
    for row in f.readlines():
        db_review.append(json.loads(row))

len(db_review)

447583

In [15]:
import csv

msd_id_map = {}
count = 0


with open(mumu_dir + '/MuMu_dataset_multi-label.csv') as csv_file:
    db_metadata = csv.reader(csv_file, delimiter=',')

    # Skip header row
    for row in db_metadata:
        msd_id_map[row[0]] = msd_id_map.get(row[0], []) + [row[2]]
        count += 1

# msd_id_map maps the amazon_ids to MSD_track_ids, to retrieve the actual information from the MSD dataset
len(msd_id_map.keys()), count

(31472, 147296)

In [16]:
import tqdm.notebook as tqdm

with open(mumu_dir + '/MuMu_dataset_multi-label.csv') as csv_file:
    db_metadata = csv.reader(csv_file, delimiter=',')
    
    msd_ids = set()
    
    for row in tqdm.tqdm(db_metadata):
        msd_ids.add(row[2])
        
len(msd_ids)

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




147296

In [4]:
from pyspark.sql import functions as sf

mumu_dir = '../mumu/MuMu_dataset/'

reviews = spark.read.json(mumu_dir + '/amazon_reviews_MuMu.json')
reviews = reviews.drop("helpful", "overall", "reviewTime", "unixReviewTime", "reviewerName")

msd_map = spark.read.csv(mumu_dir + '/MuMu_dataset_multi-label.csv', header='true')
msd_map = msd_map.drop("recording_mbid")
msd_map = msd_map.withColumn("MSD_track_id", sf.substring(sf.col("MSD_track_id"), 0, 17))
msd_map = msd_map.withColumn("CLS_match", sf.lit(1))

path_map = spark.read.csv("../msd/mp3/MSD/dataset_annotation.csv", header="true")
path_map = path_map.withColumnRenamed("id", "MSD_track_id")

base_path = "/nfs/msd/mp3/MSD/audio/"
path_map = path_map.withColumn("path", sf.concat(sf.lit(base_path), 
                                                 sf.lit(sf.col('filename').substr(1,1)),
                                                 sf.lit("/"),
                                                 sf.lit(sf.col('filename').substr(2,1)),
                                                 sf.lit("/"),
                                                 sf.col('filename')))


msd_map = msd_map.repartition(80)
path_map = path_map.repartition(80)
reviews = reviews.sample(0.01).repartition(80)

print(msd_map.count(), path_map.count(), reviews.count())

147295 993176 4444


In [23]:
filtered_path_map = path_map.join(msd_map, "MSD_track_id", how="leftsemi")

In [6]:
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType
from random import sample

def get_negative_sample(amazon_id):
    track_list = msd_id_map[amazon_id]
    sampled_track_id = sample(msd_ids, 1)[0]
    while sampled_track_id in track_list or sampled_track_id == "MSD_track_id":
        sampled_track_id = sample(msd_ids, 1)[0]
    return sampled_track_id[:-1]

negatives_udf = udf(get_negative_sample, StringType())

neg_msd = msd_map.withColumn("MSD_track_id", negatives_udf(msd_map.amazon_id))\
                 .withColumn("CLS_match", sf.lit(0))

full_map = msd_map.unionByName(neg_msd)

In [7]:
msd_map.rdd.getNumPartitions(), neg_msd.rdd.getNumPartitions(), full_map.rdd.getNumPartitions()

(80, 80, 160)

In [8]:
from pyspark.sql.functions import monotonically_increasing_id

full = full_map.join(reviews, "amazon_id").join(path_map, "MSD_track_id")
full = full.withColumn("id", monotonically_increasing_id())

full.columns

['MSD_track_id',
 'amazon_id',
 'genres',
 'CLS_match',
 'reviewText',
 'reviewerID',
 'summary',
 'filename',
 'label',
 'path',
 'id']

In [11]:
import librosa
import numpy as np
from source.io.pb_item import pb_Item
import lmdb
from pyspark.sql.functions import udf, array, struct

def write_item(row):
    
    path = row["path"]
    CLS = str(row["CLS_match"])
    review = row["reviewText"]
    id = str(row["id"])
    #try:
    x, sr = librosa.load(path, sr=None)
    #except Exception as e:
    #    return str(e)
    
    melspectrogram = librosa.feature.melspectrogram(x, sr)
    log_mel = librosa.power_to_db(melspectrogram, ref=np.max)
    features = np.transpose(log_mel)

    pb_item = pb_Item(features=features,
                      review=review,
                      seq_class=CLS)

    env = lmdb.open('data/MSD/spark.lmdb', subdir=False,
            map_size=1e12 )

    with env.begin(write=True) as txn:
        txn.put(id.encode(), pb_item.get_pb_obj().SerializeToString())
    
    return 1
        
write_udf = udf(write_item, returnType=StringType())

In [None]:
full = full.withColumn("written", write_udf(struct("*")))

env = lmdb.open('data/MSD/spark.lmdb', subdir=False,
                map_size=1e12 )

full.collect()

print("done")

In [10]:
env.stat()

{'psize': 4096,
 'depth': 2,
 'branch_pages': 1,
 'leaf_pages': 6,
 'overflow_pages': 359675,
 'entries': 473}

In [11]:
full.sample(0.01).show()

+-----------------+----------+--------------------+---------+--------------------+--------------+--------------------+----------------+------------------+--------------------+-------------+-------+
|     MSD_track_id| amazon_id|              genres|CLS_match|          reviewText|    reviewerID|             summary|        filename|             label|                path|           id|written|
+-----------------+----------+--------------------+---------+--------------------+--------------+--------------------+----------------+------------------+--------------------+-------------+-------+
|TRGMCEI128F42B1B3|B00000I0VW|Alternative Rock,...|        1|WOW, this thing i...|A3MPOBQFYIMYG5|This album is lig...|3486416.clip.mp3|ARNUPIM11F4C83BD4D|/nfs/msd/mp3/MSD/...|  34359738371|      1|
|TRWCKWW128F92D9D5|B0001M4DQO|Adult Contemporar...|        1|My first impressi...|A3OKT9G7MDJ0FX| don't throw it out!|3644324.clip.mp3|AR1PT6R1187B9A6D9A|/nfs/msd/mp3/MSD/...| 274877906945|      1|
|TRNHSAC12

In [35]:
x, sr = librosa.load('/nfs/msd/mp3/MSD/audio/2/9/2937068.clip.mp3')



In [36]:
from source.sound_transforms import log_mel_spectrogram

log_mel_spectrogram(x,sr).shape

(5993, 64)

In [6]:
from source.sound_transforms import log_mel_spectrogram
from pyspark.sql.functions import udf, array, struct
from pyspark.sql.types import *
import librosa
import lmdb


def write_to_db(row):
    path = row["path"]
    msd_id = row["MSD_track_id"].encode()
    
    env = lmdb.open('data/MSD/MSD_ID_to_log_mel_spectrogram.lmdb', subdir=False,
            map_size=1e12 )
    
    with env.begin(write=False) as txn:
        data = txn.get(msd_id)
    
    if data is None:
        try:
            x, sr = librosa.load(path)
        except:
            return "error"
        features = log_mel_spectrogram(x,sr)

        with env.begin(write=True) as txn:
            txn.put(msd_id, features.tobytes())
            
        return 1
    
    return 0

to_db_udf = udf(write_to_db, returnType=ArrayType(FloatType()))

In [None]:
path_map_written = filtered_path_map.withColumn("written", to_db_udf(struct("*")))

path_map_written.collect()


env = lmdb.open('data/MSD/msdid_to_log_mel_spectrogram.lmdb', subdir=False,
                map_size=1e12 )

print("done")

env.stat()

In [5]:
deezer = spark.read.csv("data/deezer/msd_id_to_emotion.csv", header="true")
deezer = deezer.withColumnRenamed("msd_id", "MSD_track_id")
deezer = deezer.withColumn("MSD_track_id", sf.substring(sf.col("MSD_track_id"), 0, 17))

joined_deezer = deezer.join(path_map, "MSD_track_id")

joined_deezer.show(5)

+-----------------+---------------+----------------+----------------+------------------+--------------------+
|     MSD_track_id|        valence|        arousal |        filename|             label|                path|
+-----------------+---------------+----------------+----------------+------------------+--------------------+
|TRCUDHE128E0783F2|  1.07008192307|   1.29854404255|  18147.clip.mp3|ARY2Z6Y1187B9BA126|/nfs/msd/mp3/MSD/...|
|TRGDPXJ128F4269C6| -1.93524985679| -0.655809808621|1616387.clip.mp3|ARW5XFX1187B9AE42A|/nfs/msd/mp3/MSD/...|
|TRJXIXT128E079275| -1.28169934715|    1.3769025696| 164092.clip.mp3|ARK9TRQ1187B99C095|/nfs/msd/mp3/MSD/...|
|TRUCKBD128F428F54| -1.05657234488|  0.736206377828|1102335.clip.mp3|ARMZFC81187B9AC52C|/nfs/msd/mp3/MSD/...|
|TRWIGBF128F424CDB| -1.70057201199|   1.84244440679|1489051.clip.mp3|ARLSF8H1187B9A76B0|/nfs/msd/mp3/MSD/...|
+-----------------+---------------+----------------+----------------+------------------+--------------------+
only showi

In [7]:
joined_deezer_written = joined_deezer.withColumn("written", to_db_udf(struct("*")))

joined_deezer_written.collect()

env = lmdb.open('data/MSD/MSD_ID_to_log_mel_spectrogram.lmdb', subdir=False,
                map_size=1e12 )

env.stat()

{'psize': 4096,
 'depth': 3,
 'branch_pages': 20,
 'leaf_pages': 2216,
 'overflow_pages': 100739872,
 'entries': 173775}

In [22]:
mtat = spark.read.option("sep", "\t").csv('/nfs/subtasks/MagnaTagATune/annotations_final.csv', header="true")

#mtat.groupBy('no voice').count().show()
mtat.show(5)

+-------+--------+------+----+--------+---------+-----+------+-----------+--------------+--------+-----+------+------------+----------+------+--------+-----+-------+-----+---+--------+-----+----------+------+-------+--------+-----+---------+------+---------+---------+-----+------+----+------+-----+-------+-----+--------+----+-------+----+------------+----+-----+-------+-------+-----+----+----------+--------------+-------+----------+----+----+-----+----------+-----+-----+-------+----+----------+--------+----------+----+--------+--------+----+-------+------+-----+----------+-------------+--------+---------+--------+---------+-----+--------+-------+------+-----+-----+-------+----+---------------+---------------+-----------+-----------+----------------+-------+------+-------+------+------+----+----------+---------+-----+-----------+----+--------+------+---+---+------------+------+-----+-------+-----+-----------+-----------+------+-----+-----+---+----+---------+----------+----+----+--------

In [9]:
joined_deezer_written.groupBy('written').count().show()

+-------+-----+
|written|count|
+-------+-----+
|   null|18334|
+-------+-----+



In [37]:
import numpy as np

env = lmdb.open('data/MSD/msdid_to_log_mel_spectrogram.lmdb', subdir=False,
                map_size=1e12 )

with env.begin() as txn:
    for (key, value), i in zip(txn.cursor(), range(5)):
        print(key)
        print(np.reshape(np.frombuffer(value), (-1, 64)).shape)


b'TRAAAAV128F421A32'
(5993, 64)
b'TRAAACV128F423E09'
(2998, 64)
b'TRAAADT12903CCC33'
(5998, 64)
b'TRAAAED128E0783FA'
(5993, 64)
b'TRAAAEF128F427342'
(2998, 64)


In [44]:
import pandas as pd

mumu_dir = '../mumu/MuMu_dataset/'
df = pd.read_csv(mumu_dir + '/MuMu_dataset_multi-label.csv')

### REMEMBER TO SUBSTRING MSD_ID

df.head()

Unnamed: 0,amazon_id,album_mbid,MSD_track_id,recording_mbid,artist_mbid,genres
0,B00005YQOV,77944b8c-f753-4c7c-84ba-a48fbf518667,TRJIKJU128F930BF28,68c38213-65ba-4c1e-ac20-76883b512993,0a6f37da-2a2a-4308-896a-7c34b968b0b3,"Vocal Jazz,Jazz,Traditional Vocal Pop,Pop,Mode..."
1,B00005YQOV,77944b8c-f753-4c7c-84ba-a48fbf518667,TRLCVKT128F930BF18,f5c25488-4fbc-4ade-9cd4-431ae3fe3737,0a6f37da-2a2a-4308-896a-7c34b968b0b3,"Vocal Jazz,Jazz,Traditional Vocal Pop,Pop,Mode..."
2,B00005YQOV,77944b8c-f753-4c7c-84ba-a48fbf518667,TRBQSIG128F930BEFC,e76bbfcc-f94a-425d-bf13-d4e8d32df173,0a6f37da-2a2a-4308-896a-7c34b968b0b3,"Vocal Jazz,Jazz,Traditional Vocal Pop,Pop,Mode..."
3,B00005YQOV,77944b8c-f753-4c7c-84ba-a48fbf518667,TRGQLER128F930BF59,9def0715-4be3-4755-b546-f0335e03447b,0a6f37da-2a2a-4308-896a-7c34b968b0b3,"Vocal Jazz,Jazz,Traditional Vocal Pop,Pop,Mode..."
4,B00005YQOV,77944b8c-f753-4c7c-84ba-a48fbf518667,TRQPGRM128F930BEEA,9fe52229-a318-499e-af96-0903294366b1,0a6f37da-2a2a-4308-896a-7c34b968b0b3,"Vocal Jazz,Jazz,Traditional Vocal Pop,Pop,Mode..."


In [50]:
import csv
from source.utils import Timer
import random


db = []

with open(mumu_dir + '/MuMu_dataset_multi-label.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    for row in csv_reader:
        if row[0] == "amazon_id":
            continue
            
        db.append(row[2])


t1, t2 = 0, 0        
for _ in range(1000):
    
    index = random.randint(0, 140000)

    with Timer() as t:
        db[index]

    t1 += t()
    
    with Timer() as t:
        df.iloc[index]["MSD_track_id"]
        
    t2 += t()
    
t1,t2

(0.0009832382202148438, 0.06783390045166016)

In [58]:
import lmdb

song_env = lmdb.open("data/MSD/MSD_ID_to_log_mel_spectrogram.lmdb", readonly=True, lock=False, 
                                    max_spare_txns=2, subdir=False, readahead=False, meminit=False)

msd_id = db[55][:17]
with song_env.begin() as txn:
    data = txn.get(msd_id.encode())
    
msd_id

'TRDTYCI128F93015F'