<a href="https://colab.research.google.com/github/harenlin/PySpark-Learning/blob/main/MDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MDD').getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


# Spark's Immutability

Before we get started, let's first take a moment to discuss the concept of Sparks Immutability. Spark DataFrames are immutable. What does that mean? Let's take a look at an example.

In [None]:
names = spark.createDataFrame([('Haren', 'Lin'), ('Watson', 'Wang')], ['first_name', 'last_name'])
print(names.show())
print(names.rdd.id())

+----------+---------+
|first_name|last_name|
+----------+---------+
|     Haren|      Lin|
|    Watson|     Wang|
+----------+---------+

None
167


In [None]:
# add a col
from pyspark.sql.functions import *
names = names.select(names.first_name, names.last_name, concat_ws(' ', names.first_name, names.last_name).alias('full_name'))
print(names.show())
print(names.rdd.id())

+----------+---------+-----------+
|first_name|last_name|  full_name|
+----------+---------+-----------+
|     Haren|      Lin|  Haren Lin|
|    Watson|     Wang|Watson Wang|
+----------+---------+-----------+

None
173


In [None]:
from google.colab import drive
drive.mount('/content/drive')

path = '/content/drive/My Drive/PySpark/Datasets/'
videos = spark.read.csv(path + 'youtubevideos.csv', inferSchema=True, header=True)
# data source: https://www.kaggle.com/datasnaek/youtube-new#USvideos.csv

Mounted at /content/drive


In [None]:
print(videos.printSchema())

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


In [None]:
videos.show(5)

+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|   video_id|trending_date|               title|       channel_title|category_id|        publish_time|                tags|  views| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|
+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|2kyS6SvSYSE|     17.14.11|WE WANT TO TALK A...|        CaseyNeistat|         22|2017-11-13T17:13:...|     SHANtell martin| 748374| 57527|    2966|        15954|https://i.ytimg.c...|            False|           Fal

# Manipulate Data Types


In [None]:
# Notice all vars are strings above....
from pyspark.sql.functions import * 
from pyspark.sql.types import *

# type casting 
df = videos.withColumn("views", videos["views"].cast(IntegerType())) \
           .withColumn("likes", videos["likes"].cast(IntegerType())) \
           .withColumn("dislikes", videos["dislikes"].cast(IntegerType())) \
           .withColumn("trending_date", to_date(videos.trending_date, 'dd.mm.yy')) \
#          .withColumn("publish_time", to_timestamp(videos.publish_time, 'yyyy-MM-dd HH:mm:ss:ms'))
print(df.printSchema())
df.limit(4).toPandas()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2011-01-17,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2011-01-17,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,2011-01-17,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,2011-01-17,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


In [None]:
df.show(4)

+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|   video_id|trending_date|               title|       channel_title|category_id|        publish_time|                tags|  views| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|
+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|2kyS6SvSYSE|   2011-01-17|WE WANT TO TALK A...|        CaseyNeistat|         22|2017-11-13T17:13:...|     SHANtell martin| 748374| 57527|    2966|        15954|https://i.ytimg.c...|            False|           Fal

In [None]:
# Simple Rename
renamed = df.withColumnRenamed('channel_title', 'channel_title_new')
renamed.show(5)

+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|   video_id|trending_date|               title|   channel_title_new|category_id|        publish_time|                tags|  views| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|
+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|2kyS6SvSYSE|   2011-01-17|WE WANT TO TALK A...|        CaseyNeistat|         22|2017-11-13T17:13:...|     SHANtell martin| 748374| 57527|    2966|        15954|https://i.ytimg.c...|            False|           Fal

**Clean Data**

Alright so we see that the publish_time variable could not be converted to a timestamp becuase it has those strange "T" and "Z" values between the date and the time. We essentially need to replace the "T" value with a space, and the Z value with nothing. There are a couple of ways we can do this, the first is regex which is short for regular expressions. 

**Regex**

Regex is used to replace or extract all substrings of the specified string value that match regexp with repetition.

The syntax here is: regexp_replace(*str, pattern, replacement*)

