## **text2music (Mubert & MiniLM-L6-v2) 🎶🎻🎧🥁🎹**

<br>

## The **text2music** notebook lets you generate the following from a text prompt:
* ## **fixed-length audio clips (in mp3 format)**
* ## **royalty-free audio streams**

<br>

## This is made possible with:
* ## [Mubert](https://mubert.com) an awesome AI music service
* ## [MiniLM-L6-v2](https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2) sentence transformers model

<br>

## This notebook was originally forked from: https://github.com/MubertAI/Mubert-Text-to-Music. Updates from the original so far..
* ## Playing audio streams from a text prompt was added

<br>

## TODO LIST (made by [@3chain](http://twitter.com/web3chain))
* ### Add more algorithms for picking streams. 
** #### At present it bundles all the Category, Group and Channel names into a list and uses the language model to pick the top X number of streams you want to fit your prompt. I also want to pick by: first picking a Category this way, then picking a Group from only that Category, then pick a Channel from the Group. This may give different stream result 🤓
* ### Add user friendly way to browse all Mubert's Categories/Groups/Channels and pick one.
** #### At present you can print a large table of everything and scroll through it, select the playlist ID then go fill it in manually, but this can be smoother 😊

# 1. SETUP

In [None]:
#@title ## 1.0 Setup logging 
#@markdown 'WARNING' is the default logging level.
import logging
logging_level = "WARNING" #@param ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
logger = logging.getLogger()          # Gets the root logger
logger.setLevel(level=logging_level)  # Set logging level via logger (can be set directly in logging, but then getting logging level name is messier)
print(f'Logging level is: {logging.getLevelName(logger.level)} \n')





In [None]:
#@title ## 1.1 Setup Environment
import subprocess, time
print("Setting up environment...")
start_time = time.time()
all_process = [
    ['pip', 'install', 'torch==1.12.1+cu113', 'torchvision==0.13.1+cu113', '--extra-index-url', 'https://download.pytorch.org/whl/cu113'],
    ['pip', 'install', '-U', 'sentence-transformers'],
    ['pip', 'install', 'httpx'],
    ['pip', 'install', 'simple_term_menu']
]
for process in all_process:
    running = subprocess.run(process,stdout=subprocess.PIPE).stdout.decode('utf-8')

end_time = time.time()
print(f"Environment set up in {end_time-start_time:.0f} seconds")

Setting up environment...
Environment set up in 34 seconds


In [None]:
#@title ## 1.2 Setup custom functions

'''
text - text to print
font_family - Any Google font family, e.g. Montserrat (see https://fonts.google.com/)
paragraph_type - HTML header style, e.g. h1, h3 (default: h2)
font_color - string
input - should input() be invoked after displaying the HTML and any user input returned?
'''
def print_html(text,font_family='Montserrat',paragraph_type='h1', font_color='white'):
  from IPython.display import HTML
  html_text = f"<html><head><style>@importurl(https://fonts.googleapis.com/css?family={font_family});{paragraph_type}" + '{' + f"font-family:'{font_family}',serif;color:{font_color};" + '}' + f"</style></head><body><{paragraph_type}>{text}</{paragraph_type}></body></html>"
  #print(f'text = {text} \nfont_family={font_family}, paragraph_type={paragraph_type}, font_color={font_color} \nhtml_text string is: \n{html_text}')
  html = HTML(data=html_text)
  display(html)

'''
input_html() function is just a wrapper for print_html() but adds an input() statement..
'''
def input_html(text,font_family='Montserrat',paragraph_type='h1', font_color='white'):
  print_html( **{key: value for key, value in locals().items() if key != 'self'} )  # this great trick that is a passsthrough for all arguments can be found here: https://stackoverflow.com/a/53997999/1881896
  return input('\n')

# TODO: TRYING TO DISPLAY JSON AS A DIAGRAM - CURRENTLY NOT WORKING
def display_json_as_diagram(json_data):
  edges = []
  def get_edges(treedict, parent=None):
      name = next(iter(treedict.keys()))
      if parent is not None:
          edges.append((parent, name))
      for item in treedict[name]["children"]:
          if isinstance(item, dict):
              get_edges(item, parent=name)
          else:
              edges.append((name, item))
  get_edges(json_data)
  # Dump edge list in Graphviz DOT format
  print('strict digraph tree {')
  for row in edges:
      print('    {0} -> {1};'.format(*row))
  print('}')


