# Manipulating DataFrames

In [2]:
#import findspark
#findspark.init()

import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataManipulation").getOrCreate()
spark

## Spark's Immutability

In [3]:
names = spark.createDataFrame([("Abraham", "Lincon")], ["first_name", "last_name"])
names.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|   Abraham|   Lincon|
+----------+---------+



In [4]:
names.rdd.id()

10

In [6]:
from pyspark.sql.functions import concat_ws

In [7]:
# Add a column
names = names.select([names.first_name, names.last_name, concat_ws(' ', names.first_name, names.last_name)])
names.show()

+----------+---------+-----------------------------------+
|first_name|last_name|concat_ws( , first_name, last_name)|
+----------+---------+-----------------------------------+
|   Abraham|   Lincon|                     Abraham Lincon|
+----------+---------+-----------------------------------+



In [8]:
names.rdd.id()

16

## Manipulate Data

In [18]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import to_date, to_timestamp

In [9]:
videos = spark.read.csv("youtubevideos.csv", inferSchema=True, header=True)
videos.limit(5).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,17.14.11,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...
4,d380meD0W0M,17.14.11,I Dare You: GOING BALD!?,nigahiga,24,2017-11-12T18:01:41.000Z,"""ryan""|""higa""|""higatv""|""nigahiga""|""i dare you""...",2095731,132235,1989,17518,https://i.ytimg.com/vi/d380meD0W0M/default.jpg,False,False,False,I know it's been a while since we did this sho...


In [10]:
videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



### Change columns type

In [19]:
df = videos.withColumn("views", videos["views"].cast(IntegerType())) \
    .withColumn("likes", videos["likes"].cast(IntegerType())) \
    .withColumn("dislikes", videos["dislikes"].cast(IntegerType())) \
    .withColumn("trending_date", to_date(videos.trending_date, 'yy.dd.mm')) \
    #.withColumn("publish_time", to_date(videos.publish_time, 'yyyy-MM-dd HH:mm:ss'))

In [20]:
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [21]:
df.limit(5).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2017-01-14,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2017-01-14,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,2017-01-14,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,2017-01-14,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...
4,d380meD0W0M,2017-01-14,I Dare You: GOING BALD!?,nigahiga,24,2017-11-12T18:01:41.000Z,"""ryan""|""higa""|""higatv""|""nigahiga""|""i dare you""...",2095731,132235,1989,17518,https://i.ytimg.com/vi/d380meD0W0M/default.jpg,False,False,False,I know it's been a while since we did this sho...


### Column Rename