Regex is NOT super intuitive, so if you need a refresher on regex calls visit: 
 - https://www.whoishostingthis.com/resources/regex/
 - https://docs.oracle.com/cd/B19306_01/server.102/b14200/ap_posix001.htm#BABJDBHB

In [None]:
from pyspark.sql.functions import regexp_replace#, regexp_extract

df = df.withColumn('publish_time_2', regexp_replace(df.publish_time, 'T', ' '))
df = df.withColumn('publish_time_2', regexp_replace(df.publish_time_2, 'Z', ''))
df = df.withColumn("publish_time_3", to_timestamp(df.publish_time_2, 'yyyy-MM-dd HH:mm:ss.SSS'))
# print(df.printSchema())
df.select("publish_time", "publish_time_2", "publish_time_3").show(5)
# Notice the .000 on the end of publish_time_new as opposed to publish_time_new_t

+--------------------+--------------------+-------------------+
|        publish_time|      publish_time_2|     publish_time_3|
+--------------------+--------------------+-------------------+
|2017-11-13T17:13:...|2017-11-13 17:13:...|2017-11-13 17:13:01|
|2017-11-13T07:30:...|2017-11-13 07:30:...|2017-11-13 07:30:00|
|2017-11-12T19:05:...|2017-11-12 19:05:...|2017-11-12 19:05:24|
|2017-11-13T11:00:...|2017-11-13 11:00:...|2017-11-13 11:00:04|
|2017-11-12T18:01:...|2017-11-12 18:01:...|2017-11-12 18:01:41|
+--------------------+--------------------+-------------------+
only showing top 5 rows



**Translate Function**

You could also use the Translate function here to do this, where the first set of values is what you are looking for and the second set is what you want to replace those values with respectively. 

In [None]:
import pyspark.sql.functions as F
df.select("publish_time", F.translate(F.col("publish_time"), "TZ", " ").alias("translate_func_time")).show(5,False)

+------------------------+-----------------------+
|publish_time            |translate_func_time    |
+------------------------+-----------------------+
|2017-11-13T17:13:01.000Z|2017-11-13 17:13:01.000|
|2017-11-13T07:30:00.000Z|2017-11-13 07:30:00.000|
|2017-11-12T19:05:24.000Z|2017-11-12 19:05:24.000|
|2017-11-13T11:00:04.000Z|2017-11-13 11:00:04.000|
|2017-11-12T18:01:41.000Z|2017-11-12 18:01:41.000|
+------------------------+-----------------------+
only showing top 5 rows



**Trim**

One common function you've probably seen in almost any data processing tool including excel is the "trim" function which removes leading and trailing white space from a cell in various ways. Let's go ahead and do that with the title field.

In [None]:
# Trim
# pyspark.sql.functions.trim(col) - Trim the spaces from both ends for the specified string column.
from pyspark.sql.functions import *

df = df.withColumn('title', trim(df.title)) # or rtrim/ltrim
df.select("title").show(5)

+--------------------+
|               title|
+--------------------+
|WE WANT TO TALK A...|
|The Trump Preside...|
|Racist Superman |...|
|Nickelback Lyrics...|
|I Dare You: GOING...|
+--------------------+
only showing top 5 rows



In [None]:
trim_ex = spark.createDataFrame([(' 2015-04-08 ',' 2015-05-10 ')], ['d1', 'd2']) # create a dataframe - notice the extra whitespaces in the date strings
trim_ex.show()
print("left trim")
trim_ex.select('d1', ltrim(trim_ex.d1)).show()
print("right trim")
trim_ex.select('d1', rtrim(trim_ex.d1)).show()
print("trim")
trim_ex.select('d1', trim(trim_ex.d1)).show()

+------------+------------+
|          d1|          d2|
+------------+------------+
| 2015-04-08 | 2015-05-10 |
+------------+------------+

left trim
+------------+-----------+
|          d1|  ltrim(d1)|
+------------+-----------+
| 2015-04-08 |2015-04-08 |
+------------+-----------+

right trim
+------------+-----------+
|          d1|  rtrim(d1)|
+------------+-----------+
| 2015-04-08 | 2015-04-08|
+------------+-----------+

