In [1]:
import findspark
findspark.init()

In [2]:
import time
from pyspark.sql import SparkSession

start_time = time.time()

spark = SparkSession.builder.appName('jupyter-spark') \
    .enableHiveSupport()\
    .getOrCreate()
sc = spark.sparkContext

end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Time taken to execute the code: 17.348910331726074 seconds


In [4]:
start_time = time.time()
!hadoop fs -mkdir -p /tmp/BigDataproj/
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

Time taken to execute the code: 1.7037265300750732 seconds


In [8]:
start_time = time.time()
import subprocess

files = ["CAvideos.csv", "DEvideos.csv", "FRvideos.csv", "GBvideos.csv", "INvideos.csv", "JPvideos.csv", "KRvideos.csv", "MXvideos.csv", "RUvideos.csv", "USvideos.csv"]
hdfs_dir = "/tmp/BigDataproj/"
file_paths = []

for file in files:
    local_path = f"/notebook/{file}"
    hdfs_path = f"{hdfs_dir}{file}"

    subprocess.run(["hdfs", "dfs", "-put", local_path, hdfs_path])
    file_paths.append(hdfs_path)

!hadoop fs -ls /tmp/BigDataproj
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

Found 10 items
-rw-r--r--   2 root supergroup   64067991 2024-01-07 12:46 /tmp/BigDataproj/CAvideos.csv
-rw-r--r--   2 root supergroup   63040138 2024-01-07 12:46 /tmp/BigDataproj/DEvideos.csv
-rw-r--r--   2 root supergroup   51424708 2024-01-07 12:46 /tmp/BigDataproj/FRvideos.csv
-rw-r--r--   2 root supergroup   53213441 2024-01-07 12:46 /tmp/BigDataproj/GBvideos.csv
-rw-r--r--   2 root supergroup   59600439 2024-01-07 12:46 /tmp/BigDataproj/INvideos.csv
-rw-r--r--   2 root supergroup   28740747 2024-01-07 12:47 /tmp/BigDataproj/JPvideos.csv
-rw-r--r--   2 root supergroup   34835868 2024-01-07 12:47 /tmp/BigDataproj/KRvideos.csv
-rw-r--r--   2 root supergroup   45191541 2024-01-07 12:47 /tmp/BigDataproj/MXvideos.csv
-rw-r--r--   2 root supergroup   76268286 2024-01-07 12:47 /tmp/BigDataproj/RUvideos.csv
-rw-r--r--   2 root supergroup   62756152 2024-01-07 12:47 /tmp/BigDataproj/USvideos.csv
Time taken to execute the code: 47.46334481239319 seconds


In [34]:
start_time = time.time()
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

