# OpenSearch: индексация музыкального датасета CEDS

Этот ноутбук:
- читает CSV файл с музыкальными треками (`tcc_ceds_music.csv`),
- создаёт индекс `music_ceds_semantic` с полями для поиска по артистам, трекам, жанрам и текстам,
- индексирует все треки в индекс с поддержкой текстового поиска.

**ВАЖНО:** Перед запуском заполните все настройки в ячейке конфигурации (ячейка 1):
- `OPENSEARCH_URL`, `OPENSEARCH_USER`, `OPENSEARCH_PASSWORD`
- `YANDEX_API_KEY`, `YANDEX_FOLDER_ID`

Индекс использует:
- Текстовые поля с ngram для поиска по частичным совпадениям
- BM25 для ранжирования результатов
- Метаданные треков (артист, название, жанр, год, характеристики)


In [None]:

# ============================================================================
# КОНФИГУРАЦИЯ
# ============================================================================

# OpenSearch настройки
OPENSEARCH_URL = "https://localhost:9200"  # Укажите ваш OpenSearch URL
OPENSEARCH_USER = ""  # Укажите ваше имя пользователя OpenSearch
OPENSEARCH_PASSWORD = ""  # Укажите ваш пароль OpenSearch

# Yandex API настройки
YANDEX_API_KEY = ""  # Укажите ваш Yandex API ключ
YANDEX_FOLDER_ID = ""  # Укажите ваш Yandex Folder ID

# Опциональные настройки
YANDEX_EMBED_MODEL = "text-search-doc"
YANDEX_EMBEDDINGS_URL = "https://llm.api.cloud.yandex.net/foundationModels/v1/textEmbedding"
SEMANTIC_SIM_THRESHOLD = 0.8
MAX_SENT_PER_CHUNK = 8

# Путь к CSV файлу
CSV_FILE = "/Users/admin/music_rag/tcc_ceds_music.csv"

# Имя индекса
INDEX_NAME = "music_ceds_semantic"

print("✓ Конфигурация загружена")
print(f"OpenSearch URL: {OPENSEARCH_URL}")
print(f"OpenSearch User: {OPENSEARCH_USER}")
print(f"OpenSearch Password: {'*' * len(OPENSEARCH_PASSWORD)}")
print(f"Yandex Folder ID: {YANDEX_FOLDER_ID}")
print(f"Index name: {INDEX_NAME}")


✓ Конфигурация загружена
OpenSearch URL: https://localhost:9200
OpenSearch User: admin
OpenSearch Password: ************
Yandex Folder ID: b1gql4st0j9joerfcttt
Index name: music_ceds_semantic


In [None]:
import os
import re
import json
from typing import List, Dict, Any
import pandas as pd

import numpy as np
from tqdm import tqdm
import requests
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests.auth import HTTPBasicAuth

# Проверяем, что все обязательные значения установлены
errors = []
if not OPENSEARCH_URL or OPENSEARCH_URL == "https://your-opensearch-host:9200":
    errors.append("OPENSEARCH_URL")
if not OPENSEARCH_USER:
    errors.append("OPENSEARCH_USER")
if not OPENSEARCH_PASSWORD:
    errors.append("OPENSEARCH_PASSWORD")
if not YANDEX_API_KEY:
    errors.append("YANDEX_API_KEY")
if not YANDEX_FOLDER_ID:
    errors.append("YANDEX_FOLDER_ID")

if errors:
    error_msg = ", ".join(errors)
    raise ValueError(
        f"Пожалуйста, укажите следующие настройки в ячейке конфигурации выше: {error_msg}\n"
        "Все значения должны быть заполнены перед запуском."
    )

print("✓ Все настройки проверены")
print(f"OpenSearch: {OPENSEARCH_URL}, index: {INDEX_NAME}")
print(f"CSV_FILE: {CSV_FILE}")
print(f"Yandex folder: {YANDEX_FOLDER_ID}")
print(f"Yandex embed model: {YANDEX_EMBED_MODEL}")


✓ Все настройки проверены
OpenSearch: https://localhost:9200, index: music_ceds_semantic
CSV_FILE: /Users/admin/music_rag/tcc_ceds_music.csv
Yandex folder: b1gql4st0j9joerfcttt
Yandex embed model: text-search-doc


In [3]:
def create_client(url: str, user: str, password: str) -> OpenSearch:
    auth = HTTPBasicAuth(user, password) if user and password else None
    client = OpenSearch(
        hosts=[url],
        http_compress=True,
        http_auth=auth,
        use_ssl=url.startswith("https://"),
        verify_certs=False,
        connection_class=RequestsHttpConnection,
        timeout=60,
        max_retries=3,
        retry_on_timeout=True,
    )
    return client

