In [None]:
!wget https://datasets.imdbws.com/name.basics.tsv.gz
!wget https://datasets.imdbws.com/title.akas.tsv.gz
!wget https://datasets.imdbws.com/title.basics.tsv.gz
!wget https://datasets.imdbws.com/title.crew.tsv.gz
!wget https://datasets.imdbws.com/title.episode.tsv.gz
!wget https://datasets.imdbws.com/title.principals.tsv.gz
!wget https://datasets.imdbws.com/title.ratings.tsv.gz

In [None]:
import pandas as pd
import gzip

## helper functions

In [None]:
from typing import Generator, List, Tuple, Dict, Set

def iterate_compressed_file(file_handle: str, file_operation: str = "rt") -> Generator[str, None, None]:
  with gzip.open(file_handle, file_operation) as f:
    for line in f:
      yield line

def preprocess_line(line: str) -> List[str]:
  return line.replace("\\N", "null").replace("\n", "").replace("None", "null").split("\t")

def transform_id(id: str) -> int:
  return int(id.replace("nm", "").replace("tt", ""))

def extract_header(gen: Generator[str, None, None]) -> Tuple[List[str], Generator[str, None, None]]:
  for line in gen:
    header = preprocess_line(line)
    break
  return (header, (line for line in gen))

def preprocess_joined_line(line: str) -> str:
  return line.replace("\n", "\t").replace("\\N", "null").replace("None", "null").strip("\t").split("\t")

def zipped_generator(generators: List[Generator[str,None,None]]) -> Generator[str,None,None]:
  """
  Iterates a list of generators at once until every generator is empty.
  Empty generators return empty strings in the amount that was witnessed in the first iteration.
  """
  stop_value = None
  expected_output_lengths = []

  generators_not_empty = True
  i = 0
  while generators_not_empty:
    fields = []
    for generator_index, generator in enumerate(generators):
      generators_not_empty = False
      partial_line = next(generator, stop_value)
      if partial_line is not None:
        generators_not_empty = True
        processed_line = preprocess_line(partial_line)
        if i == 0:
          expected_output_lengths.append(len(processed_line))
        fields.extend(processed_line)
      else:
        fields.extend([""] * expected_output_lengths[generator_index])
    i = i + 1
    yield(fields)

  # for lines in zip(*generators):
  #   line = "".join(lines)
  #   yield(preprocess_joined_line(line))

def synched_iterator(generators: Tuple[Generator[str, None, None], Generator[str, None, None]], field_indices_to_sync: Tuple[int, int]) -> Generator[str, None, None]:
  a, b = generators
  a_i, b_i = field_indices_to_sync

  y = None
  for x in a:
    x_fields = preprocess_line(x)
    if not y:
      y = next(b)
      y_fields = preprocess_line(y)
    if y_fields[b_i] != x_fields[a_i]:
      yield [*x_fields, *[None for field in y_fields]]
    else:
      y = None
      yield [*x_fields, *y_fields]

from typing import Iterable


def flatten(items):
    """Yield items from any nested iterable; see Reference."""
    for x in items:
        if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
            for sub_x in flatten(x):
                yield sub_x
        else:
            yield x

def set_to_dict(s: Set) -> Dict[str, int]:
  return {
      v: i
      for i, v in enumerate(s)
  }



## table preprocessing

In [None]:
def persist_to_tsv(file_name: str, data: List[List[any]]) -> None:
  with open(f"tables/{file_name}.tsv", "w", encoding="utf8") as f:
    for fields in zip(*data):
      line = "\t".join([str(field) for field in fields])
      f.write(f"{line}\n")

def extract_n_columns():
  title_header, title_iterator = extract_header(iterate_compressed_file(files["title"]))
  person_header, person_iterator = extract_header(iterate_compressed_file(files["person"]))
  localization_header, localization_iterator = extract_header(iterate_compressed_file(files["localization"]))
  cast_header, cast_iterator = extract_header(iterate_compressed_file(files["cast"]))


  columns = {
      "years": set(),
      "formats": set(),
      "genres": set(),
      "title_names": set(),
      "professions": set(),
      "names": set(),
      "regions": set(),
      "languages": set(),
      "types": set(),
      "attributes": set(),
      "characters": set(),
  }

  for line in title_iterator:
    id, format, title_name, _, _, start_year, _, _, genre = preprocess_line(line)
    columns["formats"].add(format)
    columns["genres"].update(genre.split(","))
    columns["years"].add(start_year)
    columns["title_names"].add(title_name)

  for line in person_iterator:
    id, name, birth_year, death_year, profession, _ = preprocess_line(line)
    columns["professions"].update(profession.split(","))
    columns["years"].update([birth_year, death_year])
    columns["names"].add(name)

  for line in localization_iterator:
    _, _, localized_title, region, language, localization_type, localization_attribute, _ = preprocess_line(line)
    columns["title_names"].add(localized_title)
    columns["regions"].add(region)
    columns["languages"].add(language)
    columns["attributes"].add(localization_attribute)
    columns["types"].add(localization_type)

  for line in cast_iterator:
    _, _, _, job_category, job, character = preprocess_line(line)
    columns["professions"].update([job_category])
    columns["characters"].update(flatten(character.split(",")))

  column_dicts = {}
  for column, values in columns.items():
    filtered_values = [value.replace("\x02", " ") for value in values if value != "" and value != "null"]
    column_dict = set_to_dict(filtered_values)
    column_dicts[column] = column_dict
    persist_to_tsv(column, [column_dict.values(), column_dict.keys()])

  return column_dicts