schema = StructType([
    StructField("video_id", StringType(), True),
    StructField("trending_date", StringType(), True),
    StructField("title", StringType(), True),
    StructField("channel_title", StringType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("publish_time", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("views", IntegerType(), True),
    StructField("likes", IntegerType(), True),
    StructField("dislikes", IntegerType(), True),
    StructField("comment_count", IntegerType(), True),
    StructField("thumbnail_link", StringType(), True),
    StructField("comments_disabled", BooleanType(), True),
    StructField("ratings_disabled", BooleanType(), True),
    StructField("video_error_or_removed", BooleanType(), True),
    StructField("description", StringType(), True)
])
region = ["CA", "DE", "FR", "GB", "IN", "JP", "KR", "MX", "RU", "US"]
hive_table_prefix = "yt_table"

for i, file_path in enumerate(file_paths):
    dataframe = spark.read.option("header", "true").schema(schema).csv(file_path)
    dataframe.write.mode("overwrite").parquet(f"{hdfs_dir}/df_{i+1}.parquet")
    hive_table_name = f"{hive_table_prefix}_{region[i]}"
    spark.sql(f"CREATE TABLE IF NOT EXISTS {hive_table_name} USING PARQUET OPTIONS (PATH '{hdfs_dir}/df_{i+1}.parquet')")
    spark.sql(f"INSERT INTO TABLE {hive_table_name} SELECT * FROM parquet.`{hdfs_dir}/df_{i+1}.parquet`")
    
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')



Time taken to execute the code: 26.581573247909546 seconds


                                                                                

In [35]:
start_time = time.time()
tables = spark.sql("SHOW TABLES")
tables.show()
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|yt_table_ca|      false|
|  default|yt_table_de|      false|
|  default|yt_table_fr|      false|
|  default|yt_table_gb|      false|
|  default|yt_table_in|      false|
|  default|yt_table_jp|      false|
|  default|yt_table_kr|      false|
|  default|yt_table_mx|      false|
|  default|yt_table_ru|      false|
|  default|yt_table_us|      false|
+---------+-----------+-----------+

Time taken to execute the code: 0.08134651184082031 seconds


In [36]:
start_time = time.time()
for i in range(len(region)):
    df = spark.sql(f"DESCRIBE FORMATTED yt_table_{region[i]}")
    df.show(truncate=False)

end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

+----------------------------+-----------+-------+
|col_name                    |data_type  |comment|
+----------------------------+-----------+-------+
|video_id                    |string     |null   |
|trending_date               |string     |null   |
|title                       |string     |null   |
|channel_title               |string     |null   |
|category_id                 |int        |null   |
|publish_time                |string     |null   |
|tags                        |string     |null   |
|views                       |int        |null   |
|likes                       |int        |null   |
|dislikes                    |int        |null   |
|comment_count               |int        |null   |
|thumbnail_link              |string     |null   |
|comments_disabled           |boolean    |null   |
|ratings_disabled            |boolean    |null   |
|video_error_or_removed      |boolean    |null   |
|description                 |string     |null   |
|                            | 

In [37]:
start_time = time.time()
from pyspark.sql.functions import col, when
tables = ["yt_table_ca", "yt_table_de", "yt_table_fr", "yt_table_gb", "yt_table_in",
          "yt_table_jp", "yt_table_kr", "yt_table_mx", "yt_table_ru", "yt_table_us"]

pre_tables = []

for table_name in tables:
    new_name = f"preprocessed_table_{table_name}"
    original_df = spark.sql(f"SELECT * FROM {table_name}")

    preprocessed_df = original_df.withColumn("title", when(col("title").isNotNull(), col("title")).otherwise("Unknown")) \
        .withColumn("tags", when(col("tags").isNotNull(), col("tags")).otherwise("Unknown")) \
        .withColumn("likes", when(col("likes").isNotNull(), col("likes")).otherwise(0)) \
        .withColumn("dislikes", when(col("dislikes").isNotNull(), col("dislikes")).otherwise(0)) \
        .withColumn("comment_count", when(col("comment_count").isNotNull(), col("comment_count")).otherwise(0)) \
        .withColumn("description", when(col("description").isNotNull(), col("description")).otherwise("No description")) \
        .withColumn("publish_year", col("publish_time").substr(1, 4).cast("int")) \
        .withColumn("publish_month", col("publish_time").substr(6, 2).cast("int")) \
        .withColumn("publish_day", col("publish_time").substr(9, 2).cast("int"))

    preprocessed_with_group_df = preprocessed_df.withColumn("group_column",
        when(col("views") > 1000000, "HighViews") \
        .when(col("likes") > 50000, "HighLikes") \
        .when(col("dislikes") > 50000, "HighDislikes") \
        .when(col("comment_count") > 50000, "HighComments") \
        .otherwise("Other"))

    preprocessed_with_group_df.write.mode("overwrite").saveAsTable(new_name)
    pre_tables.append(new_name)

tables = spark.sql("SHOW TABLES")
tables.show()
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')



+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|preprocessed_tabl...|      false|
|  default|         yt_table_ca|      false|
|  default|         yt_table_de|      false|
|  default|         yt_table_fr|      false|
|  default|         yt_table_gb|      false|
|  default|         yt_table_in|      false|
|  default|         yt_table_jp|      false|
|  default|         yt_table_kr|      false|
|  default|         yt_table_mx|      false|
|  default|         yt_table_ru|      false|
|  default

                                                                                

In [38]:
start_time = time.time()
output_dir = "/tmp/csv_output"
for table_name in pre_tables:
    df_spark = spark.table(table_name)
    df_spark.write.csv(f"{output_dir}/{table_name}", header=True, mode="overwrite")
    
!hadoop fs -ls /tmp/csv_output
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

                                                                                

Found 10 items
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_ca
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_de
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_fr
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_gb
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_in
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_jp
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_kr
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_mx
drwxr-xr-x   - root supergroup          0 2024-01-07 12:53 /tmp/csv_output/preprocessed_table_yt_table_ru
drwxr-xr-x   - root supergroup 

In [39]:
start_time = time.time()
import subprocess
output_dir = "/tmp/csv_output"
local_output_dir = "/notebook"
subprocess.run(["hadoop", "fs", "-get", f"{output_dir}/*", local_output_dir])
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

Time taken to execute the code: 37.85867238044739 seconds


In [40]:
start_time = time.time()
spark.stop()
end_time = time.time()
execution_time = end_time - start_time
print('Time taken to execute the code:', execution_time, 'seconds')

Time taken to execute the code: 1.1115782260894775 seconds
