In [1]:
import json
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import StructType

In [2]:
# Criando SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/") \
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
        .getOrCreate()

21/11/16 19:55:47 WARN Utils: Your hostname, alex-Inspiron-5566 resolves to a loopback address: 127.0.1.1; using 192.168.0.7 instead (on interface wlp2s0)
21/11/16 19:55:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/alex/Dev/pmd/venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/alex/.ivy2/cache
The jars for the packages stored in: /home/alex/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f526d84a-caa4-4a3f-95dc-cd42212e94a6;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 313ms :: artifacts dl 9ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |


In [4]:
lista_paises = ['CA', 'DE', 'FR', 'GB', 'IN', 'JP', 'KR', 'MX', 'RU', 'US']
base_name_csv = 'videos.csv'
base_name_json = '_category_id.json'
path = 'archive/'
videos = {}
categorias = {}
todos = None

In [5]:
colunas = ['video_id', 'trending_date', 'title', 'category_id', 'publish_time', 'views', 'likes', 'dislikes', \
           'comment_count']

In [6]:
# Separando os items do JSON
for pais in lista_paises:
    with open('original/'+ pais +'_category_id.json') as categories:
        obj = json.load(categories)
        items = obj['items']

        with open(path+ pais +'_category_id.json', 'w') as file:
           json.dump(items, file)

## Lendo os arquivos de vídeos

In [7]:
for pais in lista_paises:
    categorias[pais] = spark.read.format('org.apache.spark.sql.json').option("multiline","true") \
        .load(path+pais+base_name_json)\
        .select("id", f.col("snippet.title").alias('category_title'))
    
    videos[pais] = spark.read.format("csv").option("header", "true").load('original/'+pais+base_name_csv) \
        .select(colunas) \
        .join(categorias[pais], f.col('category_id') == f.col('id')) \
        .drop('category_id') \
        .drop('id')
    

CA


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

DE
FR
GB
IN
JP
KR
MX
RU
US


## Resultados Leitura

In [8]:
## Mostra o schema do Dataframe
videos['US'].printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- category_title: string (nullable = true)



In [9]:
categorias['US'].printSchema()

root
 |-- id: string (nullable = true)
 |-- category_title: string (nullable = true)



In [10]:
categorias['US'].select('*').show()

+---+--------------------+
| id|      category_title|
+---+--------------------+
|  1|    Film & Animation|
|  2|    Autos & Vehicles|
| 10|               Music|
| 15|      Pets & Animals|
| 17|              Sports|
| 18|        Short Movies|
| 19|     Travel & Events|
| 20|              Gaming|
| 21|       Videoblogging|
| 22|      People & Blogs|
| 23|              Comedy|
| 24|       Entertainment|
| 25|     News & Politics|
| 26|       Howto & Style|
| 27|           Education|
| 28|Science & Technology|
| 29|Nonprofits & Acti...|
| 30|              Movies|
| 31|     Anime/Animation|
| 32|    Action/Adventure|
+---+--------------------+
only showing top 20 rows



### 3. Unindo Dataframes

In [11]:
schema = videos['US'].schema
df_videos = spark.createDataFrame([], schema)

In [12]:
df_videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- category_title: string (nullable = true)



In [13]:
for pais in lista_paises:
    df_videos = df_videos.union(videos[pais])

In [14]:
df_videos = df_videos.withColumn('views', f.col('views').cast('int')) \
                     .withColumn('likes', f.col('likes').cast('int')) \
                     .withColumn('dislikes', f.col('dislikes').cast('int')) \
                     .withColumn('comment_count', f.col('comment_count').cast('int')) \
                     .withColumn('trending_date', f.to_timestamp(f.col('trending_date'), 'yy.dd.MM')) \
                     .withColumn('publish_time', f.regexp_replace('publish_time', 'T', ' ')) \
                     .withColumn('publish_time', f.regexp_replace('publish_time', 'Z', '')) \
                     .withColumn('publish_time', f.to_timestamp(f.col('publish_time'), 'yyyy-MM-dd HH:mm:ss.SSS'))\
                     .withColumn("trending_date", f.col("trending_date") + f.expr("INTERVAL 1 days"))
    

