In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, udf, struct
from pyspark.sql.types import BooleanType

# Initialize Spark Session
spark = SparkSession.builder.appName("YouTubeDataCleaning").getOrCreate()

# Directory containing the CSV files
data_dir = "./data"

# Get a list of all CSV files in the directory
csv_files = [f for f in os.listdir(data_dir) if f.endswith(".csv")]

# Function to check if a string is mostly English
def is_mostly_english(text):
    if not text:  # Handle empty strings
        return False
    try:
        text.encode('utf-8').decode('ascii')
        return True
    except UnicodeDecodeError:
        return False

# Register the UDF
is_mostly_english_udf = udf(is_mostly_english, BooleanType())

# Function to check for rows with mostly empty fields
def is_mostly_empty(row):
    non_empty_count = sum(1 for value in row if value)
    return non_empty_count < 3  # Adjust threshold as needed

# Initialize an empty DataFrame for `all_data`
all_data = None

# Loop through each CSV file
for csv_file in csv_files:
    file_path = os.path.join(data_dir, csv_file)

    # Read the CSV file into a DataFrame
    try:
        df = spark.read.csv(file_path, header=True, inferSchema=True)
    except Exception as e:
        print(f"Error reading file {csv_file}: {e}")
        continue  # Skip to the next file if there's an error

    # Remove non-English rows (Apply the UDF to relevant columns)
    if "tags" in df.columns:
        df = df.filter(is_mostly_english_udf(col("tags")))

    # Remove mostly empty rows
    row_values = [df[field] for field in df.columns]
    df = df.filter(~udf(is_mostly_empty, BooleanType())(struct(*row_values)))

    # Explode the tags column
    if "tags" in df.columns:
        df = df.withColumn("tag", explode(split(col("tags"), r"\|")))

    # Drop unnecessary columns
    columns_to_drop = ['video_id', 'thumbnail_link', 'description', 'title', 'channel_title', 'tags']
    df = df.drop(*[col_name for col_name in columns_to_drop if col_name in df.columns])

    # Merge into `all_data`
    if all_data is None:
        all_data = df
    else:
        all_data = all_data.union(df)  # Use DataFrame `union()` instead of SQL `UNION`

# If no data was processed, print a message and exit
if all_data is None:
    print("No CSV files were processed.")
else:
    all_data.show()
    # Save the final DataFrame to a single CSV file
    all_data.coalesce(1).write.mode("overwrite").option("header", "true").csv("cleaned_youtube_data.csv")
    print("Data saved to cleaned_youtube_data.csv")

# Stop the Spark session
spark.stop()
    

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
                                                                                

AnalysisException: [INCOMPATIBLE_COLUMN_TYPE] UNION can only be performed on tables with compatible column types. The 8th column of the second table is "STRING" type which is not compatible with "BOOLEAN" at the same column of the first table..;
'Union false, false
:- Project [trending_date#18, category_id#21, publish_time#22, views#24, likes#25, dislikes#26, cast(comment_count#27 as string) AS comment_count#164, comments_disabled#29, ratings_disabled#30, video_error_or_removed#31, tag#53]
:  +- Project [trending_date#18, category_id#21, publish_time#22, views#24, likes#25, dislikes#26, comment_count#27, comments_disabled#29, ratings_disabled#30, video_error_or_removed#31, tag#53]
:     +- Project [video_id#17, trending_date#18, title#19, channel_title#20, category_id#21, publish_time#22, tags#23, views#24, likes#25, dislikes#26, comment_count#27, thumbnail_link#28, comments_disabled#29, ratings_disabled#30, video_error_or_removed#31, description#32, tag#53]
:        +- Generate explode(split(tags#23, \|, -1)), false, [tag#53]
:           +- Filter NOT is_mostly_empty(struct(video_id, video_id#17, trending_date, trending_date#18, title, title#19, channel_title, channel_title#20, category_id, category_id#21, publish_time, publish_time#22, tags, tags#23, views, views#24, likes, likes#25, dislikes, dislikes#26, comment_count, comment_count#27, thumbnail_link, thumbnail_link#28, ... 8 more fields))#51
:              +- Filter is_mostly_english(tags#23)#49
:                 +- Relation [video_id#17,trending_date#18,title#19,channel_title#20,category_id#21,publish_time#22,tags#23,views#24,likes#25,dislikes#26,comment_count#27,thumbnail_link#28,comments_disabled#29,ratings_disabled#30,video_error_or_removed#31,description#32] csv
+- Project [trending_date#100, category_id#103, publish_time#104, views#106, likes#107, dislikes#108, comment_count#109, comments_disabled#111, ratings_disabled#112, video_error_or_removed#113, tag#135]
   +- Project [video_id#99, trending_date#100, title#101, channel_title#102, category_id#103, publish_time#104, tags#105, views#106, likes#107, dislikes#108, comment_count#109, thumbnail_link#110, comments_disabled#111, ratings_disabled#112, video_error_or_removed#113, description#114, tag#135]
      +- Generate explode(split(tags#105, \|, -1)), false, [tag#135]
         +- Filter NOT is_mostly_empty(struct(video_id, video_id#99, trending_date, trending_date#100, title, title#101, channel_title, channel_title#102, category_id, category_id#103, publish_time, publish_time#104, tags, tags#105, views, views#106, likes, likes#107, dislikes, dislikes#108, comment_count, comment_count#109, thumbnail_link, thumbnail_link#110, ... 8 more fields))#133
            +- Filter is_mostly_english(tags#105)#131
               +- Relation [video_id#99,trending_date#100,title#101,channel_title#102,category_id#103,publish_time#104,tags#105,views#106,likes#107,dislikes#108,comment_count#109,thumbnail_link#110,comments_disabled#111,ratings_disabled#112,video_error_or_removed#113,description#114] csv
