In [1]:
import math
import json

import numpy
import pandas as pd

with open("data.json", "r") as read_file:
    raw_json_data_tracks = json.load(read_file)

genres = raw_json_data_tracks['music']

tracks = []

for genre in genres:
    genre_key = list(genre.keys())[0]
    for subgenre in genre[genre_key]:
        subgenre_key = list(subgenre.keys())[0]
        for performer in subgenre[subgenre_key]:
            performer_key = list(performer.keys())[0]
            for track in performer[performer_key]:
                track['mixedGenres'].append('{} {}'.format(subgenre_key, genre_key))
                tracks.append(track)

original_tracks = json.loads(json.dumps(tracks))
genres_set = set()
for track in tracks:
    genres_set.add(track['genre'])

mixed_genres_set = set()
for track in tracks:
    for g in track['mixedGenres']:
        mixed_genres_set.add(g)

tags_set = set()
for track in tracks:
    for t in track['tags']:
        tags_set.add(t)

performers_set = set()
for track in tracks:
    performers_set.add(track['performer'])

albums_set = set()
for track in tracks:
    if track['album'] != 'single':
        albums_set.add(track['album'])
for track in tracks:
    for genre in list(mixed_genres_set):
        if genre in track['mixedGenres']:
            track[genre] = 1
        else:
            track[genre] = 0

    for genre in list(genres_set):
        if genre == track['genre']:
            track[genre] = 1
        else:
            track[genre] = 0

    for tag in list(tags_set):
        if tag in track['tags']:
            track[tag] = 1
        else:
            track[tag] = 0

    for performer in list(performers_set):
        if performer == track['performer']:
            track[performer] = 1
        else:
            track[performer] = 0

    for album in list(albums_set):
        if album == track['album']:
            track[album] = 1
        else:
            track[album] = 0


def weigh_columns(frame: pd.DataFrame):
    for genre in list(mixed_genres_set):
        frame[genre] = frame[genre] * GENRE_WEIGHT

    for genre in list(genres_set):
        frame[genre] = frame[genre] * SUBGENRE_WEIGHT

    for tag in list(tags_set):
        frame[tag] = frame[tag] * TAG_WEIGHT

    return frame

TRACKS_LENGTH = len(tracks)
TAG_WEIGHT = 0.4
GENRE_WEIGHT = 1
SUBGENRE_WEIGHT = 0.8
OTHER_WEIGHT = 0.8
CORRWITH_METHOD = 'spearman'

from sklearn import preprocessing

string_cols = ['tags', 'mixedGenres', 'genre', 'subgenre', 'name', 'album', 'performer']

scaler = preprocessing.MinMaxScaler()

def get_data_frame(tracks) -> pd.DataFrame:
    return pd.DataFrame(tracks).drop(columns=string_cols)

def take_nearest_tracks(input_track, performers, performer, result, n):
    tracks = next(filter(lambda pf: list(pf.keys())[0] == performer, performers))[performer]
    tracks = list(filter(lambda tr: not (tr['name'] == input_track['name'] and tr['performer'] == input_track['performer']), tracks))
    tracks = list(filter(lambda tr: not tr in result, tracks))
    if len(tracks) > n - len(result):
        while len(tracks) > n - len(result): tracks.pop()

    result.extend(tracks)

    return result

def take_nearest_performers(input_track, performers, result, n):
    for performer in performers:
        result = take_nearest_tracks(input_track, performers, list(performer.keys())[0], result, n)

    return result

def take_nearest_subgenres(input_track, subgenres, result, n):
    for subgenre in subgenres:
        new_subgenre = list(subgenre.keys())[0]
        performers = next(filter(lambda sgn: list(sgn.keys())[0] == new_subgenre, subgenres))[new_subgenre]

        result = take_nearest_performers(input_track, performers, result, n)

    return result