In [15]:
df_videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- category_title: string (nullable = true)



### 4. Lidando com valores nulos

In [16]:
df_videos = df_videos.na.fill(value=0,subset=['comment_count', 'likes', 'dislikes'])

### 4.2 Deixando valores únicos

In [17]:
df_interations = df_videos.groupBy('video_id')\
            .agg(f.max('views').alias('max_views'),
                 f.max('likes').alias('max_likes'),
                 f.max('dislikes').alias('max_dislikes'),
                 f.max('comment_count').alias('max_comment_count'),
                 f.min('publish_time').alias('min_publish_time'),
                 f.min('trending_date').alias('min_trending_date'))\
            .withColumnRenamed('video_id', 'video_id_temp')




In [18]:
df_videos = df_videos.join(df_interations, f.col('video_id') == f.col('video_id_temp'))\
            .dropDuplicates(['video_id']) \
            .drop("views", "likes", "dislikes", "comment_count", "trending_date", 'publish_time', 'video_id_temp') \
            .withColumnRenamed('max_views', 'views') \
            .withColumnRenamed('max_dislikes', 'dislikes') \
            .withColumnRenamed('max_comment_count', 'comment_count') \
            .withColumnRenamed('min_trending_date', 'trending_date') \
            .withColumnRenamed('min_publish_time', 'publish_time') \
            .withColumnRenamed('max_likes', 'likes')

In [19]:
df_videos.count()

                                                                                

182317

### 5. Criando coluna interations

In [20]:
df_videos = df_videos.withColumn('interation', f.col('likes')+f.col('dislikes')+f.col('comment_count'))

In [21]:
df_videos.select('video_id', 'comment_count', 'likes', 'dislikes', 'interation').show(truncate=False)



+-----------+-------------+------+--------+----------+
|video_id   |comment_count|likes |dislikes|interation|
+-----------+-------------+------+--------+----------+
|-H90GPnH1q8|4907         |0     |0       |4907      |
|-mCPxSHIrPc|700          |7246  |840     |8786      |
|02QdxSLdVQc|2840         |16337 |240     |19417     |
|08-n4j46okM|215          |8030  |64      |8309      |
|08URtcZ8em0|461          |2962  |45      |3468      |
|0BbDgMYIiEU|438          |9501  |899     |10838     |
|0Bl7xVD7Xtg|159          |698   |25      |882       |
|0C6b6U9fz68|0            |0     |0       |0         |
|0IVFegsnfwI|92           |367   |46      |505       |
|0MhAoY5piug|137          |491   |104     |732       |
|0UTg35xNT1s|126          |101   |42      |269       |
|0YyNLjWjJ_c|19           |741   |23      |783       |
|0ZBz_1PCpOw|313          |1523  |52      |1888      |
|0bXCbVGb04A|47646        |200284|6154    |254084    |
|0vxqPjiOHnQ|10           |225   |1       |236       |
|11K013qpR



### 6. Criando coluna time_to_trends

In [22]:
df_videos = df_videos.withColumn('time_to_trends', f.col('trending_date').cast('long') - f.col('publish_time').cast('long'))

In [23]:
df_videos.select('video_id', 'trending_date', 'views', 'category_title', 'publish_time', 'time_to_trends').show(truncate=False)



