<a href="https://colab.research.google.com/github/YeaAyuni/mze5/blob/main/ZEE5_Downloader.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Initializing

In [None]:
#@title connect drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#@title Install Requirements
!ffmpeg -version

# ffmpeg v6.0
!bash <(curl -s https://raw.githubusercontent.com/XniceCraft/ffmpeg-colab/master/install)

# m3u8downloader package
!pip install m3u8downloader
!pip install m3u8

In [None]:
#@title Logger
from IPython.display import clear_output

class Logging():
  def __init__(self):
    self.logs = []


  def format_log(self, style):

    colorOptions = {
        "black":30,
        "red":31,
        "green": 32,
        "yellow":33,
        "blue":34,
        "purple":35,
        "cyan":36,
        "white":37
    }
    styleOptions = {
        "no-effect":0,
        "bold":1
    }
    backgroundOptions = {
        "black":40,
        "red":41,
        "green": 42,
        "yellow":43,
        "blue":44,
        "purple":45,
        "cyan":46,
        "white":47
    }

    colorCode =  colorOptions[style.get("color")]
    styleCode = styleOptions[style.get("weight")]
    backgroundCode= backgroundOptions[style.get('bg')]

    format = f"\033[{styleCode};{colorCode};{backgroundCode}m" or "\033[32m"
    return format


  def log(self, message, color="white", weight="no-effect", background_color="black", re_print=True):
    style =  {"color":color, "weight": weight, "bg": background_color}

    format = self.format_log(style=style)
    self.logs.append(f"{format}{message}\033[0m")

    if re_print:
      clear_output()
      self.printLogs()


  def warn(self, message, color="yellow", weight="no-effect", background_color="black", re_print=True):
    self.log(message, color=color, weight=weight, background_color=background_color, re_print=re_print)


  def error(self, message, color="red", weight="no-effect", background_color="black", re_print=True):
     self.log(message, color=color, weight=weight, background_color=background_color, re_print=re_print)


  def printLogs(self):
    for log in self.logs:
      print(log)

In [None]:
#@title Initialize Utility Functions
import os
import re
import m3u8
import shutil
import requests
import math
import subprocess

from pathlib import Path
from threading import Thread

USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"


def do_wget(url, output, *args):
    command= ["wget", *args, "-O", output, url]
    subprocess.run(command)

    result_output_path = os.path.join(os.getcwd(), output)

    if os.path.exists(result_output_path):
        return result_output_path

    raise Exception("Failed To Download File")


def match_string(string, pattern, catch_err=True):
    is_match = re.search(pattern, string)

    if is_match:
        return string

    if catch_err:
      raise Exception("No Pattern Match with string: ", string)


def write_from_response_content(url, output_path="/content", filename="__response_temp.txt"):
  """
  Returning file path of respose content
  """
  os.chdir(output_path)

  content_path = os.path.join(output_path, filename)

  do_wget(url, content_path, "--user-agent", USER_AGENT)

  return content_path




### Initialize ZEE5

In [None]:
#@title #### ZEE5 API
import requests

class Zee5():
  def __init__(self, details_page_url, input_page, input_limit, options, logger):
    self.logger = logger

    def get_id():
      return details_page_url.split("/")[-1]

    self.page_count = input_page
    self.limit = input_limit
    self.options = options
    self.host = "https://www.zee5.com/global"
    self.id = get_id()
    self.metadata = self.get_metadata()


  def is_valid_page_count(self):
    seasons = self.metadata["seasons"]
    for season in seasons:
      max_page_count = self.get_pages(total_episode=season['total_episodes'], limit=self.limit)

      self.logger.log(f"Max Page Count: {max_page_count}")

      if self.page_count > max_page_count:
          self.logger.error("Too Much Page, decrease page.")
          self.logger.error("Maximum Page Count: %d" % max_page_count)

          return False
    return True


  def get_pages(self, total_episode, limit):
      modulo_total_episode = total_episode % limit

      if modulo_total_episode > 0:
          total_episode = total_episode - modulo_total_episode
          page = total_episode // limit + 1
      else:
          page = total_episode // limit

      return page


  def get_metadata(self):
    params = {
        "country":"id",
        "asset_subtype":"tvshow",
        "translation":"en",
        "page":"1",
        "limit":"1"
    }
    url = f"https://gwapi.zee5.com/content/tvshow/{self.id}"

    response = requests.request(
          method="GET",
          url=url,
          params=params,
          headers=self.options['headers'],
          data=self.options['payload']
          )

    self.logger.log(f"Metadata API_URL: {response.url}")
    json_data = response.json()


    def get_seasons():
        seasons = []
        for season in json_data['seasons']:
          seasons.append({
              "id": season["id"],
              "title": season["title"],
              "total_episodes": season["total_episodes"],
          })
        return seasons

    return {
        "seasons":get_seasons()
    }


  def get_episodes(self):
      if not self.is_valid_page_count():
        return

      return_data = []

      url = f"https://gwapi.zee5.com/content/tvshow/"

      seasons = self.metadata["seasons"]

      for season in seasons:
        self.logger.log(f"=================== \nProcessing {season['title']}")

        params = {
            "season_id": season["id"],
            "page": self.page_count,
            "limit":self.limit,
            "on_air":False,
            "country":"id",
            "asset_subtype":"tvshow",
            "translation":"en"
        }

        response = requests.request(
            method="GET",
            url=url,
            params=params,
            headers= self.options['headers'],
            data= self.options['payload']
            )

        self.logger.log(f"Episodes API_URL: {response.url}")

        json_data = response.json()
        episodes_list = json_data['episode']

        for episode in episodes_list:

          episode_web_url = f"{self.host}/{episode['web_url']}"
          return_data.append(episode_web_url)

      return return_data



In [None]:
#@title ### ZEE5 WORKER

class Zee5_M3U8_Worker():
    def __init__(self, url):
        """
        URL: Zee5 Stream URL
        """
        self.zee5_patterns =  [r"https://[^?]+\.mp4", r"https://[^?]+/"]
        self.zee5_basepath = self.get_zee5_basepath(url)
        self.url = url


    def get_zee5_basepath(self, url):
        """
        Return Basepath, only for Zee5
        """
        for pattern in self.zee5_patterns:
          is_match = re.search(pattern, url)

          if is_match:
              desired_url = is_match.group()
              return desired_url

        raise Exception("No URL matched with zee5_patterns")


    def get_playlist(self):
      temp_filename = "__temp_playlist_media.txt"
      temp_m3u8_file_path = write_from_response_content(url=self.url, filename=temp_filename)

      playlist = m3u8.load(temp_m3u8_file_path)

      os.remove(temp_m3u8_file_path)

      return playlist

    def get_media(self, playlist, resolution=720):

        media = {}
        media['is_separated_media'] = True

        # video url must be always exist
        for url in playlist.playlists:
          playlist_resolution = url.stream_info.resolution[1]

          if playlist_resolution == resolution:

            media["resolution"] = playlist_resolution

            video_url = f"{self.zee5_basepath}/{url.uri}"
            media['video'] = video_url

            break

        if not "video" in media:
          raise Exception("No Video Found on media")

        # for audio, if not exist just return
        if not playlist.media:
           media['is_separated_media'] = False
           return media

        audio = playlist.media.uri[0]
        media['audio'] = f"{self.zee5_basepath}/{audio}"

        return media


    def get_media_from_playlist(self, resolution = 720):
      playlist =  self.get_playlist()
      media = self.get_media(playlist=playlist, resolution=resolution)

      return media

## Batch Mode

### Getting Data

In [None]:
#@title Find Page for Episode You Want
import math
def get_the_episode():
  eps_you_want  = 187 #@param {'type':'number'}
  max_page  = 266 #@param {'type':'number'}
  limit  = 10 #@param {'type':'number'}

  if None in [eps_you_want, max_page, limit]:
    print("fill all input!")
    return

  page = math.ceil((max_page - eps_you_want + 1) / limit)

  print("The Page Is:", page)

get_the_episode()

The Page Is: 8


In [None]:
#@title Get Episode Page Lists
#@markdown the page counting start from last episodes, example:
#@markdown > - eps 123 with 1 limit and max page 266, so the page must be 144
#@markdown > - eps 1 with 1 limit and max page 266, so the page must be 266

#@markdown > formula for page is  ```max_page - eps_you_want + 1 / limit```, with rounding up result. example result is 18,1 make it 19
#@markdown > <br> to get max_page you can test it with input page = 1


episode_Logs = Logging()

def options():
    payload = {}

    headers = {
    'x-access-token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwbGF0Zm9ybV9jb2RlIjoiV2ViQCQhdDM4NzEyIiwiaXNzdWVkQXQiOiIyMDIzLTA4LTA0VDIyOjU3OjQxLjU5MloiLCJwcm9kdWN0X2NvZGUiOiJ6ZWU1QDk3NSIsInR0bCI6ODY0MDAwMDAsImlhdCI6MTY5MTE4OTg2MX0.R93CAXbM2AhKRbqgNyScnrYhF_B5gcdA1xuUkUMRdm4',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
    'Cookie': 'AWSALB=7/qAx2L0iz6mrxWThQJ/J7YFyBxlQV8LGYoZPv/SII+HwCm6CncSEyxWOf4Lw2dadaRMtPR1RpCFVpjKYADht/f4E9aV5SrKvnFmOHTOunZd8N54KHJrwVVGI9ck; AWSALBCORS=7/qAx2L0iz6mrxWThQJ/J7YFyBxlQV8LGYoZPv/SII+HwCm6CncSEyxWOf4Lw2dadaRMtPR1RpCFVpjKYADht/f4E9aV5SrKvnFmOHTOunZd8N54KHJrwVVGI9ck'
    }

    return {
        "payload": payload,
        "headers": headers
    }


def zee5_episode_list():
    show_url = "https://www.zee5.com/global/tv-shows/details/jodha-akbar/0-6-1516" #@param {type:"string"}

    limit = 10 # @param {type:"slider", min:1, max:100, step:1}
    page = 7 #@param {type:"number"}

    show_type = "tv-show" # @param ["tv-show"]

    option = options()
    zee5_data = Zee5(details_page_url=show_url,input_page=page, input_limit=limit, options=option, logger=episode_Logs)
    episodes = zee5_data.get_episodes()

    return episodes

# print test result
zee5_episodes = zee5_episode_list()
zee5_episodes.reverse()

episode_Logs.log("Episode Page List:")
for episode in zee5_episodes:
  episode_Logs.log(f" -> {episode}", color="green")


In [None]:
#@title Start Grab Video From Zee5

#@markdown ---
folder_name = "Jodha_Akbar" #@param {type:'string'}
output_codename = "JA" #@param {type:'string'}

selenium_server = "https://26f7-114-5-111-242.ngrok-free.app" #@param {type:"string"}
timeout = 30 # @param {type:"slider", min:25, max:50, step:1}
resolution = 576 # @param ["144", "240", "360", "480", "576", "720", "1080"] {type:"raw"}


main_logs = Logging()


def get_m3u8(output, url):
  !downloadm3u8 --user-agent "$USER_AGENT" -o "$output" "$url"


def combine_media(audio, video, output="output.mp4"):
  !ffmpeg -i "$video" -i "$audio" -c copy "$output"


def zee5_m3u8_download_thread(id, media, output_dir):

    result_filename = f"{output_codename}_{id}.mp4"
    video_output_path = f"{output_dir}/video.mp4"
    audio_output_path = f"{output_dir}/audio.mp3"
    result_output_path = f"{output_dir}/{result_filename}"

    is_separated_media = media['is_separated_media']
    video_url = media['video']

    # check if media only have a video
    if not is_separated_media:

      # main_logs.log(f" -> Media only Have Video, skip combine function") # logs
      get_m3u8(output=result_output_path, url=video_url)

      return media

    video_thread = Thread(target= lambda:get_m3u8(output=video_output_path, url=video_url))

    audio_url = media['audio']
    audio_thread = Thread(target=lambda:get_m3u8(output=audio_output_path, url=audio_url))

    audio_thread.start()
    video_thread.start()

    audio_thread.join()
    video_thread.join()

    return media


def download_m3u8(index_url, id, output_dir, name):
  # main_logs.log(f" -> Processing Media in Diffrent Thread", {"color": "cyan"}) # logs
  # main_logs.log(f' -> Source: {index_url}') # logs

  zee5_worker = Zee5_M3U8_Worker(url=index_url)
  media = zee5_worker.get_media_from_playlist(resolution=resolution)

  media_video_key = "video"
  if not media_video_key in media:
    raise Exception("No Video Found in Media")

  # main_logs.log(f' -> Resolution: {media["resolution"]}') # logs
  # main_logs.log(f' -> Downloading Media from M3U8') # logs

  downloaded_media = zee5_m3u8_download_thread(id, media, output_dir)

  return downloaded_media


def verify_path(path, chdir=True):

  folder_path = Path(path)

  if folder_path.exists():
    shutil.rmtree(folder_path)
  folder_path.mkdir(parents=True)

  if chdir:
    os.chdir(folder_path)


def validate_page(url):
  headers = {'User-agent': USER_AGENT}
  resp = requests.get(url, headers=headers)

  if resp.status_code >= 400:
    raise Exception("Failed to validate page, the url might expired")


def rename_file(dir):
  for media in Path(dir).iterdir():
    if media.name.count('.') > 1:
      media.rename(f'{media.parent}/{media.stem}')

  for media in Path(dir).iterdir():
    if media.is_dir():
      continue
    if media.suffix == ".mp3":
      audio = media
    if media.suffix == ".mp4":
      video = media

  return video, audio


def validate_downloaded_file(media, dir, eps_id):
  is_separated_media = media['is_separated_media']

  # check if media only have a video
  if is_separated_media:
    video, audio = rename_file(dir=dir)

    result_filename = f"{output_codename}_{eps_id}.mp4"
    result_output_path = os.path.join(dir, result_filename)

    combine_media(audio=audio, video=video, output=result_output_path)


def get_zee5_media_url(episode_web_url):
    main_logs.log(f"Fetching MediaUrl...") # logs

    query = {"zee5_url": episode_web_url, "timeout": timeout}
    response = requests.get(selenium_server, params=query)

    zee5_resp_data = response.json()
    zee5_media_url = zee5_resp_data['data'][0]

    return zee5_media_url.strip()


def start_main():
  zee5_episodes = zee5_episode_list()   # Get Episodes
  zee5_episodes.reverse()

  for episode_web_url in zee5_episodes:

    name = episode_web_url.split("/")[8]
    id = name.split("-")[-1]
    output_dir = Path(f'/content/M3U8/{folder_name}/episodes/{id}')

    main_logs.log(f"""======================================
Proccessing Media: {name}
- ID: {id}
- Directory: {output_dir}
))>> Logs: \033[32m""") # logs

    # verify folder_path
    if os.path.exists(output_dir) and os.listdir(output_dir):
      main_logs.warn(f" -> already exists | {name}")
      continue

    verify_path(output_dir)

    try:
        # Get URL
        zee5_media_url = get_zee5_media_url(episode_web_url)
        validate_page(zee5_media_url)

        # Download from URL
        media = download_m3u8(index_url=zee5_media_url, id=id, output_dir=output_dir, name=name)
        validate_downloaded_file(media=media, dir=output_dir, eps_id=id)

        main_logs.log(f"Finished Proccesing :{id} \n") # logs

    except Exception as err:
      main_logs.error(err)


if __name__ == "__main__":
    start_main()


In [None]:
from google.colab import files
last_downloaded_video = 196
for eps in os.listdir('/content/M3U8/Jodha_Akbar/episodes'):
  for media in os.listdir(f'/content/M3U8/Jodha_Akbar/episodes/{eps}'):
     if media.startswith("JA_"):
        eps_id  = int(Path(media).stem.split("JA_")[1])
        if eps_id > last_downloaded_video:
          print(eps_id)
          files.download(f"/content/M3U8/Jodha_Akbar/episodes/{eps}/{media}")

### Handle Drive Folder

In [None]:
#@title show folder size
folder_path = "/content/drive/MyDrive/Pictures" #@param {type:"string"}
!du -h -m "$folder_path"

In [None]:
def process_file_in_drive(path_files_in_drive = "/content/drive/MyDrive/Telegram/Jodha_Akbar/1080p"):
  # renaming file
  for file in Path(path_files_in_drive).iterdir():

    if file.is_dir():
      continue

    if file.stem.startswith("JA"):
      name = file.stem.replace("JA_", "JA-Episode-")

      outname=f"/content/drive/MyDrive/Telegram/Jodha_Akbar/{name}{file.suffix}" # {parent}/JA_{id}.mp4
      file.rename(outname)
      print(outname)

process_file_in_drive()



In [None]:
#@title move from processing dir to target
for eps in os.listdir('/content/M3U8/Jodha_Akbar/episodes'):
  for media in os.listdir(f'/content/M3U8/Jodha_Akbar/episodes/{eps}'):
    if media.startswith("JA"):
      source = f'/content/M3U8/Jodha_Akbar/episodes/{eps}/{media}'
      destination = f"/content/drive/MyDrive/Telegram/Jodha_Akbar/{media}"
      shutil.move(src=source, dst=destination)
      print(media)

In [None]:
for media in os.listdir(f'/content/media'):
  if media.startswith("JA"):
    source = f'/content/media/{media}'
    destination = f"/content/drive/MyDrive/Telegram/Jodha_Akbar/{media}"
    shutil.move(src=source, dst=destination)
    print(media)

## Single Mode

In [None]:
output_name = "JA_185.mp4"

def combine_media(audio, video, output="output.mp4"):
  !ffmpeg -i "$video" -i "$audio" -c copy "$output"

combine_media(audio="/content/audio.mp3", video="/content/video.mp4", output=output_name)

In [None]:
video_url = "https://zee5vodnd.akamaized.net/hls1/GLOBAL_CONTENT/PROGRAMS/LIBRARY/ZEE_BIOSKOPE/JODHA_AKBAR_BAHASA_EP_181_TO_210_19112019/JODHA_AKBAR_INDONESIAN_BAHASA_EP185_id_7180564a.mp4/media-4/hdntl=exp=1691570705~acl=%2f*~data=hdntl~hmac=1fe674f26f3f0207fb3a4224268e17e88818c125c9fdd345067ede1485dd916b/stream.m3u8?aka_me_session_id=AAAAAAAAAAARUtNkAAAAAPgU0ke+B7nN2C9nKvKsqGKYH6hx2QNY7Hzdr2ab9A%2fzxIjOmKgprYiSGfnoCp3aiVUIqVe1XPmY&aka_media_format_type=hls"
!downloadm3u8 --user-agent "$USER_AGENT" -o "video.mp4" "$video_url"

In [None]:
audio_url = "https://zee5vodnd.akamaized.net/hls1/GLOBAL_CONTENT/PROGRAMS/LIBRARY/ZEE_BIOSKOPE/JODHA_AKBAR_BAHASA_EP_181_TO_210_19112019/JODHA_AKBAR_INDONESIAN_BAHASA_EP185_id_7180564a.mp4/audio/aac/id/hdntl=exp=1691570705~acl=%2f*~data=hdntl~hmac=1fe674f26f3f0207fb3a4224268e17e88818c125c9fdd345067ede1485dd916b/stream.m3u8?aka_me_session_id=AAAAAAAAAAARUtNkAAAAAPgU0ke+B7nN2C9nKvKsqGKYH6hx2QNY7Hzdr2ab9A%2fzxIjOmKgprYiSGfnoCp3aiVUIqVe1XPmY&aka_media_format_type=hls"
!downloadm3u8 --user-agent "$USER_AGENT" -o "audio.mp3" "$audio_url"