## Подсистема мониторинга рекламных объявлений на уличных щитах с применением технологии программного анализа изображений в автоматизированной системе мониторинга выполнения обязательств поставщиков рекламных услуг

### Задача - создать сервис для рекламного агенства, который бы позволял отбирать наиболее подходящие изображения для отчета.

In [2]:
import os

In [3]:
import json
import requests

In [4]:
import moviepy.editor as mpy
import cv2
import numpy as np
import glob

In [5]:
def set_video(video_path: str):
        '''
        Получаем капчу по указанному пути
        
        Arguments:
        video_path, str - Путь к видео файлу
        
        Returns:
        capture, cv2.VideoCapture
        '''
        
        # Получаем клип и его fps
        video_clip = mpy.VideoFileClip(video_path)
        # Запоминаем fps
        fps = video_clip.fps
        # Запоминаем длительность видео в секундах
        duration = video_clip.duration
        
        # Запоминаем длину и ширину кадра в видео
        frame_size = video_clip.size

        # Получаем капчу клипа
        cap = cv2.VideoCapture(video_path)
        
        return cap

In [6]:
def viewImage(image, name_of_window = "window"):
    '''Вывод изображения на экран(для отладки)
    
    Arguments:
    image, numpy.ndarray - Изображение, полученное через cap.read()
    '''
    
    ##cv2.namedWindow(name_of_window, cv2.WINDOW_NORMAL)
    cv2.imshow(name_of_window, image)
    cv2.waitKey(0)
    
    cv2.destroyAllWindows()

In [7]:
def get_frame(cap):
    '''
    Получаем кадр из видео
    
    Arguments:
    cap, cv2.VideoCapture - Капча видеофайла
    
    Returns:
    frame, numpy.ndarray - кадр оригинального видеоролика
    '''
    #cap = set_video('to_diplom\Originals\Wrapped2021_1440x720_05_Manizha.mp4')

    counter_of_frames = 0
    
    # Запускаем капчу
    while(cap.isOpened()):
        
        # Временный отлмдачный механизм
        if counter_of_frames > 3:
            break
            
        ret, frame = cap.read() #ret - сообщ об отсуствии ошибок, frame - сам кадр
        
        # Инкрементируем счетчик кадров
        counter_of_frames+=1
            
        #?
        # Блок останваливает обработку в конце файла или при нажатии клавиши "q"
        if cv2.waitKey(1) & 0xFF == ord('q') or ret == False:
            last_frame = frame
            break
        
    # "Освобождаем" капчу
    cap.release()
    cv2.destroyAllWindows()
    
    return frame

In [8]:
#Функция вычисления хэша
def CalcImageHash(image: np.ndarray):
    '''
    Вычисляем хеш для картинки
    
    Arguments:
    image, numpy.ndarray - Капча видеофайла
    
    Returns:
    _hash, str - хеш картинки
    '''
    #image = cv2.imread(FileName) #Прочитаем картинку
    resized = cv2.resize(image, (8,8), interpolation = cv2.INTER_AREA) #Уменьшим картинку
    gray_image = cv2.cvtColor(resized, cv2.COLOR_BGR2GRAY) #Переведем в формат градаций серего
    
    avg = gray_image.mean() #Среднее значение пикселя
    ret, threshold_image = cv2.threshold(gray_image, avg, 255, 0) #Бинаризация по порогу
    
    #Рассчитаем хэш
    _hash=""
    for x in range(8):
        for y in range(8):
            val=threshold_image[x,y]
            if val==255:
                _hash=_hash+"1"
            else:
                _hash=_hash+"0"
            
    return _hash
 
def CompareHash(hash1: str, hash2: str):
    '''
    Сравнение 2х хешей
    
    Arguments:
    hash1, str - хеш первого изображения
    hash2, str - хеш второго изображения
    
    Returns:
    count, int - мера различия хешей; 0 - хеши идентичны
    '''
    #l=len(hash1)
    #i=0
    count=0
    for i in range(len(hash1)):
        if hash1[i]!=hash2[i]:
            count += 1
    #while i<l:
    #    if hash1[i]!=hash2[i]:
    #        count += 1
    #    i += 1
    return count
      
#    return CompareHash(CalcImageHash(crop_img_1),CalcImageHash(crop_img_2))

In [9]:
def read_images(file_path: str):
    '''
    Получает все картинки из папки
    
    Arguments:
    file_path, str - относительный путь к папке с картинками
    
    returns:
    array of images, (image: np.ndarray)
    '''
    
    # Собираем из файла все названия картинок
    names = [name for name in glob.glob( file_path + "/*.jpg")]
    
    images = []
    
    # Такая сложность из-за проблемы чтения русских символов библиотекой cv2
    
    # Для каждого пути получаем картинку
    for name in names:
        f = open(name, "rb")
        chunk = f.read()
        chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
        img = cv2.imdecode(chunk_arr, cv2.IMREAD_COLOR)
        
        images.append(img)
        
    return images

