# rec_track_checker
- **Objectives**:
    - Loop through ALL MLHD files.
    - Check recording_mbid column for presence of any track_gid or track_gid_redirect
    - Re-write recently read chunk in CSV+ZSTD with compression lvl 10

In [1]:
import os
import pandas as pd
import numpy as np
import lib.mb as mb
import lib.load as load
import json
from tqdm import tqdm
from time import monotonic

### Loading Files

In [2]:
# Fetching file locations
MLHD_ROOT = "/data/mlhd"
WRITE_ROOT = '/home/snaek/MLHD/unk_ids/test_output/'

mlhd_files = []
MLHD_FILES = mlhd_files

for root, dirs, files in os.walk(MLHD_ROOT):
    for file in files:
        if file.endswith(".gz"):
            mlhd_files.append(os.path.join(root, file))
    
total_files = len(mlhd_files)
files_processed = 0

In [3]:
%%time
# Fetching relevant tables

MB_track = mb.get_tracks()
print("MB_track loaded")
MB_track.head()



MB_track loaded
CPU times: user 9.98 s, sys: 3.81 s, total: 13.8 s
Wall time: 26.6 s


Unnamed: 0,gid
0,9b02977e-a03b-4a6b-a9a9-06e722bdcd7a
1,43da7544-6283-3159-84f9-537fe823a1a7
2,0b6b6283-a5a8-4560-9fa8-f68a430d86ea
3,fa124f9a-d8ea-36a3-bed3-c817fdbe13e2
4,e56c6d3c-09cf-33a0-81c5-ceade77c35dc


In [4]:
%%time
# Fetching relevant tables

MB_track_redir = mb.get_track_redirects_old()
print("MB_track_redirects_old loaded")
MB_track_redir.head()



MB_track_redirects_old loaded
CPU times: user 146 ms, sys: 27.3 ms, total: 173 ms
Wall time: 345 ms


Unnamed: 0,gid
0,d8abbf14-5945-3639-a0c9-3e9c70b1c0a4
1,446b27ef-92fd-3ea1-ad0a-fe1055196406
2,403b9e19-a135-3acc-ae69-58fb0d735036
3,b61dde50-de25-3802-b706-dd0d490879ae
4,13bee970-0129-320d-94c8-dfbc7748c692


In [5]:
%%time
# sets

MB_track_set = set(MB_track.gid)
MB_track_redir_set = set(MB_track_redir.gid)

CPU times: user 9.01 s, sys: 656 ms, total: 9.67 s
Wall time: 9.58 s


In [6]:
def load_path(file_path):
    """Function to load a file and return a dataframe"""

    df = pd.read_csv(
        file_path, sep='\t',
        header=None,
        names = ['timestamp', 'artist_MBID', 'release_MBID', 'recording_MBID'],
        dtype={'artist_MBID': str, 'release_MBID': str, 'recording_MBID': str}
        )
    
    # df.drop(df[df['recording_MBID'].isna()].index, inplace=True)

    return df

In [7]:
df_large = load.read_files('random_file_paths.txt')

In [8]:
%%time
df_test = load_path(mlhd_files[50])
df_test

CPU times: user 72.3 ms, sys: 3.93 ms, total: 76.2 ms
Wall time: 74.5 ms


Unnamed: 0,timestamp,artist_MBID,release_MBID,recording_MBID
0,1169940836,18870405-17e8-42a7-8a4c-7c79b432019c,df0f8865-3770-4b65-b3f8-007c4d624e8c,c010d736-58c4-4860-be74-b2a231ce4830
1,1169941232,3132dae9-9eff-4805-ae2e-fada1af87b76,755f05f7-b994-4d15-8aa7-46c2461fcae8,250762b8-9037-41e4-8371-15002da794f1
2,1169941456,0a3dda11-e89e-42f1-8356-b7a6e5f68424,b0dd105a-e38f-366a-91e2-9b9b776d8c13,772e9d19-dd5c-4594-9c65-5588fe40223a
3,1169941664,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,2611cee8-7ebb-4bb4-8771-5172e88fa87e
4,1169942224,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,0af519f0-8326-490c-a57d-96114ed7a0fc
...,...,...,...,...
66120,1367622214,cd2c42c1-85de-4c54-87c6-d76f051e422f,,
66121,1367622427,16f7e748-59b4-4705-959c-9276be0ebf6a,,
66122,1367622661,429f9fbf-0a0b-4a9e-a88c-94d48aa466ed,c8207f15-25b4-4b6e-b4b6-2912725a1fb1,2c33a77a-86df-4f24-b9de-a4b127edd375
66123,1367622789,b24b6ae8-333e-4351-b3b7-6be843195a16,d12e0765-9f78-4aad-8d34-9df5f76034dc,


