In [102]:
from delta import configure_spark_with_delta_pip
import pyspark
from configparser import ConfigParser
from os import environ
from pyspark.sql.types import StructType,StructField,BooleanType,MapType,IntegerType
from pyspark.sql.functions import col,from_json,lit,round
import json

In [2]:
# initialize config
config = ConfigParser()
config.read('config/python/config.ini')


['config/python/config.ini']

In [3]:
# get value from config file
gcs_jars = config["spark"]["gcs-connector"]
delta_contrib_jars = config["spark"]["delta-contrib"]
apps_name = config["spark"]["apps-name"]
data_sources = config["datasource"]["csv1"]
json_data_sources = config["datasource"]["json1"]
gcs_data_sources = config["bucket"]["gcs-data-source"]
gcs_data_sources_ids = config["bucket"]["gcs-data-source-id"]
gcs_data_destination = config["bucket"]["gcs-data-destination"]

In [4]:
# set up environment variables
environ["GOOGLE_APPLICATION_CREDENTIALS"] = config["gcpconfig"]["json"]

In [5]:

# spark initialization
builder = pyspark.sql.SparkSession.builder.appName(apps_name) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.jars",gcs_jars) \
    .config("spark.jars",delta_contrib_jars) \
    .config("spark.delta.logStore.gs.impl","io.delta.storage.GCSLogStore")

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

Ivy Default Cache set to: /Users/kotekaman/.ivy2/cache
The jars for the packages stored in: /Users/kotekaman/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-26d387f1-2cec-4d91-9039-2f0d0536e771;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/kotekaman/Documents/private/belajar/apache_spark/3.1.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 178ms :: artifacts dl 8ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtime;3.5.2 from central in [default]
	org.antlr#antlr4;4.7 from central in [default]
	org.antlr#antlr4-runtime;4.7 from central in [default]
	org.glassfish#javax.json;1.0.4 from central in [default]
	---------------------------------------------------------------------
	|             

In [6]:
data = spark.read.option("header",True).option("quote", "\"").option("escape", "\"").option("multiLine", True).csv(data_sources)
json_datas = spark.read.json(json_data_sources)
datas = None
datas_json_for_data_lake = None

In [115]:
# load table into delta lake format
try:
    data.write.format("delta").save(gcs_data_sources)
    datas = spark.read.format("delta").load(gcs_data_sources)
except:
    print("table already save in gcs")
    datas = spark.read.format("delta").load(gcs_data_sources)

try:
    json_datas.write.format("delta").save(gcs_data_sources_ids)
    datas_json_for_data_lake = spark.read.format("delta").load(gcs_data_sources_ids)
except:
    print("json id table already save in gcs")
    datas_json_for_data_lake = spark.read.format("delta").load(gcs_data_sources_ids)


                                                                                

table already save in gcs


                                                                                

json id table already save in gcs


In [53]:
json_datas.printSchema()
data.printSchema()

root
 |-- etag: string (nullable = true)
 |-- id: string (nullable = true)
 |-- kind: string (nullable = true)
 |-- snippet: struct (nullable = true)
 |    |-- assignable: boolean (nullable = true)
 |    |-- channelId: string (nullable = true)
 |    |-- title: string (nullable = true)

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [116]:
# ET Process
videos_selected_field = datas.select("title","category_id","views","likes","dislikes")
id_selected_field = datas_json_for_data_lake.select("id","snippet.*") \
    .withColumnRenamed("title","category")

get_percentage = videos_selected_field \
    .join(id_selected_field,videos_selected_field.category_id ==  id_selected_field.id,"inner") \
    .select("title","category","views","likes","dislikes") \
    .withColumn("likes",(col("likes")/col("views")*lit(100))).withColumnRenamed("likes","likes_percentage") \
    .withColumn("likes_percentage",round(col('likes_percentage'),2)) \
    .withColumn("dislikes",(col("dislikes")/col("views")*lit(100))).withColumnRenamed("dislikes","dislikes_percentage") \
    .withColumn("dislikes_percentage",round(col('dislikes_percentage'),2)) \
    .withColumn("not_voted_percentage",lit(100)-(col("likes_percentage")+col("dislikes_percentage"))) \
    .withColumn("not_voted_percentage",round(col('not_voted_percentage'),2)) \
    .withColumn("views",col("views").cast(IntegerType()))


