/
encoding-hevc-helper.py
160 lines (134 loc) · 6.22 KB
/
encoding-hevc-helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import asyncio
from datetime import timedelta
from dotenv import load_dotenv
from azure.identity.aio import DefaultAzureCredential
from azure.mgmt.media.aio import AzureMediaServices
from azure.mgmt.media.models import (
Transform,
TransformOutput,
StandardEncoderPreset,
H265Layer,
AacAudio,
H265Video,
H265Complexity,
PngImage,
Mp4Format,
PngLayer,
PngFormat,
AacAudioProfile,
OnErrorType,
Priority
)
import os, random
# Import Job Helpers
from importlib.machinery import SourceFileLoader
mymodule = SourceFileLoader('encoding_job_helpers', 'Common/encoding_job_helpers.py').load_module()
# Get environment variables
load_dotenv()
default_credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)
# Get the environment variables
subscription_id = os.getenv('AZURE_SUBSCRIPTION_ID')
resource_group = os.getenv('AZURE_RESOURCE_GROUP')
account_name = os.getenv('AZURE_MEDIA_SERVICES_ACCOUNT_NAME')
# The AMS Client
print("Creating AMS Client")
client = AzureMediaServices(default_credential, subscription_id)
# Send envs to helper function
mymodule.set_account_name(account_name)
mymodule.set_resource_group(resource_group)
mymodule.set_subscription_id(subscription_id)
mymodule.create_default_azure_credential(default_credential)
mymodule.create_azure_media_services(client)
# The file you want to upload. For this example, the file is placed under Media folder.
# The file ignite.mp4 has been provided for you.
source_file = "ignite.mp4"
name_prefix = "encodeHEVC"
output_folder = "Output/"
# This is a random string that will be added to the naming of things so that you don't have to keep doing this during testing
uniqueness = str(random.randint(0,9999))
transform_name = 'HEVCEncoding'
async def main():
async with client:
# Create a new Standard encoding Transform for H264
print(f"Creating Standard Encoding transform named: {transform_name}")
# For this snippet, we are using 'StandardEncoderPreset'
transform_output = TransformOutput(
preset = StandardEncoderPreset(
codecs= [AacAudio(channels=2, sampling_rate=48000, bitrate=128000, profile=AacAudioProfile.AAC_LC),
H265Video(key_frame_interval=timedelta(seconds=2), complexity=H265Complexity.BALANCED, layers = [H265Layer(bitrate=1800000, max_bitrate=1800000, width="1280", height="720", b_frames=4, label= "HD-1800kbps"),
H265Layer(bitrate=800000, max_bitrate=800000, width="960", height="540", b_frames=4, label="SD-800kbps"),
H265Layer(bitrate=300000, max_bitrate=300000, width="640", height="480", b_frames=4, label="SD-300kbps")],
),
PngImage(
# Also generate a set of PNG thumbnails
start="25%",
step="25%",
range="80%",
layers= [
PngLayer(
width="50%",
height="50%"
)
]
)],
# Specify the format for the output files - one for video + audio, and another for the thumbnails
formats=[
# Mux the H265 video and AAC audio into MP4 files, using basename, label, bitrate and extension macros
# Note that since you have multiple H265 Layers defined above, you have to use a macro that produces unique names per H264Layer
# Either {Label} or {Bitrate} should suffice
Mp4Format(filename_pattern="Video-{Basename}-{Label}-{Bitrate}{Extension}"),
PngFormat(filename_pattern="Thumbnail-{Basename}-{Index}{Extension}")
]
),
# What should we do with the job if there is an error?
on_error=OnErrorType.STOP_PROCESSING_JOB,
# What is the relative priority of this job to others? Normal, high or low?
relative_priority=Priority.NORMAL
)
print("Creating encoding transform...")
# Adding transform details
my_transform = Transform()
my_transform.description="A simple custom H264 encoding transform with 3 MP4 bitrates"
my_transform.outputs = [transform_output]
print(f"Creating transform {transform_name}")
try:
await client.transforms.create_or_update(
resource_group_name=resource_group,
account_name=account_name,
transform_name=transform_name,
parameters=my_transform)
print(f"{transform_name} created (or updated if it existed already). ")
print()
except:
print("There was an error creating the transform.")
input = await mymodule.get_job_input_type(source_file, {}, name_prefix, uniqueness)
output_asset_name = f"{name_prefix}-output-{uniqueness}"
job_name = f"{name_prefix}-job-{uniqueness}"
print(f"Creating the output asset (container) to encode the content into...")
output_asset = await client.assets.create_or_update(resource_group, account_name, output_asset_name, {})
if output_asset:
print("Output Asset created.")
else:
print("There was a problem creating an output asset.")
print()
print(f"Submitting the encoding job to the {transform_name} job queue...")
job = await mymodule.submit_job(transform_name, job_name, input, output_asset_name)
print(f"Waiting for encoding job - {job.name} - to finish")
job = await mymodule.wait_for_job_to_finish(transform_name, job_name)
# Uncomment the following lines to download the resulting files.
"""
if job.state == 'Finished':
await mymodule.download_results(output_asset_name, output_folder)
print("Downloaded results to local folder. Please review the outputs from the encoding job.")
"""
# closing media client
print('Closing media client')
await client.close()
# closing credential client
print('Closing credential client')
await default_credential.close()
if __name__ == "__main__":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())