In [10]:
def read_named_images(file_path: str):
    '''
    Получает все картинки из папки
    
    Arguments:
    file_path, str - относительный путь к папке с картинками
    
    returns:
    array of images dict
    '''
    
    # Собираем из файла все названия картинок
    names = [name for name in glob.glob( file_path + "/*.jpg")]
    
    images = []
    
    # Такая сложность из-за проблемы чтения русских символов библиотекой cv2
    
    # Для каждого пути получаем картинку
    for name in names:
        f = open(name, "rb")
        chunk = f.read()
        chunk_arr = np.frombuffer(chunk, dtype=np.uint8)
        img = cv2.imdecode(chunk_arr, cv2.IMREAD_COLOR)
        
        images.append({'img': img, 'id': name.split('.')[0]})
        
    return images

In [11]:
def get_most_similar_image(frame, images):
    orig_hash = CalcImageHash(frame)
    min_differences = CompareHash(orig_hash, CalcImageHash(images[0]))
    most_similar_image = images[0]
    
    for x in images:
        difference = CompareHash(orig_hash, CalcImageHash(x))
        if difference < min_differences:
            min_differences = difference
            most_similar_image = x
            print('.', end= '')
            
    return most_similar_image
#viewImage(most_similar_image)

In [12]:
def get_most_similar_named_image(frame, images):
    """
    Поиск наиболее подходящего изображения
    
    Arguments:
    frame, np.ndarray - оригинальное изображение
    images, list - Список из словарей, где в каждом словаре хрянится id и само изображение в формате np.ndarray
    
    returns:
    Словарь, где хранится наиболее похожее на оригинал изобраение и его id
    """
    orig_hash = CalcImageHash(frame)
    min_differences = CompareHash(orig_hash, CalcImageHash(images[0]['img']))
    
    most_similar_image = {'img': images[0]['img'], 'id': images[0]['id']}
    
    for x in images:
        difference = CompareHash(orig_hash, CalcImageHash(x['img']))
        if difference < min_differences:
            min_differences = difference
            most_similar_image = {'img': x['img'], 'id': x['id'].split('tmp\\')[1]}
            print('.', end= '')
            
    return most_similar_image

# Запускаем:

%%time

#capture = set_video('to_diplom\Originals\Wrapped2021_1440x720_05_Manizha.mp4')
# Запуск
capture = set_video('to_diplom\Originals\Wrapped2021_1440x720_05_Джарахов_Маркул.mp4')
orig_image = get_frame(capture)
images = read_images('to_diplom/Photo_reports/test1')
result_image = get_most_similar_image(orig_image, images)

#viewImage(result_image)

1. Считываем json, вычленяя оригиналы и фотографии (Стоит ли скачивать фотки??? Мб и нет)
2. Бегаем по каждому оригиналу
3. Для каждого оригинала ищем самую похожую фотку
4. Создаем ответ: Отправляем json в котором массив Id фоток

In [13]:
def get_json_data(path: str):
    """Возвращает данные из json объекта"""
    
    with open(path, 'r') as j:
        json_data = json.load(j)
        
    return json_data

In [14]:
def load_photos_to_directory(named_photos, directory_path = 'to_diplom\Photo_reports\\tmp\\'):
    """Функция загрузки фотографий в папку из словаря с id и url фотографий
    
    Arguments:
    named_photos, list - список фотографий, содержащий url и id каждой фотографии
    directory_path, str - Путь к папке, куда необходимо скачать фотографии
    """
    # Скачиваем весь массив фотографий в локальную папку
    for img_dict in named_photos:
        
        p = requests.get(img_dict['url'])
        output_filepath = directory_path + img_dict['pdf_id'] + '.jpg'
        out = open(output_filepath, "wb")
        out.write(p.content)
        out.close()

In [15]:
def remove_images(path = 'to_diplom/Photo_reports/tmp'):
    """Функция удаления содержимого папки"""
    path = path + "/*"
    files = glob.glob(path)
    for f in files:
        os.remove(f)

### Запускаем

In [21]:
# Получаем из json данные
json_data = get_json_data('to_diplom\jsons\Spotify 1325894.json')

# Запоминаем id отчета, пути к оригинальным видео, словарь фотографий и callback адрес
report_id = json_data['pd_id']
original_paths = json_data['originals']
photos_dict = json_data['photos']
result_url = json_data['callback']

In [22]:
%%time
load_photos_to_directory(photos_dict)

Wall time: 52.7 s


In [23]:
%%time

# Перебираем каждый видеоролик оригианла
result_images = []
for capture in original_paths:
    
    # Осуществляем поиск наиболее похожего изображения
    orig_image = get_frame(set_video(capture))
    images = read_named_images('to_diplom/Photo_reports/tmp')
    result_image = get_most_similar_named_image(orig_image, images)
    
    result_images.append(result_image)
    
# Удаляем фотографии из папки
remove_images()

# Отправляем результат
result_id = [ x['id'] for x in result_images]
response = requests.post(result_url, json={'pd_id': report_id, 'photos': result_id})
print("Код ответа: " + str(response))
print({'pd_id': report_id, 'photos': result_id})


.....Код ответа: <Response [200]>
{'pd_id': '1325894', 'photos': ['1528828', '1534736']}
Wall time: 9.85 s


In [31]:
print(images[0]['img'].shape)

(360, 640, 3)


In [26]:
for res in result_images:
    viewImage(res['img'])
    print(res['id'])

1528828
1534736