most_not_voted_film = get_percentage.distinct() \
    .groupBy("title","category","likes_percentage","dislikes_percentage","not_voted_percentage") \
    .sum('views') \
    .withColumnRenamed("sum(views)","views") \
    .sort(col("not_voted_percentage").desc(),col("views").desc())

most_not_voted_film.show()

most_dislikes_film = get_percentage.distinct() \
    .sort(col("dislikes_percentage").desc())

most_dislikes_film.show()

most_likes_film = get_percentage.distinct() \
    .sort(col("likes_percentage").desc(),col("views").desc())

most_likes_film.show()


                                                                                

+--------------------+--------+-------+----------------+-------------------+--------------------+
|               title|category|  views|likes_percentage|dislikes_percentage|not_voted_percentage|
+--------------------+--------+-------+----------------+-------------------+--------------------+
|Bruno Mars - Fine...|   Music| 548621|           29.05|               0.43|               70.52|
|Luis Fonsi, Demi ...|   Music| 499946|           27.06|               0.71|               72.23|
|j-hope 'Airplane' MV|   Music|5275672|           26.57|               0.12|               73.31|
|dodie - Secret Fo...|   Music| 129130|           25.37|               0.08|               74.55|
|Louis Tomlinson -...|   Music| 985998|           24.51|               0.08|               75.41|
+--------------------+--------+-------+----------------+-------------------+--------------------+



                                                                                

+--------------------+--------------------+----------------+-------------------+--------------------+---------+
|               title|            category|likes_percentage|dislikes_percentage|not_voted_percentage|    views|
+--------------------+--------------------+----------------+-------------------+--------------------+---------+
|     To Our Daughter|      People & Blogs|             0.0|                0.0|               100.0|461064419|
|T-Mobile | #Littl...|       Entertainment|             0.0|                0.0|               100.0|115590182|
|Coachella 2018 LI...|               Music|             0.0|                0.0|               100.0| 38740070|
|The New Snapchat ...|               Music|             0.0|                0.0|               100.0| 21791461|
|KKW BEAUTY: Conce...|      People & Blogs|             0.0|                0.0|               100.0| 11962752|
|GET READY WITH ME...|       Howto & Style|             0.0|                0.0|               100.0|  9

                                                                                

+--------------------+---------------+-------+----------------+-------------------+--------------------+
|               title|       category|  views|likes_percentage|dislikes_percentage|not_voted_percentage|
+--------------------+---------------+-------+----------------+-------------------+--------------------+
|PSA from Chairman...| People & Blogs|1142585|             0.8|              19.15|               80.05|
|PSA from Chairman...| People & Blogs|1179072|            0.79|              19.01|                80.2|
|PSA from Chairman...| People & Blogs|1205682|            0.79|              18.95|               80.26|
|#ProudToCreate: P...|  Entertainment| 597669|            4.98|              11.98|               83.04|
|The FCC repeals i...|News & Politics| 985179|            0.49|              11.24|               88.27|
|The FCC repeals i...|News & Politics|1192501|            0.48|              10.62|                88.9|
|The FCC repeals i...|News & Politics|1242998|         



+-------------------------+--------------+-------+----------------+-------------------+--------------------+
|                    title|      category|  views|likes_percentage|dislikes_percentage|not_voted_percentage|
+-------------------------+--------------+-------+----------------+-------------------+--------------------+
|     Bruno Mars - Fine...|         Music| 548621|           29.05|               0.43|               70.52|
|     Luis Fonsi, Demi ...|         Music| 499946|           27.06|               0.71|               72.23|
|     j-hope 'Airplane' MV|         Music|5275672|           26.57|               0.12|               73.31|
|     dodie - Secret Fo...|         Music| 129130|           25.37|               0.08|               74.55|
|     Louis Tomlinson -...|         Music| 985998|           24.51|               0.08|               75.41|
|BTS (방탄소년단) 'FAKE...|         Music|5884233|           24.44|               0.11|               75.45|
|     5 Seconds Of Summ.

                                                                                

In [119]:
# write data into GCS
most_not_voted_film.limit(5).write.mode("overwrite").format("delta").save(gcs_data_destination+"/five_most_not_voted_film")
most_dislikes_film.limit(5).write.mode("overwrite").format("delta").save(gcs_data_destination+"/five_most_dislikes_film")
most_likes_film.limit(5).write.mode("overwrite").format("delta").save(gcs_data_destination+"/five_most_likes_film")

                                                                                