In [None]:
#@title ## 1.3 Setup Mubert API

import numpy as np
from sentence_transformers import SentenceTransformer
from IPython.display import Audio, display
import httpx
import json

# MiniLM is a family of small pre-trained language models created by Microsoft. See https://github.com/microsoft/unilm/tree/master/minilm 
# The model used in this Notebook is https://huggingface.co/sentence-transformers/all-MiniLM-L6-v2 
minilm = SentenceTransformer('all-MiniLM-L6-v2') 

# Endpoints for API calls are specified in the Mubert documentation: https://mubert2.docs.apiary.io/
# GLOBAL VARIABLES (constants)
RECORD_TRACK_ENDPOINT = 'https://api-b2b.mubert.com/v2/RecordTrackTTM'  # make sure there's no '/' at the end..
RECORD_TRACK_METHOD = 'RecordTrackTTM'
GET_CHANNELS_ENDPOINT = 'https://api-b2b.mubert.com/v2/GetPlayMusic'
GET_CHANNELS_METHOD = 'GetPlayMusic'

# The following tags string can also be retrieved via the Mubert endpoint https://api-b2b.mubert.com/v2/GetPlayMusic
MUBERT_CUSTOM_TAGS_STRING = 'tribal,action,kids,neo-classic,run 130,pumped,jazz / funk,ethnic,dubtechno,reggae,acid jazz,liquidfunk,funk,witch house,tech house,underground,artists,mystical,disco,sensorium,r&b,agender,psychedelic trance / psytrance,peaceful,run 140,piano,run 160,setting,meditation,christmas,ambient,horror,cinematic,electro house,idm,bass,minimal,underscore,drums,glitchy,beautiful,technology,tribal house,country pop,jazz & funk,documentary,space,classical,valentines,chillstep,experimental,trap,new jack swing,drama,post-rock,tense,corporate,neutral,happy,analog,funky,spiritual,sberzvuk special,chill hop,dramatic,catchy,holidays,fitness 90,optimistic,orchestra,acid techno,energizing,romantic,minimal house,breaks,hyper pop,warm up,dreamy,dark,urban,microfunk,dub,nu disco,vogue,keys,hardcore,aggressive,indie,electro funk,beauty,relaxing,trance,pop,hiphop,soft,acoustic,chillrave / ethno-house,deep techno,angry,dance,fun,dubstep,tropical,latin pop,heroic,world music,inspirational,uplifting,atmosphere,art,epic,advertising,chillout,scary,spooky,slow ballad,saxophone,summer,erotic,jazzy,energy 100,kara mar,xmas,atmospheric,indie pop,hip-hop,yoga,reggaeton,lounge,travel,running,folk,chillrave & ethno-house,detective,darkambient,chill,fantasy,minimal techno,special,night,tropical house,downtempo,lullaby,meditative,upbeat,glitch hop,fitness,neurofunk,sexual,indie rock,future pop,jazz,cyberpunk,melancholic,happy hardcore,family / kids,synths,electric guitar,comedy,psychedelic trance & psytrance,edm,psychedelic rock,calm,zen,bells,podcast,melodic house,ethnic percussion,nature,heavy,bassline,indie dance,techno,drumnbass,synth pop,vaporwave,sad,8-bit,chillgressive,deep,orchestral,futuristic,hardtechno,nostalgic,big room,sci-fi,tutorial,joyful,pads,minimal 170,drill,ethnic 108,amusing,sleepy ambient,psychill,italo disco,lofi,house,acoustic guitar,bassline house,rock,k-pop,synthwave,deep house,electronica,gabber,nightlife,sport & fitness,road trip,celebration,electro,disco house,electronic'

