Skip to content
Permalink
Browse files
Possible metadata method - need to be tested with concurrent video do…
…wnlaod
  • Loading branch information
Javinator9889 committed Sep 22, 2019
1 parent 919fe62 commit 07ce5352e78343f8b9d3d35ce0ca1f2a1b1afc34
@@ -102,3 +102,6 @@ venv.bak/

# mypy
.mypy_cache/

# keys folder
keys/
@@ -18,6 +18,7 @@ cache:

before_script:
- python -V # Print out python version for debugging
- apt install libchromaprint-tools

test:pylint:
script:
@@ -13,4 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..audio.audio_utils import AudioUtils
from ..audio.ffmpeg import FFmpegOpener
from ..audio.ffmpeg import ffmpeg_available
from ..audio.fpcalc import FPCalc
@@ -0,0 +1,49 @@
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO
from subprocess import PIPE
from subprocess import Popen


def ffmpeg_available() -> bool:
try:
proc = Popen(["ffmpeg", "-version"],
stdout=PIPE,
stderr=PIPE)
except OSError:
return False
else:
proc.wait()
return proc.returncode == 0


class FFmpegOpener(object):
def __init__(self, data: bytes):
io = BytesIO(data)
self.__ffmpeg_proc = Popen(["ffmpeg", "-i", "-", "-f", "s16le", "-"],
stdout=PIPE, stderr=PIPE, stdin=io)
self.__out = None
self.__err = None

def open(self) -> int:
self.__out, self.__err = self.__ffmpeg_proc.communicate()
return self.__ffmpeg_proc.returncode

def get_output(self) -> bytes:
return self.__out

def get_extra(self) -> bytes:
return self.__err
@@ -0,0 +1,50 @@
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from subprocess import PIPE
from subprocess import Popen

from ..constants import FPCALC


def is_fpcalc_available() -> bool:
try:
proc = Popen(["fpcalc", "-v"], stdout=PIPE, stderr=PIPE)
except OSError:
return False
else:
proc.wait()


class FPCalc(object):
def __init__(self, audio: bytes):
fpcalc = Popen(FPCALC, stdout=PIPE, stdin=PIPE)
out, _ = fpcalc.communicate(audio)
res = out.decode("utf-8")

duration_pattern = "[^=]\\d+\\n"
fingerprint_pattern = "[^=]*$"
duration = re.search(duration_pattern, res)
fingerprint = re.search(fingerprint_pattern, res)

self.__duration: int = int(duration.group(0))
self.__fp: str = str(fingerprint.group(0))

def duration(self) -> int:
return self.__duration

def fingerprint(self) -> str:
return self.__fp
@@ -13,4 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..constants.app_constants import ydl_cli_options
from ..constants.app_constants import ACOUSTID_KEY
from ..constants.app_constants import FPCALC
from ..constants.app_constants import YDL_CLI_OPTIONS
@@ -13,5 +13,9 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
ydl_cli_options = ["youtube-dl", "--format", "bestaudio[ext=m4a]", "--quiet", "--output",
import os

YDL_CLI_OPTIONS = ["youtube-dl", "--format", "bestaudio[ext=m4a]", "--quiet", "--output",
"-"]
FPCALC = ["fpcalc", "-"]
ACOUSTID_KEY = os.environ["ACOUSTID_KEY"]
@@ -16,13 +16,13 @@
from io import BytesIO
from typing import Tuple

from ..constants.app_constants import ydl_cli_options
from ..constants.app_constants import YDL_CLI_OPTIONS


class YouTubeDownloader(object):
def __init__(self, url: str):
self.__url: str = url
self.__options: list = ydl_cli_options.copy()
self.__options: list = YDL_CLI_OPTIONS.copy()
self.__options.append(self.__url)

def download(self) -> Tuple[BytesIO, bytes]:
@@ -37,7 +37,7 @@ def download(self) -> Tuple[BytesIO, bytes]:
return BytesIO(stdout), stdout
else:
raise RuntimeError("youtube-dl downloader exception - more info: " +
str(stderr))
str(stderr.decode("utf-8")))

def get_url(self) -> str:
return self.__url
@@ -13,26 +13,3 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO

