In [2]:
!pip install chardet

Collecting chardet
  Downloading chardet-5.2.0-py3-none-any.whl.metadata (3.4 kB)
Downloading chardet-5.2.0-py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.4/199.4 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m0m
[?25hInstalling collected packages: chardet
Successfully installed chardet-5.2.0


In [3]:
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
import chardet
from collections import ChainMap
import os
import re
from pyspark.sql.window import *

In [7]:
spark = SparkSession \
            .builder \
            .appName("Youtube_Analysis") \
            .config('spark.jars', 'postgresql-42.5.0.jar') \
            .getOrCreate()

In [8]:
print(spark.conf.get("spark.jars"))

postgresql-42.5.0.jar


## Preparing the Data

In [4]:
schema = StructType([
    StructField("video_id", StringType(), True),
    StructField("trending_date", StringType(), True),
    StructField("title", StringType(), True),
    StructField("channel_title", StringType(), True),
    StructField("category_id", StringType(), True),
    StructField("publish_time", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("views", StringType(), True),
    StructField("likes", StringType(), True),
    StructField("dislikes", StringType(), True),
    StructField("comment_count", StringType(), True),
    StructField("thumbnail_link", StringType(), True),
    StructField("comments_disabled", StringType(), True),
    StructField("ratings_disabled", StringType(), True),
    StructField("video_error_or_removed", StringType(), True),
    StructField("description", StringType(), True),
    StructField("row_id", LongType(), False),
    StructField("snippet_title", StringType(), True),
    StructField("id", StringType(), True),
    StructField("snippet_assignable", StringType(), True),
    StructField("region" ,StringType() ,True)
])

all_regions = spark.createDataFrame([] ,schema = schema)

In [173]:
def ensure_utf8(file_path):
    with open(file_path, 'rb') as f:
        result = chardet.detect(f.read())
        source_encoding = result['encoding']
    
    if source_encoding is None:
        source_encoding = 'utf-8'  

    if source_encoding.lower() != 'utf-8':
        with open(file_path, 'r', encoding=source_encoding, errors='replace') as f:
            content = f.read()
        
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)

In [174]:
region_mapping = {
    "CA": "Canada",
    "US": "United State America",
    "DE": "Germany",
    "RU": "Russia",
    "KR": "South Korea",
    "JP": "Japan",
    "MX": "Mexico",
    "IN": "India",
    "GB": "United Kingdom"
}

In [175]:
dir = "Data"

for file in sorted(os.listdir(dir)):
    
    if file.endswith(".csv"):
        files = [name for name in sorted(os.listdir(dir)) if file[:2] in name]

        region_code = file[:2]

        region_name = region_mapping.get(region_code, "Unknown")

        file_path = os.path.join(dir ,file)

        ensure_utf8(file_path)
        
        region_df = spark.read.csv(f"file:///home/jovyan/work/Projects/Youtube_Analysis/Data/{files[-1]}" ,header=True ,inferSchema = True)
        region_json = spark.read.option("multiline", "true").json(f"file:///home/jovyan/work/Projects/Youtube_Analysis/Data/{files[0]}")

        titles = region_json.select(F.explode(region_json["items"]["snippet"]["title"])).toDF("snippet_title")
        assignable = region_json.select(F.explode(region_json["items"]["snippet"]["assignable"])).toDF("snippet_assignable")
        ids  = region_json.select(F.explode(region_json["items"]["id"])).toDF("id")


        titles_with_id = titles.withColumn("row_id", F.monotonically_increasing_id())
        assignable_with_id = assignable.withColumn("row_id", F.monotonically_increasing_id())
        ids_with_id = ids.withColumn("row_id", F.monotonically_increasing_id())

        region_info = titles_with_id.join(ids_with_id, on="row_id")
        region_info = region_info.join(assignable_with_id, on="row_id")

        region_join = region_df.join(region_info ,region_df.category_id == region_info.id ,"inner")

        region_join = region_join.withColumn("comments_disabled", region_join["comments_disabled"].cast("string")) \
                         .withColumn("ratings_disabled", region_join["ratings_disabled"].cast("string")) \
                         .withColumn("video_error_or_removed", region_join["video_error_or_removed"].cast("string")) \
                         .withColumn("snippet_assignable", region_join["snippet_assignable"].cast("string"))


        region_join = region_join.withColumn("region", F.lit(region_name))

        all_regions = all_regions.union(region_join)

In [14]:
all_regions.coalesce(1).write.option("header" ,True).mode("overwrite").csv("file:///home/jovyan/work/Projects/Youtube_Analysis/Data/all_regions")

In [9]:
all_regions = spark.read.csv("file:///home/jovyan/work/Projects/Youtube_Analysis/Data/all_regions/part-00000-46b19352-00e1-4204-b00a-cc3b19ade311-c000.csv" 
                             ,header=True ,inferSchema=True)

In [10]:
all_regions.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)
 |-- description: string (nullable = true)
 |-- row_id: integer (nullable = true)
 |-- snippet_title: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- snippet_assignable: boolean (nullable = true)
 |-- region: string (nullable = true)



