Author: KMonzella <br>
Date: 2021/11/06 <br>
Subject: Data Cleaning in Spark <br>
Overview: In our 02 program, we pulled our YouTube data from links generated in earlier programs. We are now going to perform preliminary data cleaning on the raw files and output clean analysis files with all constructs except some NLP-based constructs which will be generated separately.
    

## Step 1. Program setup

In [1]:
import pandas                as pd
import numpy                 as np
import matplotlib.pyplot     as plt
import seaborn               as sns
from   scipy                 import stats
from pyspark.sql.functions   import *
import pyspark.sql.functions as f
from pyspark.sql.types       import IntegerType,BooleanType,DateType
from pyspark.sql.functions   import rank, col,approxCountDistinct, countDistinct

#change configuration settings on Spark 
spark = SparkSession.builder.appName('SparkBasics').getOrCreate()

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '200g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '8'), ('spark.cores.max', '8'), ('spark.driver.memory','30g')])

#print spark configuration settings
sc   = spark.sparkContext
spark

## Step 2: Read in CSV input file

In [2]:
%time df = spark.read.format("org.apache.spark.csv").option("multiline", "true").csv("/user/kmonzella/data/youtube_raw_20211120.csv", inferSchema=True, header=True, quote='\"', sep = ",", escape='"')
df.printSchema() 


CPU times: user 8.65 ms, sys: 3.04 ms, total: 11.7 ms
Wall time: 36.8 s
root
 |-- _c0: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- watch_url: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- metadata: string (nullable = true)
 |-- stream_info: string (nullable = true)
 |-- age_restricted: string (nullable = true)
 |-- length: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- title: string (nullable = true)
 |-- views: string (nullable = true)
 |-- transcript: string (nullable = true)
 |-- language_detected: string (nullable = true)



In [3]:
# check what data look like
df.show(3)

+---+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------+--------------------+--------------+------+------+--------------------+-----+--------------------+-----------------+
|_c0|              author|             channel|         description|publish_date|           watch_url|            keywords|metadata|         stream_info|age_restricted|length|rating|               title|views|          transcript|language_detected|
+---+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------+--------------------+--------------+------+------+--------------------+-----+--------------------+-----------------+
|  0|What Does That Mean?|UCS3RXL1ICt42KV-E...|What does proleta...|    20150105|https://youtube.c...|proletarianism|de...|      []|[<Stream: itag="1...|         FALSE|    59|     1|What does proleta...|  174|What does proleta...|               en|
|  1

In [4]:
# total record count
df.count()

411829

## Step 3: Preliminary cleaning

### A. Check for duplicates

In [5]:
df = df.dropDuplicates(['watch_url'])

In [6]:
df.count()

376842

### B. Check for other missing data

In [7]:
# check missingness
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+-------+-----------+------------+---------+--------+--------+-----------+--------------+------+------+-----+-----+----------+-----------------+
|_c0|author|channel|description|publish_date|watch_url|keywords|metadata|stream_info|age_restricted|length|rating|title|views|transcript|language_detected|
+---+------+-------+-----------+------------+---------+--------+--------+-----------+--------------+------+------+-----+-----+----------+-----------------+
|  0|     6|      1|      33952|           1|        1|   93444|       6|          7|             8|     9|    10|   13|   12|        33|                0|
+---+------+-------+-----------+------------+---------+--------+--------+-----------+--------------+------+------+-----+-----+----------+-----------------+



In [None]:
# there are the overflow from the cut off transcripts. As long as this is low prevalence, drop
percent_nonmissing = df.filter(df.watch_url.startswith('https://youtube.com/watch?v=')).count() * 100 / df.count()
print(percent_nonmissing)

df1 = df.filter(df.watch_url.startswith('https://youtube.com/watch?v='))


In [9]:
# recheck missingness
df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns]).show()

+---+------+-------+-----------+------------+---------+--------+--------+-----------+--------------+------+------+-----+-----+----------+-----------------+
|_c0|author|channel|description|publish_date|watch_url|keywords|metadata|stream_info|age_restricted|length|rating|title|views|transcript|language_detected|
+---+------+-------+-----------+------------+---------+--------+--------+-----------+--------------+------+------+-----+-----+----------+-----------------+
|  0|     5|      0|      33951|           0|        0|   93440|       1|          1|             1|     1|     1|    2|    1|        22|                0|
+---+------+-------+-----------+------------+---------+--------+--------+-----------+--------------+------+------+-----+-----+----------+-----------------+



In [10]:
# address other missingness - transcripts

df1 = df1.withColumn("transcript", f.when(col("transcript")=='No transcript available', None).otherwise(col("transcript")))