def id_of_value(column: str, value: any) -> int:
  if value in column_dicts[column]:
    return column_dicts[column][value]
  return "null"

def extract_episode_ids() -> Set[str]:
  episode_header, episode_iterator = extract_header(iterate_compressed_file(files["episode"]))

  titles_to_ignore = set()
  for line in episode_iterator:
    fields = preprocess_line(line)
    series_ids = transform_id(fields[0]), transform_id(fields[1])
    titles_to_ignore.update(series_ids)

  return titles_to_ignore


def process_titles() -> set:
  title_header, title_iterator = extract_header(iterate_compressed_file(files["title"]))
  rating_header, rating_iterator = extract_header(iterate_compressed_file(files["rating"]))

  title_ids_to_keep = set()

  with open("tables/title.tsv", "w", encoding="utf8") as title_f:
    with open("tables/title_genre.tsv", "w", encoding="utf8") as title_genre_f:
      for line in synched_iterator([title_iterator, rating_iterator], [0, 0]):
        title_id, format, primary_title, _, is_adult, start_year, _, runtime, genre, _, average_rating, num_votes = line
        numeric_title_id = transform_id(title_id)

        if numeric_title_id in titles_to_ignore:
          continue
        title_ids_to_keep.add(numeric_title_id)

        format_id = id_of_value("formats", format)
        title_name_id = id_of_value("title_names", primary_title)
        release_year_id = id_of_value("years", start_year)
        title_line = f"{numeric_title_id}\t{format_id}\t{title_name_id}\t{release_year_id}\t{is_adult}\t{runtime}\t{average_rating}\t{num_votes}\n"
        title_f.write(title_line)

        for g in genre.split(","):
          if g != "null":
            genre_id = id_of_value("genres", g)
            title_genre_f.write(f"{numeric_title_id}\t{genre_id}\n")

  return title_ids_to_keep


def process_persons() -> None:
  person_header, person_iterator = extract_header(iterate_compressed_file(files["person"]))
  crew_header, crew_iterator = extract_header(iterate_compressed_file(files["crew"]))
  cast_header, cast_iterator = extract_header(iterate_compressed_file(files["cast"]))

  persons_to_keep = set()

  with open("tables/person_title.tsv", "w", encoding="utf8") as person_title_f:
    for line in crew_iterator:
      title_id, director_id, writer_id = preprocess_line(line)
      numeric_title_id = transform_id(title_id)

      if numeric_title_id in title_ids_to_keep:

        if director_id != "null":
          for d_id in director_id.split(","):
            numeric_director_id = transform_id(d_id)
            persons_to_keep.add(numeric_director_id)

            profession_id = column_dicts["professions"]["director"]
            director_line = f"{numeric_director_id}\t{numeric_title_id}\t{profession_id}\n"
            person_title_f.write(director_line)

        if writer_id != "null":
          for w_id in writer_id.split(","):
            numeric_writer_id = transform_id(w_id)
            persons_to_keep.add(numeric_writer_id)

            profession_id = column_dicts["professions"]["director"]
            writer_line = f"{numeric_writer_id}\t{numeric_title_id}\t{profession_id}\n"
            person_title_f.write(writer_line)

    for line in cast_iterator:
      title_id, _, person_id, job, _, _ = preprocess_line(line)
      numeric_title_id = transform_id(title_id)

      if numeric_title_id in title_ids_to_keep:
        numeric_person_id = transform_id(person_id)
        persons_to_keep.add(numeric_person_id)

        profession_id = id_of_value("professions", job)
        if profession_id != "null":
          cast_line = f"{numeric_person_id}\t{numeric_title_id}\t{profession_id}\n"
          person_title_f.write(cast_line)

  with open("tables/person.tsv", "w", encoding="utf8") as person_f:
    with open("tables/person_profession.tsv", "w", encoding="utf8") as person_profession_f:
      for line in person_iterator:
        person_id, name, birth_year, death_year, profession, _ = preprocess_line(line)
        numeric_person_id = transform_id(person_id)

        if numeric_person_id in persons_to_keep:
          name_id = id_of_value("names", name)
          birth_year_id = id_of_value("years", birth_year)
          death_year_id = id_of_value("years", death_year)
          person_line = f"{numeric_person_id}\t{name_id}\t{birth_year_id}\t{death_year_id}\n"
          person_f.write(person_line)

          for p in profession.split(","):
            profession_id = id_of_value("professions", p)
            if profession_id != "null":
              person_profession_line = f"{numeric_person_id}\t{profession_id}\n"
              person_profession_f.write(person_profession_line)