client = create_client(OPENSEARCH_URL, OPENSEARCH_USER, OPENSEARCH_PASSWORD)
print("OpenSearch client ready")


OpenSearch client ready




In [4]:
MODEL_URI = f"emb://{YANDEX_FOLDER_ID}/{YANDEX_EMBED_MODEL}/latest"
print(f"Yandex Model URI: {MODEL_URI}")

def yandex_embed_one(text: str) -> List[float]:
    body = {"modelUri": MODEL_URI, "text": text}
    headers = {
        "Authorization": f"Api-Key {YANDEX_API_KEY}",
        "x-folder-id": YANDEX_FOLDER_ID,
        "Content-Type": "application/json",
    }
    resp = requests.post(YANDEX_EMBEDDINGS_URL, headers=headers, json=body, timeout=60)
    resp.raise_for_status()
    data = resp.json()
    emb = data.get("embedding") or (data.get("result") or {}).get("embedding")
    if emb is None:
        raise RuntimeError(f"Bad embedding response: {data}")
    return emb


Yandex Model URI: emb://b1gql4st0j9joerfcttt/text-search-doc/latest


In [5]:
INDEX_BODY: Dict[str, Any] = {
    "settings": {
        "index": {
            "number_of_shards": 1,
            "number_of_replicas": 0,
            "max_ngram_diff": 8,  # Разрешаем разницу между max_gram (10) и min_gram (2)
            "similarity": {
                "custom_similarity": {
                    "type": "BM25",
                    "k1": 1.2,
                    "b": 0.75,
                    "discount_overlaps": "true",
                }
            },
            "analysis": {
                "filter": {
                    "russian_stemmer": {"type": "stemmer", "language": "russian"},
                    "unique_pos": {"type": "unique", "only_on_same_position": False},
                    "my_multiplexer": {
                        "type": "multiplexer",
                        "filters": [
                            "keyword_repeat",
                            "russian_stemmer",
                            "remove_duplicates",
                        ],
                    },
                    "ngram_filter": {
                        "type": "ngram",
                        "min_gram": 2,
                        "max_gram": 10,
                        "token_chars": ["letter", "digit"],
                    },
                },
                "analyzer": {
                    "search_text_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": ["lowercase", "my_multiplexer", "unique_pos"],
                        "char_filter": ["e_mapping"],
                    },
                    "ru_international_translit_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": [
                            "lowercase",
                            "russian_stemmer",
                        ],
                        "char_filter": ["transliteration_filter", "e_mapping"],
                    },
                    "text_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": [
                            "lowercase",
                            "russian_stemmer",
                        ],
                        "char_filter": ["e_mapping"],
                    },
                    "exact_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": ["lowercase"],
                        "char_filter": ["e_mapping"],
                    },
                    "ngram_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": ["lowercase", "ngram_filter"],
                    },
                    "ngram_search_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": ["lowercase", "ngram_filter"],
                    },
                    "text_standard": {"type": "standard"},
                    "text_whitespace": {"type": "whitespace"},
                    "text_lowercase": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": ["lowercase"],
                    },
                },
                "char_filter": {
                    "transliteration_filter": {
                        "type": "mapping",
                        "mappings": [
                            "a => а",
                            "b => б",
                            "v => в",
                            "g => г",
                            "d => д",
                            "e => е",
                            "ye => ё",
                            "zh => ж",
                            "z => з",
                            "i => и",
                            "j => й",
                            "k => к",
                            "l => л",
                            "m => м",
                            "n => н",
                            "o => о",
                            "p => п",
                        ],
                    },
                    "e_mapping": {"type": "mapping", "mappings": ["e => ё"]},
                },
            },
        }
    },
    "mappings": {
        "properties": {
            "artist_name": {
                "type": "text",
                "analyzer": "text_analyzer",
                "fields": {
                    "keyword": {"type": "keyword"},
                    "ngram": {
                        "type": "text",
                        "analyzer": "ngram_analyzer",
                        "search_analyzer": "ngram_search_analyzer",
                    },
                },
            },
            "track_name": {
                "type": "text",
                "analyzer": "text_analyzer",
                "fields": {
                    "keyword": {"type": "keyword"},
                    "ngram": {
                        "type": "text",
                        "analyzer": "ngram_analyzer",
                        "search_analyzer": "ngram_search_analyzer",
                    },
                },
            },
            "lyrics": {
                "type": "text",
                "analyzer": "text_analyzer",
                "similarity": "BM25",
                "fields": {
                    "ngram": {
                        "type": "text",
                        "analyzer": "ngram_analyzer",
                        "search_analyzer": "ngram_search_analyzer",
                    },
                },
            },
            "genre": {"type": "keyword"},
            "release_date": {"type": "integer"},
            "topic": {"type": "keyword"},
            "age": {"type": "float"},
            "danceability": {"type": "float"},
            "energy": {"type": "float"},
            "valence": {"type": "float"},
            "acousticness": {"type": "float"},
            "instrumentalness": {"type": "float"},
            "loudness": {"type": "float"},
            "len": {"type": "integer"},
            "dating": {"type": "float"},
            "violence": {"type": "float"},
            "world_life": {"type": "float"},
            "night_time": {"type": "float"},
            "shake_the_audience": {"type": "float"},
            "family_gospel": {"type": "float"},
            "romantic": {"type": "float"},
            "communication": {"type": "float"},
            "obscene": {"type": "float"},
            "music": {"type": "float"},
            "movement_places": {"type": "float"},
            "light_visual_perceptions": {"type": "float"},
            "family_spiritual": {"type": "float"},
            "like_girls": {"type": "float"},
            "sadness": {"type": "float"},
            "feelings": {"type": "float"},
            "track_id": {"type": "keyword"},
        }
    },
}