In [22]:
renamed = df.withColumnRenamed('channel_title', 'channel_title_new')
renamed.limit(5).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title_new,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2017-01-14,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2017-01-14,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,2017-01-14,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,2017-01-14,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...
4,d380meD0W0M,2017-01-14,I Dare You: GOING BALD!?,nigahiga,24,2017-11-12T18:01:41.000Z,"""ryan""|""higa""|""higatv""|""nigahiga""|""i dare you""...",2095731,132235,1989,17518,https://i.ytimg.com/vi/d380meD0W0M/default.jpg,False,False,False,I know it's been a while since we did this sho...


### Cleaning Data

In [25]:
from pyspark.sql.functions import regexp_replace, regexp_extract
from pyspark.sql.functions import to_timestamp

In [26]:
df = df.withColumn('publish_time_2', regexp_replace(df.publish_time, 'T', ' '))
df = df.withColumn('publish_time_2', regexp_replace(df.publish_time_2, 'Z', ''))
df = df.withColumn("publish_time_3", to_timestamp(df.publish_time_2, 'yyyy-MM-dd HH:mm:ss.SSS'))

In [27]:
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publish_time_2: string (nullable = true)
 |-- publish_time_3: timestamp (nullable = true)



In [29]:
df.select("publish_time", "publish_time_2", "publish_time_3").show(5, False)

+------------------------+-----------------------+-------------------+
|publish_time            |publish_time_2         |publish_time_3     |
+------------------------+-----------------------+-------------------+
|2017-11-13T17:13:01.000Z|2017-11-13 17:13:01.000|2017-11-13 17:13:01|
|2017-11-13T07:30:00.000Z|2017-11-13 07:30:00.000|2017-11-13 07:30:00|
|2017-11-12T19:05:24.000Z|2017-11-12 19:05:24.000|2017-11-12 19:05:24|
|2017-11-13T11:00:04.000Z|2017-11-13 11:00:04.000|2017-11-13 11:00:04|
|2017-11-12T18:01:41.000Z|2017-11-12 18:01:41.000|2017-11-12 18:01:41|
+------------------------+-----------------------+-------------------+
only showing top 5 rows



### Translate function

In [30]:
import pyspark.sql.functions as f

In [33]:
df.select("publish_time", f.translate(f.col("publish_time"), "TZ", " ").alias("translate_func")).show(5,False)

+------------------------+-----------------------+
|publish_time            |translate_func         |
+------------------------+-----------------------+
|2017-11-13T17:13:01.000Z|2017-11-13 17:13:01.000|
|2017-11-13T07:30:00.000Z|2017-11-13 07:30:00.000|
|2017-11-12T19:05:24.000Z|2017-11-12 19:05:24.000|
|2017-11-13T11:00:04.000Z|2017-11-13 11:00:04.000|
|2017-11-12T18:01:41.000Z|2017-11-12 18:01:41.000|
+------------------------+-----------------------+
only showing top 5 rows



### Trim

In [38]:
from pyspark.sql.functions import trim, ltrim, rtrim

In [35]:
df = df.withColumn('title',trim(df.title)) # or rtrim/ltrim
df.select("title").show(5,False)

+--------------------------------------------------------------+
|title                                                         |
+--------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |
|Nickelback Lyrics: Real or Fake?                              |
|I Dare You: GOING BALD!?                                      |
+--------------------------------------------------------------+
only showing top 5 rows



In [45]:
trim_ex = spark.createDataFrame([(' 2015-04-08 ', ' 2015-05-10 ')], ['d1', 'd2']) # create a dataframe - notice the extra whitespaces in the date strings
trim_ex.show()

+------------+------------+
|          d1|          d2|
+------------+------------+
| 2015-04-08 | 2015-05-10 |
+------------+------------+



In [41]:
trim_ex.select('d1', ltrim(trim_ex.d1).alias("d1-ltrim")).show()

+------------+-----------+
|          d1|   d1-ltrim|
+------------+-----------+
| 2015-04-08 |2015-04-08 |
+------------+-----------+



In [43]:
trim_ex.select('d1', rtrim(trim_ex.d1).alias("d1-rtrim")).show()

+------------+-----------+
|          d1|   d1-rtrim|
+------------+-----------+
| 2015-04-08 | 2015-04-08|
+------------+-----------+



In [44]:
trim_ex.select('d1', trim(trim_ex.d1).alias("d1-trim")).show()

+------------+----------+
|          d1|   d1-trim|
+------------+----------+
| 2015-04-08 |2015-04-08|
+------------+----------+



### Lower

In [47]:
from pyspark.sql.functions import lower

In [52]:
df = df.withColumn('title', lower(df.title)) # or rtrim/ltrim
df.select("title").show(5, False)

+--------------------------------------------------------------+
|title                                                         |
+--------------------------------------------------------------+
|we want to talk about our marriage                            |
|the trump presidency: last week tonight with john oliver (hbo)|
|racist superman | rudy mancuso, king bach & lele pons         |
|nickelback lyrics: real or fake?                              |
|i dare you: going bald!?                                      |
+--------------------------------------------------------------+
only showing top 5 rows



### Case When

In [53]:
from pyspark.sql.functions import when 

In [55]:
# Option 1: select or withColumn() using when-otherwise
df.select("likes", "dislikes",(when(df.likes > df.dislikes, "Good").when(df.likes < df.dislikes, "Bad").otherwise("Undetermined")).alias("Favorability")).show(5)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows



In [56]:
from pyspark.sql.functions import expr 

In [57]:
# Option 2: select or withColumn() using expr function
df.select("likes","dislikes",expr("CASE WHEN likes > dislikes THEN  'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability")).show(5)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows



In [60]:
# Option 3: selectExpr() using SQL equivalent CASE expression
df.selectExpr("likes","dislikes","CASE WHEN likes > dislikes THEN  'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability").show(5)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
| 10172|     666|        Good|
|132235|    1989|        Good|
+------+--------+------------+
only showing top 5 rows



### Concatenate

In [61]:
from pyspark.sql.functions import concat_ws

In [67]:
df.select(concat_ws(' | ', df.title, df.channel_title, df.tags).alias('Text')).show(1, False)

+-------------------------------------------------------------------+
|Text                                                               |
+-------------------------------------------------------------------+
|we want to talk about our marriage | CaseyNeistat | SHANtell martin|
+-------------------------------------------------------------------+
only showing top 1 row



### Extracting data from Date and Timestamp variables

In [66]:
from pyspark.sql.functions import year, month

In [72]:
# concat_ws(sep, *cols)
df.select("trending_date", year("trending_date"), month("trending_date")).show(5)

+-------------+-------------------+--------------------+
|trending_date|year(trending_date)|month(trending_date)|
+-------------+-------------------+--------------------+
|   2017-01-14|               2017|                   1|
|   2017-01-14|               2017|                   1|
|   2017-01-14|               2017|                   1|
|   2017-01-14|               2017|                   1|
|   2017-01-14|               2017|                   1|
+-------------+-------------------+--------------------+
only showing top 5 rows



### Calculate the Difference between two dates

In [69]:
from pyspark.sql.functions import datediff

In [71]:
# datediff(end, start)
df.select("trending_date", "publish_time_3", (datediff(df.trending_date, df.publish_time_3)/365).alias('diff')).show(5)

+-------------+-------------------+-------------------+
|trending_date|     publish_time_3|               diff|
+-------------+-------------------+-------------------+
|   2017-01-14|2017-11-13 17:13:01|-0.8301369863013699|
|   2017-01-14|2017-11-13 07:30:00|-0.8301369863013699|
|   2017-01-14|2017-11-12 19:05:24|-0.8273972602739726|
|   2017-01-14|2017-11-13 11:00:04|-0.8301369863013699|
|   2017-01-14|2017-11-12 18:01:41|-0.8273972602739726|
+-------------+-------------------+-------------------+
only showing top 5 rows



### Split a string around a pattern

In [73]:
from pyspark.sql.functions import split

In [75]:
# df.select(split(str, pattern))
df.select("title").show(1, False)
df.select(split(df.title, ' ').alias('split title')).show(1, False)

+----------------------------------+
|title                             |
+----------------------------------+
|we want to talk about our marriage|
+----------------------------------+
only showing top 1 row

+------------------------------------------+
|split title                               |
+------------------------------------------+
|[we, want, to, talk, about, our, marriage]|
+------------------------------------------+
only showing top 1 row



### Working with Arrays

In [77]:
from pyspark.sql.functions import array_contains, array_distinct, array_remove

In [78]:
array_df = df.select("title", split(df.title, ' ').alias('title_array'))

array_df.select("title", array_contains(array_df.title_array, "marriage")).show(5, False)

+--------------------------------------------------------------+-------------------------------------+
|title                                                         |array_contains(title_array, marriage)|
+--------------------------------------------------------------+-------------------------------------+
|we want to talk about our marriage                            |true                                 |
|the trump presidency: last week tonight with john oliver (hbo)|false                                |
|racist superman | rudy mancuso, king bach & lele pons         |false                                |
|nickelback lyrics: real or fake?                              |false                                |
|i dare you: going bald!?                                      |false                                |
+--------------------------------------------------------------+-------------------------------------+
only showing top 5 rows



In [92]:
# Removes duplicate values from the array
array_df.select(array_df.title_array, array_distinct(array_df.title_array)).show(n=10, truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------------------------------------------------------
 title_array                 | [we, want, to, talk, about, our, marriage]                                     
 array_distinct(title_array) | [we, want, to, talk, about, our, marriage]                                     
-RECORD 1-----------------------------------------------------------------------------------------------------
 title_array                 | [the, trump, presidency:, last, week, tonight, with, john, oliver, (hbo)]      
 array_distinct(title_array) | [the, trump, presidency:, last, week, tonight, with, john, oliver, (hbo)]      
-RECORD 2-----------------------------------------------------------------------------------------------------
 title_array                 | [racist, superman, |, rudy, mancuso,, king, bach, &, lele, pons]               
 array_distinct(title_array) | [racist, superman, |, rudy, mancuso,, king, bach, &, lele, pons]               
-

In [95]:
# Remove all elements that equal to element from the given array
array_df.select(array_remove(array_df.title_array, "we")).show(n=5, truncate=False)

+-------------------------------------------------------------------------+
|array_remove(title_array, we)                                            |
+-------------------------------------------------------------------------+
|[want, to, talk, about, our, marriage]                                   |
|[the, trump, presidency:, last, week, tonight, with, john, oliver, (hbo)]|
|[racist, superman, |, rudy, mancuso,, king, bach, &, lele, pons]         |
|[nickelback, lyrics:, real, or, fake?]                                   |
|[i, dare, you:, going, bald!?]                                           |
+-------------------------------------------------------------------------+
only showing top 5 rows



## Creating Functions

Functions as you know them in Python work a bit differently in Pyspark because it operates on a cluster. If you define a function the traditional Python way in PySpark, you will not recieve an error message but the call will not distribute on all nodes. So it will run slower. 

A Python function has to be called as a user defined function (UDF) in PySpark.

*Note: a function will not work on a column with null values

In [99]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

In [100]:
def square(x):
    return int(x**2)
square_udf = udf(lambda z: square(z), IntegerType())

In [101]:
df.select('dislikes',square_udf('dislikes').alias('likes_sq')).where(col('dislikes').isNotNull()).show()

+--------+--------+
|dislikes|likes_sq|
+--------+--------+
|    2966| 8797156|
|    6146|37773316|
|    5339|28504921|
|     666|  443556|
|    1989| 3956121|
|     511|  261121|
|    2445| 5978025|
|     778|  605284|
|     119|   14161|
|    1363| 1857769|
|      25|     625|
|     303|   91809|
|    1333| 1776889|
|    1171| 1371241|
|     246|   60516|
|      52|    2704|
|     638|  407044|
|      53|    2809|
|      36|    1296|
|     191|   36481|
+--------+--------+
only showing top 20 rows