def process_localization() -> None:
  localization_header, localization_iterator = extract_header(iterate_compressed_file(files["localization"]))

  with open("tables/localization.tsv", "w", encoding="utf8") as localization_f:
    localization_id = 0
    for line in localization_iterator:
      title_id, _, title_name, region, language, _, _, is_original_title = preprocess_line(line)

      if is_original_title == "0":

        numeric_title_id = transform_id(title_id)
        if numeric_title_id in title_ids_to_keep:

          title_name_id = id_of_value("title_names", region)
          region_id = id_of_value("regions", region)
          language_id = id_of_value("languages", language)
          localization_line = f"{localization_id}\t{numeric_title_id}\t{title_name_id}\t{region_id}\t{language_id}\n"
          localization_f.write(localization_line)

          localization_id = localization_id + 1

## imdb_schema table overview

In [None]:
files = {
    "person": "name.basics.tsv.gz",
    "localization": "title.akas.tsv.gz",
    "title": "title.basics.tsv.gz",
    "crew": "title.crew.tsv.gz",
    "cast": "title.principals.tsv.gz",
    "rating": "title.ratings.tsv.gz",
    "episode": "title.episode.tsv.gz"
}

In [None]:
import os

path = "tables"
if not os.path.exists(path):
  os.makedirs(path)

process columns with 1-n or n-m cardinalities

(
      years,
      formats,
      genres,
      title_names,
      professions,
      names,
      regions,
      languages
)

In [None]:
column_dicts = extract_n_columns()

generate lookup-set for filtering titles that are not movies or games (e.g. tv-series)

In [None]:
titles_to_ignore = extract_episode_ids()
len(titles_to_ignore)

7945657

filter and process title and title_genre tables

In [None]:
title_ids_to_keep = process_titles()

filter and process person, person_title and person_profession tables

In [None]:
process_persons()

filter and process localization table

In [None]:
process_localization()

In [None]:
# fixing wrongly assigned null values (None -> null) title
with open("tables/title.tsv", "r", encoding="utf8") as i_f:
  with open("tables/titles.tsv", "w", encoding="utf8") as o_f:
    for line in i_f:
      new_line = line.replace("None", "null")
      o_f.write(new_line)

In [None]:
!zip -r ./tables.zip ./tables

  adding: tables/ (stored 0%)
  adding: tables/regions.tsv (deflated 43%)
  adding: tables/attributes.tsv (deflated 66%)
  adding: tables/person_profession.tsv (deflated 75%)
  adding: tables/formats.tsv (deflated 21%)
  adding: tables/title_names.tsv (deflated 47%)
  adding: tables/characters.tsv (deflated 56%)
  adding: tables/types.tsv (deflated 57%)
  adding: tables/title_genre.tsv (deflated 75%)
  adding: tables/.ipynb_checkpoints/ (stored 0%)
  adding: tables/professions.tsv (deflated 53%)
  adding: tables/localization.tsv (deflated 80%)
  adding: tables/person_title.tsv (deflated 70%)
  adding: tables/titles.tsv (deflated 72%)
  adding: tables/person.tsv (deflated 72%)
  adding: tables/names.tsv (deflated 52%)
  adding: tables/title.tsv (deflated 72%)
  adding: tables/years.tsv (deflated 55%)
  adding: tables/genres.tsv (deflated 26%)
  adding: tables/languages.tsv (deflated 38%)


In [None]:
with open("tables/person.tsv", "r", encoding="utf8") as f:
  for line in f:
    i, n = line.split("\t")
    if len(s) != 4:
      print(line)

In [None]:
# # filtering person_ids that are not present in the person table
# person_ids = set()
# with open("person.tsv", "r", encoding="utf8") as person_f:
#   for line in person_f:
#     person_ids.add(line.split("\t")[0])

# with open("person_title.tsv", "r", encoding="utf8") as person_title_f:
#   with open("person_titles.tsv", "w", encoding="utf8") as new_person_title_f:
#     for line in person_title_f:
#       fields = line.split("\t")

#       if len(fields) != 3:
#         print(line)
#       person_id = line.split("\t")[0]
#       if person_id in person_ids and :
#         new_person_title_f.write(line)

i = 0
with open("tables/person_title.tsv", "r", encoding="utf8") as person_title_f:
  # with open("tables/person_titles.tsv", "w", encoding="utf8") as new_person_title_f:
  for line in person_title_f:
    p_id, t_id, pr_id = line.split("\t")

    i = i+1
    if i > 5500000:
      print(line)
    if i > 5500020:
      break