def play_audio_from_url(url, maximum_iterations=20, autoplay=True):
  # I guess this loop keeps going until all the audio has been downloaded and 
  # displayed (i.e. played). Notice how it iterates up to a maximum times of 'maxit', which defaults to 20.
  for i in range(maximum_iterations):
    r = httpx.get(url)                           # retrieve the mp3 ***NOTE: this isn't necessary to get the actual data, it's just to check we have a 200 code before we create an 'Audio' object
    if r.status_code == 200:
        display(Audio(url, autoplay=autoplay))   # play the mp3
        break
    time.sleep(1)
    print('.', end='')

# this function uses the RecordTrackTTM endpoint/method, which weirdly is not specified in the API
# linked at the top of this Notebook. There's an endpoint/method just called "RecordTrack" specified.
# 'RecordTracTTM' does work fine however!
def get_track_by_tags(tags, pat, duration, maxit=20, autoplay=False, loop=False):
  global RECORD_TRACK_ENDPOINT
  global RECORD_TRACK_METHOD
  if loop:
    mode = "loop"
  else:
    mode = "track"
  # NOTE: In the original version of this Notebook, this is the only API call used!
  #       There is another HTTP-GET later on, but that's just to retrieve the mp3 with 
  #       the unique URL returned by the RecordTrack(TTM) method 
  r = httpx.post(f'{RECORD_TRACK_ENDPOINT}',
      json={
          "method": RECORD_TRACK_METHOD,
          "params": {
              "pat": pat, 
              "duration": duration,
              "tags": tags,
              "mode": mode
          }
      })

  rdata = json.loads(r.text)
  #breakpoint() #DEBUG
  #display_json_as_diagram(rdata) #TODO for displaying channels as a tree diagram - currently not working 
  assert rdata['status'] == 1, rdata['error']['text']
  trackurl = rdata['data']['tasks'][0]['download_link']  # get the download link URL
  print('Generating track ', end='')
  print(trackurl) # DEBUG
  play_audio_from_url(trackurl, autoplay = autoplay, maximum_iterations = maxit)                             # play audio from url
  

''' FUNCTION: find_similar
    This function uses math to compare your prompt to the complete list of Mubert tags 
    to find the tags with the best match to your prompt.
    (You CANNOT decide how many tags you want HERE. 
     But you can do that in the 'get_tags_for_prompts' function below by changing the 
     value of the argument passed to the 'top_n' parameter.                 '''  
def find_similar(em, embeddings, method='cosine'):
  scores = []
  for ref in embeddings:
      if method == 'cosine': 
          scores.append(1 - np.dot(ref, em)/(np.linalg.norm(ref)*np.linalg.norm(em)))
      if method == 'norm': 
          scores.append(np.linalg.norm(ref - em))
  return np.array(scores), np.argsort(scores)

def get_tags_for_prompts(prompts, top_n=3, debug=True, tags='MUBERT_CUSTOM_TAGS_STRING', channels_list=[]):
  print(f'length of channels_list provided: {len(channels_list)}') #DEBUG
  global MUBERT_CUSTOM_TAGS_STRING 
  tags_list = []
  if tags == 'MUBERT_CUSTOM_TAGS_STRING':
    tags_list = MUBERT_CUSTOM_TAGS_STRING.split(',')
  elif tags == 'MUBERT_CHANNELS_LIST':
    tags_list = channels_list
  else:
    print(f'Not sure what tags list to use.. Defaulting to MUBERT_CUSTOM_TAGS_STRING')
    tags_list = MUBERT_CUSTOM_TAGS_STRING.split(',')
  mubert_tags = np.array(tags_list)                     # Put tags in Numpy array
  mubert_tags_embeddings = minilm.encode(mubert_tags)   # Encode Mubert tags with the MiniLM language model 
  prompts_embeddings = minilm.encode(prompts)           # Encode prompts  with the MiniLM language model 
  top_tags_list = []
  for i, pe in enumerate(prompts_embeddings):
      scores, idxs = find_similar(pe, mubert_tags_embeddings) # compares Mubert tags to the prompts you provided and scores them based on similarity (see function above)
      top_tags = mubert_tags[idxs[:top_n]]                    # selects the top X tags depending on how many specified as an argument for parameter 'top_n' in this function
      top_prob = 1 - scores[idxs[:top_n]]                     # gets the match score (probability) associated with each top X tag (just for displaying - see below)
      if debug:
          print(f"Prompt: {prompts[i]}\nTags: {', '.join(top_tags)}\nScores: {top_prob}\n\n\n")   # tells the user what the top X tags are and their scores (match probability) 
      top_tags_list.append((prompts[i], list(top_tags)))
  return top_tags_list

