# task 3

In [2]:
import pandas as pd
import pyspark
import numpy as np
from pyspark.sql import SparkSession
from os.path import join

In [3]:
d = pd.read_csv('GBvideos.csv', nrows=38000)
d.to_csv('GBvideos_fixed.csv', index=False)

In [4]:
# Import SparkSession
from pyspark.sql import SparkSession 

# Create SparkSession 
spark = SparkSession \
        .builder \
        .appName("my spark app") \
        .master("local[*]") \
        .getOrCreate()

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DateType, TimestampType, LongType


schema = StructType([
    StructField("video_id", StringType(), ),
    StructField("trending_date", StringType(), False),
    StructField("title", StringType(), False),
    StructField("channel_title", StringType(), False),
    StructField("category_id", IntegerType(), False),
    StructField("publish_time", TimestampType(), False),
    StructField("tags", StringType(), False),
    StructField("views", LongType(), False),
    StructField("likes", IntegerType(), False),
    StructField("dislikes", IntegerType(), False),
    StructField("comment_count", IntegerType(), False),
    StructField("thumbnail_link", StringType(), False),
    StructField("comments_disabled", BooleanType(), False),
    StructField("ratings_disabled", BooleanType(), False),
    StructField("video_error_or_removed", BooleanType(), False),
    StructField("description", StringType(), False),    
])

# You can also use spark.read.csv function
df = spark.read.format("csv").load("GBvideos.csv", header = True, schema = schema)
df.createOrReplaceTempView("videos")

In [6]:
# question 1

spark.sql(""" 
        SELECT video_id,  SUM(likes) - SUM(dislikes) as dis_likes_difference from videos
        GROUP BY video_id
        ORDER BY dis_likes_difference DESC;          
          """).show(10)

+-----------+--------------------+
|   video_id|dis_likes_difference|
+-----------+--------------------+
|VYOjWnS4cMY|           138849717|
|xpVfcZ0ZcFM|           115703926|
|kLpH1nSLJSs|           101196276|
|ffxKSjUwKdU|            93591008|
|7C2z4GqqS5E|            69445285|
|_I_D_8Z4sJE|            57760713|
|kX0vO4vlJuU|            52999142|
|tCXGJQYZ9JA|            52839411|
|8O_MwlZ2dEg|            52579739|
|9jI-z9QN6g8|            50715687|
+-----------+--------------------+
only showing top 10 rows



In [7]:
# question 2

spark.sql(""" 
SELECT channel_title, AVG(likes) as avg_likes
FROM videos
GROUP BY channel_title
ORDER BY avg_likes DESC;
          """).show(10)

+-------------------+------------------+
|      channel_title|         avg_likes|
+-------------------+------------------+
|ChildishGambinoVEVO| 4122446.027777778|
|          Bad Bunny| 2957022.972222222|
|   ArianaGrandeVevo| 2786633.285714286|
|      LuisFonsiVEVO|         2686169.0|
|          DrakeVEVO|         2389258.0|
|            ibighit| 2359758.066666667|
|  YouTube Spotlight| 1935720.037037037|
|      Flow La Movie|1621740.7272727273|
|         BeckyGVEVO| 1439571.619047619|
|         Ed Sheeran|     1423959.96875|
+-------------------+------------------+
only showing top 10 rows



In [8]:
spark.sql("""
SELECT channel_title, SUM(views) as total_views_count
FROM videos
group by channel_title
having SUM(views) > POWER(10, 6)
ORDER BY  total_views_count DESC;          
          """).show(10)



+--------------------+-----------------+
|       channel_title|total_views_count|
+--------------------+-----------------+
|          NickyJamTV|       8516190092|
|               Ozuna|       8305198063|
|           Bad Bunny|       6891280759|
|           DrakeVEVO|       6581834413|
| ChildishGambinoVEVO|       6101309613|
|       Flow La Movie|       5151438858|
|    ArianaGrandeVevo|       4107436350|
|Marvel Entertainment|       3993421220|
|    jypentertainment|       3624070589|
|          Ed Sheeran|       3589056783|
+--------------------+-----------------+
only showing top 10 rows



In [9]:
from pyspark.sql.functions import split, size
df.select('channel_title', size(split(df.tags, r'\|', -1)).alias('tag_count')).groupBy("channel_title").mean().orderBy('avg(tag_count)', ascending=False).show(10)# .orderBy('tag_average_mean').show(10)