## Transformation

In [11]:
# Year-Day-Month

In [12]:
all_regions.select(all_regions["trending_date"]).show(5)

+-------------+
|trending_date|
+-------------+
|     17.14.11|
|     17.14.11|
|     17.14.11|
|     17.14.11|
|     17.14.11|
+-------------+
only showing top 5 rows



In [13]:
all_regions = all_regions.withColumn( "trending_date" , F.to_date(F.concat( \
                            F.lit("20"),F.split(all_regions["trending_date"],"\.")[0] \
                           ,F.lit("/") ,F.split(all_regions["trending_date"],"\.")[1] \
                           ,F.lit("/") ,F.split(all_regions["trending_date"],"\.")[2] \
                                        ) ,"yyyy/dd/MM"))

In [14]:
all_regions.select(all_regions["trending_date"]).show(5)

+-------------+
|trending_date|
+-------------+
|   2017-11-14|
|   2017-11-14|
|   2017-11-14|
|   2017-11-14|
|   2017-11-14|
+-------------+
only showing top 5 rows



In [15]:
all_regions = all_regions.withColumn("tag_list" ,F.split(all_regions["tags"] ,"\|"))

In [16]:
def filter_tags(tags_list):
    list_tags = ["".join(re.sub(r"[^\s\'\dA-Za-z\.,-]" ,"" ,tag)) for tag in tags_list if "".join(re.findall(r"[\dA-Za-z\.,-]+",tag)) !=""]
    if (len(list_tags) == 0) or ( "none" in  ChainMap(*tags_list) ):
        return "No tags"
    else:
        return list_tags

In [17]:
filter_tags_func = F.udf(filter_tags ,StringType())

In [18]:
all_regions = all_regions.withColumn("tag_list" ,
                      filter_tags_func(F.col("tag_list"))
                      )

In [19]:
for column in all_regions.columns:
    print(column)
    all_regions.select((F.when(all_regions[column].isNull() == "false",0).otherwise(1)).alias("null?")).groupBy("null?").count().show()

video_id
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

trending_date
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

title
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

channel_title
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

category_id
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

publish_time
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

tags
+-----+------+
|null?| count|
+-----+------+
|    0|373204|
+-----+------+

views
+-----+------+
|null?| count|
+-----+------+
|    0|373202|
|    1|     2|
+-----+------+

likes
+-----+------+
|null?| count|
+-----+------+
|    0|373202|
|    1|     2|
+-----+------+

dislikes
+-----+------+
|null?| count|
+-----+------+
|    0|373202|
|    1|     2|
+-----+------+

comment_count
+-----+------+
|null?| count|
+-----+------+
|    0|373202|
|    1|     2|
+-----+-----

In [20]:
c1 = ["views" ,"likes" ,"dislikes" ,"comment_count"]

for column in c1:
    all_regions = all_regions.withColumn(column ,
                                     F.when( all_regions[column].isNull() == "true" ,0).otherwise(all_regions[column])
                                      )


In [21]:
all_regions = all_regions.drop("description" ,"tags" ,"title" ,"channel_title" ,"thumbnail_link")

In [22]:
all_regions = all_regions.dropna()

In [23]:
all_regions.count()

373202

In [24]:
all_regions = all_regions.withColumn("likes ratio" , F.round( (all_regions["likes"] / all_regions["views"]) * 100 ,7) )
all_regions = all_regions.withColumn("dislikes ratio" , F.round( (all_regions["dislikes"] / all_regions["views"]) * 100 ,7) )
all_regions = all_regions.withColumn("comments ratio" , F.round( (all_regions["comment_count"] / all_regions["views"]) * 100 ,7) )

In [25]:
window_fun = Window.orderBy("row_id")

In [26]:
all_regions = all_regions.withColumn("row_id" ,F.row_number().over(window_fun))

In [28]:
db_url = 'jdbc:postgresql://localhost:5432/Youtube'
user = 'postgres'
password = 'test1234'

In [None]:
all_regions.select("*") \
        .write \
        .format("jdbc") \
        .option("driver", "org.postgresql.Driver") \
        .option("url", db_url) \
        .option("dbtable", "youtube_videos") \
        .option("user", user) \
        .option("password", password) \
        .mode("append") \
        .save()