Downloading:   0%|          | 0.00/1.18k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/612 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/116 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/39.3k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/112 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/350 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/13.2k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/349 [00:00<?, ?B/s]

# 2. SETUP - USER INPUT NEEDED

In [None]:
#@title ## 2.1 Enter **either** your Mubert registered email **or** PAT access token
##@markdown This section receives a PAT access token then gets the channels   

from IPython.display import clear_output 

MUBERT_REGISTERED_EMAIL = "" #@param {type:"string"}
user_pat_token = '' #@param{type:'string'}
verify_pat_token_is_working = True #@param{type:'boolean'}

#@markdown <br>
#@markdown
#@markdown ## 👆 **Notes about these parameters** 👆
#@markdown * <code><font size="5">mubert_registered_email</font></code><font size=4> - Email used to register on Mubert 
#@markdown
#@markdown * <code><font size="5">user_pat_token</font></code><font size="4"> - If you already have a pat token you can just enter it here. <br><font size=3>*NOTE: this is not secure, but anyone who knows the email address can also retrieve this token.*
#@markdown
#@markdown * <code><font size="5">verify_pat_token_is_working</font></code><font size="4"> - If you already have a pat token you can just enter it here. <br><font size=3>*NOTE: this is not secure

def api_get_service_access(email):
  response = httpx.post('https://api-b2b.mubert.com/v2/GetServiceAccess', 
      json={
          "method":"GetServiceAccess",
          "params": {
              "email": email,
              "license":"ttmmubertlicense#f0acYBenRcfeFpNT4wpYGaTQIyDI4mJGv5MfIhBFz97NXDwDNFHmMRsBSzmGsJwbTpP1A6i07AXcIeAHo5",
              "token":"4951f6428e83172a4f39de05d5b3ab10d58560b8",
              "mode": "loop"
              }})
  response_as_json = json.loads(response.text)
  if response_as_json['status'] == 1:
    pat = response_as_json['data']['pat']
    return pat
  else:
    print_html('Failed to get PAT token, most likely the email is incorrect/ not registered with Mubert.', paragraph_type='h3')
    return ""

def get_user_pat_token():
  import httpx
  import json
  global user_pat_token
  global MUBERT_REGISTERED_EMAIL
  pat_token_provided = user_pat_token
  pat_token_verified_ok = False
  
  if pat_token_provided != "":
    if verify_pat_token_is_working:
      logger.info('Found a PAT token, verifying it works.. \n')
      response = httpx.post('https://api-b2b.mubert.com/v2/GetPlayMusic',
                    json={
                        "method":"GetPlayMusic",
                        "params":{
                            "pat":pat_token_provided
                            }
                          })
      response_as_json = json.loads(response.text)
      from pprint import pprint
      #pprint(response_as_json)
      if response.status_code == 200 and response_as_json['status'] == 1:
        pat_token_verified_ok = True
        logger.info('OK, PAT token is valid.. \n')
      else:
        print_html('The PAT token provided doesn\'t seem to work. Let\'s get you a new one!..', paragraph_type='h3')
        pat_token_provided = ""

  if pat_token_provided != "" and (pat_token_verified_ok or not verify_pat_token_is_working):
    print_html(f'Here\'s the Mubert PAT token you entered 👇 \n')
    print_html(f'{pat_token_provided}', paragraph_type='h3')
    return pat_token_provided

  user_email = MUBERT_REGISTERED_EMAIL
  if user_email == "":
    user_input = input_html(f'Please enter the email address that has been registered with Mubert (or <a href="https://mubert.com/render/sign-up">you can sign up for Mubert here</a>): \n', paragraph_type='h3')
    user_email = user_input

  # Get a (new) PAT token
  pat_retrieved_from_mubert = api_get_service_access(user_email)

  if pat_retrieved_from_mubert == "":
    print_html(f'Attempted to get you a PAT token, but failed! Please check the email provided is correct and that it\'s the one used to register your Mubert account! 👇 \n{user_email}', paragraph_type='h3')
    print_html(f'Unfortunately this means that we can\'t make music files. It <b>is</b> possible to get STREAMS however, as this doesn\'t require a personal PAT token!', paragraph_type='h3')
    return ""

  else:
    clear_output()
    print_html(f'Got your personal Mubert PAT token! 👇 \n')
    print_html(f'{pat_retrieved_from_mubert}', paragraph_type='h3')
    print_html(f'\n👆 You can save this token for next time 👆')
    user_pat_token = pat_retrieved_from_mubert
    return pat_retrieved_from_mubert

