Skip to content

Commit

Permalink
Merge pull request #823 from sickelap/fix_motion_photo_processing
Browse files Browse the repository at this point in the history
do not process motion photos other than image/jpeg
  • Loading branch information
derneuere committed Apr 15, 2023
2 parents 7c62dbc + 6b00f57 commit 9369c5f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
4 changes: 4 additions & 0 deletions api/models/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def _locate_embedded_video_samsung(data):

def has_embedded_media(file: File) -> bool:
path = str(file.path)
mime = magic.Magic(mime=True)
mime_type = mime.from_file(path)
if mime_type != "image/jpeg":
return False
with open(path, "rb+") as image:
with mmap(image.fileno(), 0, access=ACCESS_READ) as mm:
return (
Expand Down
28 changes: 17 additions & 11 deletions api/tests/test_motion_photo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def create_test_file(path: str, user: User, content: bytes):
return File.create(path, user)


JPEG = b"\xDE\xAD\xFA\xCE" + JPEG_EOI_MARKER
JPEG_MAGIC_NUMBER = b"\xFF\xD8\xFF"
JPEG = JPEG_MAGIC_NUMBER + b"\xDE\xAD\xFA\xCE" + JPEG_EOI_MARKER
MP4_DATA = b"\xCA\xFE\xFE\xED"
MP4_PREFIX = b"\x00\x00\x00\x18"
MP4 = MP4_PREFIX + b"ftypmp42" + MP4_DATA
Expand All @@ -30,32 +31,38 @@ def create_test_file(path: str, user: User, content: bytes):
@override_settings(MEDIA_ROOT="/tmp")
class MotionPhotoTest(TestCase):
def setUp(self):
self.test_file_path = "/tmp/test_image.jpeg"
self.test_image_path = "/tmp/test_file.jpeg"
self.test_video_path = "/tmp/test_file.mp4"
self.user = create_test_user()
self.client = APIClient()

def test_should_not_process_non_jpeg_files(self):
file = create_test_file(self.test_video_path, self.user, MP4)
actual = has_embedded_media(file)
self.assertFalse(actual)

def test_google_pixel_motion_photo_signatures(self):
for signature in GOOGLE_PIXEL_MOTION_PHOTO_MP4_SIGNATURES:
content = JPEG + MP4_PREFIX + signature + MP4_DATA
file = create_test_file(self.test_file_path, self.user, content)
file = create_test_file(self.test_image_path, self.user, content)
actual = has_embedded_media(file)
self.assertTrue(actual)

def test_samsung_motion_photo_signature(self):
content = JPEG + SAMSUNG_MOTION_PHOTO_MARKER + MP4_DATA
file = create_test_file(self.test_file_path, self.user, content)
file = create_test_file(self.test_image_path, self.user, content)
actual = has_embedded_media(file)
self.assertTrue(actual)

def test_other_content_should_not_report_as_having_embedded_media(self):
file = create_test_file(self.test_file_path, self.user, RANDOM_BYTES)
file = create_test_file(self.test_image_path, self.user, RANDOM_BYTES)
actual = has_embedded_media(file)
self.assertFalse(actual)

def test_extract_embedded_media_from_google_motion_photo(self):
for signature in GOOGLE_PIXEL_MOTION_PHOTO_MP4_SIGNATURES:
content = JPEG + MP4_PREFIX + signature + MP4_DATA
file = create_test_file(self.test_file_path, self.user, content)
file = create_test_file(self.test_image_path, self.user, content)
path = extract_embedded_media(file)
expected = f"{settings.MEDIA_ROOT}/embedded_media/{file.hash}_1.mp4"
self.assertEqual(path, expected)
Expand All @@ -65,7 +72,7 @@ def test_extract_embedded_media_from_google_motion_photo(self):

def test_extract_embedded_media_from_samsung_motion_photo(self):
content = JPEG + SAMSUNG_MOTION_PHOTO_MARKER + MP4
file = create_test_file(self.test_file_path, self.user, content)
file = create_test_file(self.test_image_path, self.user, content)
path = extract_embedded_media(file)
expected = f"{settings.MEDIA_ROOT}/embedded_media/{file.hash}_1.mp4"
self.assertEqual(expected, path)
Expand All @@ -75,16 +82,15 @@ def test_extract_embedded_media_from_samsung_motion_photo(self):

def test_fetch_embedded_media_as_owner(self):
self.client.force_authenticate(user=self.user)
embedded_media = create_test_file(self.test_file_path, self.user, MP4)
embedded_media = create_test_file(self.test_video_path, self.user, MP4)
photo = create_test_photo(owner=self.user)
photo.main_file.embedded_media.add(embedded_media)

response = self.client.get(f"/media/embedded_media/{photo.pk}")
self.assertEqual(response.status_code, 200)

def test_fetch_embedded_media_as_anonymous_when_photo_is_public(self):
self.client.force_authenticate(user=None)
embedded_media = create_test_file(self.test_file_path, self.user, MP4)
embedded_media = create_test_file(self.test_video_path, self.user, MP4)
photo = create_test_photo(owner=self.user, public=True)
photo.main_file.embedded_media.add(embedded_media)

Expand All @@ -93,7 +99,7 @@ def test_fetch_embedded_media_as_anonymous_when_photo_is_public(self):

def test_fetch_embedded_media_as_anonymous_when_photo_is_private(self):
self.client.force_authenticate(user=None)
embedded_media = create_test_file(self.test_file_path, self.user, MP4)
embedded_media = create_test_file(self.test_video_path, self.user, MP4)
photo = create_test_photo(owner=self.user, public=False)
photo.main_file.embedded_media.add(embedded_media)

Expand Down
6 changes: 5 additions & 1 deletion api/views/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,11 @@ def get(self, request, path, fname, format=None):
if path.lower() == "embedded_media":
jwt = request.COOKIES.get("jwt")
query = Q(public=True)
if jwt is not None:
if request.user.is_authenticated:
query = Q(owner=request.user)
if (
jwt is not None
): # pragma: no cover, currently it's difficult to test requests with jwt in cookies
try:
token = AccessToken(jwt)
user = User.objects.filter(id=token["user_id"]).only("id").first()
Expand Down

0 comments on commit 9369c5f

Please sign in to comment.