In [1]:
import requests
import time
import m3u8
from urllib.parse import urljoin, urlparse

In [2]:
m3u8_url='http://123.54.220.216:9901/tsfile/live/1045_1.m3u8?key=txiptv&playlive=1&authid=0'

In [3]:
playlist = m3u8.load(m3u8_url)

In [4]:
type(playlist)

m3u8.model.M3U8

In [5]:
playlist

<m3u8.model.M3U8 at 0x1a1559cbed0>

In [6]:
segments = playlist.segments

In [7]:
segments

[<m3u8.model.Segment at 0x1a155a86c90>,
 <m3u8.model.Segment at 0x1a155a58610>,
 <m3u8.model.Segment at 0x1a155a586d0>,
 <m3u8.model.Segment at 0x1a155d68850>,
 <m3u8.model.Segment at 0x1a155d688d0>]

In [8]:
parsed_uri = urlparse(m3u8_url)

In [9]:
parsed_uri

ParseResult(scheme='http', netloc='123.54.220.216:9901', path='/tsfile/live/1045_1.m3u8', params='', query='key=txiptv&playlive=1&authid=0', fragment='')

In [10]:
base_uri = f"{parsed_uri.scheme}://{parsed_uri.netloc}"

In [11]:
base_uri

'http://123.54.220.216:9901'

In [12]:
len(segments)

5

In [13]:
segment = segments[1 % len(segments)]

In [14]:
segment

<m3u8.model.Segment at 0x1a155a58610>

In [15]:
segment

<m3u8.model.Segment at 0x1a155a58610>

In [16]:
segment_uri = segment.uri

In [17]:
segment_uri

'live_1045_1_40085613.ts?key=txiptv&key2=40085613'

In [18]:
segment_uri.startswith('http')

False

In [19]:
segment_uri.startswith('/')

False

In [20]:
base_url = m3u8_url.rsplit('/', 1)[0] + '/'

In [21]:
base_url

'http://123.54.220.216:9901/tsfile/live/'

In [22]:
segment_url = urljoin(base_url, segment_uri)

In [23]:
segment_url

'http://123.54.220.216:9901/tsfile/live/live_1045_1_40085613.ts?key=txiptv&key2=40085613'

In [24]:
response = requests.get(segment_url, stream=True, timeout=5)

In [25]:
response.raise_for_status()

In [26]:
response

<Response [200]>

In [34]:
print(response.headers)

{'Content-Length': '5421920', 'Accept-Ranges': 'bytes', 'Connection': 'keep-alive', 'Content-Type': 'application/octet-stream;charset=UTF-8', 'Date': 'Tue, 26 Aug 2025 06:31:18 GMT', 'Keep-Alive': 'timeout=60', 'Last-Modified': 'Tue, 26 Aug 2025 06:31:04 GMT', 'Proxy-Connection': 'keep-alive'}


In [28]:
content_length = int(response.headers.get('content-length', 0))

In [29]:
content_length

5421920

In [31]:
len(response.content)

3727140

In [35]:
len(response.text)

3543525

In [32]:
'content-encoding' in response.headers

False

In [36]:
def test_m3u8_speed(m3u8_url, test_count=3):
    """
    测试M3U8直播源的速度
    
    参数:
    m3u8_url: M3U8直播源的URL
    test_count: 测试次数，默认为3次
    
    返回:
    平均速度(KB/s)，如果测试失败则返回None
    """
    speeds = []
    
    try:
        # 加载M3U8文件
        playlist = m3u8.load(m3u8_url)
        segments = playlist.segments
        
        if not segments:
            print("错误: M3U8文件中没有找到视频段")
            return None
        
        # 构建基础URL（处理相对路径）
        parsed_uri = urlparse(m3u8_url)
        base_uri = f"{parsed_uri.scheme}://{parsed_uri.netloc}"
        
        for i in range(test_count):
            # 选择一个视频段（循环使用可用段）
            segment = segments[i % len(segments)]
            segment_uri = segment.uri
            
            # 处理相对URL
            if segment_uri.startswith('http'):
                segment_url = segment_uri
            elif segment_uri.startswith('/'):
                segment_url = base_uri + segment_uri
            else:
                # 对于相对路径，使用M3U8文件所在目录作为基础
                base_url = m3u8_url.rsplit('/', 1)[0] + '/'
                segment_url = urljoin(base_url, segment_uri)
            
            # 测试下载速度
            try:
                start_time = time.time()
                response = requests.get(segment_url, stream=True, timeout=5)
                # response.raise_for_status()
                if response.status_code != 200:
                    return None
                # 获取内容长度
                content_length = len(response.content)
                if content_length
                
                # 计算下载速度
                download_time = time.time() - start_time
                speed = content_length / download_time / 1024  # KB/s
                speeds.append(speed)
                
                print(f"测试 {i+1}: 速度 {speed:.2f} KB/s")
                
            except Exception as e:
                print(f"测试 {i+1} 失败: {str(e)}")
                continue
        
        if not speeds:
            print("所有测试都失败了")
            return None
        
        # 计算平均速度
        avg_speed = sum(speeds) / len(speeds)
        print(f"平均速度: {avg_speed:.2f} KB/s")
        return avg_speed
        
    except Exception as e:
        print(f"测试过程中发生错误: {str(e)}")
        return None

In [37]:
test_m3u8_speed(m3u8_url)

测试 1: 速度 500.31 KB/s
测试 2: 速度 453.54 KB/s
测试 3: 速度 467.73 KB/s
平均速度: 473.86 KB/s


473.86066434443387

In [38]:
segment.duration

5.0