In [9]:
# df_test_positive = pd.DataFrame({
#     'timestamp': [None, None, None, None, None],
#     'artist_MBID': [None, None, None, None, None],
#     'release_MBID': [None, None, None, None, None], 
#     'recording_MBID': [None, None, None, None, None]})

# Taking a slice from original dataframe
df_test_positive = df_test.dropna().iloc[:6, :]
print("Before:\n", df_test_positive.recording_MBID.values)

df_test_positive.iloc[:4, 3] = pd.concat([MB_track.gid.iloc[:2 ], MB_track_redir.gid.iloc[:2]])

# In this data:
# First 2 rows are from MB_track
# Next 2 rows are from MB_track_redirects_old
# Last 2 rows are from MB_track

df_test_positive

Before:
 ['c010d736-58c4-4860-be74-b2a231ce4830'
 '250762b8-9037-41e4-8371-15002da794f1'
 '772e9d19-dd5c-4594-9c65-5588fe40223a'
 '2611cee8-7ebb-4bb4-8771-5172e88fa87e'
 '0af519f0-8326-490c-a57d-96114ed7a0fc'
 '42b91da5-1508-4d64-a40a-83597177c7e2']


Unnamed: 0,timestamp,artist_MBID,release_MBID,recording_MBID
0,1169940836,18870405-17e8-42a7-8a4c-7c79b432019c,df0f8865-3770-4b65-b3f8-007c4d624e8c,9b02977e-a03b-4a6b-a9a9-06e722bdcd7a
1,1169941232,3132dae9-9eff-4805-ae2e-fada1af87b76,755f05f7-b994-4d15-8aa7-46c2461fcae8,43da7544-6283-3159-84f9-537fe823a1a7
2,1169941456,0a3dda11-e89e-42f1-8356-b7a6e5f68424,b0dd105a-e38f-366a-91e2-9b9b776d8c13,d8abbf14-5945-3639-a0c9-3e9c70b1c0a4
3,1169941664,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,446b27ef-92fd-3ea1-ad0a-fe1055196406
4,1169942224,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,0af519f0-8326-490c-a57d-96114ed7a0fc
5,1169942482,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,42b91da5-1508-4d64-a40a-83597177c7e2


## Testing Files

In [10]:
def query_in(series_to_query, series_to_query_in):
    # Queries all mbids in the recording table. Returns mbids that are present in recording table.
    mbids_in_series = series_to_query_in[series_to_query_in.isin(series_to_query)]
    
    # Makes a boolean map for all mbids in series_of_mbids
    bool_map = series_to_query.isin(mbids_in_series)
    
    return bool_map

In [12]:
def check_rec(df):
    """
    Function to check if a track is in the recording table
    INPUT: dataframe with recording_MBID column. 
    OUTPUT: Returns a series of "recording-MBIDs" that are in recording table.
    """    
    # inp_series = pd.Series(df['recording_MBID'].unique())
    # in_track = query_in(inp_series, MB_track.gid)
    # in_redir = query_in(inp_series, MB_track_redir.gid)
    
    # return inp_series[in_track | in_redir]
    ret = [mbid for mbid in df.recording_MBID.tolist() if (mbid in MB_track_set) or (mbid in MB_track_redir_set)]
    if ret == []:
        return None
    else: 
        return ret


In [13]:
%%time

test_output_positive = check_rec(df_test_positive)
print(test_output_positive, "\n")

test_output_negative = check_rec(df_test)
print(test_output_negative, "\n")

['9b02977e-a03b-4a6b-a9a9-06e722bdcd7a', '43da7544-6283-3159-84f9-537fe823a1a7', 'd8abbf14-5945-3639-a0c9-3e9c70b1c0a4', '446b27ef-92fd-3ea1-ad0a-fe1055196406'] 