trim
+------------+----------+
|          d1|  trim(d1)|
+------------+----------+
| 2015-04-08 |2015-04-08|
+------------+----------+



In [None]:
# lower
df = df.withColumn('title', lower(df.title))
df.select('title').show(5)

+--------------------+
|               title|
+--------------------+
|we want to talk a...|
|the trump preside...|
|racist superman |...|
|nickelback lyrics...|
|i dare you: going...|
+--------------------+
only showing top 5 rows



# Case When

We can also use the classic sql "case when" clause to recode values. Let's say we wanted to create a categorical variable that told if the video had more likes than dislikes and visa versa.

In [None]:
print("Option#1: select or withColumn() using when-otherwise")
from pyspark.sql.functions import when
df.select("likes", "dislikes", (when(df.likes > df.dislikes, 'Good').when(df.likes < df.dislikes, 'Bad').otherwise('Undetermined')).alias("Favorability")).show(3)

Option#1: select or withColumn() using when-otherwise
+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
+------+--------+------------+
only showing top 3 rows



In [None]:
print("Option#2: select or withColumn() using expr function")
from pyspark.sql.functions import expr 
df.select("likes", "dislikes",
          expr("CASE WHEN likes > dislikes THEN 'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability")).show(3)

Option#2: select or withColumn() using expr function
+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
+------+--------+------------+
only showing top 3 rows



In [None]:
print("Option#3: selectExpr() using SQL equivalent CASE expression")
df.selectExpr("likes", "dislikes", "CASE WHEN likes > dislikes THEN  'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability").show(3)

Option#3: selectExpr() using SQL equivalent CASE expression
+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
+------+--------+------------+
only showing top 3 rows



In [None]:
print("Option#1: select or withColumn() using when-otherwise")
from pyspark.sql.functions import when
df.select("likes", "dislikes", (when(df.likes > df.dislikes, 'Good').when(df.likes < df.dislikes, 'Bad').otherwise('Undetermined')).alias("Favorability")).show(5)

print("Option#2: select or withColumn() using expr function")
from pyspark.sql.functions import expr 
df.select("likes", "dislikes", expr("CASE WHEN likes > dislikes THEN 'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability")).show(5)

print("Option#3: selectExpr() using SQL equivalent CASE expression")
df.selectExpr("likes", "dislikes", "CASE WHEN likes > dislikes THEN  'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability").show(5)

Option#1: select or withColumn() using when-otherwise
+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows

Option#2: select or withColumn() using expr function
+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows

Option#3: selectExpr() using SQL equivalent CASE expression
+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|

**Concatenate**

If you want to combine two variables together (given a separator) you can use the concatenate method. Let's say we wanted to combined all the text description variables of the videos here for a robust NLP exercise of some sort and we needed to have all the text in one colum to do that like this.

    concat_ws(sep, *cols)

In [None]:
df.select(df.title, df.channel_title).show(5)

+--------------------+--------------------+--------------------+
|               title|       channel_title|                tags|
+--------------------+--------------------+--------------------+
|we want to talk a...|        CaseyNeistat|     SHANtell martin|
|the trump preside...|     LastWeekTonight|"last week tonigh...|
|racist superman |...|        Rudy Mancuso|"racist superman"...|
|nickelback lyrics...|Good Mythical Mor...|"rhett and link"|...|
|i dare you: going...|            nigahiga|"ryan"|"higa"|"hi...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import concat_ws # concat with separator
df.select( concat_ws(' || ', df.title, df.channel_title).alias('text') ).show(5,False)

+---------------------------------------------------------------------------------+
|text                                                                             |
+---------------------------------------------------------------------------------+
|we want to talk about our marriage || CaseyNeistat                               |
|the trump presidency: last week tonight with john oliver (hbo) || LastWeekTonight|
|racist superman | rudy mancuso, king bach & lele pons || Rudy Mancuso            |
|nickelback lyrics: real or fake? || Good Mythical Morning                        |
|i dare you: going bald!? || nigahiga                                             |
+---------------------------------------------------------------------------------+
only showing top 5 rows



**Extracting data from Date and Timestamp variables**

If you have the need to extract say the year or month from a date field, you can use PySpark's SQL function library like this. 