if client.indices.exists(index=INDEX_NAME):
    print(f"Index {INDEX_NAME} exists. Deleting...")
    client.indices.delete(index=INDEX_NAME)

print(f"Creating index {INDEX_NAME}...")
client.indices.create(index=INDEX_NAME, body=INDEX_BODY)
print("Index created")


Creating index music_ceds_semantic...
Index created




In [6]:
def load_music_data(csv_path: str) -> List[Dict[str, Any]]:
    """Загружает данные из CSV и создаёт документы для индексации."""
    df = pd.read_csv(csv_path)
    
    docs: List[Dict[str, Any]] = []
    
    print(f"Loading {len(df)} tracks from CSV...")
    
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Processing tracks"):
        # Создаём эмбеддинг для текста песни
        lyrics_text = str(row.get('lyrics', '')).strip()
        if not lyrics_text or lyrics_text == 'nan':
            lyrics_text = ""
        
        # Подготавливаем документ
        doc = {
            "track_id": str(row.get('Unnamed: 0', idx)),
            "artist_name": str(row.get('artist_name', '')),
            "track_name": str(row.get('track_name', '')),
            "lyrics": lyrics_text,
            "genre": str(row.get('genre', '')),
            "release_date": int(row.get('release_date', 0)) if pd.notna(row.get('release_date')) else None,
            "topic": str(row.get('topic', '')),
            "age": float(row.get('age', 0)) if pd.notna(row.get('age')) else None,
            "danceability": float(row.get('danceability', 0)) if pd.notna(row.get('danceability')) else None,
            "energy": float(row.get('energy', 0)) if pd.notna(row.get('energy')) else None,
            "valence": float(row.get('valence', 0)) if pd.notna(row.get('valence')) else None,
            "acousticness": float(row.get('acousticness', 0)) if pd.notna(row.get('acousticness')) else None,
            "instrumentalness": float(row.get('instrumentalness', 0)) if pd.notna(row.get('instrumentalness')) else None,
            "loudness": float(row.get('loudness', 0)) if pd.notna(row.get('loudness')) else None,
            "len": int(row.get('len', 0)) if pd.notna(row.get('len')) else None,
            "dating": float(row.get('dating', 0)) if pd.notna(row.get('dating')) else None,
            "violence": float(row.get('violence', 0)) if pd.notna(row.get('violence')) else None,
            "world_life": float(row.get('world/life', 0)) if pd.notna(row.get('world/life')) else None,
            "night_time": float(row.get('night/time', 0)) if pd.notna(row.get('night/time')) else None,
            "shake_the_audience": float(row.get('shake the audience', 0)) if pd.notna(row.get('shake the audience')) else None,
            "family_gospel": float(row.get('family/gospel', 0)) if pd.notna(row.get('family/gospel')) else None,
            "romantic": float(row.get('romantic', 0)) if pd.notna(row.get('romantic')) else None,
            "communication": float(row.get('communication', 0)) if pd.notna(row.get('communication')) else None,
            "obscene": float(row.get('obscene', 0)) if pd.notna(row.get('obscene')) else None,
            "music": float(row.get('music', 0)) if pd.notna(row.get('music')) else None,
            "movement_places": float(row.get('movement/places', 0)) if pd.notna(row.get('movement/places')) else None,
            "light_visual_perceptions": float(row.get('light/visual perceptions', 0)) if pd.notna(row.get('light/visual perceptions')) else None,
            "family_spiritual": float(row.get('family/spiritual', 0)) if pd.notna(row.get('family/spiritual')) else None,
            "like_girls": float(row.get('like/girls', 0)) if pd.notna(row.get('like/girls')) else None,
            "sadness": float(row.get('sadness', 0)) if pd.notna(row.get('sadness')) else None,
            "feelings": float(row.get('feelings', 0)) if pd.notna(row.get('feelings')) else None,
        }
        
        docs.append(doc)
    
    return docs