import soundfile


class AudioUtils(object):
def __init__(self, audio: BytesIO):
self.__audio = soundfile.SoundFile(audio)

def get_audio_samplerate(self) -> int:
return self.__audio.samplerate

def get_audio_channels(self) -> int:
return self.__audio.channels

def get_audio_duration(self) -> float:
return self.__audio.frames / self.get_audio_samplerate()

def get_audio_name(self) -> str:
return self.__audio.name

def get_audio_format(self) -> str:
return self.__audio.format
@@ -13,24 +13,73 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO

import acoustid
import musicbrainzngs

try:
import ujson as json
except ImportError:
import json

from .. import AudioUtils
from ..audio import FPCalc
from ..constants import ACOUSTID_KEY


class MetadataIdentifier(object):
def __init__(self, audio: BytesIO, raw: bytes):
self.__audio = raw
self.__audio_info = AudioUtils(audio)

def _calculate_fingerprint(self) -> bytes:
return acoustid.fingerprint(self.__audio_info.get_audio_samplerate(),
self.__audio_info.get_audio_channels(),
iter(self.__audio))

def identify_audio(self) -> list:
fingerprint = self._calculate_fingerprint()
return acoustid.lookup(None, fingerprint,
self.__audio_info.get_audio_duration())
def __init__(self, audio: bytes):
self.__fingerprint = FPCalc(audio)
self.__result: json = None
self.__artist: str = ""
self.__title: str = ""
self.__release_id: str = ""
self.__recording_id: str = ""
self.__score: float = 0.0
self.__cover: bytes = bytes(0)

def identify_audio(self) -> json:
data: json = acoustid.lookup(apikey=ACOUSTID_KEY,
fingerprint=self.__fingerprint.fingerprint(),
duration=self.__fingerprint.duration(),
meta="recordings releaseids")
self.__result = data
if data["status"] == "ok" and "results" in data:
result = data["results"][0]
score = result["score"]
recording = result["recordings"][0]
if recording.get("artists"):
names = [artist["name"] for artist in recording["artists"]]
artist_name = "; ".join(names)
else:
artist_name = None
title = recording.get("title")
release_id = recording["releases"][0]["id"]
recording_id = recording.get("id")

self.__score = score
self.__title = title
self.__recording_id = recording_id
self.__release_id = release_id
self.__artist = artist_name
self.__cover = musicbrainzngs.get_image_front(release_id)
return data

def get_title(self) -> str:
return self.__title

def get_score(self) -> float:
return self.__score

def get_artist(self) -> str:
return self.__artist

def get_recording_id(self) -> str:
return self.__recording_id

def get_release_id(self) -> str:
return self.__release_id

def get_cover(self) -> bytes:
return self.__cover

def get_results(self) -> json:
return self.__result
@@ -13,3 +13,4 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..metadata.MetadataIdentifier import MetadataIdentifier
@@ -1,4 +1,5 @@
SoundFile
musicbrainzngs
ujson
youtube_dl
pyacoustid
python-telegram-bot
@@ -0,0 +1,31 @@
import unittest
from pprint import pprint

from YouTubeMDBot.downloader import YouTubeDownloader
from YouTubeMDBot.metadata import MetadataIdentifier


class IdentifierTest(unittest.TestCase):
def test_identification(self):
url = "https://www.youtube.com/watch?v=YQHsXMglC9A"
downloader = YouTubeDownloader(url=url)
audio, data = downloader.download()
with open("hello.m4a", "wb") as song:
song.write(data)
identifier = MetadataIdentifier(audio=data)

results = identifier.identify_audio()
print("{0} by {1} - score: {2} / 1\n"
"\thttps://musicbrainz.org/recording/{3}\n"
"\thttps://musicbrainz.org/release/{4}\n\n"
.format(identifier.get_title(), identifier.get_artist(),
identifier.get_score(),
identifier.get_recording_id(), identifier.get_release_id()))
with open("cover.jpg", "wb") as cover:
cover.write(identifier.get_cover())

pprint(results)


if __name__ == '__main__':
unittest.main()

0 comments on commit 07ce535

Please sign in to comment.