Просмотр фильмов на оригинальном языке - это популярный и действенный метод прокачаться при изучении иностранных языков. Важно выбрать фильм, который подходит студенту по уровню сложности, т.е. студент понимал 50-70 % диалогов. Чтобы выполнить это условие, преподаватель должен посмотреть фильм и решить, какому уровню он соответствует. Однако это требует больших временных затрат.

Требуется разработать ML решение для автоматического определения уровня сложности англоязычных фильмов.

In [None]:
import sys
!{sys.executable} -m pip install pandas pysrt nltk pyprind joblib

import nltk
nltk.download('stopwords')
nltk.download('punkt')

In [None]:
import glob
import os
import pandas as pd
import string
import pysrt
import re
from time import time
from joblib import dump
import pyprind

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import metrics
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.extmath import density

# обучим модели
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier
from sklearn.naive_bayes import ComplementNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neighbors import NearestCentroid
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import RidgeClassifier

In [None]:
# пусть к файлам, где хранятся субтитры
PATH='learning/subtitles/*/*.srt'

# путь к дополнительным данным для фильмов
MOVIES_LABEL_PATH='learning/data/movies_labels.xlsx'

# путь к файлу со словами
WORDS_PATH='learning/data/words.xlsx'

# каталог для сохранение результата
OUTPUT_PATH='learning/models/'

## Подготовка данных

Считываем субтитры, которые хранятся в каталоге <b>subtitles</b>, если требуется добавить новые данные, то можно создать дополнительный каталог, куда можно поместить дополнительные файлы.

<b>Примечание</b>: нужно также дополнительно в файле <b>movies_labels.xlsx</b> указать уровень знания английского языка

In [None]:
# считываем файлы субтитров (файлы были предварительно обработаны: корректность имени)
files = []
movies = []

for file in glob.glob(PATH):
    full_path = file.replace('\\', '/')
    movies.append(os.path.basename(full_path).replace('.srt', ''))
    files.append([full_path])

In [None]:
# создаём DataFrame
df_movies = pd.DataFrame(files, columns=['Path'], index=movies)    
print(f'Было найдено {df_movies.shape[0]} фильмов')

df_movies.head()

In [None]:
# Заранее был обработан файл movies_labels.xlsx, где были удалены дубликаты 
df_movies_labels = pd.read_excel(MOVIES_LABEL_PATH, index_col=1)
df_movies_labels.drop(['id'], axis=1, inplace=True)
print(f'Было найдено {df_movies_labels.shape[0]} фильмов')

df_movies_labels.head()

In [None]:
df_movies = df_movies.merge(df_movies_labels, how='left', left_index=True, right_index=True)

# были найдены фильмы, по которым неизвестна категория
df_movies = df_movies[~df_movies['Max_Level'].isna()]

print(f'Итоговое количество фильмов для обработки {df_movies.shape[0]}')

In [None]:
df_movies.head()

<b>Примечание</b>: у некоторых фильмах было указано несколько категорий, в этом случаи я брать максимальное значение (см. колонку Max_Level).

In [None]:
# используем вспомогательные методы

HTML = r'<.*?>' # html тэги меняем на пробел
TAG = r'{.*?}' # тэги меняем на пробел
COMMENTS = r'[\(\[][A-Za-z ]+[\)\]]' # комменты в скобках меняем на пробел
UPPER = r'[[A-Za-z ]+[\:\]]' # указания на того кто говорит (BOBBY:)
LETTERS = r'[^a-zA-Z\'.,!? ]' # все что не буквы меняем на пробел 
SPACES = r'([ ])\1+' # повторяющиеся пробелы меняем на один пробел
DOTS = r'[\.]+' # многоточие меняем на точку
SYMB = r"[^\w\d'\s]" # знаки препинания кроме апострофа