In [7]:
if not os.path.isfile(CSV_FILE):
    raise FileNotFoundError(f"CSV file not found: {CSV_FILE}")

docs = load_music_data(CSV_FILE)
print(f"Total tracks loaded: {len(docs)}")


Loading 28372 tracks from CSV...


Processing tracks: 100%|██████████| 28372/28372 [00:02<00:00, 9478.08it/s]

Total tracks loaded: 28372





In [8]:
# Просмотр примера загруженных данных
if docs:
    print(f"Пример первого трека:")
    print(json.dumps(docs[0], indent=2, ensure_ascii=False))

Пример первого трека:
{
  "track_id": "0",
  "artist_name": "mukesh",
  "track_name": "mohabbat bhi jhoothi",
  "lyrics": "hold time feel break feel untrue convince speak voice tear try hold hurt try forgive okay play break string feel heart want feel tell real truth hurt lie worse anymore little turn dust play house ruin run leave save like chase train late late tear try hold hurt try forgive okay play break string feel heart want feel tell real truth hurt lie worse anymore little run leave save like chase train know late late play break string feel heart want feel tell real truth hurt lie worse anymore little know little hold time feel",
  "genre": "pop",
  "release_date": 1950,
  "topic": "sadness",
  "age": 1.0,
  "danceability": 0.3577385465179248,
  "energy": 0.1371101880258922,
  "valence": 0.3394476504534212,
  "acousticness": 0.9979919658553876,
  "instrumentalness": 0.901821862348178,
  "loudness": 0.4541189139296976,
  "len": 95,
  "dating": 0.0005980861262889,
  "violence":

In [9]:
BULK_ENDPOINT = f"/{INDEX_NAME}/_bulk"
lines: List[str] = []

print(f"Indexing {len(docs)} tracks...")
for i, d in enumerate(tqdm(docs, desc="Preparing bulk")):
    doc_id = d.get("track_id") or f"track-{i}"
    meta = {"index": {"_index": INDEX_NAME, "_id": doc_id}}
    # Удаляем None значения
    src = {k: v for k, v in d.items() if v is not None}
    lines.append(json.dumps(meta, ensure_ascii=False))
    lines.append(json.dumps(src, ensure_ascii=False))

payload = "\n".join(lines) + "\n"
print("Sending bulk request...")
resp = client.transport.perform_request("POST", BULK_ENDPOINT, body=payload)
if isinstance(resp, dict) and resp.get("errors"):
    errs = sum(1 for it in resp.get("items", []) if (it.get("index") or {}).get("error"))
    print("Bulk completed with errors:", errs)
else:
    print(f"Bulk indexed {len(docs)} tracks into '{INDEX_NAME}'")
    
# Проверяем количество документов в индексе
count_resp = client.count(index=INDEX_NAME)
print(f"Total documents in index: {count_resp.get('count', 0)}")


Indexing 28372 tracks...


Preparing bulk: 100%|██████████| 28372/28372 [00:00<00:00, 45280.45it/s]


Sending bulk request...




Bulk indexed 28372 tracks into 'music_ceds_semantic'
Total documents in index: 22320




In [10]:
def search_music(query: str, size: int = 10, search_fields: List[str] = None) -> Dict[str, Any]:
    """Поиск по музыкальному индексу."""
    if search_fields is None:
        search_fields = ["artist_name", "track_name", "lyrics"]
    
    # Текстовый поиск с ngram
    text_query = {
        "multi_match": {
            "query": query,
            "fields": [f"{field}^2" for field in search_fields] + [f"{field}.ngram" for field in search_fields],
            "type": "best_fields",
            "fuzziness": "AUTO",
        }
    }
    
    # Семантический поиск по lyrics (если есть эмбеддинги)
    # Для семантического поиска нужен knn, но пока используем только текстовый
    
    search_body = {
        "query": {
            "bool": {
                "should": [
                    text_query,
                    {
                        "match": {
                            "lyrics": {
                                "query": query,
                                "boost": 3.0,
                            }
                        }
                    },
                    {
                        "match": {
                            "artist_name": {
                                "query": query,
                                "boost": 2.0,
                            }
                        }
                    },
                    {
                        "match": {
                            "track_name": {
                                "query": query,
                                "boost": 2.0,
                            }
                        }
                    },
                ],
                "minimum_should_match": 1,
            }
        },
        "size": size,
        "_source": ["track_id", "artist_name", "track_name", "genre", "release_date", "lyrics", "topic"],
    }
    
    response = client.search(index=INDEX_NAME, body=search_body)
    return response


# Тестовый поиск
test_query = "love"
print(f"\nSearching for: '{test_query}'")
results = search_music(test_query, size=5)

print(f"\nFound {results['hits']['total']['value']} results")
print("\nTop results:")
for i, hit in enumerate(results['hits']['hits'][:5], 1):
    source = hit['_source']
    print(f"\n{i}. Score: {hit['_score']:.2f}")
    print(f"   Artist: {source.get('artist_name', 'N/A')}")
    print(f"   Track: {source.get('track_name', 'N/A')}")
    print(f"   Genre: {source.get('genre', 'N/A')}")
    print(f"   Year: {source.get('release_date', 'N/A')}")
    lyrics_preview = source.get('lyrics', '')[:100]
    if lyrics_preview:
        print(f"   Lyrics: {lyrics_preview}...")



Searching for: 'love'

Found 10000 results

Top results:

1. Score: 22.51
   Artist: love
   Track: hummingbirds
   Genre: country
   Year: 1967
   Lyrics: little devil ½who think foolingâ concentrate singer sunday choir mama love love get knees hug love l...

2. Score: 15.57
   Artist: love and theft
   Track: if you ever get lonely
   Genre: country
   Year: 2012
   Lyrics: thank call good hear voice break static noise listen cause choice come sound guess things work okay ...

3. Score: 15.17
   Artist: love affair
   Track: everlasting love
   Genre: pop
   Year: 1968
   Lyrics: hearts go astray leave hurt go away need regret come beggin forget welcome know open eye realize sta...

4. Score: 15.05
   Artist: love
   Track: signed d.c.
   Genre: pop
   Year: 1966
   Lyrics: feel lonely comedown scar face pierce skin lord care soul belong dealer keep mind play leecher care ...

5. Score: 15.05
   Artist: love
   Track: a message to pretty
   Genre: pop
   Year: 1966
   Lyrics: people



In [11]:
# Дополнительные примеры поиска

# Поиск по артисту
print("\n" + "="*60)
print("Поиск по артисту: 'frankie laine'")
results = search_music("frankie laine", size=3)
for i, hit in enumerate(results['hits']['hits'][:3], 1):
    source = hit['_source']
    print(f"{i}. {source.get('artist_name')} - {source.get('track_name')} ({source.get('release_date')})")

# Поиск по жанру и тексту
print("\n" + "="*60)
print("Поиск по тексту: 'believe'")
results = search_music("believe", size=3)
for i, hit in enumerate(results['hits']['hits'][:3], 1):
    source = hit['_source']
    print(f"{i}. {source.get('artist_name')} - {source.get('track_name')} (Score: {hit['_score']:.2f})")

# Поиск по теме
print("\n" + "="*60)
print("Поиск по теме: 'romantic'")
results = search_music("romantic", size=3)
for i, hit in enumerate(results['hits']['hits'][:3], 1):
    source = hit['_source']
    print(f"{i}. {source.get('artist_name')} - {source.get('track_name')} (Topic: {source.get('topic')})")




Поиск по артисту: 'frankie laine'




1. frankie laine - i believe (1950)
2. frankie laine - necessary evil (1951)
3. frankie laine - after you've gone (1954)

Поиск по тексту: 'believe'
1. josh groban - believe (Score: 20.44)
2. yellowcard - believe (Score: 19.69)
3. jonas brothers - i believe (Score: 18.68)

Поиск по теме: 'romantic'
1. death from above 1979 - romantic rights (Topic: obscene)
2. khalid - hopeless (Topic: music)
3. ben e. king - perfidia (Topic: sadness)