# Now run it!
user_pat_token = get_user_pat_token()

In [None]:
#@title ## 2.2 Get a PAT token for streaming from the Mubert streams page
user_streaming_token = "bXViZXJ0Zm9yc3RyZWFtZXJzLjE3MzY0NTQwLjAzZTgxNTg5NzJmMWMzM2IxM2Y2ZDVlOWQ2ZWI3MTdkYTNkOTM3NTcuMS4z.10c5849e0e4f405cb163fdbefe43db9d61da5b2b6ecbae63168b34b9f836cd23" #@param{type:'string'}
get_new_streaming_pat_token = True #@param{type:'boolean'}
#@markdown ----
#@markdown ### 👆 **Notes about parameters** 👆

#@markdown * ### `user_streaming_token` - You CANNOT use the personal Mubert PAT token for streaming. No prblem, just check the `get_new_streaming_apt_token` checkbox and we'll help you to get a streaming token! Then once you've got it, you can enter it here for next time!
#@markdown * ### `get_new_streaming_pat_token` - Check to get a (new) streaming token

def get_pat_from_mubert_streamers_page():
  from IPython.display import IFrame
  from IPython.display import clear_output 
  from urllib.parse import urlparse
  from urllib.parse import parse_qs
  
  display(IFrame('https://streamers.mubert.com/', width=600, height=440))
  copy_mubert_link_instructions = '👆 Wait until the Mubert web page has loaded in the frame above.. 👆 then.. <br><ol><li>press "Copy URL" next to the first playlist you see<br><li>copy the link with the copy button <br> <li>paste the link into the field below and hit enter'
  text_with_link_to_streams_page = "(btw if you want to check out the streams page here's the link: <a href='https://streamers.mubert.com/'>https://streamers.mubert.com</a>). But this Notebook can access many more streams!.."
  print_html(copy_mubert_link_instructions)
  print_html(text_with_link_to_streams_page, paragraph_type='h2')
  print_html('👇 Paste link you copied here and hit enter 👇')
  usr_input = input('\n')
  # clear outputs first..
  clear_output()
  #mubert_stream_url = "https://stream.mubert.com/b2b/v2?playlist=6.5&intensity=medium&pat=bXViZXJ0Zm9yc3RyZWFtZXJzLjE3MzY0NTQwLjAzZTgxNTg5NzJmMWMzM2IxM2Y2ZDVlOWQ2ZWI3MTdkYTNkOTM3NTcuMS4z.10c5849e0e4f405cb163fdbefe43db9d61da5b2b6ecbae63168b34b9f836cd23" #@param{type:'string'}
  mubert_stream_url = usr_input
  parsed_url = urlparse(mubert_stream_url)
  pat = parse_qs(parsed_url.query)['pat'][0]
  print_html(f'PAT token extracted from the link')
  print_html(f'{pat}', paragraph_type='h3')
  #print(f'PAT token extracted from the link is: \n{pat}')
  return pat

# Now run it!
if user_streaming_token == "" or get_new_streaming_pat_token:
  user_streaming_token = get_pat_from_mubert_streamers_page()
else:
  print_html(f'The streaming PAT token provided will be used')
  print_html(f'{user_streaming_token}', paragraph_type='h3')



In [None]:
#@title ## 2.3 Load Mubert channels and *optionally* display them in a table

