# Data Preprocessing from MongoDB Database 1 to MongoDB Database 2

In [1]:
from pyspark.sql import SparkSession

# stop the SparkSession created automatically (by the time the notebook is running, cannot change much in that session's configuration)
# [Ref]https://www.edureka.co/community/5268/how-to-change-the-spark-session-configuration-in-pyspark
spark.sparkContext.stop() 
# create a new SparkSession and connect to MongoDB database & collection
spark = SparkSession \
    .builder \
    .appName("YouTube Trending Videos Analysis and Prediction") \
    .config("spark.mongodb.input.uri", "mongodb+srv://gp15:MSBD5003gp15@cluster0.3ygtx.mongodb.net/Database0.US_videos") \
    .config("spark.mongodb.output.uri", "mongodb+srv://gp15:MSBD5003gp15@cluster0.qfnff.mongodb.net/Database0.US_pre") \
    .getOrCreate()

spark # check if sparksession created successfully

In [2]:
df = spark.read.format("mongo").load()

In [3]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- description: string (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- title: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- views: integer (nullable = true)



In [4]:
df = df.select('trending_date','title','channel_title',
               'category_id', 'publish_time','tags','views',
               'likes','dislikes','comment_count')

In [8]:
df.select('publish_time').show(truncate=False)

+------------------------+
|publish_time            |
+------------------------+
|2017-11-13T07:30:00.000Z|
|2017-11-12T18:01:41.000Z|
|2017-11-13T17:13:01.000Z|
|2017-11-12T05:37:17.000Z|
|2017-11-13T14:00:23.000Z|
|2017-11-13T13:45:16.000Z|
|2017-11-13T02:05:26.000Z|
|2017-11-13T03:00:00.000Z|
|2017-11-13T17:00:00.000Z|
|2017-11-12T14:00:00.000Z|
|2017-11-12T18:30:01.000Z|
|2017-11-13T20:09:58.000Z|
|2017-11-12T17:00:05.000Z|
|2017-11-13T19:07:23.000Z|
|2017-11-13T16:00:07.000Z|
|2017-11-13T11:00:04.000Z|
|2017-11-13T15:30:17.000Z|
|2017-11-12T22:00:01.000Z|
|2017-11-13T14:00:03.000Z|
|2017-11-12T15:00:01.000Z|
+------------------------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import col , column
cols=['category_id','views','likes','dislikes','comment_count']
for col in cols:
    df=df.withColumn('%s'%col ,df[col].cast('int'))
    
df

DataFrame[trending_date: string, title: string, channel_title: string, category_id: int, publish_time: string, tags: string, views: int, likes: int, dislikes: int, comment_count: int]

In [11]:
df.select('views','likes','dislikes','comment_count').describe().show()

+-------+------------------+-----------------+------------------+------------------+
|summary|             views|            likes|          dislikes|     comment_count|
+-------+------------------+-----------------+------------------+------------------+
|  count|             40949|            40949|             40949|             40949|
|   mean|2360784.6382573447| 74266.7024347359| 3711.400888910596| 8446.803682629612|
| stddev| 7394113.759703929|228885.3382094995|29029.705945001806|37430.486994379804|
|    min|               549|                0|                 0|                 0|
|    max|         225211923|          5613827|           1674420|           1361580|
+-------+------------------+-----------------+------------------+------------------+



In [12]:
df.select('trending_date','title','channel_title','category_id', 'publish_time','tags').describe().show()

+-------+-------------+--------------------+------------------------+-----------------+--------------------+------------------------+
|summary|trending_date|               title|           channel_title|      category_id|        publish_time|                    tags|
+-------+-------------+--------------------+------------------------+-----------------+--------------------+------------------------+
|  count|        40949|               40949|                   40949|            40949|               40949|                   40949|
|   mean|         null|               435.0|                    null|19.97242911914821|                null|                    null|
| stddev|         null|                 0.0|                    null|7.568326828280466|                null|                    null|
|    min|     17.01.12|#184 Making a PCB...|                 12 News|                1|2006-07-23T08:24:...|    #MeToo|"Grammys 2...|
|    max|     18.31.05|😱 $1,145 iPhone ...|영국남자 Korean Engli..

### remove null

In [13]:
from pyspark.sql.functions import isnull
# 查询某列为null的行数
cols = ['trending_date','title','channel_title',
        'category_id', 'publish_time','tags','views',
        'likes','dislikes','comment_count']
for col in cols:
    print(df.filter(isnull('%s'%col)).count())

0
0
0
0
0
0
0
0
0
0


In [14]:
df.dropna(thresh=10).count()

40949

In [15]:
# df1=df.dropna(thresh=10)
# df1.count()

### remove duplicate

In [17]:
# 1、删去完全重复的行
df2 = df.dropDuplicates() 
df2.count()

40901

In [18]:
# 2.删除某些关键字段值完全相同的记录，subset参数定义这些字段
df2 = df2.dropDuplicates(subset = 
                         [c for c in df2.columns if c in ['channel_title','likes', 'dislikes','views','comment_count']])
df2.count()

40898

### outlier

In [19]:
cols = ['views', 'likes', 'dislikes','comment_count']
bounds = {}

for col in cols:
    quantiles = df2.approxQuantile(
        col, [0.25, 0.75], 0.05
    )
    
    IQR = quantiles[1] - quantiles[0]
    
    bounds[col] = [
        quantiles[0] - 1.5 * IQR,
        quantiles[1] + 1.5 * IQR
    ]
bounds

{'views': [-1737579.5, 3485976.5],
 'likes': [-56319.0, 107129.0],
 'dislikes': [-1931.5, 3712.5],
 'comment_count': [-5664.0, 10944.0]}

### json

In [22]:
df2.select('category_id','channel_title').show(truncate=False)

+-----------+---------------------+
|category_id|channel_title        |
+-----------+---------------------+
|25         |CBS New York         |
|24         |Looper               |
|26         |First We Feast       |
|23         |Smosh                |
|10         |carrieunderwoodVEVO  |
|24         |RM Videos            |
|23         |jacksfilms           |
|22         |FOX Soccer           |
|26         |Clevver Style        |
|24         |TheEllenShow         |
|15         |AntsCanada           |
|10         |Alan Walker          |
|10         |DopeBoyTroy          |
|28         |MinuteEarth          |
|15         |camelsandfriends     |
|24         |KTVU                 |
|24         |Warner Bros. Pictures|
|24         |Gibi ASMR            |
|17         |Philadelphia 76ers   |
|10         |R3HAB                |
+-----------+---------------------+
only showing top 20 rows



In [38]:
j = spark.read.format("mongo").option("uri", "mongodb+srv://gp15:MSBD5003gp15@cluster0.3ygtx.mongodb.net/Database0.US_videos_cat").load()

In [39]:
# j = spark.read.json('./US_category_id.json')

In [40]:
from pyspark.sql import functions as F
# j = spark.read.option("multiLine","true").json('./US_category_id.json')

# Explode Array to Structure
explodej = j.withColumn('Exp_RESULTS',F.explode(F.col('items'))).drop('items')

# Read location and name
dfj = explodej.select("Exp_RESULTS.snippet.title",'Exp_RESULTS.id')
dfj.show(truncate=False)

+---------------------+---+
|title                |id |
+---------------------+---+
|Film & Animation     |1  |
|Autos & Vehicles     |2  |
|Music                |10 |
|Pets & Animals       |15 |
|Sports               |17 |
|Short Movies         |18 |
|Travel & Events      |19 |
|Gaming               |20 |
|Videoblogging        |21 |
|People & Blogs       |22 |
|Comedy               |23 |
|Entertainment        |24 |
|News & Politics      |25 |
|Howto & Style        |26 |
|Education            |27 |
|Science & Technology |28 |
|Nonprofits & Activism|29 |
|Movies               |30 |
|Anime/Animation      |31 |
|Action/Adventure     |32 |
+---------------------+---+
only showing top 20 rows



In [41]:
dfj=dfj.withColumnRenamed('id','category_id')
dfj=dfj.withColumnRenamed('title','category_title')     # 将title重命名为category_title

In [42]:
dfj.show()

+--------------------+-----------+
|      category_title|category_id|
+--------------------+-----------+
|    Film & Animation|          1|
|    Autos & Vehicles|          2|
|               Music|         10|
|      Pets & Animals|         15|
|              Sports|         17|
|        Short Movies|         18|
|     Travel & Events|         19|
|              Gaming|         20|
|       Videoblogging|         21|
|      People & Blogs|         22|
|              Comedy|         23|
|       Entertainment|         24|
|     News & Politics|         25|
|       Howto & Style|         26|
|           Education|         27|
|Science & Technology|         28|
|Nonprofits & Acti...|         29|
|              Movies|         30|
|     Anime/Animation|         31|
|    Action/Adventure|         32|
+--------------------+-----------+
only showing top 20 rows



In [43]:
df3=df2.join(dfj, 'category_id', "left_outer")

In [44]:
df3.select('category_id','channel_title','title','category_title').show()

+-----------+--------------------+--------------------+--------------------+
|category_id|       channel_title|               title|      category_title|
+-----------+--------------------+--------------------+--------------------+
|         28|      Dennis Kapatos|01/24/2018 - Falc...|Science & Technology|
|         28|     Because Science|The Most Toxic Ki...|Science & Technology|
|         28|Hydraulic Press C...|Crushing and Slic...|Science & Technology|
|         28|         MinuteEarth|Why You Shouldn't...|Science & Technology|
|         28|            PBS Eons|How the Squid Los...|Science & Technology|
|         28|         AsapSCIENCE|What If Your Airp...|Science & Technology|
|         28|  CrazyRussianHacker|6 Cheese Gadgets ...|Science & Technology|
|         28|       the Hacksmith|Make it Real: HUL...|Science & Technology|
|         28|           The Verge|Android P first look|Science & Technology|
|         28|         MinuteEarth|Milk Is Just Filt...|Science & Technology|

In [45]:
df3.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- category_title: string (nullable = true)



In [46]:
df3.write.format("mongo").mode("overwrite").option("database", "Database0").option("collection", "US_pre").save()