In [11]:
df1.select("transcript").show()

+--------------------+
|          transcript|
+--------------------+
|                null|
|hey what's up guy...|
|Oh hey guys move ...|
|                null|
|[Music] [Music] [...|
|should I there gu...|
|                null|
|                null|
|[Music] welcome t...|
|                null|
|the last couple o...|
|hey everyone welc...|
|[Music] hello eve...|
|hey what's up you...|
|EMCEE: And now fo...|
|well we're going ...|
|                null|
|hey this is tom a...|
|or the table mate...|
|                null|
+--------------------+
only showing top 20 rows



### C. Check and change data types

In [12]:
# --- numeric

cols = ["length", "rating", "views"]
for col in cols:
    df1 = df1.withColumn(
            col,
            f.col(col).cast("double"))


In [13]:
# --- dates

df1 = df1.withColumn("publish_date_new", to_date("publish_date", "yyyyMMdd"))

print(df1.select(["publish_date", "publish_date_new"]).show(10)) 

+------------+----------------+
|publish_date|publish_date_new|
+------------+----------------+
|    20170207|      2017-02-07|
|    20210906|      2021-09-06|
|    20110916|      2011-09-16|
|    20211001|      2021-10-01|
|    20211109|      2021-11-09|
|    20141109|      2014-11-09|
|    20121211|      2012-12-11|
|    20170114|      2017-01-14|
|    20210319|      2021-03-19|
|    20180920|      2018-09-20|
+------------+----------------+
only showing top 10 rows

None


In [14]:
df1 = df1.drop("publish_date")

In [15]:
# --- booleans

df1 = df1.withColumn("age_restricted",  df1.age_restricted.cast(BooleanType()))

## Step 4: Create new variables

### A. Indicators of whether there is a song in the video

In [16]:
df1 = df1.withColumn("has_music", f.when(f.col("metadata").contains('Song'), 1.0).otherwise(0.0))

In [17]:
df1.filter("has_music==1").select(["has_music", "metadata"]).show(10)
df1.filter("has_music==0").select(["has_music", "metadata"]).show(10)

+---------+--------------------+
|has_music|            metadata|
+---------+--------------------+
|      1.0|[{"Song": "Broad ...|
|      1.0|[{"Song": "Dead H...|
|      1.0|[{"Song": "First ...|
|      1.0|[{"Song": "Africa...|
|      1.0|[{"Song": "Runnin...|
|      1.0|[{"Song": "19th F...|
|      1.0|[{"Song": "Chills...|
|      1.0|[{"Song": "Ankara...|
|      1.0|[{"Song": "My Swe...|
|      1.0|[{"Song": "Half E...|
+---------+--------------------+
only showing top 10 rows

+---------+--------+
|has_music|metadata|
+---------+--------+
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
|      0.0|      []|
+---------+--------+
only showing top 10 rows



In [18]:
# transcript has sound effects
df1 = df1.withColumn("has_sounds", f.when(f.col("transcript").contains('[Music]'), 1.0).otherwise(0.0))
df1.filter("has_sounds==1").select(["has_sounds", "transcript"]).show(10)
df1.filter("has_sounds==0").select(["has_sounds", "transcript"]).show(10)

+----------+--------------------+
|has_sounds|          transcript|
+----------+--------------------+
|       1.0|hey what's up guy...|
|       1.0|[Music] [Music] [...|
|       1.0|[Music] welcome t...|
|       1.0|the last couple o...|
|       1.0|hey everyone welc...|
|       1.0|[Music] hello eve...|
|       1.0|you go aiden you'...|
|       1.0|[Music] welcome b...|
|       1.0|[Music] do [Music...|
|       1.0|game one on the r...|
+----------+--------------------+
only showing top 10 rows

+----------+--------------------+
|has_sounds|          transcript|
+----------+--------------------+
|       0.0|                null|
|       0.0|Oh hey guys move ...|
|       0.0|                null|
|       0.0|should I there gu...|
|       0.0|                null|
|       0.0|                null|
|       0.0|                null|
|       0.0|hey what's up you...|
|       0.0|EMCEE: And now fo...|
|       0.0|well we're going ...|
+----------+--------------------+
only showing top 10 ro

### B. Indicators related to streaming quality

In [19]:
# not needed at this time

### C. Indicators related to duration - binary

In [20]:
df1 = df1.withColumn("duration_long",   f.when(f.col('length') >= 1800, 1).otherwise(0))
df1 = df1.withColumn("duration_med",    f.when((f.col('length') < 1800) & (f.col('length')>=600), 1).otherwise(0))
df1 = df1.withColumn("duration_short",  f.when(f.col('length') < 600,   1).otherwise(0))

In [21]:
df1.filter("duration_long==1").select(["length", "duration_long", "duration_med", "duration_short"]).show()
df1.filter("duration_long==0").select(["length", "duration_long", "duration_med", "duration_short"]).show()

+-------+-------------+------------+--------------+
| length|duration_long|duration_med|duration_short|
+-------+-------------+------------+--------------+
| 3245.0|            1|           0|             0|
| 7754.0|            1|           0|             0|
| 7211.0|            1|           0|             0|
| 4827.0|            1|           0|             0|
|19903.0|            1|           0|             0|
| 5708.0|            1|           0|             0|
| 5558.0|            1|           0|             0|
| 2108.0|            1|           0|             0|
| 6177.0|            1|           0|             0|
| 5550.0|            1|           0|             0|
| 3113.0|            1|           0|             0|
| 5182.0|            1|           0|             0|
| 3685.0|            1|           0|             0|
| 2664.0|            1|           0|             0|
| 2886.0|            1|           0|             0|
|68199.0|            1|           0|             0|
| 3539.0|   

### E. Keywords

In [22]:
# count number of keywords - perhaps more search terms helps you get more views
df1 = df1.withColumn('n_keywords', f.size(f.split(f.col("keywords"), r"\|")) + 1)

In [23]:
df1.select(["keywords", "n_keywords"]).show()

+--------------------+----------+
|            keywords|n_keywords|
+--------------------+----------+
|                null|         0|
|paranormal activi...|         9|
|undulation|belly ...|        45|
|Goodzik|Removal o...|        25|
|Mopar 1968 1969 1...|         2|
|                null|         0|
|Johnny Conga|Cong...|         9|
|What is Optics|ex...|        17|
|                null|         0|
|                null|         0|
|speed ramping tut...|        20|
|saul|saul lopez|s...|        24|
|how to pronounce ...|         9|
|AndyHafell|make m...|        19|
|lifetime|mylifeti...|        30|
|                null|         0|
|affair|movies|PRI...|        32|
|TSLA|TESLA|TESLA ...|        13|
|QVC|V36076|For th...|         6|
|#RHCHEMISTRY|#RH|...|        23|
+--------------------+----------+
only showing top 20 rows



### G. Dates

In [24]:
# --- post covid 

# create a covid inicator. Assume official start of COVID-19 pandemic as March 27, 2020 
# this is the date the CARES act was implemented
df1 = df1.withColumn('post_covid', f.when(f.col('publish_date_new') > "2020-03-26", 1).otherwise(0))
df1.select(["post_covid", "publish_date_new"]).show()

+----------+----------------+
|post_covid|publish_date_new|
+----------+----------------+
|         0|      2017-02-07|
|         1|      2021-09-06|
|         0|      2011-09-16|
|         1|      2021-10-01|
|         1|      2021-11-09|
|         0|      2014-11-09|
|         0|      2012-12-11|
|         0|      2017-01-14|
|         1|      2021-03-19|
|         0|      2018-09-20|
|         0|      2018-09-04|
|         1|      2020-07-21|
|         1|      2020-11-01|
|         1|      2021-10-27|
|         0|      2018-07-15|
|         0|      2018-01-08|
|         0|      2015-02-07|
|         1|      2021-10-12|
|         0|      2019-02-08|
|         1|      2021-11-03|
+----------+----------------+
only showing top 20 rows



### H. Author

In [25]:
from pyspark.sql import Window
w = Window.partitionBy('author')
df1 = df1.withColumn('n_posts_by_author', f.count('author').over(w))
df1.agg(f.min(f.col("n_posts_by_author")),  f.max(f.col("n_posts_by_author"))).show()

+----------------------+----------------------+
|min(n_posts_by_author)|max(n_posts_by_author)|
+----------------------+----------------------+
|                     0|                   630|
+----------------------+----------------------+



In [26]:
w = Window.partitionBy('channel')
df1 = df1.withColumn('n_posts_by_channel', f.count('channel').over(w))
df1.agg(f.min(f.col("n_posts_by_channel")),  f.max(f.col("n_posts_by_channel"))).show()

+-----------------------+-----------------------+
|min(n_posts_by_channel)|max(n_posts_by_channel)|
+-----------------------+-----------------------+
|                      1|                    627|
+-----------------------+-----------------------+



In [27]:
df1.sort("author").select(["author", "n_posts_by_author"]).show()

+--------------------+-----------------+
|              author|n_posts_by_author|
+--------------------+-----------------+
|                null|                0|
|                null|                0|
|                null|                0|
|                null|                0|
|                null|                0|
|       ! Hadookaan !|                1|
|"M��nnerabend - D...|                1|
|"The Voodoo King"...|                4|
|"The Voodoo King"...|                4|
|"The Voodoo King"...|                4|
|"The Voodoo King"...|                4|
|"You are First" b...|                1|
|      # Chen Jialing|                1|
|            # DUBBIC|                1|
| # Don���t Trust Bro|                1|
|# UMESH RAUT | VA...|                1|
|             # eRRoR|                1|
|    #1 Marmaduke Fan|                2|
|    #1 Marmaduke Fan|                2|
| #1 in ESL Practice!|                1|
+--------------------+-----------------+
only showing top

## Step 5: Transformations to outcome variable

In [28]:
df1 = df1.withColumn('views_high', f.when(f.col('views') > 10000, 1).otherwise(0))
df1.select(["views_high", "views"]).show()

+----------+---------+
|views_high|    views|
+----------+---------+
|         0|      1.0|
|         1|  70211.0|
|         1|  13210.0|
|         1| 117918.0|
|         0|    262.0|
|         0|     15.0|
|         0|   2369.0|
|         1|  60554.0|
|         0|     55.0|
|         0|     11.0|
|         0|    493.0|
|         0|   8467.0|
|         0|      3.0|
|         0|      4.0|
|         1|4741684.0|
|         0|   6001.0|
|         0|   1909.0|
|         1| 121606.0|
|         0|   1499.0|
|         1|  49239.0|
+----------+---------+
only showing top 20 rows



In [29]:
# categorical views - high medium low
df1 = df1.withColumn("views_cat", f.when(f.col('views') > 10000, 3).otherwise(
     f.when(f.col("views") > 1000, 2).otherwise(1)))
    
df1.filter("views_cat==3").select(["views_cat", "views"]).show(10)
df1.filter("views_cat==2").select(["views_cat", "views"]).show(10)
df1.filter("views_cat==1").select(["views_cat", "views"]).show(10)

+---------+---------+
|views_cat|    views|
+---------+---------+
|        3|  70211.0|
|        3|  13210.0|
|        3| 117918.0|
|        3|  60554.0|
|        3|4741684.0|
|        3| 121606.0|
|        3|  49239.0|
|        3|  22499.0|
|        3| 104235.0|
|        3| 333510.0|
+---------+---------+
only showing top 10 rows

+---------+------+
|views_cat| views|
+---------+------+
|        2|2369.0|
|        2|8467.0|
|        2|6001.0|
|        2|1909.0|
|        2|1499.0|
|        2|1671.0|
|        2|4063.0|
|        2|4696.0|
|        2|5005.0|
|        2|1885.0|
+---------+------+
only showing top 10 rows

+---------+-----+
|views_cat|views|
+---------+-----+
|        1|  1.0|
|        1|262.0|
|        1| 15.0|
|        1| 55.0|
|        1| 11.0|
|        1|493.0|
|        1|  3.0|
|        1|  4.0|
|        1|  7.0|
|        1| 73.0|
+---------+-----+
only showing top 10 rows



In [30]:
# check min, max, median, and average values for references
df1.agg(f.min(f.col("views")), f.max(f.col("views")), f.avg(f.col("views"))).show()

+----------+-------------+------------------+
|min(views)|   max(views)|        avg(views)|
+----------+-------------+------------------+
|       0.0|7.606261432E9|2938761.1831926527|
+----------+-------------+------------------+



## Step 6: Output df 

In [31]:
df1.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- description: string (nullable = true)
 |-- watch_url: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- metadata: string (nullable = true)
 |-- stream_info: string (nullable = true)
 |-- age_restricted: boolean (nullable = true)
 |-- length: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- views: double (nullable = true)
 |-- transcript: string (nullable = true)
 |-- language_detected: string (nullable = true)
 |-- publish_date_new: date (nullable = true)
 |-- has_music: double (nullable = false)
 |-- has_sounds: double (nullable = false)
 |-- duration_long: integer (nullable = false)
 |-- duration_med: integer (nullable = false)
 |-- duration_short: integer (nullable = false)
 |-- n_keywords: integer (nullable = false)
 |-- post_covid: integer (nullable = false)
 |-- n_posts_by_author: lon

In [32]:
%time df1.coalesce(1).write.mode('overwrite').option('header','true').csv('/user/kmonzella/data/youtube_clean.csv')

#hdfs dfs -copyToLocal /user/kmonzella/data/youtube_clean.csv ~/data/

CPU times: user 14.2 ms, sys: 5.94 ms, total: 20.1 ms
Wall time: 2min 20s