+------------------------+-----------------+
|           channel_title|   avg(tag_count)|
+------------------------+-----------------+
|          특이한동물채널|             78.0|
|                CCTV春晚|             67.0|
|                theCHIVE|             63.0|
|                 Sanadsk|             61.0|
|             jessiepaege|             59.0|
|영국남자 Korean Engli...|             58.0|
|               Jake Paul|56.27272727272727|
|         Rosanna Pansino|             56.0|
|                    KARD|             56.0|
|           Daniel Howell|             53.0|
+------------------------+-----------------+
only showing top 10 rows



In [10]:
from pyspark.sql.functions import explode
# a temporary table where the 'tags' column is converted to the array data type
df_temp = df.select((split(df.tags, r'\|', -1)).alias('tags_list'))
# df_temp.select(flatten("individual_tag")).show()
df_temp.select(explode('tags_list').alias('individual_tag')).groupBy('individual_tag').count().where("individual_tag != '[none]'").orderBy('count', ascending=False).show(10)

+--------------+-----+
|individual_tag|count|
+--------------+-----+
|       "funny"| 2629|
|      "comedy"| 2322|
|       "music"| 1972|
|         "Pop"| 1556|
|        "2018"| 1383|
|       "video"| 1189|
| "music video"| 1159|
|     "Records"| 1137|
|   "interview"| 1124|
|    "official"| 1106|
+--------------+-----+
only showing top 10 rows



In [11]:
spark.sql(""" 

SELECT channel_title, MAX(comment_count) as max_comments_count
FROM videos
GROUP BY channel_title
ORDER BY max_comments_count DESC

          """).show(10)

+--------------------+------------------+
|       channel_title|max_comments_count|
+--------------------+------------------+
|    Logan Paul Vlogs|           1626501|
|             ibighit|           1228655|
|   YouTube Spotlight|            845233|
| ChildishGambinoVEVO|            553371|
|Marvel Entertainment|            368739|
|           DrakeVEVO|            301756|
|         Lucas Lucco|            275795|
|    jypentertainment|            274087|
|    ArianaGrandeVevo|            259613|
|          HowToBasic|            225618|
+--------------------+------------------+
only showing top 10 rows



# task 1

In [1]:
import pyspark 

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from os.path import join

In [3]:
# Import SparkSession
from pyspark.sql import SparkSession 

# Create SparkSession 
spark = SparkSession \
        .builder \
        .appName("my spark app") \
        .master("local[*]").getOrCreate()


sc = spark.sparkContext


In [None]:
json_object = sc.textFile("GB_category_id.json")
json_reduced = json_object.reduce(lambda x, y: "\n".join([x,y]))
import re
a = re.findall(r'"id".*:.*"(\w+)".*', json_reduced)
b = re.findall(r'"title".*:.*"(\w+)"', json_reduced)
categories = dict(zip(map(int, a),b))

print(categories)

In [None]:
df = spark.read.csv("FRvideos.csv", header=True)
rdd = df.rdd 
rdd0 = rdd.map(lambda x: (x['title'], x['views'], x['channel_title'],))
rdd1 = rdd0.groupBy(lambda x: x[2])


# task 2

In [1]:
# Import SparkSession
from pyspark.sql import SparkSession 

# Create SparkSession 
spark = SparkSession \
        .builder \
        .appName("my spark app") \
        .master("local[*]") \
        .getOrCreate()

print("\n\nQUESTION 1")
print("#" * 100)


from pyspark.sql.types import *
schema = StructType([
  StructField('video_id', StringType(), True),
  StructField('trending_date', DateType(), True),
  StructField('title', StringType(), True),
  StructField('channel_title', StringType(), True),
  StructField('category_id', IntegerType(), True),
  StructField('publish_time', TimestampType() , True),
  StructField('tags', StringType(), True),
  StructField('views', LongType(), True),
  StructField('likes', LongType(), True),
  StructField('dislikes', LongType(), True),
  StructField('comment_count', LongType(), True),
  StructField('thumbnail_link', StringType(), True),
  StructField('comments_disabled', BooleanType(), True),
  StructField('ratings_disabled', BooleanType(), True),
  StructField('video_error_or_removed', BooleanType(), True),
  StructField('description', StringType(), True),
])