def search_tree(input_track, track_tree, n):
    genre = input_track['genre']
    subgenre = input_track['subgenre']
    performer = input_track['performer']

    result = []

    genres = track_tree

    subgenres = next(filter(lambda gn: list(gn.keys())[0] == genre, genres))[genre]
    performers = next(filter(lambda sgn: list(sgn.keys())[0] == subgenre, subgenres))[subgenre]

    result = take_nearest_tracks(input_track, performers, performer, result, n)

    if len(result) < n:
        result = take_nearest_performers(input_track, performers, result, n)

        if len(result) < n:
            result = take_nearest_subgenres(input_track, subgenres, result, n)

            if len(result) < n:
                for genre in genres:
                    new_genre = list(genre.keys())[0]
                    subgenres = next(filter(lambda gn: list(gn.keys())[0] == new_genre, genres))[new_genre]

                    result = take_nearest_subgenres(input_track, subgenres, result, n)

    # print(len(result))
    return result

def search_tree_multiple(tracks, track_tree, disliked, n):
    addition_coef = 0
    result = []

    while (len(tracks) + n <= TRACKS_LENGTH and len(result) < n)\
            or (len(tracks) + n > TRACKS_LENGTH and len(result) < TRACKS_LENGTH - len(tracks)):
        addition_coef = addition_coef + 1
        result_tracks_sets = []
        result = []
        input_track_names = list(map(lambda t: t['name'], tracks))
        for track in tracks:
            result_tracks_sets.append(search_tree(track, track_tree, math.ceil(n / len(tracks)) + addition_coef))

        for res_set in result_tracks_sets:
            result.extend(res_set)
            result = list(filter(lambda t: not t['name'] in input_track_names and not t['name'] in disliked, result))
            filtered_result = []

            for res_track in result:
                if not res_track['name'] in list(map(lambda t: t['name'], filtered_result)):
                    filtered_result.append(res_track)
            result = filtered_result

        while len(result) > n:
            result.pop()

    return result

def find_correlations(input_tracks, all_tracks):
    result = []
    for track in input_tracks:
        tr_keys = ['name', 'genre', 'subgenre', 'performer']
        tr_result = dict()
        tr_result['track'] = {x:track[x] for x in tr_keys}
        corr_results = []
        tr_frame = get_data_frame([track])

        for i in range(len(all_tracks)):
            corr_res = dict()
            corr_res['track'] = {x:all_tracks[i][x] for x in tr_keys}
            corr_track = get_data_frame([all_tracks[i]])
            corr_res[CORRWITH_METHOD] = tr_frame.corrwith(corr_track, axis=1, method=CORRWITH_METHOD).to_list()[0]
            corr_res['chebyshev'] = pairwise_distances(tr_frame, corr_track, metric='chebyshev')[0][0]
            corr_res['manhattan'] = pairwise_distances(tr_frame, corr_track, metric='manhattan')[0][0]
            corr_res['euclidean'] = pairwise_distances(tr_frame, corr_track, metric='euclidean')[0][0]
            corr_res['cosine'] = pairwise_distances(tr_frame, corr_track, metric='cosine')[0][0]
            # print(corr_res['track']['name'], corr_res[CORRWITH_METHOD], corr_res['chebyshev'], corr_res['manhattan'], corr_res['euclidean'], corr_res['cosine'])

            corr_results.append(corr_res)

        tr_result['correlation'] = corr_results
        result.append(tr_result)

    return result