None 

CPU times: user 15 ms, sys: 0 ns, total: 15 ms
Wall time: 14.3 ms


In [67]:
def log_output(Series_to_log, path, time_taken, timestamp, master_log_dict):
    """
    Function to log output series and path
    """
    # If keys not in dict, generate empty list with dict
    key_set = {'path', 'logs', 'time_taken', 'timestamp'}
    
    for key in key_set:
        if key not in master_log_dict.keys():
            master_log_dict[key]=[] 
    
    master_log_dict['path'].append(path)
    master_log_dict['logs'].append(Series_to_log)
    master_log_dict['time_taken'].append(time_taken)
    master_log_dict['timestamp'].append(timestamp)
        
    return master_log_dict

In [68]:
output_log = {}

log_output(test_output_positive, "test_path", 0.55, monotonic(), output_log)
log_output(test_output_positive[:2], "test_path", 0.55, monotonic(), output_log)
log_output(test_output_negative, "test_path", 0.55, monotonic(), output_log)

print(output_log)

{'logs': [['9b02977e-a03b-4a6b-a9a9-06e722bdcd7a', '43da7544-6283-3159-84f9-537fe823a1a7', 'd8abbf14-5945-3639-a0c9-3e9c70b1c0a4', '446b27ef-92fd-3ea1-ad0a-fe1055196406'], ['9b02977e-a03b-4a6b-a9a9-06e722bdcd7a', '43da7544-6283-3159-84f9-537fe823a1a7'], None], 'time_taken': [0.55, 0.55, 0.55], 'path': ['test_path', 'test_path', 'test_path'], 'timestamp': [7533943.827020003, 7533943.827115547, 7533943.827191365]}


In [36]:
def write_log(log_dict, log_path):
    """Function to update log"""
    # Make directory inside WRITE_ROOT if it doesn't exist
    os.makedirs(os.path.dirname(log_path), exist_ok=True)

    with open(log_path, 'w') as f:
        json.dump(log_dict, f)
    return log_path

## Re-Writing

In [17]:
%%time
df_test.to_csv(
    'unk_ids/test.csv.zst',
    index=False, 
    sep='\t',
    header=False, 
    compression={'method': 'zstd', 'level': 10},
    )

CPU times: user 310 ms, sys: 7.89 ms, total: 318 ms
Wall time: 347 ms


In [18]:
load_path('unk_ids/test.csv.zst')

Unnamed: 0,timestamp,artist_MBID,release_MBID,recording_MBID
0,1169940836,18870405-17e8-42a7-8a4c-7c79b432019c,df0f8865-3770-4b65-b3f8-007c4d624e8c,c010d736-58c4-4860-be74-b2a231ce4830
1,1169941232,3132dae9-9eff-4805-ae2e-fada1af87b76,755f05f7-b994-4d15-8aa7-46c2461fcae8,250762b8-9037-41e4-8371-15002da794f1
2,1169941456,0a3dda11-e89e-42f1-8356-b7a6e5f68424,b0dd105a-e38f-366a-91e2-9b9b776d8c13,772e9d19-dd5c-4594-9c65-5588fe40223a
3,1169941664,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,2611cee8-7ebb-4bb4-8771-5172e88fa87e
4,1169942224,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,0af519f0-8326-490c-a57d-96114ed7a0fc
...,...,...,...,...
66120,1367622214,cd2c42c1-85de-4c54-87c6-d76f051e422f,,
66121,1367622427,16f7e748-59b4-4705-959c-9276be0ebf6a,,
66122,1367622661,429f9fbf-0a0b-4a9e-a88c-94d48aa466ed,c8207f15-25b4-4b6e-b4b6-2912725a1fb1,2c33a77a-86df-4f24-b9de-a4b127edd375
66123,1367622789,b24b6ae8-333e-4351-b3b7-6be843195a16,d12e0765-9f78-4aad-8d34-9df5f76034dc,