how_do_you_want_your_channels = "In a large table" #@param ['In a large table', 'Let me search', 'Let me browse channels', 'Give me a menu', 'Load channels only (no display)'] {type:'string'}

#@markdown ----
#@markdown ## **LFG! - If `how_do_you_want_your_channels` is set to `In a large table` pick a playlist ID from the table displayed below and enter it in section 4.2 to play that stream. (here some more infos..)** 

#@markdown * #### In the table, the ID in square brackets next to each item is its Mubert playlist ID.
#@markdown * #### For example the 'Pumped' Channel can be played with ID: **0.1.0** <br>
#@markdown * #### **NOT ONLY Channels can be played. Groups and Categories can also be played. <br>When you use a Category or Group ID, then the AI uses ALL the underlying channels to compose the stream! So for example:**
#@markdown * #### the Category 'Moods' can be played with ID: **0**
#@markdown * #### the Group 'Energizing' can be played with with ID: **0.1**
#@markdown <br>

#@markdown #### **After choosing a playlist you can remove the table by clicking the little symbol next to the 'Category' column header)**

list_of_mubert_categories = []
list_of_mubert_groups = []
list_of_mubert_channels = []
dict_of_cats_grps_chns = dict()

def ui_display_mubert_channels():
  exit_now = False
  while exit_now == False:
    choice = input(
      f"MENU: how would you like your channels displayed \n"
      "\t1.  In a large table \n"
      "\t2.  Let me search \n"
      "\t3.  Let me browse channels \n"
      "\t9.  exit \n")
    if choice == '1':
      load_all_mubert_channels()
    if choice in ['q','exit','bye','-1','9']: 
      exit_now = True
  

def load_all_mubert_channels(pat, display=False):
  # These are lists of TUPLES, where each Tuple is made up of (name,playlist)
  global list_of_mubert_categories
  global list_of_mubert_groups
  global list_of_mubert_channels
  global dict_of_cats_grps_chns
  r = httpx.post(
      f'https://api-b2b.mubert.com/v2/GetPlayMusic',
      json={
          "method": 'GetPlayMusic',
          "params": {
              "pat": pat}})
  #print(r.text)
  get_play_music_api_json_response = json.loads(r.text)
  #print(channels_dict)
  #print(json.dumps(get_play_music_api_json_response, indent=4)))  # DEBUG **CAREFUL** LARGE OUTPUT: PRETTY PRINT JSON REPONSE :)
  mubert_channels = get_play_music_api_json_response['data']['categories']

  # Create lists of categories, groups and channels and print if required
  if display:
    table_format = '{:<25} {:<25} {:<25}'
    print(table_format.format(f'Category [playlist ID]', 'Group [playlist ID]', 'Channel [playlist ID]'))
  for category in mubert_channels:
    list_of_mubert_categories.append(category["name"])
    dict_of_cats_grps_chns[category['name']] = category["playlist"]
    for group in category["groups"]:
      list_of_mubert_groups.append(group["name"])
      dict_of_cats_grps_chns[group['name']] = group["playlist"]
      for channel in group["channels"]:
        list_of_mubert_channels.append(channel["name"])
        dict_of_cats_grps_chns[channel['name']] = channel["playlist"]
        if display:
          print(table_format.format(
              f'[{category["playlist"]}] {category["name"]}', 
              f'[{group["playlist"]}] {group["name"]}', 
              f'[{channel["playlist"]}] {channel["name"]}'
              )
          )
  return mubert_channels


# Now load the channels!
if not user_pat_token:
  print_html('We don\'t have a streaming token. You need to run section 2.2 first (or maybe you need to de a "Run All") to load a streaming token \n')
elif how_do_you_want_your_channels == "In a large table":
  mubert_channels = load_all_mubert_channels(user_pat_token, display = True)
elif how_do_you_want_your_channels == "Give me a menu":
  ui_display_mubert_channels()
elif how_do_you_want_your_channels == "Load channels only (no display)":
  mubert_channels = load_all_mubert_channels(user_pat_token, display = False)
