In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "storage.yandexcloud.net")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "*****")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key","*****")

In [5]:
schema = types.StructType([
    types.StructField('video_id', types.StringType(), True),
    types.StructField('title', types.StringType(), True),
    types.StructField('publishedAt', types.TimestampType(), True),
    types.StructField('channelId', types.StringType(), True),
    types.StructField('channelTitle', types.StringType(), True),
    types.StructField('categoryId', types.IntegerType(), True),
    types.StructField('trending_date', types.TimestampType(), True),
    types.StructField('tags', types.StringType(), True),
    types.StructField('view_count', types.IntegerType(), True),
    types.StructField('likes', types.IntegerType(), True),
    types.StructField('dislikes', types.IntegerType(), True),
    types.StructField('comment_count', types.IntegerType(), True),
    types.StructField('thumbnail_link', types.StringType(), True),    
    types.StructField('comments_disabled', types.BooleanType(), True),  
    types.StructField('ratings_disabled', types.BooleanType(), True),  
    types.StructField('description', types.StringType(), True)      
])

In [78]:
s3_df=spark.read.option("quoteAll", True ).csv("s3a://kaggleyoutubedata/US_youtube_trending_data.csv",header=True,schema = schema)
s3_df.show(5)

+--------------------+--------------------+-------------------+--------------------+------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|            video_id|               title|        publishedAt|           channelId|channelTitle|categoryId|      trending_date|                tags|view_count| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|         description|
+--------------------+--------------------+-------------------+--------------------+------------+----------+-------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|         3C66w5Z0ixs|I ASKED HER TO BE...|2020-08-12 00:20:14|UCvtRTOMP2TqYqu51...|    Brawadis|        22|2020-08-12 05:00:00|brawadis|prank|ba...|   1514614|156908|    5855|        35313|https:

In [83]:
s3_df = s3_df.drop('description')
s3_df = s3_df.drop('thumbnail_link')
s3_df = s3_df.drop('description')
s3_df = s3_df.drop('title')
s3_df = s3_df.drop('channelTitle')
s3_df = s3_df.drop('tags')

In [8]:
s3_df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- channelId: string (nullable = true)
 |-- categoryId: integer (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)



In [84]:
mode = "overwrite"
url = "jdbc:postgresql://*****.yandexcloud.net:6432/kaggle_youtube_data?targetServerType=master&ssl=true&sslmode=verify-full"
properties = {"user": "*****","password": "*****","driver": "org.postgresql.Driver"}
s3_df.write.jdbc(url=url, table="youtube_trending_data", mode=mode, properties=properties)

                                                                                

In [16]:
s3_category_df = spark.read.option("multiline",True).json("s3a://kaggleyoutubedata/US_category_id.json")

In [17]:
s3_category_df.printSchema()