def clean_subs(subs):
    """
    Очистка субтитров

    Параметры:
    ----------
    subs: SubRipFile - объект с информацией о субтитрах

    Результат:
    ----------
    Отформатированная строка
    """
    subs = subs[1:] # удаляем первый рекламный субтитр
    txt = re.sub(HTML, ' ', subs.text) # html тэги меняем на пробел
    txt = re.sub(COMMENTS, ' ', txt) # комменты в скобках меняем на пробел
    txt = re.sub(UPPER, ' ', txt) # указания на того кто говорит (BOBBY:)
    txt = re.sub(LETTERS, ' ', txt) # все что не буквы меняем на пробел
    txt = re.sub(DOTS, r'.', txt) # многоточие меняем на точку
    txt = re.sub(SPACES, r'\1', txt) # повторяющиеся пробелы меняем на один пробел
    txt = re.sub(SYMB, '', txt) # знаки препинания кроме апострофа на пустую строку
    txt = re.sub('www', '', txt) # кое-где остаётся www, то же меняем на пустую строку
    txt = txt.lstrip() # обрезка пробелов слева
    txt = txt.encode('ascii', 'ignore').decode() # удаляем все что не ascii символы   
    txt = txt.lower() # текст в нижний регистр
    return txt

In [None]:
# общая информация о фильмах
subtitles = [] 

# дополнительная информация о фильмах с новыми features
subtitles_info = [] 

In [None]:
pbar = pyprind.ProgBar(df_movies.shape[0])

def read_subs(row):
    """
    Пофайловое чтение субтитров
    
    Параметры:
    ----------
    row - объект DataFrame'а
    
    Результат:
    ----------
    row - объект DataFrame'а
    """
    
    movie = row.name
    level = row['Max_Level']
    path = row['Path']
    
    subs = None
    
    encodings = ['utf-8', 'ansi', 'utf-16-le']
    
    # специально пробегаем по всем кодировкам, можно добавить своё
    for encoding in encodings:
        try:
            subs = pysrt.open(path, encoding=encoding)
            if len(subs) > 0:
                break
        except:
            pass
    
    try:
        subtitles.append([movie, level, clean_subs(subs)])
        
        for i in range(len(subs)):
            if i == 0:
                # пропускаем первый рекламный субтитр
                continue
            len_ms = subs[i].end - subs[i].start
            if len_ms.seconds > 0:
                len_ms = (len_ms.seconds * 1000) + len_ms.milliseconds
            else:
                len_ms = len_ms.milliseconds
                
            subtitles_info.append([movie, level, subs[i].start, subs[i].end, subs[i].end - subs[i].start, len_ms])
        
    except Exception as e:
        # обычно это связано с кодировкой
        print(f'Ошибка чтения файла {path}: {e}')
    
    pbar.update()
    
    return row

In [None]:
df_movies = df_movies.apply(read_subs, axis=1)

In [None]:
# создаём DataFrame
df_subtitles_info = pd.DataFrame(subtitles_info, columns=['movie', 'level', 'start', 'end', 'length', 'milliseconds'])

df_subtitles_info.head()

Описание полей таблицы:
* start - время старта субтитра
* end - время завершения субтитра
* length - время показа субтитра
* milliseconds - время показа в миллисекундах

In [None]:
df_subtitles = pd.DataFrame(subtitles, columns=['movie', 'level', 'text'])
df_subtitles.set_index('movie', inplace=True)

df_subtitles.head()

Описание полей таблицы:
* level - уровень знаний английского языка
* text - прочитанные субтитры

In [None]:
# сгуппируем данные для генерации новых features для фильма
df_subtitles_info = df_subtitles_info.groupby(['movie']).aggregate({'start': 'min', 'end': 'max', 'milliseconds': 'mean'})

df_subtitles_info.head()

In [None]:
def time2ms(time):
    return (time.to_time().hour * 60 * 60) + (time.to_time().minute * 60) + (time.to_time().second)

# специально сделал преобразование до секунд
df_subtitles_info['len_sec'] = df_subtitles_info['end'].apply(time2ms)

In [None]:
# дополним фильмы новыми features
df_subtitles = df_subtitles.merge(df_subtitles_info, how='left', left_index=True, right_index=True)
df_subtitles.drop(columns=['start', 'end'], inplace=True)

df_subtitles.head()

In [None]:
# считываем словарь и создаём дополнительный объект
df_words = pd.read_excel(WORDS_PATH)
df_words = df_words.groupby(['level', 'word'])['file'].count().reset_index()