else:
  print_html(f'Sorry, <code>&nbsp{how_do_you_want_your_channels}&nbsp</code> hasn\'t been implemented yet for the <code>&nbsphow_do_you_want_your_channels&nbsp</code> parameter!', paragraph_type='h3')
  mubert_channels = load_all_mubert_channels(user_pat_token, display = False)


# 4. PLAY STUFF!

In [None]:
#@title ## 4.1 🎵 Generate Music! (mp3 format) 🎵

skip_section = True #@param{type:'boolean'}

def generate_track_by_prompt(prompt, duration, loop=False):
  top_tags = get_tags_for_prompts([prompt,])[0][1]
  #print(f'top_tags: {top_tags}') #DEBUG
  try:
    get_track_by_tags(top_tags, pat, duration, autoplay=True, loop=loop)
  except Exception as e:
    print(str(e))
  print('\n')

track_generation_prompt = ""
if not skip_section:
  prompt = 'the vikings are coming' #@param {type:"string"}
  track_generation_prompt = prompt
  duration = 30 #@param {type:"number"}
  loop = True #@param {type:"boolean"}
  generate_track_by_prompt(track_generation_prompt, duration, loop)

#@markdown ---
#@markdown ### 👆 **Notes about these parameters** 👆
#@markdown * `prompt` can have maximum 256 words.
#@markdown \**This is a limitation of the all-MiniLM-L6-v2 'sentence transformers' model.
#@markdown More words and the prompt gets 'truncated' - i.e. words after nr 250 are ignored.*
  


In [None]:
#@title ## 4.2 🎵 Play Stream (can't be downloaded) 🎵 
#@markdown #What's so cool about this?!
#@markdown * ## Mainly - from [streamers.mubert.com](https://streamers.mubert.com) only a few streams can be played, but with this, any Mubert Categroy, Group, or Channel that you can find in the Channels list (when you run code block 3.1 above) can be played!

#@markdown # **NOTES:**
#@markdown * ## **Stream is different every time this block runs**
#@markdown * ## **The stream cannot be downloaded**
#@markdown * ## **You could record it with something like [OBS Studio](https://obsproject.com/download)**
#@markdown ----

from pprint import pprint

skip_section = False #@param{type:'boolean'}
recommend_stream_from_prompt = True #@param{type:'boolean'}
use_track_generation_prompt = False #@param{type:'boolean'}
stream_prompt = "dark melodic techno washing over a sea of storms" #@param{type:'string'}
number_of_streams_to_get = 3 #@param{type:'integer'}
playlist_id = "" #@param{type:'string'}
intensity = "high" #@param['high','medium','low'] {type:'string'}
#@markdown <br>
#@markdown
#@markdown ## 👆 **Notes about parameters** 👆
#@markdown * <code><font size="5">recommended_stream_from_prompt</font></code><font size=4> - When checked, `playlist_id` will be ignored
#@markdown * <code><font size="5">use_track_generation_prompt</font></code><font size=4> - When checked, `stream_prompt` will be ignored
#@markdown * <code><font size="5">playlist_id</font></code><font size=4> - The playlist_id will only be used when a prompt is NOT being used to recommend a stream
  
def play_mubert_stream(playlist_id, intensity, streaming_pat_token):
  from IPython.display import IFrame
  from IPython.display import Video
  stream_url = f'https://stream.mubert.com/b2b/v2?playlist={playlist_id}&intensity={intensity}&pat={streaming_pat_token}'
  print_html(f'<a href={stream_url}>Here is the link to the stream!</a> (or just play below)',paragraph_type='h3')
  #display(IFrame(stream_url, width=300, height=150)) # works, but then discovered can play the audio via 'Video' object
  display(Video(data=f'{stream_url}',height=50,width=300))

def get_list_of_playlist_ids_for_prompt(prompt, number_of_playlists=1):
  global list_of_mubert_categories
  global list_of_mubert_groups
  global list_of_mubert_channels
  channels_list = list_of_mubert_categories + list_of_mubert_groups + list_of_mubert_channels
  #logger.debug(pprint(channels_list)) #DEBUG
  top_channels = get_tags_for_prompts(
      [prompt,],
      top_n=5,
      tags='MUBERT_CHANNELS_LIST',
      channels_list = list(set(channels_list))
      )
  top_channels_list = top_channels[0][1]
  top_playlist = dict_of_cats_grps_chns[top_channels_list[0]]
  logger.info(f'top_channels_list: {top_channels_list} \ntop_playlist: {top_playlist}') #DEBUG
  top_playlists_list = []
  for i in range(number_of_playlists):
    top_playlists_list.append(dict_of_cats_grps_chns[top_channels_list[i]])
  logger.info(f'top_playlists_list: {top_playlists_list} \n') #INFO
  return top_playlists_list

