# **Dataset Preprocessing**

#### 1. Automated Process of Unzipping of files

In [None]:
import os
import zipfile

In [None]:
dataset_dir = "/path/to/youtube_dataset"

In [None]:
zip_files = [f for f in os.listdir(dataset_dir) if f.endswith('.zip')]

In [None]:
# Unzip each file
for zip_file in zip_files:
    zip_path = os.path.join(dataset_dir, zip_file)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(dataset_dir)

In [None]:
extracted_folders = [f for f in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, f))]
if len(extracted_folders) == len(zip_files):
    # Delete zip files if extraction is complete
    for zip_file in zip_files:
        os.remove(os.path.join(dataset_dir, zip_file))
    print("All zip files extracted and deleted successfully.")
else:
    print("Extraction verification failed. Zip files not deleted.")

#### 2. Collecting Relevant Files , Renaming and Storing in a new folder 

In [None]:
import os
import shutil

In [None]:
dataset_dir = "/path/to/youtube_dataset"
temp_dir = "/path/to/youtube_dataset_temp"
os.makedirs(temp_dir, exist_ok=True)

In [None]:
# Process each folder in the dataset directory
for folder_name in os.listdir(dataset_dir):
    folder_path = os.path.join(dataset_dir, folder_name)
    
    # Ensure we're working with directories only
    if os.path.isdir(folder_path):
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.txt') and file_name != "log.txt":  # Omit "log.txt"
                # Create a new unique filename with folder name as prefix
                new_file_name = f"{folder_name}_{file_name}"
                src_path = os.path.join(folder_path, file_name)
                dest_path = os.path.join(temp_dir, new_file_name)
                
                # Copy file with the new name to the temp directory
                shutil.copy(src_path, dest_path)
                
                # Print success message
                print(f"Successfully stored {new_file_name} from folder: {folder_name}")

print("All files processed and stored in youtube_dataset_temp.")

# **Dataset Cleaning and Transformation**

#### 1.Automated Script which would result in formatted_data.csv

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import split, col, array, expr
from pyspark.sql import functions as F
import glob

In [None]:
spark = SparkSession.builder \
    .appName("Merge, Validate, and Store YouTube Dataset") \
    .getOrCreate()

In [None]:
# Define a schema with a single string column to load entire lines as single entries
schema = StructType([
    StructField("line", StringType(), True)
])

In [None]:
# Load and merge all txt files from youtube_dataset_temp
temp_dir = "/oath/to/youtube_dataset_temp"
txt_files = glob.glob(f"{temp_dir}/*.txt")
combined_df = None

In [None]:
for file_path in txt_files:
    df = spark.read.schema(schema).csv(f"file://{os.path.abspath(file_path)}")
    combined_df = df if combined_df is None else combined_df.union(df)

In [None]:
print("Displaying raw data content:")
combined_df.show(5, truncate=False)

In [None]:
split_cols = split(col("line"), "\t")

In [None]:
# Define each expected column explicitly with handling for related IDs
combined_df = combined_df.withColumn("video_id", split_cols.getItem(0)) \
    .withColumn("uploader", split_cols.getItem(1)) \
    .withColumn("age", split_cols.getItem(2).cast(IntegerType())) \
    .withColumn("category", split_cols.getItem(3)) \
    .withColumn("length", split_cols.getItem(4).cast(IntegerType())) \
    .withColumn("views", split_cols.getItem(5).cast(IntegerType())) \
    .withColumn("rate", split_cols.getItem(6).cast(DoubleType())) \
    .withColumn("ratings", split_cols.getItem(7).cast(IntegerType())) \
    .withColumn("comments", split_cols.getItem(8).cast(IntegerType())) \
    .withColumn("related_ids", array([split_cols.getItem(i) for i in range(9, 29)]))  # Capture all remaining as related_ids array

In [None]:
combined_df = combined_df.drop("line")

In [None]:
combined_df = combined_df.withColumn("related_ids", F.expr("concat_ws(',', related_ids)"))

In [None]:
combined_df = combined_df \
    .withColumnRenamed("video_id", "Video ID") \
    .withColumnRenamed("uploader", "Uploader") \
    .withColumnRenamed("age", "Age") \
    .withColumnRenamed("category", "Category") \
    .withColumnRenamed("length", "Length") \
    .withColumnRenamed("views", "Views") \
    .withColumnRenamed("rate", "Rating") \
    .withColumnRenamed("ratings", "Ratings Count") \
    .withColumnRenamed("comments", "Comments Count") \
    .withColumnRenamed("related_ids", "Related Videos")

In [None]:
# Perform validation checks
def validate_data(df):
    # Check if column count matches expected schema
    if len(df.columns) != 10:
        print("Validation failed: Incorrect number of columns.")
        return False

    # Check data types for each column
    expected_types = {
        "Video ID": "string",
        "Uploader": "string",
        "Age": "int",
        "Category": "string",
        "Length": "int",
        "Views": "int",
        "Rating": "double",
        "Ratings Count": "int",
        "Comments Count": "int",
        "Related Videos": "string"
    }

    for col_name, expected_type in expected_types.items():
        actual_type = df.schema[col_name].dataType.simpleString()
        if actual_type != expected_type:
            print(f"Validation failed: Column '{col_name}' expected type '{expected_type}', found '{actual_type}'.")
            return False

    # Validation passed
    print("Validation successful: Dataset format is correct.")
    return True