+-----------+-------------------+-------+---------------+-------------------+--------------+
|video_id   |trending_date      |views  |category_title |publish_time       |time_to_trends|
+-----------+-------------------+-------+---------------+-------------------+--------------+
|-H90GPnH1q8|2017-12-12 00:00:00|968586 |Entertainment  |2017-12-11 03:40:15|73185         |
|-mCPxSHIrPc|2017-12-13 00:00:00|119872 |Entertainment  |2017-12-11 14:30:58|120542        |
|02QdxSLdVQc|2018-01-25 00:00:00|222149 |People & Blogs |2018-01-23 22:10:38|92962         |
|08-n4j46okM|2018-02-09 00:00:00|63894  |People & Blogs |2018-02-07 16:42:46|112634        |
|08URtcZ8em0|2018-05-15 00:00:00|28982  |Entertainment  |2018-05-13 14:56:59|118981        |
|0BbDgMYIiEU|2017-12-27 00:00:00|162866 |Entertainment  |2017-12-25 15:48:41|115879        |
|0Bl7xVD7Xtg|2018-01-25 00:00:00|8276   |Entertainment  |2018-01-24 01:00:02|82798         |
|0C6b6U9fz68|2018-01-06 00:00:00|26350  |Education      |2018-01-04 04

                                                                                

In [24]:
df_videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- category_title: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- interation: integer (nullable = true)
 |-- time_to_trends: long (nullable = true)



### 7. Criando Dataframe de Categoria

In [25]:
categorias_text = df_videos.select('category_title').distinct().collect()
ids = [(idx+1, row.category_title) for idx, row in enumerate(categorias_text)]
ids

                                                                                

[(1, 'Shows'),
 (2, 'Education'),
 (3, 'Gaming'),
 (4, 'Entertainment'),
 (5, 'Travel & Events'),
 (6, 'Science & Technology'),
 (7, 'Sports'),
 (8, 'Howto & Style'),
 (9, 'Nonprofits & Activism'),
 (10, 'Film & Animation'),
 (11, 'People & Blogs'),
 (12, 'News & Politics'),
 (13, 'Pets & Animals'),
 (14, 'Movies'),
 (15, 'Autos & Vehicles'),
 (16, 'Music'),
 (17, 'Comedy'),
 (18, 'Trailers')]

In [26]:
df_categorias = spark.createDataFrame(ids).toDF('id', 'category_name')


In [27]:
df_categorias.show()

+---+--------------------+
| id|       category_name|
+---+--------------------+
|  1|               Shows|
|  2|           Education|
|  3|              Gaming|
|  4|       Entertainment|
|  5|     Travel & Events|
|  6|Science & Technology|
|  7|              Sports|
|  8|       Howto & Style|
|  9|Nonprofits & Acti...|
| 10|    Film & Animation|
| 11|      People & Blogs|
| 12|     News & Politics|
| 13|      Pets & Animals|
| 14|              Movies|
| 15|    Autos & Vehicles|
| 16|               Music|
| 17|              Comedy|
| 18|            Trailers|
+---+--------------------+



### 8. Atualizando ID das categorias no DataFrame de Videos

In [28]:
df_videos = df_videos.join(df_categorias, f.col('category_title') == f.col('category_name')) \
                    .withColumnRenamed('id', 'category_id') \
                    .drop('category_title')

In [29]:
df_videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- trending_date: timestamp (nullable = true)
 |-- interation: integer (nullable = true)
 |-- time_to_trends: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_name: string (nullable = true)



### 9. Criar coluna qtd_view no DataFrame de Categorias

In [30]:
df_categorias = df_categorias.join( \
                        df_videos.groupBy('category_id').agg(f.sum('views').alias('qtd_views')), \
                        f.col('id') == f.col('category_id')) \
                    .drop('category_id')         

In [None]:
df_categorias.show()

[Stage 117:>(17 + 4) / 44][Stage 118:> (0 + 0) / 44][Stage 120:>  (0 + 0) / 4]

In [None]:
df_categorias.printSchema()

## Salvando dados no MongoDB

In [None]:
df_categorias.write.format("mongo").mode("overwrite").option("database","youtube_trending")\
            .option("collection", "categorias").save()

In [None]:
df_videos.write.format("mongo").mode("overwrite").option("database","youtube_trending") \
            .option("collection", "videos").save()