# Get the stream(s)!
playlist_id_list = []
if not skip_section:
  if recommend_stream_from_prompt:
    if use_track_generation_prompt:
      if track_generation_prompt and len(track_generation_prompt) > 0:
        stream_prompt = track_generation_prompt
      elif len(stream_prompt) > 0:
        print(f'Sorry can\'t find a prompt. Maybe the track generation section above is disabled? If so the prompt won\'t be read. \nstream_prompt will be used instead.')
      else:
        usr_input_prompt = ""
        while usr_input_prompt == "":
          usr_input_prompt = input(f'Sorry can\'t find a prompt. Maybe the track generation section above is disabled? If so the prompt won\'t be read. \nSince there is also no stream_prompt, please enter a prompt now in the field below then hit enter:\n')
          if usr_input_prompt == "":
            print('No prompt available, playlist_id will be used, or if unavailable, playlist \'0\' will be used..\n')
          else:
            stream_prompt = usr_input_prompt
  
    # If we don't have a prompt yet, check if there's a playlist ID 
    if stream_prompt == "":
      if playlist_id == "":
        print(f'No prompts or playlists provided. Stream with playlist \'0\' will be played.. \n')
        playlist_id_list = ["0"]
      else:
        playlist_id_list = [playlist_id]
    else:
      playlist_id_list = get_list_of_playlist_ids_for_prompt(stream_prompt, number_of_playlists=number_of_streams_to_get)

  for playlist_id in playlist_id_list:
    print_html(f'Playlist ID used for streaming: {playlist_id} \n', paragraph_type='h3')
    play_mubert_stream(playlist_id, intensity, user_streaming_token)


length of channels_list provided: 141
Prompt: dark melodic techno washing over a sea of storms
Tags: Techno, Trance, Melodic House, Bollywood Slow Ballad, Genres
Scores: [0.51264268 0.49848834 0.47764349 0.45583463 0.45506334]





In [None]:
#@title ## 4.3 Batch Music (mp3) generation 🎶
#@markdown ## Use multiple prompts to get multiple downloads!

skip_on_run_all = True #@param{type:'boolean'}

if not skip_on_run_all:
  duration = 60 #@param{type:'integer'}

  prompts = [
      'kind beaver guards life tree, stan lee, epic',
      'astronaut riding a horse',
      'winnie the pooh cooking methamphetamine',
      'vladimir lenin smoking weed with bob marley',
      'soviet retrofuturism',
      'two wasted friends high on weed are trying to navigate their way to their hostel in a big city, night, trippy',
      'an elephant levitating on a gas balloon',
      'calm music',
      'a refrigerator floating in a pond'
  ]

  tags = get_tags_for_prompts(prompts)

  for i, tag in enumerate(tags):
    print(f'Prompt: {tag[0]}\nTags: {tag[1]}')
    try:
      get_track_by_tags(tag[1], pat, duration, autoplay=False)
    except Exception as e:
      print(str(e))
    print('\n')

else:
  print(f'batch section skipped because \'skip_on_run_all\' = {skip_on_run_all}')

batch section skipped because 'skip_on_run_all' = True


In [None]:
#@title BONUS: Run a linux terminal (usually only available with Colab Pro subscription)
#@markdown Not related to the rest of this Notebook, not related to Mubert or music generation, but just a cool little bonus, something I recently discovered. 
skip_section = True #@param{type:'boolean'}
just_run_terminal = False #@param{type:'boolean'}
if not skip_section:
  if not just_run_terminal:
    # install terminal facility
    !pip install colab-xterm
    %load_ext colabxterm
  else:
    %xterm # run terminal