root
 |-- etag: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- etag: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- kind: string (nullable = true)
 |    |    |-- snippet: struct (nullable = true)
 |    |    |    |-- assignable: boolean (nullable = true)
 |    |    |    |-- channelId: string (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |-- kind: string (nullable = true)



In [39]:
s3_category_df.select("items.id").show(5)

+--------------------+
|                  id|
+--------------------+
|[1, 2, 10, 15, 17...|
+--------------------+



In [38]:
from pyspark.sql import functions as F


In [41]:
#s3_category_df.withColumn('items', F.explode('items')).show()
s3_category_df.select("items").show(5)

+--------------------+
|               items|
+--------------------+
|[{IfWa37JGcqZs-jZ...|
+--------------------+



In [73]:
category_df = s3_category_df.select("items").withColumn('items', F.explode('items'))

In [64]:
category_df.show()

+--------------------+
|               items|
+--------------------+
|{IfWa37JGcqZs-jZe...|
|{5XGylIs7zkjHh594...|
|{HCjFMARbBeWjpm6P...|
|{ra8H7xyAfmE2Fews...|
|{7mqChSJogdF3hSIL...|
|{0Z6uGkj97NgjD-X3...|
|{K_-7stg0kIU7eUBO...|
|{I3IL9xGIM3MsULlq...|
|{D1W6tq5mMMCV0wtN...|
|{QMEBz6mxVdklVaq8...|
|{v2n6q4JttoL4uUba...|
|{Qi1csjh-POReitZE...|
|{IbGXblQi8v_nOsXj...|
|{gYzt8dB8mlod-84i...|
|{hHUhloYhyMMqVkQ4...|
|{KEdEtUd4WGk_aACr...|
|{AioSVwhKNpZ2bhtw...|
|{tMfbFvkfuPP8YnUE...|
|{totPMF_82XTvVFy0...|
|{LNgRKNynxC50MeYV...|
+--------------------+
only showing top 20 rows



In [65]:
category_df.printSchema()

root
 |-- items: struct (nullable = true)
 |    |-- etag: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- kind: string (nullable = true)
 |    |-- snippet: struct (nullable = true)
 |    |    |-- assignable: boolean (nullable = true)
 |    |    |-- channelId: string (nullable = true)
 |    |    |-- title: string (nullable = true)



In [66]:
category_df.select('items.*').show()

+--------------------+---+--------------------+--------------------+
|                etag| id|                kind|             snippet|
+--------------------+---+--------------------+--------------------+
|IfWa37JGcqZs-jZeA...|  1|youtube#videoCate...|{true, UCBR8-60-B...|
|5XGylIs7zkjHh5940...|  2|youtube#videoCate...|{true, UCBR8-60-B...|
|HCjFMARbBeWjpm6PD...| 10|youtube#videoCate...|{true, UCBR8-60-B...|
|ra8H7xyAfmE2FewsD...| 15|youtube#videoCate...|{true, UCBR8-60-B...|
|7mqChSJogdF3hSIL-...| 17|youtube#videoCate...|{true, UCBR8-60-B...|
|0Z6uGkj97NgjD-X3p...| 18|youtube#videoCate...|{false, UCBR8-60-...|
|K_-7stg0kIU7eUBOP...| 19|youtube#videoCate...|{true, UCBR8-60-B...|
|I3IL9xGIM3MsULlqR...| 20|youtube#videoCate...|{true, UCBR8-60-B...|
|D1W6tq5mMMCV0wtNx...| 21|youtube#videoCate...|{false, UCBR8-60-...|
|QMEBz6mxVdklVaq8J...| 22|youtube#videoCate...|{true, UCBR8-60-B...|
|v2n6q4JttoL4uUbaz...| 23|youtube#videoCate...|{true, UCBR8-60-B...|
|Qi1csjh-POReitZEb...| 24|youtube#

In [76]:
category_df = category_df.select('items.id','items.snippet.title')

In [77]:
mode = "overwrite"
url = "jdbc:postgresql://*****.mdb.yandexcloud.net:6432/kaggle_youtube_data?targetServerType=master&ssl=true&sslmode=verify-full"
properties = {"user": "v","password": "*****","driver": "org.postgresql.Driver"}
category_df.write.jdbc(url=url, table="video_category", mode=mode, properties=properties)

[Stage 20:>                                                         (0 + 1) / 1]                                                                                

In [85]:
s3_df.join(category_df, s3_df.categoryId == category_df.id, 'inner').registerTempTable('yt_data')



In [86]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    title AS category_name,
    date_trunc('month', publishedAt) AS published_month, 
    date_trunc('month', trending_date) AS trending_month, 
    
    count(categoryId) count_videos, 
    AVG(view_count) avg_view_count,
    AVG(likes) avg_likes
FROM
    yt_data
GROUP BY
    1, 2, 3
""")

In [87]:
mode = "overwrite"
url = "jdbc:postgresql://*****.mdb.yandexcloud.net:6432/kaggle_youtube_data?targetServerType=master&ssl=true&sslmode=verify-full"
properties = {"user": "*****","password": "*****","driver": "org.postgresql.Driver"}
df_result.write.jdbc(url=url, table="yt_data_transformed", mode=mode, properties=properties)

                                                                                