Note with this analysis we stumbled apon a date conversion descrepancy here. I'll leave fixing that for a hw problem!

In [None]:
from pyspark.sql.functions import year, month
# Other options: dayofmonth, dayofweek, dayofyear, weekofyear
df.select("trending_date", year("trending_date"), month("trending_date")).show(5)

+-------------+-------------------+--------------------+
|trending_date|year(trending_date)|month(trending_date)|
+-------------+-------------------+--------------------+
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
+-------------+-------------------+--------------------+
only showing top 5 rows



**Calculate the Difference between two dates**

If you want to calculate the time difference between two dates, you could use PySparks datediff function which returns the number of days from start to end.

    datediff(end, start)

In [None]:
from pyspark.sql.functions import datediff
df.select("trending_date", "publish_time_3", (datediff(df.trending_date,df.publish_time_3)/365).alias('diff')).show(5)

+-------------+-------------------+-------------------+
|trending_date|     publish_time_3|               diff|
+-------------+-------------------+-------------------+
|   2011-01-17|2017-11-13 17:13:01|-6.8273972602739725|
|   2011-01-17|2017-11-13 07:30:00|-6.8273972602739725|
|   2011-01-17|2017-11-12 19:05:24| -6.824657534246575|
|   2011-01-17|2017-11-13 11:00:04|-6.8273972602739725|
|   2011-01-17|2017-11-12 18:01:41| -6.824657534246575|
+-------------+-------------------+-------------------+
only showing top 5 rows



In [None]:
# Split a string around pattern (pattern is a regular expression).
from pyspark.sql.functions import split

df.select("title").show(5)
df.select(split(df.title, ' ').alias('new')).show(5)

+--------------------+
|               title|
+--------------------+
|we want to talk a...|
|the trump preside...|
|racist superman |...|
|nickelback lyrics...|
|i dare you: going...|
+--------------------+
only showing top 5 rows

+--------------------+
|                 new|
+--------------------+
|[we, want, to, ta...|
|[the, trump, pres...|
|[racist, superman...|
|[nickelback, lyri...|
|[i, dare, you:, g...|
+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import *
array_df = df.select("title", split(df.title, ' ').alias('title_array'))
array_df.select("title", array_contains(array_df.title_array, "marriage")).show(5)

# get rid of repeat values
array_df.select(array_distinct(array_df.title_array)).show(5)

# remove certian values
array_df.select(array_remove(array_df.title_array, "we")).show(5)

+--------------------+-------------------------------------+
|               title|array_contains(title_array, marriage)|
+--------------------+-------------------------------------+
|we want to talk a...|                                 true|
|the trump preside...|                                false|
|racist superman |...|                                false|
|nickelback lyrics...|                                false|
|i dare you: going...|                                false|
+--------------------+-------------------------------------+
only showing top 5 rows

+---------------------------+
|array_distinct(title_array)|
+---------------------------+
|       [we, want, to, ta...|
|       [the, trump, pres...|
|       [racist, superman...|
|       [nickelback, lyri...|
|       [i, dare, you:, g...|
+---------------------------+
only showing top 5 rows

+-----------------------------+
|array_remove(title_array, we)|
+-----------------------------+
|         [want, to, talk, ...|
|  

## Creating Functions

Functions as you know them in Python work a bit differently in Pyspark because it operates on a cluster. If you define a function the traditional Python way in PySpark, you will not recieve an error message but the call will not distribute on all nodes. So it will run slower. 

So to convert a Python function to what's called a user defined function (UDF) in PySpark. This is what you do.

*Note: keep in mind that a function will not work on a column with null values

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def square(x):
    return int(x**2)

square_udf = udf(lambda x: square(x), IntegerType())

df.select('dislikes', square_udf('dislikes').alias('dislikes_sq')).where(col('dislikes').isNotNull()).show(5)

+--------+-----------+
|dislikes|dislikes_sq|
+--------+-----------+
|    2966|    8797156|
|    6146|   37773316|
|    5339|   28504921|
|     666|     443556|
|    1989|    3956121|
+--------+-----------+
only showing top 5 rows