dict_words = {}

for level in df_words['level'].unique():
    # создаём новые колонки
    df_subtitles[level] = 0
    
    dict_words[level] = df_words.loc[df_words['level'] == level, 'word'].values
    
print(f'Были созданы колонки {", ".join(df_words["level"].unique())}')

In [None]:
pbar = pyprind.ProgBar(df_subtitles.shape[0])

def set_level_count(row):
    """
    Устанавливаем доли слов определённых категорий в фильме
    
    Параметры:
    ----------
    row - объект DataFrame'а
    
    Результат:
    ----------
    row - объект DataFrame'а
    """
    words = word_tokenize(row['text'])
    
    pbar.update()
    
    for level in df_words['level'].unique():
        row[level] = len([word for word in words if word.lower() in dict_words[level]]) / len(words)
    return row

In [None]:
# процесс обрабтки занимает некоторое время, всё зависит от количество фильмов
df_subtitles = df_subtitles.apply(set_level_count, axis=1)

In [None]:
pbar = pyprind.ProgBar(df_subtitles.shape[0])

all_stopwords = stopwords.words('english')
porter = PorterStemmer()

def tokenizer_porter(text):
    """
    Нормализация слов
    
    Параметры:
    ----------
    text: string - входная строка
    
    Результат:
    ----------
    string - преобразованная строка
    """
    pbar.update()
    
    if text == text:
        text = text.translate(str.maketrans('', '', string.punctuation))
        # чистим от стоп-слов
        words = word_tokenize(text)
        words = [word for word in words if word.casefold() not in all_stopwords]

        return " ".join([porter.stem(word) for word in words])
    else:
        return None

In [None]:
# выполняем нормализацию данных
df_subtitles['porter_text'] = df_subtitles['text'].apply(tokenizer_porter)

df_subtitles.head()

Итоговая талица содержит следующую информацию:
* level - уровень знания языка
* text - оригинальные субтитры
* milliseconds - среднее время показа субтитра
* len_sec - продолжительность фильма в секундах
* A1 - C1 - доля слов в каждой категории
* porter_text - преобразованные субтитры

## Обучение

In [None]:
features = df_subtitles[['porter_text', 'A1', 'A2', 'B1', 'B2', 'C1']]
target = df_subtitles['level']

features_train, features_valid, target_train, target_valid = train_test_split(features, target, test_size=0.25, random_state=12345, shuffle=True)

# разделил исходные данные
print(f'Обучающая выборка:', features_train.shape[0])
print(f'Валидационная выборка:', features_valid.shape[0])

In [None]:
numeric = ['A1', 'A2', 'B1', 'B2', 'C1']

features_train_ohe = features_train.copy()
features_valid_ohe = features_valid.copy()

# приводим числовые значения к диапазону от 0 и до 1 (одна из моделей работает только со значениями этого диапазона)
scaler = MinMaxScaler()
features_train_ohe[numeric] = scaler.fit_transform(features_train_ohe[numeric])
features_valid_ohe[numeric] = scaler.transform(features_valid_ohe[numeric])

In [None]:
# используем викторизацию
tfidf = TfidfVectorizer(lowercase=False)
column_transformer = ColumnTransformer([('vect1', tfidf, 'porter_text')], remainder='passthrough')

features_train_ohe = column_transformer.fit_transform(features_train_ohe)
features_valid_ohe = column_transformer.transform(features_valid_ohe)

In [None]:
best_score = 0
best_cls = None
best_params = None

