diff --git a/datasets/imdb/infra/imdb_dataset.tf b/datasets/imdb/infra/imdb_dataset.tf index 2199300ce..d02391f55 100644 --- a/datasets/imdb/infra/imdb_dataset.tf +++ b/datasets/imdb/infra/imdb_dataset.tf @@ -18,7 +18,7 @@ resource "google_bigquery_dataset" "imdb" { dataset_id = "imdb" project = var.project_id - description = "aclImdb_v1 dataset" + description = "It consistes of reviews dataset along with all IMDb interfaces(7 - datasets)." } output "bigquery_dataset-imdb-dataset_id" { diff --git a/datasets/imdb/infra/interfaces_pipeline.tf b/datasets/imdb/infra/interfaces_pipeline.tf new file mode 100644 index 000000000..066098307 --- /dev/null +++ b/datasets/imdb/infra/interfaces_pipeline.tf @@ -0,0 +1,142 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "imdb_name_basics" { + project = var.project_id + dataset_id = "imdb" + table_id = "name_basics" + description = "It consists details about unique identifier of the name/person." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_name_basics-table_id" { + value = google_bigquery_table.imdb_name_basics.table_id +} + +output "bigquery_table-imdb_name_basics-id" { + value = google_bigquery_table.imdb_name_basics.id +} + +resource "google_bigquery_table" "imdb_title_akas" { + project = var.project_id + dataset_id = "imdb" + table_id = "title_akas" + description = "It consists details about unique identifier of the title_id." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_title_akas-table_id" { + value = google_bigquery_table.imdb_title_akas.table_id +} + +output "bigquery_table-imdb_title_akas-id" { + value = google_bigquery_table.imdb_title_akas.id +} + +resource "google_bigquery_table" "imdb_title_basics" { + project = var.project_id + dataset_id = "imdb" + table_id = "title_basics" + description = "It consists additional details about unique identifier of the title_id." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_title_basics-table_id" { + value = google_bigquery_table.imdb_title_basics.table_id +} + +output "bigquery_table-imdb_title_basics-id" { + value = google_bigquery_table.imdb_title_basics.id +} + +resource "google_bigquery_table" "imdb_title_crew" { + project = var.project_id + dataset_id = "imdb" + table_id = "title_crew" + description = "Contains the director and writer information for all the titles in IMDb." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_title_crew-table_id" { + value = google_bigquery_table.imdb_title_crew.table_id +} + +output "bigquery_table-imdb_title_crew-id" { + value = google_bigquery_table.imdb_title_crew.id +} + +resource "google_bigquery_table" "imdb_title_episode" { + project = var.project_id + dataset_id = "imdb" + table_id = "title_episode" + description = "Contains the tv episode information." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_title_episode-table_id" { + value = google_bigquery_table.imdb_title_episode.table_id +} + +output "bigquery_table-imdb_title_episode-id" { + value = google_bigquery_table.imdb_title_episode.id +} + +resource "google_bigquery_table" "imdb_title_principals" { + project = var.project_id + dataset_id = "imdb" + table_id = "title_principals" + description = "Contains the principal cast/crew for titles." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_title_principals-table_id" { + value = google_bigquery_table.imdb_title_principals.table_id +} + +output "bigquery_table-imdb_title_principals-id" { + value = google_bigquery_table.imdb_title_principals.id +} + +resource "google_bigquery_table" "imdb_title_ratings" { + project = var.project_id + dataset_id = "imdb" + table_id = "title_ratings" + description = "Contains the IMDb rating and votes information for titles." + depends_on = [ + google_bigquery_dataset.imdb + ] +} + +output "bigquery_table-imdb_title_ratings-table_id" { + value = google_bigquery_table.imdb_title_ratings.table_id +} + +output "bigquery_table-imdb_title_ratings-id" { + value = google_bigquery_table.imdb_title_ratings.id +} diff --git a/datasets/imdb/infra/reviews_pipeline.tf b/datasets/imdb/infra/reviews_pipeline.tf index 9dd811533..79fc16812 100644 --- a/datasets/imdb/infra/reviews_pipeline.tf +++ b/datasets/imdb/infra/reviews_pipeline.tf @@ -19,7 +19,7 @@ resource "google_bigquery_table" "imdb_reviews" { project = var.project_id dataset_id = "imdb" table_id = "reviews" - description = "Reviews table" + description = "Large Movie Review Dataset v1.0\n\nOverview\n\nThis dataset contains movie reviews along with their associated binary\nsentiment polarity labels. It is intended to serve as a benchmark for\nsentiment classification. This document outlines how the dataset was\ngathered, and how to use the files provided.\n\nDataset\n\nThe core dataset contains 50,000 reviews split evenly into 25k train\nand 25k test sets. The overall distribution of labels is balanced (25k\npos and 25k neg). We also include an additional 50,000 unlabeled\ndocuments for unsupervised learning.\n\nIn the entire collection, no more than 30 reviews are allowed for any\ngiven movie because reviews for the same movie tend to have correlated\nratings. Further, the train and test sets contain a disjoint set of\nmovies, so no significant performance is obtained by memorizing\nmovie-unique terms and their associated with observed labels. In the\nlabeled train/test sets, a negative review has a score \u003c= 4 out of 10,\nand a positive review has a score \u003e= 7 out of 10. Thus reviews with\nmore neutral ratings are not included in the train/test sets. In the\nunsupervised set, reviews of any rating are included and there are an\neven number of reviews \u003e 5 and \u003c= 5.\n\nColumns\nsplit - it has test(25K) / train(75K) records.\nlabel - Negative(25K) --\u003e test(12.5K) and train (12.5K)\n Positive(25K) --\u003e test(12.5K) and train (12.5K)\n Unsupervised(50K) --\u003e train(50K)\n\nFor Unsupervised label, reviewer_rating is NaN.\n" depends_on = [ google_bigquery_dataset.imdb ] diff --git a/datasets/imdb/pipelines/_images/run_csv_transform_kub/csv_transform.py b/datasets/imdb/pipelines/_images/run_csv_transform_kub/csv_transform.py index 06053f4f9..095530c27 100644 --- a/datasets/imdb/pipelines/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/imdb/pipelines/_images/run_csv_transform_kub/csv_transform.py @@ -16,156 +16,456 @@ import glob import json import logging +import math import os import pathlib import tarfile import typing +import numpy as np import pandas as pd import requests from google.cloud import storage +TEST_TRAIN = ["test", "train"] +NEG_POS_UNSUP = ["neg", "pos", "unsup"] +REVIEW_COLS = ["review", "split", "label", "id_tag", "path", "reviewer_rating"] +LABEL_DICT = {"neg": "Negative", "pos": "Positive", "unsup": "Unsupervised"} +REPLACE_DICT = {"\\N": np.nan} +REPLACE_BINARY_DICT = {"0": 0, "1": 1, "0.0": 0, "1.0": 1, 0.0: 0, 1.0: 1} + def main( - source_url: str, - source_file: pathlib.Path, + source_gcs_bucket: str, + source_gcs_object: str, + source_url: dict, + source_file: dict, extract_here: pathlib.Path, - target_file: pathlib.Path, + target_csv_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str, headers: typing.List[str], rename_mappings: dict, pipeline_name: str, + table_name: str, + chunk_size: int, ) -> None: logging.info( f"IMDb Dataset {pipeline_name} pipeline process started at " + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) ) + logging.info(f"Creating './files' folder under {os.getcwd()}") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - logging.info("Downloading tar file ...") - download_tarfile(source_url, source_file) - logging.info("Downloading Completed.") + if pipeline_name == "reviews": + df = get_reviews( + source_gcs_bucket, source_gcs_object, source_url, source_file, extract_here + ) + elif pipeline_name == "interfaces": + download_gzfile(source_url.get("url", ""), source_file.get("url_data", "")) - logging.info(f"Extracting tar file to {extract_here}.") - extract_tar(source_file, extract_here) - logging.info(f"Successfully extracted tar file to {extract_here}.") + if table_name == "name_basics": + df = get_name_basics(source_file) + elif table_name == "title_akas": + df = get_title_akas( + source_file, chunk_size, rename_mappings, target_csv_file, headers + ) + elif table_name == "title_basics": + df = get_title_basics(source_file) + elif table_name == "title_crew": + df = get_title_crew(source_file) + elif table_name == "title_episode": + df = get_title_episode(source_file) + elif table_name == "title_principals": + df = get_title_principals( + source_file, chunk_size, rename_mappings, target_csv_file, headers + ) + elif table_name == "title_ratings": + df = get_title_ratings(source_file) - logging.info("Started creating Dataframe.") - df = create_dataframe(extract_here, headers) - logging.info("Successfully Created Dataframe and assigned to variable df.") + if table_name not in ("title_akas", "title_principals"): + rename_headers(df, rename_mappings) + try: + save_to_newfile(df, target_csv_file, headers) - logging.info("Started cleaning html tags from the user review.") - clean_html_tags(df) - logging.info("Cleaning html tags completed.") + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + upload_file_to_gcs(target_csv_file, target_gcs_bucket, target_gcs_path) logging.info( - 'Changing "label" column data from ["neg", "pos"] --> ["Negative", "Positive"].' + f"IMDb Dataset {pipeline_name} pipeline process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) ) - change_label(df) - logging.info('Successfully replaced "label" column data.') - logging.info("Renaming headers") - rename_headers(df, rename_mappings) - logging.info(f"Saving to output file... {target_file}") - try: - save_to_new_file(df, target_file) - logging.info("Successfully saved.") - except Exception as e: - logging.error(f"Error saving output file: {e}.") +def get_reviews( + source_gcs_bucket: str, + source_gcs_object: str, + source_url: dict, + source_file: dict, + extract_here: pathlib.Path, +) -> pd.DataFrame: + download_gzfile(source_url.get("title_link", ""), source_file.get("title_data", "")) + download_blob( + source_gcs_bucket, source_gcs_object, source_file.get("user_review_data", "") + ) + extract_tar(source_file.get("user_review_data", ""), extract_here) + df_reviews = create_dataframe(extract_here) + df = add_movie_title(df_reviews, source_file.get("title_data", "")) + clean_html_tags(df, "review") + coldata_replace(df, "label", LABEL_DICT) + return df - logging.info( - f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + +def get_name_basics(source_file: dict) -> pd.DataFrame: + logging.info(f'Reading data from {source_file.get("url_data", "")}') + df = pd.read_csv(source_file.get("url_data", ""), sep="\t", compression="gzip") + for col in df: + if col in ("birthYear", "deathYear"): + coldata_replace(df, col, REPLACE_DICT) + convert_int(df, col, "Int64") + continue + elif col in ("knownForTitles"): + coldata_replace(df, col, REPLACE_DICT) + return df + + +def get_title_akas( + source_file: dict, + chunk_size: int, + rename_mappings: dict, + target_csv_file: pathlib.Path, + headers: list, +) -> None: + logging.info(f'Reading data from {source_file.get("url_data", "")} in chunks') + df_chunk = pd.read_csv( + source_file.get("url_data", ""), + sep="\t", + compression="gzip", + chunksize=chunk_size, ) - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info("Successfully uploaded file to gcs bucket.") + for idx, chunk in enumerate(df_chunk): + logging.info(f"\t\tStarted cleaning chunk {idx}.") + chunk_cleaned = chunk_clean_akas(chunk) + if idx == 0: + rename_headers(chunk_cleaned, rename_mappings) + logging.info(f"csv headers are {headers}") + logging.info(f"Writing data to {target_csv_file}.") + chunk_cleaned.to_csv(str(target_csv_file), index=False, columns=headers) + else: + logging.info(f"Appending data to {target_csv_file}.") + chunk_cleaned.to_csv( + str(target_csv_file), index=False, mode="a", header=False + ) + logging.info(f"{idx} chunk shape {chunk_cleaned.shape}") + logging.info(f"Successfully created {target_csv_file} file") - logging.info( - f"IMDb Dataset {pipeline_name} pipeline process completed at " - + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + +def get_title_basics(source_file: dict) -> pd.DataFrame: + logging.info(f'Reading data from {source_file.get("url_data", "")}') + df = pd.read_csv(source_file.get("url_data", ""), sep="\t", compression="gzip") + df = df[df.isAdult.isin([0, "0", "0.0", 0.0, 1, "1", "1.0", 1.0, "\\N"])] + for col in df: + if col in ("isAdult"): + coldata_replace(df, col, {**REPLACE_BINARY_DICT, **REPLACE_DICT}) + convert_int(df, col, "Int64") + continue + elif col in ("startYear", "endYear", "runtimeMinutes"): + convert_digit(df, col) + convert_int(df, col, "Int64") + continue + elif col in ("genres"): + coldata_replace(df, col, REPLACE_DICT) + + return df + + +def get_title_crew(source_file: dict) -> pd.DataFrame: + logging.info(f'Reading data from {source_file.get("url_data", "")}') + df = pd.read_csv(source_file.get("url_data", ""), sep="\t", compression="gzip") + for col in df: + coldata_replace(df, col, REPLACE_DICT) + return df + + +def get_title_episode(source_file: dict) -> pd.DataFrame: + logging.info(f'Reading data from {source_file.get("url_data", "")}') + df = pd.read_csv(source_file.get("url_data", ""), sep="\t", compression="gzip") + for col in df: + if col in ("seasonNumber", "episodeNumber"): + convert_digit(df, col) + convert_int(df, col, "Int64") + return df + + +def get_title_principals( + source_file: dict, + chunk_size: int, + rename_mappings: dict, + target_csv_file: pathlib.Path, + headers: list, +) -> None: + logging.info(f'Reading data from {source_file.get("url_data", "")} in chunks') + df_chunk = pd.read_csv( + source_file.get("url_data", ""), + sep="\t", + compression="gzip", + chunksize=chunk_size, ) + for idx, chunk in enumerate(df_chunk): + logging.info(f"\t\tStarted cleaning chunk {idx}.") + chunk_cleaned = chunk_clean_principals(chunk) + if idx == 0: + rename_headers(chunk_cleaned, rename_mappings) + logging.info(f"csv headers are {headers}") + logging.info(f"Writing data to {target_csv_file}.") + chunk_cleaned.to_csv(str(target_csv_file), index=False, columns=headers) + else: + logging.info(f"Appending data to {target_csv_file}.") + chunk_cleaned.to_csv( + str(target_csv_file), index=False, mode="a", header=False + ) + logging.info(f"{idx} chunk shape {chunk_cleaned.shape}") + logging.info(f"Successfully created {target_csv_file} file") -def download_tarfile(source_url: str, source_file: pathlib.Path) -> None: - logging.info(f"Creating 'files' folder under {os.getcwd()}") - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - logging.info(f"Downloading file from {source_url}...") - logging.info(f"Downloading {source_url} into {source_file}") +def get_title_ratings(source_file: dict) -> pd.DataFrame: + logging.info(f'Reading data from {source_file.get("url_data", "")}') + df = pd.read_csv(source_file.get("url_data", ""), sep="\t", compression="gzip") + return df + + +def download_gzfile(source_url: str, source_file: str): + logging.info(f"Downloading data from {source_url} to {source_file} .") res = requests.get(source_url, stream=True) if res.status_code == 200: with open(source_file, "wb") as fb: for chunk in res: fb.write(chunk) else: - logging.error(f"Couldn't download {source_url}: {res.text}") + logging.info(f"Couldn't download {source_url}: {res.text}") + logging.info(f"Downloaded data from {source_url} into {source_file}") + + +def download_blob(source_gcs_bucket: str, source_gcs_object: str, target_file: str): + """Downloads a blob from the bucket.""" + logging.info( + f"Downloading data from gs://{source_gcs_bucket}/{source_gcs_object} to {target_file} ..." + ) + storage_client = storage.Client() + bucket = storage_client.bucket(source_gcs_bucket) + blob = bucket.blob(source_gcs_object) + blob.download_to_filename(target_file) + logging.info("Downloading Completed.") def extract_tar(source_file: pathlib.Path, extract_here: pathlib.Path): - with tarfile.open(str(source_file), "r") as tar_fb: - tar_fb.extractall(extract_here) + logging.info(f"Extracting tar.gz file to -> {extract_here}.") + if "tar.gz" in source_file: + with tarfile.open(str(source_file), "r") as tar_fb: + tar_fb.extractall(extract_here) + logging.info(f"Successfully extracted tar file to -> {extract_here}.") -def create_dataframe( - extract_here: pathlib.Path, headers: typing.List[str] -) -> pd.DataFrame: - df = pd.DataFrame(columns=headers) - for parent in ["train", "test"]: - for child in ["pos", "neg"]: +def get_id_rating(file: str, index: int): + if "unsup" in file and index == 1: + return math.nan + return int(file.split("/")[-1].split(".")[0].split("_")[index]) + + +def allign_data(file: str): + review = open(file).read() + split = str(file).split("/")[2] + label = file.split("/")[-2] + review_path = file + id_tag = get_id_rating(file, 0) + reviewer_rating = get_id_rating(file, 1) + return [review, split, label, id_tag, review_path, reviewer_rating] + + +def add_movie_url(parent: str, child: str, df: pd.DataFrame, id_tag: str): + urls = [] + try: + with open(f"./files/aclImdb/{parent}/urls_{child}.txt") as fb: + urls.extend(fb.read().splitlines()) + urls = [url.replace("usercomments", "") for url in urls] + movie_title_dict = dict(enumerate(urls)) + return df[id_tag].map(movie_title_dict, na_action=None) + except FileNotFoundError as e: + logging.info(f"\t\t{e}.") + + +def add_movie_id(df: pd.DataFrame, column: str, index: int): + return df[column].apply(lambda row: row.split("/")[index]) + + +def create_dataframe(extract_here: pathlib.Path): + logging.info("Started creating Dataframe for reviews(data).") + df_reviews = pd.DataFrame(columns=REVIEW_COLS) + for parent in TEST_TRAIN: + for child in NEG_POS_UNSUP: + if parent == "test" and child == "unsup": + break path = f"{extract_here}/aclImdb/{parent}/{child}/" csv_files = list(glob.glob(path + "*.txt")) + csv_files.sort() logging.info( - f"\tCreating Dataframe from by reading fila from {parent}-->{child}." + f"\tCreating Dataframe by reading files from {parent}-->{child}." ) df_child = pd.DataFrame( - [[open(file).read(), file.split("/")[-2]] for file in csv_files], - columns=headers, + [allign_data(file) for file in csv_files], columns=REVIEW_COLS ) + logging.info("\tAdding movie_url column") + df_child["movie_url"] = add_movie_url(parent, child, df_child, "id_tag") + logging.info("\tAdding movie_id column") + df_child["movie_id"] = add_movie_id(df_child, "movie_url", -2) logging.info( - f"\tSuccessfully created Dataframe(Child Dataframe) for {parent}-->{child}." + f"\tTrying to concatenating main dataframe & child dataframe for {parent}-->{child} (folder)." ) - logging.info( - f"\tTrying to concatenating main dataframe & child dataframe for {parent}-->{child}." - ) - df = pd.concat([df, df_child], ignore_index=True) + df_reviews = pd.concat([df_reviews, df_child], ignore_index=True) logging.info("\tChild Dataframe concatenated with main Dataframe df") + logging.info("Successfully Created Dataframe and assigned to variable df_reviews.") + return df_reviews + + +def add_movie_title(df: pd.DataFrame, source_url_path: str): + logging.info("Started creating Dataframe for title_basics(data).") + logging.info( + f"\tCreating Dataframe(df_title_basics) for movie_id and title by reading ./files/{(source_url_path).split('/')[-1]}." + ) + df_title_basics = pd.read_csv( + str(source_url_path), + sep="\t", + compression="gzip", + usecols=["tconst", "primaryTitle"], + ) + logging.info( + "\tRenaming Dataframe(df_title_basics) columns from ['tconst', 'primaryTitle'] -> ['movie_id', 'title']." + ) + rename_headers(df_title_basics, {"tconst": "movie_id", "primaryTitle": "title"}) + logging.info( + "Merging two Dataframes(df_reviews & df_title_basics) by using left-join and assigned to variable df" + ) + df = pd.merge(df, df_title_basics, how="left") + logging.info("Successfully created final Dataframe.") return df -def clean_html_tags(df: pd.DataFrame) -> None: - df.review.replace(to_replace="<{1,}.{0,4}>", value="", regex=True, inplace=True) +def clean_html_tags(df: pd.DataFrame, review: str) -> None: + logging.info("Started cleaning html tags from the user review.") + df[review].replace(to_replace="<{1,}.{0,4}>", value="", regex=True, inplace=True) + logging.info("Cleaning html tags completed.") + + +def replace_unicode(df: pd.DataFrame, col: str, match: str, replace: str) -> None: + logging.info( + f"Replacing unicode char in '{col}' replacing '{match}' with '{replace}'." + ) + df[col] = df[col].apply(lambda data: data.replace(match, replace)) + +def coldata_replace(df: pd.DataFrame, col: str, replace_dict: dict) -> None: + logging.info(f"Replacing '{col}' column data with {replace_dict}.") + df[col].replace(replace_dict, inplace=True) + logging.info(f"Successfully replaced '{col}' column data.") -def change_label(df: pd.DataFrame) -> None: - df.label.replace({"neg": "Negative", "pos": "Positive"}, inplace=True) + +def convert_int(df: pd.DataFrame, col: str, dtype: str) -> None: + logging.info(f"Converting data type to {dtype}") + df[col] = df[col].astype(dtype) + + +def convert_digit(df: pd.DataFrame, col: str) -> None: + logging.info(f"Converting '{col}' data from string(value) to integer(value).") + df[col] = df[col].apply(lambda data: int(data) if str(data).isdigit() else np.nan) def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + logging.info("Renaming headers") + logging.info(f"\t {list(df.columns)} with {rename_mappings}") df.rename(columns=rename_mappings, inplace=True) + logging.info("Renaming headers completed.") + + +def convert_str(chunk: pd.DataFrame, col: str, data_type: str) -> pd.DataFrame: + logging.info(f"Converting datatype of '{col}' column to {data_type}.") + chunk[col] = chunk[col].apply(data_type) + return chunk -def save_to_new_file(df: pd.DataFrame, target_file: pathlib.Path) -> None: - df.to_csv(str(target_file), header=True, index=False) +def clean_data(chunk: pd.DataFrame, col: str, match: str, replace: str) -> None: + logging.info( + f"Replacing unicode char in '{col}' replacing '{match}' with '{replace}'." + ) + chunk[col] = chunk[col].apply( + lambda x: x if pd.isnull(x) else x.replace(match, replace) + ) + + +def chunk_clean_akas(chunk: pd.DataFrame) -> pd.DataFrame: + for col in chunk: + if col in ("title", "region", "language"): + coldata_replace(chunk, col, {"\\N": None}) + if col in ("types", "attributes"): + replace_unicode(chunk, col, "\x02", "&") + coldata_replace(chunk, col, {"\\N": None}) + if col in ("isOriginalTitle"): + coldata_replace( + chunk, col, {"0": False, "1": True, "\\N": None, 0: False, 1: True} + ) + if col in ("title"): + clean_data(chunk, col, "\n", "|") + logging.info(f"Dataframe chunk shape {chunk.shape}") + return chunk + + +def chunk_clean_principals(chunk: pd.DataFrame) -> pd.DataFrame: + for col in chunk: + if col in ("characters"): + replace_unicode(chunk, col, "[", "") + replace_unicode(chunk, col, "]", "") + replace_unicode(chunk, col, '"', "") + coldata_replace(chunk, col, {"\\N": None}) + elif col in ("job"): + coldata_replace(chunk, col, {"\\N": None}) + logging.info(f"Dataframe chunk shape {chunk.shape}") + return chunk + + +def save_to_newfile( + df: pd.DataFrame, target_csv_file: pathlib.Path, headers: typing.List[str] +) -> None: + logging.info(f"Saving to output file to ... {target_csv_file}") + df.to_csv(str(target_csv_file), header=True, index=False, columns=headers) + logging.info("Successfully saved.") def upload_file_to_gcs( - target_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str + target_csv_file: pathlib.Path, target_gcs_bucket: str, target_gcs_path: str ) -> None: + logging.info(f"Uploading output file to gs://{target_gcs_bucket}/{target_gcs_path}") storage_client = storage.Client() bucket = storage_client.bucket(target_gcs_bucket) blob = bucket.blob(target_gcs_path) - blob.upload_from_filename(target_file) + blob.upload_from_filename(target_csv_file) + logging.info("Successfully uploaded file to gcs bucket.") if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) main( - source_url=os.environ["SOURCE_URL"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - extract_here=pathlib.Path(os.environ["EXTRACT_HERE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - headers=json.loads(os.environ["CSV_HEADERS"]), - rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), - pipeline_name=os.environ["PIPELINE_NAME"], + source_gcs_bucket=os.environ.get("SOURCE_GCS_BUCKET", ""), + source_gcs_object=os.environ.get("SOURCE_GCS_OBJECT", ""), + source_url=json.loads(os.environ.get("SOURCE_URL")), + source_file=json.loads(os.environ.get("SOURCE_FILE")), + extract_here=pathlib.Path(os.environ.get("EXTRACT_HERE", "")).expanduser(), + target_csv_file=pathlib.Path(os.environ.get("TARGET_CSV_FILE")).expanduser(), + target_gcs_bucket=os.environ.get("TARGET_GCS_BUCKET"), + target_gcs_path=os.environ.get("TARGET_GCS_PATH"), + headers=json.loads(os.environ.get("CSV_HEADERS")), + rename_mappings=json.loads(os.environ.get("RENAME_MAPPINGS")), + pipeline_name=os.environ.get("PIPELINE_NAME", ""), + table_name=os.environ.get("TABLE_NAME", ""), + chunk_size=int(os.environ.get("CHUNK_SIZE", "1000000")), ) diff --git a/datasets/imdb/pipelines/dataset.yaml b/datasets/imdb/pipelines/dataset.yaml index 91d6bf617..0c7e1b37c 100644 --- a/datasets/imdb/pipelines/dataset.yaml +++ b/datasets/imdb/pipelines/dataset.yaml @@ -21,4 +21,4 @@ dataset: resources: - type: bigquery_dataset dataset_id: imdb - description: "aclImdb_v1 dataset" + description: "It consistes of reviews dataset along with all IMDb interfaces(7 - datasets)." diff --git a/datasets/imdb/pipelines/interfaces/interfaces_dag.py b/datasets/imdb/pipelines/interfaces/interfaces_dag.py new file mode 100644 index 000000000..89afa9b8d --- /dev/null +++ b/datasets/imdb/pipelines/interfaces/interfaces_dag.py @@ -0,0 +1,538 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="imdb.interfaces", + default_args=default_args, + max_active_runs=1, + schedule_interval="@weekly", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + name_basics_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="name_basics_transform_csv", + startup_timeout_seconds=600, + name="name_basics", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/name.basics.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/name_basics.tsv.gz"}', + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/name_basics_data_output.csv", + "TABLE_NAME": "name_basics", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["nconst", "primary_name", "birth_year", "death_year", "primary_profession", "known_for_titles"]', + "RENAME_MAPPINGS": '{"nconst": "nconst", "primaryName": "primary_name", "birthYear": "birth_year", "deathYear": "death_year",\n "primaryProfession": "primary_profession", "knownForTitles": "known_for_titles"}', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_name_basics_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_name_basics_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/name_basics_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.name_basics", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "nconst", + "type": "string", + "description": "Alphanumeric unique identifier of the name/person.", + "mode": "nullable", + }, + { + "name": "primary_name", + "type": "string", + "description": "Name by which the person is most often credited.", + "mode": "nullable", + }, + { + "name": "birth_year", + "type": "integer", + "description": "Birth year in YYYY format.", + "mode": "nullable", + }, + { + "name": "death_year", + "type": "integer", + "description": "Death year in YYYY format if applicable.", + "mode": "nullable", + }, + { + "name": "primary_profession", + "type": "string", + "description": "The top-3 professions of the person.", + "mode": "nullable", + }, + { + "name": "known_for_titles", + "type": "string", + "description": "Titles the person is known for.", + "mode": "nullable", + }, + ], + ) + + # Run CSV transform within kubernetes pod + title_akas_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="title_akas_transform_csv", + startup_timeout_seconds=900, + name="title_akas", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.akas.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/title_akas.tsv.gz"}', + "CHUNK_SIZE": "300000", + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/title_akas_data_output.csv", + "TABLE_NAME": "title_akas", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["title_id", "ordering", "title", "region", "language", "types", "attributes", "is_original_title"]', + "RENAME_MAPPINGS": '{"titleId": "title_id", "ordering": "ordering", "title": "title", "region": "region", "language": "language", "types": "types", "attributes": "attributes", "isOriginalTitle": "is_original_title"}', + }, + resources={ + "request_memory": "8G", + "request_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_title_akas_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_title_akas_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/title_akas_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.title_akas", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "title_id", + "type": "string", + "description": "A tconst, an alphanumeric unique identifier of the title.", + "mode": "nullable", + }, + { + "name": "ordering", + "type": "integer", + "description": "A number to uniquely identify rows for a given title_id.", + "mode": "nullable", + }, + { + "name": "title", + "type": "string", + "description": "The localized title.", + "mode": "nullable", + }, + { + "name": "region", + "type": "string", + "description": "The region for this version of the title.", + "mode": "nullable", + }, + { + "name": "language", + "type": "string", + "description": "The language of the title.", + "mode": "nullable", + }, + { + "name": "types", + "type": "string", + "description": "Enumerated set of attributes for this alternative title. One or more of the following: 'alternative', 'dvd', 'festival', 'tv', 'video', 'working', 'original', 'imdbDisplay'. New values may be added in the future without warning.", + "mode": "nullable", + }, + { + "name": "attributes", + "type": "string", + "description": "Additional terms to describe this alternative title, not enumerated", + "mode": "nullable", + }, + { + "name": "is_original_title", + "type": "boolean", + "description": "False: not original title; True: original title.", + "mode": "nullable", + }, + ], + ) + + # Run CSV transform within kubernetes pod + title_basics_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="title_basics_transform_csv", + startup_timeout_seconds=600, + name="title_basics", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.basics.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/title_basics.tsv.gz"}', + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/title_basics_data_output.csv", + "TABLE_NAME": "title_basics", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["tconst", "title_type", "primary_title", "original_title", "is_adult", "start_year", "end_year", "runtime_minutes", "genres"]', + "RENAME_MAPPINGS": '{"tconst": "tconst", "titleType": "title_type", "primaryTitle": "primary_title", "originalTitle": "original_title",\n "isAdult": "is_adult", "startYear": "start_year", "endYear": "end_year", "runtimeMinutes": "runtime_minutes", "genres": "genres"}', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_title_basics_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_title_basics_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/title_basics_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.title_basics", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "tconst", + "type": "string", + "description": "Alphanumeric unique identifier of the title.", + "mode": "nullable", + }, + { + "name": "title_type", + "type": "string", + "description": "The type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc).", + "mode": "nullable", + }, + { + "name": "primary_title", + "type": "string", + "description": "The more popular title / the title used by the filmmakers on promotional materials at the point of release.", + "mode": "nullable", + }, + { + "name": "original_title", + "type": "string", + "description": "Original title, in the original language.", + "mode": "nullable", + }, + { + "name": "is_adult", + "type": "integer", + "description": "0: non-adult title; 1: adult title.", + "mode": "nullable", + }, + { + "name": "start_year", + "type": "integer", + "description": "Represents the release year of a title. In the case of TV Series, it is the series start year.", + "mode": "nullable", + }, + { + "name": "end_year", + "type": "integer", + "description": "TV Series end year.", + "mode": "nullable", + }, + { + "name": "runtime_minutes", + "type": "integer", + "description": "Primary runtime of the title, in minutes.", + "mode": "nullable", + }, + { + "name": "genres", + "type": "string", + "description": "Includes up to three genres associated with the title.", + "mode": "nullable", + }, + ], + ) + + # Run CSV transform within kubernetes pod + title_crew_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="title_crew_transform_csv", + startup_timeout_seconds=600, + name="title_crew", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.crew.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/title_crew.tsv.gz"}', + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/title_crew_data_output.csv", + "TABLE_NAME": "title_crew", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["tconst", "directors", "writers"]', + "RENAME_MAPPINGS": '{"tconst": "tconst", "directors": "directors", "writers": "writers"}', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_title_crew_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_title_crew_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/title_crew_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.title_crew", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "tconst", + "type": "string", + "description": "Alphanumeric unique identifier of the title.", + "mode": "nullable", + }, + { + "name": "directors", + "type": "string", + "description": "Strinng of nconsts - director(s) of the given title.", + "mode": "nullable", + }, + { + "name": "writers", + "type": "string", + "description": "String of nconsts - writer(s) of the given title.", + "mode": "nullable", + }, + ], + ) + + # Run CSV transform within kubernetes pod + title_episode_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="title_episode_transform_csv", + startup_timeout_seconds=600, + name="title_episode", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.episode.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/title_episode.tsv.gz"}', + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/title_episode_data_output.csv", + "TABLE_NAME": "title_episode", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["tconst", "parent_tconst", "season_number", "episode_number"]', + "RENAME_MAPPINGS": '{"tconst": "tconst", "parentTconst": "parent_tconst", "seasonNumber": "season_number", "episodeNumber": "episode_number"}', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_title_episode_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_title_episode_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/title_episode_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.title_episode", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "tconst", + "type": "string", + "description": "Alphanumeric identifier of episode.", + "mode": "nullable", + }, + { + "name": "parent_tconst", + "type": "string", + "description": "Alphanumeric identifier of the parent TV Series.", + "mode": "nullable", + }, + { + "name": "season_number", + "type": "integer", + "description": "Season number the episode belongs to.", + "mode": "nullable", + }, + { + "name": "episode_number", + "type": "integer", + "description": "Episode number of the tconst in the TV series.", + "mode": "nullable", + }, + ], + ) + + # Run CSV transform within kubernetes pod + title_principals_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="title_principals_transform_csv", + startup_timeout_seconds=900, + name="title_principals", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.principals.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/title_principals.tsv.gz"}', + "CHUNK_SIZE": "300000", + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/title_principals_data_output.csv", + "TABLE_NAME": "title_principals", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["tconst", "ordering", "nconst", "category", "job", "characters"]', + "RENAME_MAPPINGS": '{"tconst": "tconst", "ordering": "ordering", "nconst": "nconst", "category": "category",\n "job": "job", "characters": "characters"}', + }, + resources={ + "request_memory": "8G", + "request_cpu": "3", + "request_ephemeral_storage": "10G", + }, + ) + + # Task to load CSV data to a BigQuery table + load_title_principals_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_title_principals_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/title_principals_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.title_principals", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "tconst", + "type": "string", + "description": "Alphanumeric unique identifier of the title.", + "mode": "nullable", + }, + { + "name": "ordering", + "type": "integer", + "description": "a number to uniquely identify rows for a given title_id.", + "mode": "nullable", + }, + { + "name": "nconst", + "type": "string", + "description": "Alphanumeric unique identifier of the name/person.", + "mode": "nullable", + }, + { + "name": "category", + "type": "string", + "description": "The category of job that person was in.", + "mode": "nullable", + }, + { + "name": "job", + "type": "string", + "description": "The specific job title if applicable.", + "mode": "nullable", + }, + { + "name": "characters", + "type": "string", + "description": "The name of the character played if applicable.", + "mode": "nullable", + }, + ], + ) + + # Run CSV transform within kubernetes pod + title_ratings_transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="title_ratings_transform_csv", + startup_timeout_seconds=600, + name="title_ratings", + namespace="composer", + service_account_name="datasets", + image_pull_policy="Always", + image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.ratings.tsv.gz"}', + "SOURCE_FILE": '{"url_data": "./files/title_ratings.tsv.gz"}', + "TARGET_CSV_FILE": "./files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/imdb/interfaces/title_ratings_data_output.csv", + "TABLE_NAME": "title_ratings", + "PIPELINE_NAME": "interfaces", + "CSV_HEADERS": '["tconst", "average_rating", "num_votes"]', + "RENAME_MAPPINGS": '{"tconst": "tconst", "averageRating": "average_rating", "numVotes": "num_votes"}', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_title_ratings_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_title_ratings_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/imdb/interfaces/title_ratings_data_output.csv"], + source_format="CSV", + destination_project_dataset_table="imdb.title_ratings", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "tconst", + "type": "string", + "description": "Alphanumeric unique identifier for title.", + "mode": "nullable", + }, + { + "name": "average_rating", + "type": "float", + "description": "Weighted average of all the individual user ratings.", + "mode": "nullable", + }, + { + "name": "num_votes", + "type": "integer", + "description": "Number of votes the title has received.", + "mode": "nullable", + }, + ], + ) + + name_basics_transform_csv >> load_name_basics_to_bq + title_akas_transform_csv >> load_title_akas_to_bq + title_basics_transform_csv >> load_title_basics_to_bq + title_crew_transform_csv >> load_title_crew_to_bq + title_episode_transform_csv >> load_title_episode_to_bq + title_principals_transform_csv >> load_title_principals_to_bq + title_ratings_transform_csv >> load_title_ratings_to_bq diff --git a/datasets/imdb/pipelines/interfaces/pipeline.yaml b/datasets/imdb/pipelines/interfaces/pipeline.yaml new file mode 100644 index 000000000..9dee01c64 --- /dev/null +++ b/datasets/imdb/pipelines/interfaces/pipeline.yaml @@ -0,0 +1,512 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: name_basics + description: "It consists details about unique identifier of the name/person." + + - type: bigquery_table + table_id: title_akas + description: "It consists details about unique identifier of the title_id." + + - type: bigquery_table + table_id: title_basics + description: "It consists additional details about unique identifier of the title_id." + + - type: bigquery_table + table_id: title_crew + description: "Contains the director and writer information for all the titles in IMDb." + + - type: bigquery_table + table_id: title_episode + description: "Contains the tv episode information." + + - type: bigquery_table + table_id: title_principals + description: "Contains the principal cast/crew for titles." + + - type: bigquery_table + table_id: title_ratings + description: "Contains the IMDb rating and votes information for titles." + + +dag: + airflow_version: 2 + initialize: + dag_id: interfaces + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@weekly" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "name_basics_transform_csv" + startup_timeout_seconds: 600 + name: "name_basics" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/name.basics.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/name_basics.tsv.gz"} + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/name_basics_data_output.csv" + TABLE_NAME: "name_basics" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["nconst", "primary_name", "birth_year", "death_year", "primary_profession", "known_for_titles"] + RENAME_MAPPINGS: >- + {"nconst": "nconst", "primaryName": "primary_name", "birthYear": "birth_year", "deathYear": "death_year", + "primaryProfession": "primary_profession", "knownForTitles": "known_for_titles"} + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_name_basics_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/name_basics_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.name_basics" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "nconst" + type: "string" + description: "Alphanumeric unique identifier of the name/person." + mode: "nullable" + - name: "primary_name" + type: "string" + description: "Name by which the person is most often credited." + mode: "nullable" + - name: "birth_year" + type: "integer" + description: "Birth year in YYYY format." + mode: "nullable" + - name: "death_year" + type: "integer" + description: "Death year in YYYY format if applicable." + mode: "nullable" + - name: "primary_profession" + type: "string" + description: "The top-3 professions of the person." + mode: "nullable" + - name: "known_for_titles" + type: "string" + description: "Titles the person is known for." + mode: "nullable" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "title_akas_transform_csv" + startup_timeout_seconds: 900 + name: "title_akas" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/title.akas.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/title_akas.tsv.gz"} + CHUNK_SIZE: "300000" + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/title_akas_data_output.csv" + TABLE_NAME: "title_akas" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["title_id", "ordering", "title", "region", "language", "types", "attributes", "is_original_title"] + RENAME_MAPPINGS: >- + {"titleId": "title_id", "ordering": "ordering", "title": "title", "region": "region", + "language": "language", "types": "types", "attributes": "attributes", "isOriginalTitle": "is_original_title"} + resources: + request_memory: "8G" + request_cpu: "3" + request_ephemeral_storage: "10G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_title_akas_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/title_akas_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.title_akas" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "title_id" + type: "string" + description: "A tconst, an alphanumeric unique identifier of the title." + mode: "nullable" + - name: "ordering" + type: "integer" + description: "A number to uniquely identify rows for a given title_id." + mode: "nullable" + - name: "title" + type: "string" + description: "The localized title." + mode: "nullable" + - name: "region" + type: "string" + description: "The region for this version of the title." + mode: "nullable" + - name: "language" + type: "string" + description: "The language of the title." + mode: "nullable" + - name: "types" + type: "string" + description: "Enumerated set of attributes for this alternative title. One or more of the following: 'alternative', 'dvd', 'festival', 'tv', 'video', 'working', 'original', 'imdbDisplay'. New values may be added in the future without warning." + mode: "nullable" + - name: "attributes" + type: "string" + description: "Additional terms to describe this alternative title, not enumerated" + mode: "nullable" + - name: "is_original_title" + type: "boolean" + description: "False: not original title; True: original title." + mode: "nullable" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "title_basics_transform_csv" + startup_timeout_seconds: 600 + name: "title_basics" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/title.basics.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/title_basics.tsv.gz"} + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/title_basics_data_output.csv" + TABLE_NAME: "title_basics" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["tconst", "title_type", "primary_title", "original_title", "is_adult", "start_year", "end_year", "runtime_minutes", "genres"] + RENAME_MAPPINGS: >- + {"tconst": "tconst", "titleType": "title_type", "primaryTitle": "primary_title", "originalTitle": "original_title", + "isAdult": "is_adult", "startYear": "start_year", "endYear": "end_year", "runtimeMinutes": "runtime_minutes", "genres": "genres"} + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_title_basics_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/title_basics_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.title_basics" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "tconst" + type: "string" + description: "Alphanumeric unique identifier of the title." + mode: "nullable" + - name: "title_type" + type: "string" + description: "The type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc)." + mode: "nullable" + - name: "primary_title" + type: "string" + description: "The more popular title / the title used by the filmmakers on promotional materials at the point of release." + mode: "nullable" + - name: "original_title" + type: "string" + description: "Original title, in the original language." + mode: "nullable" + - name: "is_adult" + type: "integer" + description: "0: non-adult title; 1: adult title." + mode: "nullable" + - name: "start_year" + type: "integer" + description: "Represents the release year of a title. In the case of TV Series, it is the series start year." + mode: "nullable" + - name: "end_year" + type: "integer" + description: "TV Series end year." + mode: "nullable" + - name: "runtime_minutes" + type: "integer" + description: "Primary runtime of the title, in minutes." + mode: "nullable" + - name: "genres" + type: "string" + description: "Includes up to three genres associated with the title." + mode: "nullable" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "title_crew_transform_csv" + startup_timeout_seconds: 600 + name: "title_crew" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/title.crew.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/title_crew.tsv.gz"} + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/title_crew_data_output.csv" + TABLE_NAME: "title_crew" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["tconst", "directors", "writers"] + RENAME_MAPPINGS: >- + {"tconst": "tconst", "directors": "directors", "writers": "writers"} + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_title_crew_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/title_crew_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.title_crew" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "tconst" + type: "string" + description: "Alphanumeric unique identifier of the title." + mode: "nullable" + - name: "directors" + type: "string" + description: "Strinng of nconsts - director(s) of the given title." + mode: "nullable" + - name: "writers" + type: "string" + description: "String of nconsts - writer(s) of the given title." + mode: "nullable" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "title_episode_transform_csv" + startup_timeout_seconds: 600 + name: "title_episode" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/title.episode.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/title_episode.tsv.gz"} + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/title_episode_data_output.csv" + TABLE_NAME: "title_episode" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["tconst", "parent_tconst", "season_number", "episode_number"] + RENAME_MAPPINGS: >- + {"tconst": "tconst", "parentTconst": "parent_tconst", "seasonNumber": "season_number", "episodeNumber": "episode_number"} + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_title_episode_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/title_episode_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.title_episode" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "tconst" + type: "string" + description: "Alphanumeric identifier of episode." + mode: "nullable" + - name: "parent_tconst" + type: "string" + description: "Alphanumeric identifier of the parent TV Series." + mode: "nullable" + - name: "season_number" + type: "integer" + description: "Season number the episode belongs to." + mode: "nullable" + - name: "episode_number" + type: "integer" + description: "Episode number of the tconst in the TV series." + mode: "nullable" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "title_principals_transform_csv" + startup_timeout_seconds: 900 + name: "title_principals" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/title.principals.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/title_principals.tsv.gz"} + CHUNK_SIZE: "300000" + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/title_principals_data_output.csv" + TABLE_NAME: "title_principals" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["tconst", "ordering", "nconst", "category", "job", "characters"] + RENAME_MAPPINGS: >- + {"tconst": "tconst", "ordering": "ordering", "nconst": "nconst", "category": "category", + "job": "job", "characters": "characters"} + resources: + request_memory: "8G" + request_cpu: "3" + request_ephemeral_storage: "10G" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_title_principals_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/title_principals_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.title_principals" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "tconst" + type: "string" + description: "Alphanumeric unique identifier of the title." + mode: "nullable" + - name: "ordering" + type: "integer" + description: "a number to uniquely identify rows for a given title_id." + mode: "nullable" + - name: "nconst" + type: "string" + description: "Alphanumeric unique identifier of the name/person." + mode: "nullable" + - name: "category" + type: "string" + description: "The category of job that person was in." + mode: "nullable" + - name: "job" + type: "string" + description: "The specific job title if applicable." + mode: "nullable" + - name: "characters" + type: "string" + description: "The name of the character played if applicable." + mode: "nullable" + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "title_ratings_transform_csv" + startup_timeout_seconds: 600 + name: "title_ratings" + namespace: "composer" + service_account_name: "datasets" + image_pull_policy: "Always" + image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: >- + {"url": "https://datasets.imdbws.com/title.ratings.tsv.gz"} + SOURCE_FILE: >- + {"url_data": "./files/title_ratings.tsv.gz"} + TARGET_CSV_FILE: "./files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/imdb/interfaces/title_ratings_data_output.csv" + TABLE_NAME: "title_ratings" + PIPELINE_NAME: "interfaces" + CSV_HEADERS: >- + ["tconst", "average_rating", "num_votes"] + RENAME_MAPPINGS: >- + {"tconst": "tconst", "averageRating": "average_rating", "numVotes": "num_votes"} + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_title_ratings_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/imdb/interfaces/title_ratings_data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "imdb.title_ratings" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - name: "tconst" + type: "string" + description: "Alphanumeric unique identifier for title." + mode: "nullable" + - name: "average_rating" + type: "float" + description: "Weighted average of all the individual user ratings." + mode: "nullable" + - name: "num_votes" + type: "integer" + description: "Number of votes the title has received." + mode: "nullable" + + graph_paths: + - "name_basics_transform_csv >> load_name_basics_to_bq" + - "title_akas_transform_csv >> load_title_akas_to_bq" + - "title_basics_transform_csv >> load_title_basics_to_bq" + - "title_crew_transform_csv >> load_title_crew_to_bq" + - "title_episode_transform_csv >> load_title_episode_to_bq" + - "title_principals_transform_csv >> load_title_principals_to_bq" + - "title_ratings_transform_csv >> load_title_ratings_to_bq" diff --git a/datasets/imdb/pipelines/reviews/pipeline.yaml b/datasets/imdb/pipelines/reviews/pipeline.yaml index 77f15fcf8..7d506dfcb 100644 --- a/datasets/imdb/pipelines/reviews/pipeline.yaml +++ b/datasets/imdb/pipelines/reviews/pipeline.yaml @@ -16,7 +16,41 @@ resources: - type: bigquery_table table_id: reviews - description: "Reviews table" + description: | + Large Movie Review Dataset v1.0 + + Overview + + This dataset contains movie reviews along with their associated binary + sentiment polarity labels. It is intended to serve as a benchmark for + sentiment classification. This document outlines how the dataset was + gathered, and how to use the files provided. + + Dataset + + The core dataset contains 50,000 reviews split evenly into 25k train + and 25k test sets. The overall distribution of labels is balanced (25k + pos and 25k neg). We also include an additional 50,000 unlabeled + documents for unsupervised learning. + + In the entire collection, no more than 30 reviews are allowed for any + given movie because reviews for the same movie tend to have correlated + ratings. Further, the train and test sets contain a disjoint set of + movies, so no significant performance is obtained by memorizing + movie-unique terms and their associated with observed labels. In the + labeled train/test sets, a negative review has a score <= 4 out of 10, + and a positive review has a score >= 7 out of 10. Thus reviews with + more neutral ratings are not included in the train/test sets. In the + unsupervised set, reviews of any rating are included and there are an + even number of reviews > 5 and <= 5. + + Columns + split - it has test(25K) / train(75K) records. + label - Negative(25K) --> test(12.5K) and train (12.5K) + Positive(25K) --> test(12.5K) and train (12.5K) + Unsupervised(50K) --> train(50K) + + For Unsupervised label, reviewer_rating is NaN. dag: airflow_version: 2 @@ -42,17 +76,23 @@ dag: image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: - SOURCE_URL: "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz" - SOURCE_FILE: "./files/data.tar.gz" + SOURCE_GCS_BUCKET: "{{ var.value.composer_bucket }}" + SOURCE_GCS_OBJECT: "data/imdb/reviews/aclImdb_v1.tar.gz" + SOURCE_URL: >- + {"title_link": "https://datasets.imdbws.com/title.basics.tsv.gz"} + SOURCE_FILE: >- + {"user_review_data": "./files/aclImdb_v1.tar.gz", + "title_data": "./files/title_basics.tsv.gz"} EXTRACT_HERE: "./files" - TARGET_FILE: "./files/data_output.csv" + TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/reviews/data_output.csv" PIPELINE_NAME: "reviews" CSV_HEADERS: >- - ["review", "label"] + ["review", "split", "label", "movie_id", "reviewer_rating", "movie_url", "title"] RENAME_MAPPINGS: >- - {"review": "review", "label": "label"} + {"review": "review", "split": "split", "label": "label", "movie_id": "movie_id", + "reviewer_rating": "reviewer_rating", "movie_url": "movie_url", "title": "title"} resources: request_memory: "4G" request_cpu: "1" @@ -71,11 +111,31 @@ dag: schema_fields: - name: "review" type: "STRING" - description: "User review's in IMDb" + description: "User review's in IMDb." + mode: "NULLABLE" + - name: "split" + type: "STRING" + description: "It has two categories test and train." mode: "NULLABLE" - name: "label" type: "STRING" - description: "Type of the review" + description: "It has three categories Negative, Positive and Unsupervised. All Unsupervised label has only split equals-to train." + mode: "NULLABLE" + - name: "movie_id" + type: "STRING" + description: "UniqueId for the movie in IMDb." + mode: "NULLABLE" + - name: "reviewer_rating" + type: "INTEGER" + description: "Reviewer rating for particular movie in IMDb. For train-unsupervised, reviewer_rating is NULL." + mode: "NULLABLE" + - name: "movie_url" + type: "STRING" + description: "Movie url for corresponding movie_id" + mode: "NULLABLE" + - name: "title" + type: "STRING" + description: "Title of the movie for corresponding movie_id" mode: "NULLABLE" graph_paths: diff --git a/datasets/imdb/pipelines/reviews/reviews_dag.py b/datasets/imdb/pipelines/reviews/reviews_dag.py index 5f9d90701..9f53bced0 100644 --- a/datasets/imdb/pipelines/reviews/reviews_dag.py +++ b/datasets/imdb/pipelines/reviews/reviews_dag.py @@ -43,15 +43,17 @@ image_pull_policy="Always", image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}", env_vars={ - "SOURCE_URL": "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz", - "SOURCE_FILE": "./files/data.tar.gz", + "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "SOURCE_GCS_OBJECT": "data/imdb/reviews/aclImdb_v1.tar.gz", + "SOURCE_URL": '{"title_link": "https://datasets.imdbws.com/title.basics.tsv.gz"}', + "SOURCE_FILE": '{"user_review_data": "./files/aclImdb_v1.tar.gz", "title_data": "./files/title_basics.tsv.gz"}', "EXTRACT_HERE": "./files", - "TARGET_FILE": "./files/data_output.csv", + "TARGET_CSV_FILE": "./files/data_output.csv", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/imdb/reviews/data_output.csv", "PIPELINE_NAME": "reviews", - "CSV_HEADERS": '["review", "label"]', - "RENAME_MAPPINGS": '{"review": "review", "label": "label"}', + "CSV_HEADERS": '["review", "split", "label", "movie_id", "reviewer_rating", "movie_url", "title"]', + "RENAME_MAPPINGS": '{"review": "review", "split": "split", "label": "label", "movie_id": "movie_id", "reviewer_rating": "reviewer_rating", "movie_url": "movie_url", "title": "title"}', }, resources={"request_memory": "4G", "request_cpu": "1"}, ) @@ -70,13 +72,43 @@ { "name": "review", "type": "STRING", - "description": "User review's in IMDb", + "description": "User review's in IMDb.", + "mode": "NULLABLE", + }, + { + "name": "split", + "type": "STRING", + "description": "It has two categories test and train.", "mode": "NULLABLE", }, { "name": "label", "type": "STRING", - "description": "Type of the review", + "description": "It has three categories Negative, Positive and Unsupervised. All Unsupervised label has only split equals-to train.", + "mode": "NULLABLE", + }, + { + "name": "movie_id", + "type": "STRING", + "description": "UniqueId for the movie in IMDb.", + "mode": "NULLABLE", + }, + { + "name": "reviewer_rating", + "type": "INTEGER", + "description": "Reviewer rating for particular movie in IMDb. For train-unsupervised, reviewer_rating is NULL.", + "mode": "NULLABLE", + }, + { + "name": "movie_url", + "type": "STRING", + "description": "Movie url for corresponding movie_id", + "mode": "NULLABLE", + }, + { + "name": "title", + "type": "STRING", + "description": "Title of the movie for corresponding movie_id", "mode": "NULLABLE", }, ],