df = spark.read.csv("FRvideos.csv", header=True, schema=schema)
from pyspark.sql.functions import split
from pyspark.sql.functions import col
df = df.withColumn('tags', split(col('tags'), r'\|'))
df.printSchema()




QUESTION 1
####################################################################################################
root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- views: long (nullable = true)
 |-- likes: long (nullable = true)
 |-- dislikes: long (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)
 |-- description: string (nullable = true)



In [None]:
print("\n\nQUESTION 2")
print("#" * 100)

from pyspark.sql.functions import *

df.select('video_id', 'publish_time', (df.likes - df.dislikes).alias('delta')). \
                            where(df.publish_time < '2010-01-01').orderBy('delta', ascending=False). \
                            show(10)


print("\n\nQUESTION 2")
print("#" * 100)


df.groupby('channel_title').agg(avg('likes').alias('average_likes')).orderBy('average_likes', ascending=False).show(5)


In [None]:
print("\n\nQUESTION 3")
print("#" * 100)

df.select('channel_title', 'views').groupby('channel_title').sum().filter('sum(views) > 1000000').show(15)

In [8]:
df.agg(corr('likes', 'views').alias('correlation')).show()

+------------------+
|       correlation|
+------------------+
|0.8125691299632112|
+------------------+



In [15]:
df.select('channel_title', size('tags').alias('tag_count')). \
groupBy("channel_title").mean().\
orderBy('avg(tag_count)', ascending=False).show(10)

+--------------------+-----------------+
|       channel_title|   avg(tag_count)|
+--------------------+-----------------+
|        Yoann Leroux|             78.0|
|     Official Hyolyn|             76.5|
|       Ficko Primera|             75.0|
|        CocoCam Jump|72.33333333333333|
|         RallyPasion|             72.0|
|               TraKz|             71.5|
|Joshua - Réalisat...|             68.0|
|       Zvezde Granda|             67.0|
|            Voltor63|             67.0|
|   Checkpoint Rallye|             65.0|
+--------------------+-----------------+
only showing top 10 rows



In [23]:
df.select(explode('tags').alias('individual_tag')). \
groupBy('individual_tag').count().where("individual_tag != '[none]'"). \
orderBy('count', ascending=False).withColumnRenamed('count', 'frequency').show(10)

+--------------+---------+
|individual_tag|frequency|
+--------------+---------+
|    ""humour""|     1513|
|      ""2018""|      963|
|  ""football""|      791|
|    ""france""|      599|
|      ""2017""|      529|
|       ""rap""|      481|
|     ""video""|      463|
| ""freestyle""|      456|
|   ""musique""|      444|
|      ""live""|      436|
+--------------+---------+
only showing top 10 rows



In [16]:
df.groupBy("channel_title").max("comment_count").orderBy(desc("max(comment_count)")).show(10)

+--------------------+------------------+
|       channel_title|max(comment_count)|
+--------------------+------------------+
|             ibighit|           1040912|
|   YouTube Spotlight|            827755|
|    Logan Paul Vlogs|            611327|
|           AuronPlay|            315801|
|        David Dobrik|            264405|
| ChildishGambinoVEVO|            232723|
|Marvel Entertainment|            177598|
|    ArianaGrandeVevo|            176926|
|            Lil pump|            161259|
|         AsapSCIENCE|            141646|
+--------------------+------------------+
only showing top 10 rows



In [18]:
import re
def match(description):
    if description is not None:
        return [list(re.findall(r"https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%_\+.~#?&//=]*)", description)),
                list(re.findall("\B\@\w+", description))]
    else :
        return [[], []]

regex_udf = udf(match, ArrayType(ArrayType(StringType())))

In [None]:
df.select("title", "description", regex_udf(col("description"))).filter(col("title").isNotNull() & col("description").isNotNull()).withColumn("links", col("match(description)").getItem(0)).withColumn("mentions", col("match(description)").getItem(1)).orderBy("title").drop("description").drop("match(description)").show()

In [None]:
df.select("title", "description", regex_udf(col("description"))).where('title is not null and description is not null'). \
withColumn("links", col("match(description)").getItem(0)). \
withColumn("mentions", col("match(description)").getItem(1)). \
orderBy("title").select('title', 'mentions', 'links').show() # drop("description").drop("match(description)").show()