def choose_tracks(input_tracks, n, method, disliked):
    # print(find_correlations(input_tracks, normalized_tracks))
    disliked_names = list(map(lambda t: t['name'], disliked))

    if method == 'tree':
        return list(map(lambda t: t['name'], search_tree_multiple(input_tracks, genres, disliked_names, n)))

    input_track_names = list(map(lambda t: t['name'], input_tracks))

    correlations = find_correlations(input_tracks, normalized_tracks)
    sum_correlations = dict()
    for correlation in correlations:
        correlation['correlation'] = list(filter(lambda c: c['track']['name'] not in input_track_names, correlation['correlation']))

        for corr_result in correlation['correlation']:
            if corr_result['track']['name'] not in list(sum_correlations.keys()):
                sum_correlations[corr_result['track']['name']] = 0
            sum_correlations[corr_result['track']['name']] = sum_correlations[corr_result['track']['name']] + corr_result[method]

    for track_name in list(sum_correlations.keys()):
        if track_name in disliked_names:
            if method == CORRWITH_METHOD:
                sum_correlations[track_name] = 0
            else:
                sum_correlations[track_name] = 1
        else:
            sum_correlations[track_name] = sum_correlations[track_name] / len(input_tracks)

    result = []
    nres = min(n, TRACKS_LENGTH - len(input_tracks))
    if method == CORRWITH_METHOD:
        for i in range(nres):
            max_corr_name, max_corr = max(list(filter(lambda c: c[0] not in result, list(sum_correlations.items()))), key=lambda c: c[1])

            result.append(max_corr_name)
    else:
        for i in range(nres):
            max_corr_name, max_corr = min(list(filter(lambda c: c[0] not in result, list(sum_correlations.items()))), key=lambda c: c[1])

            result.append(max_corr_name)

    # for res in result:
    #     display(res)
    return result

tracks_frame = get_data_frame(tracks)
tracks_str_cols = pd.DataFrame(tracks, columns=string_cols)
tracks_frame = pd.DataFrame(scaler.fit_transform(tracks_frame), columns=tracks_frame.columns)
tracks_frame = weigh_columns(tracks_frame)
for col in list(tracks_frame.columns.values):
    tracks_frame[col] = tracks_frame[col] * OTHER_WEIGHT
tracks_frame = pd.concat([tracks_frame, tracks_str_cols], axis=1)
normalized_tracks = tracks_frame.to_dict('records')

In [2]:
import re
import random
from functools import reduce

import nltk
import pymorphy2
import numpy

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import pairwise_distances

from nltk.corpus import stopwords
# nltk.download('stopwords')

morph = pymorphy2.MorphAnalyzer()
tfidf = TfidfVectorizer()
stop = stopwords.words('russian')

In [3]:
with open("questions.json", "r", encoding='utf-8') as read_file:
    raw_json_data = json.load(read_file)

sample_questions = list(raw_json_data)

In [4]:
def clean_text(s):
    return re.sub(r'[^а-яa-z0-9<>]', ' ', s.lower())

def get_normal_forms(question):
    return [morph.normal_forms(s) for s in clean_text(question).split()]

def filter_stopwords(question_words):
    return question_words # list(filter(lambda qp: any(w not in stop for w in qp), question_words))

def get_lemmatized_text(question):
    return ' '.join(list(map(lambda nf: ','.join(nf), filter_stopwords(get_normal_forms(question)))))

In [5]:
# s = 'Какой жанр у трека Deep Dive?'
#
# cleaned = clean_text(s)
#
# print(cleaned)
#
# words = cleaned.split()
#
# for word in words:
#     print(morph.normal_forms(word))

sample_df = pd.DataFrame(sample_questions)

sample_df['lemmatized_questions'] = sample_df[0].apply(get_lemmatized_text)
x_tfidf = tfidf.fit_transform(sample_df['lemmatized_questions']).toarray()
df_tfidf = pd.DataFrame(x_tfidf, columns=tfidf.get_feature_names_out())
print(df_tfidf)

    альбом  альбоме  без    больше   большой      быть      весь  видеоигра  \
0      0.0      0.0  0.0  0.000000  0.000000  0.000000  0.000000        0.0   
1      0.0      0.0  0.0  0.000000  0.000000  0.000000  0.000000        0.0   
2      0.0      0.0  0.0  0.000000  0.000000  0.374644  0.000000        0.0   
3      0.0      0.0  0.0  0.000000  0.000000  0.000000  0.444155        0.0   
4      0.0      0.0  0.0  0.000000  0.000000  0.435433  0.000000        0.0   
..     ...      ...  ...       ...       ...       ...       ...        ...   
66     0.0      0.0  0.0  0.000000  0.000000  0.000000  0.000000        0.0   
67     0.0      0.0  0.0  0.000000  0.000000  0.000000  0.000000        0.0   
68     0.0      0.0  0.0  0.000000  0.000000  0.000000  0.000000        0.0   
69     0.0      0.0  0.0  0.000000  0.000000  0.000000  0.326347        0.0   
70     0.0      0.0  0.0  0.392421  0.392421  0.000000  0.342127        0.0   

    включить     всего  ...  хороший  хотеть    час