In [None]:
if validate_data(combined_df):
    # Display first 5 records
    print("Displaying the first 5 records:")
    combined_df.show(5, truncate=False)

    # Convert `related_ids` array to a single string
    combined_df = combined_df.withColumn("Related Videos", F.expr("concat_ws(',', `Related Videos`)"))

    # Prompt user for confirmation
    confirm = input("Do you want to overwrite the files in HDFS? (y/n): ")
    
    if confirm.lower() == 'y':
        # Save as a tab-delimited text file on HDFS
        combined_df.coalesce(1).write.mode("overwrite").option("delimiter", "\t").csv("hdfs://localhost:9000/path/to/merged_data_txt", header=True)

        # Save as a standard CSV file on HDFS
        combined_df.coalesce(1).write.mode("overwrite").csv("hdfs://localhost:9000/path/to/merged_data_csv", header=True)

        print("Files successfully saved to HDFS.")
    else:
        print("Operation canceled. Files were not saved.")
else:
    print("Dataset validation failed. Files will not be saved.")

In [None]:
spark.stop()

In [None]:
import pandas as pd
from io import StringIO
import subprocess

In [None]:
hdfs_path = "/hdfs/path/to/merged_data_csv/"
cmd = f"hdfs dfs -cat {hdfs_path}"

In [None]:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

In [None]:
data = StringIO(result.stdout)
df = pd.read_csv(data)

In [None]:
df['Related Videos'] = df['Related Videos'].fillna("").apply(lambda x: x.split(',') if x else [])

In [None]:
numeric_columns = ["Age", "Length", "Views", "Rating", "Ratings Count", "Comments Count"]
for col in numeric_columns:
    df[col] = df[col].astype(float)

In [None]:
print(df.head())

In [None]:
df.to_csv("/loacl/path/to/formatted_data.csv", index=False)

#### 2. Script for formatting and transforming formatted_data.csv
##### Upload to hdfs using the following command : hdfs dfs -copyFromLocal /path/to/formatted_data.csv /hdfs_directory/path

In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, explode, split, trim

In [None]:
file_path = '/local/path/to/youtube.csv'

In [None]:
df = pd.read_csv(file_path)

In [None]:
df.info()

In [None]:
spark = SparkSession.builder.appName("YouTube Data Cleaning").getOrCreate()

In [None]:
file_path = '/hdfs/path/formatted_data.csv'

In [None]:
df = spark.read.option("header", "true").csv(file_path)

In [None]:
df_cleaned = df.withColumn("Related Videos", regexp_replace(df["Related Videos"], "[\\[\\]']", ""))

In [None]:
df_final = df_cleaned.withColumn("Related Videos", explode(split(trim(df_cleaned["Related Videos"]), ",")))

In [None]:
df_final = df_final.select("Video ID", "Uploader", "Age", "Category", "Length", "Views", "Rating", "Ratings Count", "Comments Count", "Related Videos")

In [None]:
df_final.show(10, truncate=False)

In [None]:
df_no_header.write.csv("hdfs://localhost:9000/hdfs/path/cleaned_youtube_data", 
                        header=False,  
                        mode="overwrite", 
                        sep="\t")

In [None]:
spark.stop()

#### 3. Hive to merge the flattened dataset

##### Run the following on Hive

CREATE EXTERNAL TABLE youtube_data (
    video_id STRING,
    uploader STRING,
    age FLOAT,
    category STRING,
    length FLOAT,
    views FLOAT,
    rating FLOAT,
    ratings_count FLOAT,
    comments_count FLOAT,
    related_videos STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/hdfs/path/cleaned_youtube_data/';

In [None]:
SELECT * FROM youtube_data LIMIT 5;

In [None]:
CREATE TABLE merged_youtube_data (
    video_id STRING,
    uploader STRING,
    age FLOAT,
    category STRING,
    length FLOAT,
    views FLOAT,
    rating FLOAT,
    ratings_count FLOAT,
    comments_count FLOAT,
    related_videos STRING
);

In [None]:
INSERT INTO TABLE merged_youtube_data
SELECT 
    video_id, 
    uploader, 
    age, 
    category, 
    length, 
    views, 
    rating, 
    ratings_count, 
    comments_count, 
    CONCAT_WS(',', COLLECT_LIST(related_videos)) AS related_videos
FROM 
    youtube_data
GROUP BY 
    video_id, 
    uploader, 
    age, 
    category, 
    length, 
    views, 
    rating, 
    ratings_count, 
    comments_count;

In [None]:
INSERT OVERWRITE DIRECTORY '/hdfs/path/Hive/merged_youtube_data'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '\t'  -- Use tab as the field separator
LINES TERMINATED BY '\n'     -- Use newline for line termination
SELECT 
    video_id, 
    uploader, 
    age, 
    category, 
    length, 
    views, 
    rating, 
    ratings_count, 
    comments_count, 
    CONCAT_WS(',', COLLECT_LIST(related_videos)) AS related_videos -- Keep comma for related video IDs
FROM 
    merged_youtube_data
GROUP BY 
    video_id, 
    uploader, 
    age, 
    category, 
    length, 
    views, 
    rating, 
    ratings_count, 
    comments_count;