In [None]:
def benchmark(clf, params, custom_name=False):
    """
    Специальный метод для определения наилучшей модели
    """
    print("_" * 80)
    print("Training: ")
    print(clf)
    t0 = time()
    
    gs_clf = GridSearchCV(clf, params,
                           scoring='f1_weighted',
                           cv=5,
                           verbose=False,
                           n_jobs=-1)
    
    gs_clf.fit(features_train_ohe, target_train)
    
    print('Best parameter set: %s ' % gs_clf.best_params_)
    print('CV F1: %.3f' % gs_clf.best_score_)
    
    train_time = time() - t0
    print(f"train time: {train_time:.3}s")

    t0 = time()
    
    best_clf = gs_clf.best_estimator_
    pred = best_clf.predict(features_valid_ohe)
    test_time = time() - t0
    print(f"test time:  {test_time:.3}s")

    score = metrics.f1_score(target_valid, pred, average='weighted')
    
    global best_cls
    global best_score
    global best_params
    
    if score > best_score:
        best_cls = best_clf
        best_score = score
        best_params = gs_clf.best_params_
        
    print(f"Valid F1:   {score:.3}")

    if hasattr(clf, "coef_"):
        print(f"dimensionality: {best_clf.coef_.shape[1]}")
        print(f"density: {density(best_clf.coef_)}")
        print()

    print()
    if custom_name:
        clf_descr = str(custom_name)
    else:
        clf_descr = best_clf.__class__.__name__
    return clf_descr, score, train_time, test_time

In [None]:
results = []
STATE=12345
JOB=-1
for clf, params, name in (
    (LogisticRegression(penalty='l2', max_iter=1000, class_weight='balanced', random_state=STATE, n_jobs=JOB), {'C': [1, 3, 5, 7], 'solver': ['lbfgs', 'liblinear', 'sag', 'saga']}, "Logistic Regression"),
    (RidgeClassifier(class_weight='balanced', random_state=STATE), {'alpha': [1, 3, 5, 7], 'solver': ['lsqr', 'sparse_cg', 'sag']}, "Ridge Classifier"),
    (KNeighborsClassifier(n_jobs=JOB), {'n_neighbors': [50, 100, 150], 'leaf_size': [20, 30, 40]}, "kNN"),
    (RandomForestClassifier(class_weight='balanced', random_state=STATE, n_jobs=JOB), {'n_estimators': [100, 200, 300], 'criterion': ['gini', 'entropy', 'log_loss'], 'max_depth': [5, 10, 15, 20]}, "Random Forest"),
    # L2 penalty Linear SVC
    (LinearSVC(class_weight='balanced', loss='squared_hinge', dual=False, random_state=STATE), {'C': [0.1, 0.5, 1.0]}, "Linear SVC"),
    # L2 penalty Linear SGD
    (SGDClassifier(alpha=1e-4, early_stopping=True, class_weight='balanced', random_state=STATE, n_jobs=JOB), {'n_iter_no_change': [3, 5, 7], 'loss': ['hinge', 'log_loss', 'log', 'modified_huber', 'squared_hinge', 'perceptron', 'squared_error', 'huber', 'epsilon_insensitive', 'squared_epsilon_insensitive']}, "SGD"),
    # NearestCentroid (aka Rocchio classifier)
    (NearestCentroid(), {'metric': ['euclidean']}, "NearestCentroid"),
    # Sparse naive Bayes classifier
    (ComplementNB(), {'alpha': [0.1, 0.3, 0.5, 0.7, 0.9], 'norm': [True, False]}, "Complement naive Bayes")
    ):
    print("=" * 80)
    print(name)
    print(params)
    results.append(benchmark(clf, params, name))

In [None]:
print(f'Модель {best_cls.__class__.__name__} показала наилучший результат метрики F1 = {best_score}')

<b>Вывод</b>: для работы с текстовыми данными лучшие показатели были при применении векторизации слов <b>TfidfVectorizer</b>. При этом на повышение метрики хорошо повлияли доли слов в каждой категории.

## Сохранение модели

In [None]:
features_final = features.copy()

scaler = MinMaxScaler()
features_final[numeric] = scaler.fit_transform(features_final[numeric])

tfidf = TfidfVectorizer(lowercase=False)
column_transformer = ColumnTransformer([('vect1', tfidf, 'porter_text')], remainder='passthrough')

features_final = column_transformer.fit_transform(features_final)

dump(scaler, OUTPUT_PATH + 'subtitle.scaler')
dump(column_transformer, OUTPUT_PATH + 'subtitle.transformer')

In [None]:
best_cls.fit(features_final, target)
dump(best_cls, OUTPUT_PATH + 'subtitle.model')