In [6]:
def calculate_cosine_dist(question):
    test_lemma = get_lemmatized_text(question)

    q_tfidf = tfidf.transform([test_lemma]).toarray()

    return 1 - pairwise_distances(df_tfidf, q_tfidf, metric='cosine')

In [7]:
import ipywidgets as widgets
from ipywidgets import interact, interact_manual
from IPython.display import display

In [8]:
def construct_answer_from_tracks(question, prop, ans_template):
    if prop == 'mixedGenres' or prop == 'tags':
        filtered = list(filter(lambda t: any(g in question for g in t[prop]), tracks))

        if len(filtered) == 0:
            return None

        prop_val = list(filter(lambda g: g in question, list(reduce(lambda s1, s2: s1 & s2, list(map(lambda t: set(t[prop]), filtered))))))[0]
    else:
        filtered = list(filter(lambda t: str(t[prop]) in question, tracks))

        if len(filtered) == 0:
            return None

        prop_val = filtered[0][prop]

    answer = ans_template.format(prop_val, ", ".join(map(lambda t: t["name"], filtered)))

    return answer

def construct_answer(question: str, answer_template: str):
    result_entities_lit = answer_template.split()[0]

    if result_entities_lit == 'информация':
        track = list(filter(lambda t: t["name"] in question, tracks))

        if len(track) == 0:
            return None

        track = track[0]

        answer = f'Полная информация о треке {track["name"]}: Исполнитель {track["performer"]}, жанры {", ".join(track["mixedGenres"])}, альбом {track["album"]}, длина {track["length"]} секунд, {track["listens"]} прослушиваний, {track["likes"]} лайков, теги {", ".join(track["tags"])}. В треке {"присутствует" if track["vocals"] else "отсутствует"} текст. В треке {"присутствует" if track["explicit"] else "отсутствует"} ненормативная лексика.'

        return answer

    if result_entities_lit == 'треки':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<исполнитель>':
            return construct_answer_from_tracks(question, 'performer', '{0} является исполнителем следующих треков: {1}')
            # answer = f'{filtered[0]["performer"]} является исполнителем следующих треков: {", ".join(map(lambda t: t["name"], filtered))}'

        if criteria_lit == '<жанр>':
            return construct_answer_from_tracks(question, 'mixedGenres', 'Жанр {0} включает в себя следующие композиции: {1}')

        if criteria_lit == '<альбом>':
            return construct_answer_from_tracks(question, 'album', 'Альбом {0} состоит из композиций: {1}')

        if criteria_lit == 'без мата':
            return construct_answer_from_tracks('False', 'explicit', 'Ненормативная лексика отсутствует в треках: {1}')

        if criteria_lit == 'без слов':
            return construct_answer_from_tracks('False', 'vocals', 'Эти треки не содержат вокала: {1}')

        if criteria_lit == 'видеоигры':
            return construct_answer_from_tracks('videogame', 'tags', 'Треки, имеющие отношение к видеоиграм: {1}')

        if criteria_lit == 'каверы':
            return construct_answer_from_tracks('cover', 'tags', 'К каверам относятся треки: {1}')

        if 'длиной' in criteria_lit:
            tr_lengths = [int(s) for s in question.split() if s.isdigit()]
            if len(tr_lengths) > 0:
                tr_length = tr_lengths[0] * (60 if 'минут' in question else 1)
            else:
                tr_length = 240

            if ' > ' in criteria_lit:
                filtered = list(filter(lambda t: t['length'] >= tr_length, tracks))

                if len(filtered) == 0:
                    return None

                return f'Ваша подборка длинных треков: {", ".join(map(lambda t: t["name"], filtered))}'

            if ' < ' in criteria_lit:
                filtered = list(filter(lambda t: t['length'] <= tr_length, tracks))

                if len(filtered) == 0:
                    return None

                return f'Получилась такая подборка коротких треков: {", ".join(map(lambda t: t["name"], filtered))}'

        if 'в количестве' in criteria_lit:
            tr_qtys = [int(s) for s in question.split() if s.isdigit()]
            if len(tr_qtys) > 0:
                tr_qty = tr_qtys[0]
            else:
                tr_qty = 5

            rnd_tracks = random.sample(tracks, tr_qty)

            return f'Вот ваша случайная подборка треков: {", ".join(map(lambda t: t["name"], rnd_tracks))}'

        if 'как' in criteria_lit:
            track = list(filter(lambda t: t["name"] in question, tracks))

            if len(track) == 0:
                return None

            track = track[0]
            similar_tracks = choose_tracks((track,), 5, CORRWITH_METHOD, [])

            return f'Вот подборка треков, похожих на {track["name"]}: {", ".join(similar_tracks)}'

        if 'лучшие' in criteria_lit:
            if 'лайкам' in criteria_lit:
                tr_copy = tracks.copy()
                tr_copy.sort(key=lambda t: t['likes'], reverse=True)
                best_tracks = tr_copy[:5]

                return f'Вот самые любимые треки: {", ".join(map(lambda t: t["name"], best_tracks))}'

            if 'прослушиваниям' in criteria_lit:
                tr_copy = tracks.copy()
                tr_copy.sort(key=lambda t: t['listens'], reverse=True)
                best_tracks = tr_copy[:5]

                return f'Удалось найти, что чаще всего слушают эти треки: {", ".join(map(lambda t: t["name"], best_tracks))}'

    if result_entities_lit == 'жанр':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<название>':
            track = list(filter(lambda t: t["name"] in question, tracks))

            if len(track) == 0:
                return None

            track = track[0]

            tr_genres = track['mixedGenres']

            return f'Композиция {track["name"]} принадлежит к жанрам: {", ".join(tr_genres)}'

        if criteria_lit == '<исполнитель>':
            filtered = list(filter(lambda t: str(t['performer']) in question, tracks))

            if len(filtered) == 0:
                return None

            # f_genres = list(reduce(lambda s1, s2: s1 & s2, list(map(lambda t: set(t['mixedGenres']), filtered))))
            f_genres = set()
            for ft in filtered:
                for g in ft['mixedGenres']:
                    f_genres.add(g)
            prop_val = filtered[0]['performer']

            return f'Исполнитель {prop_val} играет в жанрах: {", ".join(f_genres)}'

        if criteria_lit == '<жанр>':
            filtered = list(filter(lambda t: str(t['genre']) in question, tracks))

            if len(filtered) == 0:
                return None

            prop_val = filtered[0]['genre']
            subs = set()
            for ft in filtered:
                subs.add(ft['subgenre'])

            return f'У жанра {prop_val} существуют такие поджанры: {", ".join(subs)}'

    if result_entities_lit == 'исполнители':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<название>':
            track = list(filter(lambda t: t["name"] in question, tracks))

            if len(track) == 0:
                return None

            track = track[0]

            return f'Композиция {track["name"]} принадлежит к творчеству {track["performer"]}'

        if criteria_lit == '<жанр>':
            filtered = list(filter(lambda t: any(g in question for g in t['mixedGenres']), tracks))

            if len(filtered) == 0:
                return None

            prop_val = list(filter(lambda g: g in question, list(reduce(lambda s1, s2: s1 & s2, list(map(lambda t: set(t['mixedGenres']), filtered))))))[0]
            perf = set()
            for ft in filtered:
                perf.add(ft['performer'])

            return f'В жанре {prop_val} играют исполнители: {", ".join(list(perf))}'

    if result_entities_lit == 'альбомы':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<название>':
            track = list(filter(lambda t: t["name"] in question, tracks))

            if len(track) == 0:
                return None

            track = track[0]

            tr_alb = track['album']

            return f'Композиция {track["name"]} относится к альбому {tr_alb}'

        if criteria_lit == '<исполнитель>':
            filtered = list(filter(lambda t: str(t['performer']) in question, tracks))

            if len(filtered) == 0:
                return None

            f_albs = set()
            for ft in filtered:
                f_albs.add(ft['album'])
            prop_val = filtered[0]['performer']

            return f'Исполнителю {prop_val} принадлежат такие альбомы, как: {", ".join(list(f_albs))}'

    if result_entities_lit == 'лайки/прослушивания':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<название>':
            track = list(filter(lambda t: t["name"] in question, tracks))

            if len(track) == 0:
                return None

            track = track[0]

            likes_to_listens = track['listens'] / track['likes']

            return f'В среднем 1 из {round(likes_to_listens)} пользователей ставит композиции {track["name"]} лайк'

    if result_entities_lit == 'лайки':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<жанр>':
            favorite_genre = ''
            favorite_likes = 0
            for gnr in mixed_genres_set:
                filtered = list(filter(lambda t: any(g == gnr for g in t['mixedGenres']), tracks))
                likes_qty = 0
                for ft in filtered:
                    likes_qty += ft['likes']

                if favorite_likes < likes_qty:
                    favorite_likes = likes_qty
                    favorite_genre = gnr

            return f'Самый любимый жанр - {favorite_genre}. Композициям в этом жанре поставили {favorite_likes} лайков'

    if result_entities_lit == 'прослушивания':
        criteria_lit = ' '.join(answer_template.split()[1:])

        if criteria_lit == '<исполнитель>':
            filtered = list(filter(lambda t: str(t['performer']) in question, tracks))

            if len(filtered) == 0:
                return None

            prop_val = filtered[0]['performer']
            listens_qty = 0
            for ft in filtered:
                listens_qty += ft['listens']

            return f'Треки исполнителя {prop_val} были прослушаны в общей сложности {listens_qty} раз'

        if criteria_lit == '<жанр>':
            filtered = list(filter(lambda t: any(g in question for g in t['mixedGenres']), tracks))

            if len(filtered) == 0:
                return None

            prop_val = list(filter(lambda g: g in question, list(reduce(lambda s1, s2: s1 & s2, list(map(lambda t: set(t['mixedGenres']), filtered))))))[0]
            listens_qty = 0
            for ft in filtered:
                listens_qty += ft['listens']

            return f'У треков жанра {prop_val} суммарно набралось {listens_qty} прослушиваний'



In [9]:
dialog_rec = []

def answer_q(question):
    dialog_rec.insert(0, question)

    cosine_dist = calculate_cosine_dist(question)

    if (cosine_dist.max() < 0.5):
        dialog_rec.insert(0, 'Вопрос не совсем понятен, попробуйте спросить по-другому')
        display(dialog_rec)
        return

    acceptable_rates = cosine_dist[cosine_dist > 0.5]

    for rate in acceptable_rates:

        ind, j = numpy.where(numpy.isclose(cosine_dist, rate))
        # display(sample_df[1].loc[ind[0]])

        answer = construct_answer(question, sample_df[1].loc[ind[0]])

        if answer:
            dialog_rec.insert(0, answer)
            display(dialog_rec)
            return

    dialog_rec.insert(0, 'К сожалению, ничего не удалось найти... Пожалуйста, задайте другой вопрос')
    display(dialog_rec)

In [10]:
interact_manual(answer_q, question=widgets.Text(value='Кто играет в жанре alternative pop?'))

interactive(children=(Text(value='Кто играет в жанре alternative pop?', description='question'), Button(descri…

<function __main__.answer_q(question)>