In [19]:
def write_frame(df_input, original_path):
    """
    Function to write a dataframe to a csv file
    """
    
    # Replace MLHD_ROOT with path to new MLHD folder.
    write_path = original_path.replace(MLHD_ROOT+'/', WRITE_ROOT)
    write_path = write_path.replace('txt.gz', 'csv.zst')
    
    # print(write_path)
    # Make directory inside WRITE_ROOT if it doesn't exist
    os.makedirs(os.path.dirname(write_path), exist_ok=True)

    df_input.to_csv(
        write_path,
        index=False, 
        sep='\t',
        header=False, 
        compression={'method': 'zstd', 'level': 10},
        )

    return write_path

In [20]:
%%time

load_path(write_frame(df_test, MLHD_FILES[50]))

CPU times: user 347 ms, sys: 3.9 ms, total: 351 ms
Wall time: 349 ms


Unnamed: 0,timestamp,artist_MBID,release_MBID,recording_MBID
0,1169940836,18870405-17e8-42a7-8a4c-7c79b432019c,df0f8865-3770-4b65-b3f8-007c4d624e8c,c010d736-58c4-4860-be74-b2a231ce4830
1,1169941232,3132dae9-9eff-4805-ae2e-fada1af87b76,755f05f7-b994-4d15-8aa7-46c2461fcae8,250762b8-9037-41e4-8371-15002da794f1
2,1169941456,0a3dda11-e89e-42f1-8356-b7a6e5f68424,b0dd105a-e38f-366a-91e2-9b9b776d8c13,772e9d19-dd5c-4594-9c65-5588fe40223a
3,1169941664,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,2611cee8-7ebb-4bb4-8771-5172e88fa87e
4,1169942224,20883363-1ea4-4d72-ad72-c0e767038f3e,a6c76014-bf81-3b15-adf5-c4175ea42c0b,0af519f0-8326-490c-a57d-96114ed7a0fc
...,...,...,...,...
66120,1367622214,cd2c42c1-85de-4c54-87c6-d76f051e422f,,
66121,1367622427,16f7e748-59b4-4705-959c-9276be0ebf6a,,
66122,1367622661,429f9fbf-0a0b-4a9e-a88c-94d48aa466ed,c8207f15-25b4-4b6e-b4b6-2912725a1fb1,2c33a77a-86df-4f24-b9de-a4b127edd375
66123,1367622789,b24b6ae8-333e-4351-b3b7-6be843195a16,d12e0765-9f78-4aad-8d34-9df5f76034dc,


### Algorithm

```
- define output_log_dict
- define total_files
- define files_processed

for path in mlhd_files:
    
    1) Start time log
    2) Read file into dataframe df
    3) output = check_rec(df)
    4) Write original df to txt+zstd
    5) Stop time log
    6) log_output(output, path, start_time - stop_time, output_log_dict)
    7) files_processed += 1
```

### To-Do
- Pandas.Series.isin() scales really well. Try processing multiple files in a for loop instead of a single file, and benchmark the performance difference.
- Define Global variables for storage paths, etc. (config.json?)
- Make a progressbar

In [70]:
%%time
output_log = {
    'path': [], 
    'logs': [],
    'time_taken': [],
    }
LOG_WRITE_PATH = "/home/snaek/MLHD/rec_track_checker/MLHD/log.json"
LOG_EPOCH = 10

file_counter = 0

master_start = monotonic()
for path in tqdm(MLHD_FILES[:20], desc="Writing files", unit = 'Files'):
    start = monotonic()
    # Start Processing
    df_loop = load_path(path)

    output = check_rec(df_loop)
    
    _ = write_frame(df_loop, path)
    
    # End Processing
    end = monotonic()

    # Logging

    file_counter += 1
    
    log_output(output, path, round(end - start, 3), monotonic(), output_log)
    if file_counter%10 == 0:
        _ = write_log(output_log, LOG_WRITE_PATH)

master_end = monotonic()
print("Total Time Taken: ", round(master_end - master_start, 3))
# print(output_log)

Writing files: 100%|██████████| 20/20 [00:04<00:00,  4.98Files/s]

Total Time Taken:  4.023
CPU times: user 3.91 s, sys: 29.1 ms, total: 3.94 s
Wall time: 4.02 s





In [29]:
log_df = pd.DataFrame(output_log)
print("avg time taken per file: ", log_df.time_taken.mean())
print("File Not None: ", log_df.logs.value_counts())

avg time taken per file:  0.2104
File Not None:  Series([], Name: logs, dtype: int64)
