# Data Manipulating 
- Change data types when they are interpretted
- Clean your data
- Rename columns
- Extract or Clean new values

In [1]:
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataManipulate").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "cores")
spark

You are working with 1 cores


In [2]:
# Immutability
names = spark.createDataFrame([('Abraham', 'Lincoln')], ['first_name','last_name'])
print(names.show())
print(names.rdd.id())

+----------+---------+
|first_name|last_name|
+----------+---------+
|   Abraham|  Lincoln|
+----------+---------+

None
11


In [3]:
from pyspark.sql.functions import lit
names = names.withColumn('random', lit('test'))
print(names.show())
print(names.rdd.id())

+----------+---------+------+
|first_name|last_name|random|
+----------+---------+------+
|   Abraham|  Lincoln|  test|
+----------+---------+------+

None
19


In [4]:
from pyspark.sql.functions import *
names = names.select(names.first_name, names.last_name, names.random, concat_ws(' ', names.first_name, names.last_name).alias('full_name'))

In [5]:
print(names.show())
print(names.rdd.id())

+----------+---------+------+---------------+
|first_name|last_name|random|      full_name|
+----------+---------+------+---------------+
|   Abraham|  Lincoln|  test|Abraham Lincoln|
+----------+---------+------+---------------+

None
27


## Datasets 
**Source:** https://www.kaggle.com/datasnaek/youtube-new#USvideos.csv

In [6]:
path = '/user/harishmohan/Datasets/'
videos = spark.read.csv(path + 'youtubevideos.csv', inferSchema=True, header=True)
videos.limit(4).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,17.14.11,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


In [7]:
print(videos.printSchema())

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


## Changing data types after reading
Available types: 

DataType, NullType, StringType, BinaryType, BooleanType, DateType, TimeStampType, DecimalType, DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType, MapType, StructField, StructType

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.types import * 

df = videos.withColumn("views", videos["views"].cast(IntegerType())) \
            .withColumn("likes", videos["likes"].cast(IntegerType())) \
            .withColumn("dislikes", videos["dislikes"].cast(IntegerType())) \
            .withColumn("trending_date", to_date(videos.trending_date, 'dd.mm.yy'))
print(df.printSchema())
df.limit(4).toPandas()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2011-01-17,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2011-01-17,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,2011-01-17,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,2011-01-17,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


In [9]:
renamed = df.withColumnRenamed('channel_title', 'channel_title_new')
renamed.limit(4).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title_new,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2011-01-17,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2011-01-17,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,2011-01-17,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,2011-01-17,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


In [10]:
from pyspark.sql.functions import regexp_replace, regexp_extract

df = df.withColumn('publish_time_2', regexp_replace(df.publish_time, 'T', ' ')) 
df = df.withColumn('publish_time_2', regexp_replace(df.publish_time_2, 'Z', ''))
df = df.withColumn('publish_time_3', to_timestamp(df.publish_time_2, 'yyyy-MM-dd HH:mm:ss.SSS'))
print(df.printSchema())
df.select("publish_time", "publish_time_2", "publish_time_3").show(5,True)

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publish_time_2: string (nullable = true)
 |-- publish_time_3: timestamp (nullable = true)

None
+--------------------+--------------------+-------------------+
|        publish_time|      publish_time_2|     publish_time_3|
+--------------------+--------------------+-------------------+
|2017-11-13T

In [11]:
import pyspark.sql.functions as f
df.select("publish_time", f.translate(f.col("publish_time"), "TZ", " ").alias("translate_func")).show(5, False)

+------------------------+-----------------------+
|publish_time            |translate_func         |
+------------------------+-----------------------+
|2017-11-13T17:13:01.000Z|2017-11-13 17:13:01.000|
|2017-11-13T07:30:00.000Z|2017-11-13 07:30:00.000|
|2017-11-12T19:05:24.000Z|2017-11-12 19:05:24.000|
|2017-11-13T11:00:04.000Z|2017-11-13 11:00:04.000|
|2017-11-12T18:01:41.000Z|2017-11-12 18:01:41.000|
+------------------------+-----------------------+
only showing top 5 rows



In [12]:
from pyspark.sql.functions import *
df = df.withColumn('title', trim(df.title))
df.select("title").show(5, False)

+--------------------------------------------------------------+
|title                                                         |
+--------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |
|Nickelback Lyrics: Real or Fake?                              |
|I Dare You: GOING BALD!?                                      |
+--------------------------------------------------------------+
only showing top 5 rows



In [13]:
df = df.withColumn('title', lower(df.title))
df.select("title").show(5, False)

+--------------------------------------------------------------+
|title                                                         |
+--------------------------------------------------------------+
|we want to talk about our marriage                            |
|the trump presidency: last week tonight with john oliver (hbo)|
|racist superman | rudy mancuso, king bach & lele pons         |
|nickelback lyrics: real or fake?                              |
|i dare you: going bald!?                                      |
+--------------------------------------------------------------+
only showing top 5 rows



In [14]:
from pyspark.sql.functions import when
df.select("likes","dislikes",(when(df.likes > df.dislikes, 'Good').when(df.likes < df.dislikes, 'Bad').otherwise('Undetermined')).alias("Favorability")).show(3)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
+------+--------+------------+
only showing top 3 rows



In [15]:
from pyspark.sql.functions import expr
df.select("likes","dislikes",
          expr(
              "CASE \
                  WHEN likes > dislikes THEN  'Good' \
                  WHEN likes < dislikes THEN 'Bad' \
              ELSE 'Undetermined' END AS Favorability" \
              )).show(3)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
+------+--------+------------+
only showing top 3 rows



In [16]:
df.selectExpr("likes", "dislikes", "CASE WHEN likes > dislikes THEN 'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undetermined' END AS Favorability").show(3)

+------+--------+------------+
| likes|dislikes|Favorability|
+------+--------+------------+
| 57527|    2966|        Good|
| 97185|    6146|        Good|
|146033|    5339|        Good|
+------+--------+------------+
only showing top 3 rows



In [17]:
# Concatenate
from pyspark.sql.functions import concat_ws

df.select(df.title, df.channel_title, df.tags).show(2)
df.select(concat_ws(' ', df.title, df.channel_title, df.tags).alias('text')).show(2, False)

+--------------------+---------------+--------------------+
|               title|  channel_title|                tags|
+--------------------+---------------+--------------------+
|we want to talk a...|   CaseyNeistat|     SHANtell martin|
|the trump preside...|LastWeekTonight|"last week tonigh...|
+--------------------+---------------+--------------------+
only showing top 2 rows

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|we want to talk about our marriage CaseyNeistat SHANtell

In [18]:
# Extract DateTime
from pyspark.sql.functions import year, month
df.select("trending_date", year("trending_date"), month("trending_date")).show(5)

+-------------+-------------------+--------------------+
|trending_date|year(trending_date)|month(trending_date)|
+-------------+-------------------+--------------------+
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
|   2011-01-17|               2011|                   1|
+-------------+-------------------+--------------------+
only showing top 5 rows



In [19]:
# DateDiff
from pyspark.sql.functions import datediff
df.select("trending_date", "publish_time_3", (datediff(df.trending_date, df.publish_time_3)/365).alias('diff')).show(5)

+-------------+-------------------+-------------------+
|trending_date|     publish_time_3|               diff|
+-------------+-------------------+-------------------+
|   2011-01-17|2017-11-13 17:13:01|-6.8273972602739725|
|   2011-01-17|2017-11-13 07:30:00|-6.8273972602739725|
|   2011-01-17|2017-11-12 19:05:24| -6.824657534246575|
|   2011-01-17|2017-11-13 11:00:04|-6.8273972602739725|
|   2011-01-17|2017-11-12 18:01:41| -6.824657534246575|
+-------------+-------------------+-------------------+
only showing top 5 rows



In [20]:
# delimiter split
from pyspark.sql.functions import split

df.select(df.title).show(1, False)
df.select(split(df.title, ' ').alias('new')).show(1, False)

+----------------------------------+
|title                             |
+----------------------------------+
|we want to talk about our marriage|
+----------------------------------+
only showing top 1 row

+------------------------------------------+
|new                                       |
+------------------------------------------+
|[we, want, to, talk, about, our, marriage]|
+------------------------------------------+
only showing top 1 row



In [21]:
# Arrays
from pyspark.sql.functions import *

array_df = df.select("title", split(df.title, ' ').alias('title_array'))
array_df.show(1)

# Array contains
array_df.select("title", array_contains(array_df.title_array, "marriage")).show(1, False)


+--------------------+--------------------+
|               title|         title_array|
+--------------------+--------------------+
|we want to talk a...|[we, want, to, ta...|
+--------------------+--------------------+
only showing top 1 row

+----------------------------------+-------------------------------------+
|title                             |array_contains(title_array, marriage)|
+----------------------------------+-------------------------------------+
|we want to talk about our marriage|true                                 |
+----------------------------------+-------------------------------------+
only showing top 1 row



In [26]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def square(x):
    return int(x**2)


square_udf = udf(lambda z: square(z), IntegerType())

df.select('dislikes', square_udf('dislikes').alias('likes_sql')).where(col('dislikes').isNotNull()).show()

+--------+---------+
|dislikes|likes_sql|
+--------+---------+
|    2966|  8797156|
|    6146| 37773316|
|    5339| 28504921|
|     666|   443556|
|    1989|  3956121|
|     511|   261121|
|    2445|  5978025|
|     778|   605284|
|     119|    14161|
|    1363|  1857769|
|      25|      625|
|     303|    91809|
|    1333|  1776889|
|    1171|  1371241|
|     246|    60516|
|      52|     2704|
|     638|   407044|
|      53|     2809|
|      36|     1296|
|     191|    36481|
+--------+---------+
only showing top 20 rows



# DataSet
**Source:** https://www.kaggle.com/kapastor/democratvsrepublicantweets#ExtractedTweets.csv

In [29]:
tweets = spark.read.csv(path + 'Rep_vs_Dem_tweets.csv', inferSchema=True, header=True)
tweets.show(5)

+--------------------+-------------+--------------------+
|               Party|       Handle|               Tweet|
+--------------------+-------------+--------------------+
|            Democrat|RepDarrenSoto|Today, Senate Dem...|
|            Democrat|RepDarrenSoto|RT @WinterHavenSu...|
|            Democrat|RepDarrenSoto|RT @NBCLatino: .@...|
|Congress has allo...|         null|                null|
|            Democrat|RepDarrenSoto|RT @NALCABPolicy:...|
+--------------------+-------------+--------------------+
only showing top 5 rows



In [30]:
tweets.limit(4).toPandas()

Unnamed: 0,Party,Handle,Tweet
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P..."
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...
3,"Congress has allocated about $18…""",,


In [31]:
tweets.select("tweet").show(3, False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… https://t.co/n3tggDLU1L |
|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia…|
|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |
+--------------------------------------------------------------------------------------------------------------------------------------------+

In [33]:
print(tweets.printSchema())

root
 |-- Party: string (nullable = true)
 |-- Handle: string (nullable = true)
 |-- Tweet: string (nullable = true)

None


In [36]:
from pyspark.sql.functions import *
latino = tweets.withColumn('Latino_Mentions', regexp_extract(tweets.Tweet, '(.)(@LatinoLeader)(.)',2))
latino.limit(6).toPandas()

Unnamed: 0,Party,Handle,Tweet,Latino_Mentions
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P...",
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...,
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...,
3,"Congress has allocated about $18…""",,,
4,Democrat,RepDarrenSoto,RT @NALCABPolicy: Meeting with @RepDarrenSoto ...,@LatinoLeader
5,Democrat,RepDarrenSoto,RT @Vegalteno: Hurricane season starts on June...,


In [39]:
from pyspark.sql.functions import * 
counts = tweets.groupBy("Party").count()
counts.orderBy(desc("count")).show(5)

+--------------------+-----+
|               Party|count|
+--------------------+-----+
|          Republican|44392|
|            Democrat|42068|
|            That’s…"|   28|
|https://t.co/oc6J...|   22|
|                 Now|   17|
+--------------------+-----+
only showing top 5 rows



In [41]:
from pyspark.sql.functions import when
clean = tweets.withColumn('Party', when(tweets.Party == 'Democrat', 'Democrat').when(tweets.Party == 'Republican', 'Republican').otherwise('Other'))
counts = clean.groupBy("Party").count()
counts.orderBy(desc("count")).show(5)

+----------+-----+
|     Party|count|
+----------+-----+
|Republican|44392|
|  Democrat|42068|
|     Other| 6029|
+----------+-----+



In [44]:
tweets.select("tweet").show(1, False)

+-------------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… https://t.co/n3tggDLU1L|
+-------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



### regexp tester
https://regex101.com/

In [46]:
print("Cleaned Tweet")
tweets.withColumn('cleaned', regexp_replace('Tweet', '(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?', '')).select("cleaned").show(1, False)

Cleaned Tweet
+--------------------------------------------------------------------------------------------------------------------+
|cleaned                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… |
+--------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [47]:
from pyspark.sql.functions import *
tweets.select("Tweet").show(5, False)
tweets.select("Tweet", trim(tweets.Tweet)).show(5, False)

+--------------------------------------------------------------------------------------------------------------------------------------------+
|Tweet                                                                                                                                       |
+--------------------------------------------------------------------------------------------------------------------------------------------+
|Today, Senate Dems vote to #SaveTheInternet. Proud to support similar #NetNeutrality legislation here in the House… https://t.co/n3tggDLU1L |
|RT @WinterHavenSun: Winter Haven resident / Alta Vista teacher is one of several recognized by @RepDarrenSoto for National Teacher Apprecia…|
|RT @NBCLatino: .@RepDarrenSoto noted that Hurricane Maria has left approximately $90 billion in damages.                                    |
|null                                                                                                                                        |

In [48]:
renamed = tweets.withColumnRenamed('Party', 'Dem_Rep')
renamed.limit(4).toPandas()

Unnamed: 0,Dem_Rep,Handle,Tweet
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P..."
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...
3,"Congress has allocated about $18…""",,


In [54]:
from pyspark.sql.functions import *
tweets.select(tweets.Party, tweets.Handle, concat_ws(' ', tweets.Party, tweets.Handle).alias('Concatenated')).show(2, False)

+--------+-------------+----------------------+
|Party   |Handle       |Concatenated          |
+--------+-------------+----------------------+
|Democrat|RepDarrenSoto|Democrat RepDarrenSoto|
|Democrat|RepDarrenSoto|Democrat RepDarrenSoto|
+--------+-------------+----------------------+
only showing top 2 rows



In [58]:
from pyspark.sql.types import *
md_office = [('Mohammed','Alfasy','1987-4-8','2016-1-7','2017-2-3','2018-3-2') \
            ,('Marcy','Wellmaker','1986-4-8','2015-1-7','2017-1-3','2018-1-2') \
            ,('Ginny','Ginger','1986-7-10','2014-8-7','2015-2-3','2016-3-2') \
            ,('Vijay','Doberson','1988-5-2','2016-1-7','2018-2-3','2018-3-2') \
            ,('Orhan','Gelicek','1987-5-11','2016-5-7','2017-1-3','2018-9-2') \
            ,('Sarah','Jones','1956-7-6','2016-4-7','2017-8-3','2018-10-2') \
            ,('John','Johnson','2017-10-12','2018-1-2','2018-10-3','2018-3-2') ]

df = spark.createDataFrame(md_office, ['First_name', 'last_name', 'dob', 'visit1', 'visit2', 'visit3'])
        
df = df.withColumn("dob", df["dob"].cast(DateType())) \
    .withColumn("visit1", df["visit1"].cast(DateType())) \
    .withColumn("visit2", df["visit2"].cast(DateType())) \
    .withColumn("visit3", df["visit3"].cast(DateType()))

df.show()
print(df.printSchema())

+----------+---------+----------+----------+----------+----------+
|First_name|last_name|       dob|    visit1|    visit2|    visit3|
+----------+---------+----------+----------+----------+----------+
|  Mohammed|   Alfasy|1987-04-08|2016-01-07|2017-02-03|2018-03-02|
|     Marcy|Wellmaker|1986-04-08|2015-01-07|2017-01-03|2018-01-02|
|     Ginny|   Ginger|1986-07-10|2014-08-07|2015-02-03|2016-03-02|
|     Vijay| Doberson|1988-05-02|2016-01-07|2018-02-03|2018-03-02|
|     Orhan|  Gelicek|1987-05-11|2016-05-07|2017-01-03|2018-09-02|
|     Sarah|    Jones|1956-07-06|2016-04-07|2017-08-03|2018-10-02|
|      John|  Johnson|2017-10-12|2018-01-02|2018-10-03|2018-03-02|
+----------+---------+----------+----------+----------+----------+

root
 |-- First_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- visit1: date (nullable = true)
 |-- visit2: date (nullable = true)
 |-- visit3: date (nullable = true)

None


In [61]:
from pyspark.sql.functions import *
diff1 = df.select(datediff(df.visit2, df.visit1).alias("diff"))
diff2 = df.select(datediff(df.visit3, df.visit2).alias("diff"))

diff_combo = diff1.union(diff2)
diff_combo = diff2.show(5)

+----+
|diff|
+----+
| 392|
| 364|
| 393|
|  27|
| 607|
+----+
only showing top 5 rows



In [65]:
ages = df.select(format_number(datediff(df.visit1, df.dob)/365, 0).alias('age'))
ages.show()

+---+
|age|
+---+
| 29|
| 29|
| 28|
| 28|
| 29|
| 60|
|  0|
+---+



In [66]:
month1 = df.select(month(df['visit1']).alias("month"))
month1.show(3)

+-----+
|month|
+-----+
|    1|
|    1|
|    8|
+-----+
only showing top 3 rows



## DataSet
**Source:** https://www.kaggle.com/aungpyaeap/supermarket-sales

In [68]:
sales = spark.read.csv(path + 'supermarket_sales.csv', inferSchema=True, header=True)

In [69]:
sales.limit(6).toPandas()

Unnamed: 0,Invoice ID,Branch,City,Customer type,Gender,Product line,Unit price,Quantity,Tax 5%,Total,Date,Time,Payment,cogs,gross margin percentage,gross income,Rating
0,750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,548.9715,1/5/2019,13:08,Ewallet,522.83,4.761905,26.1415,9.1
1,226-31-3081,C,Naypyitaw,Normal,Female,Electronic accessories,15.28,5,3.82,80.22,3/8/2019,10:29,Cash,76.4,4.761905,3.82,9.6
2,631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,340.5255,3/3/2019,13:23,Credit card,324.31,4.761905,16.2155,7.4
3,123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,489.048,1/27/2019,20:33,Ewallet,465.76,4.761905,23.288,8.4
4,373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,634.3785,2/8/2019,10:37,Ewallet,604.17,4.761905,30.2085,5.3
5,699-14-3026,C,Naypyitaw,Normal,Male,Electronic accessories,85.39,7,29.8865,627.6165,3/25/2019,18:30,Ewallet,597.73,4.761905,29.8865,4.1


In [70]:
print(sales.printSchema())

root
 |-- Invoice ID: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Customer type: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Product line: string (nullable = true)
 |-- Unit price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Tax 5%: double (nullable = true)
 |-- Total: double (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Payment: string (nullable = true)
 |-- cogs: double (nullable = true)
 |-- gross margin percentage: double (nullable = true)
 |-- gross income: double (nullable = true)
 |-- Rating: double (nullable = true)

None


In [75]:
from pyspark.sql.types import *

# Note that this method gives all null values-- This doesn't work
df = sales.withColumn("Formatted Date", sales["Date"].cast(DateType()))
df = df.select("Date", "Formatted Date")
print(df.limit(6).toPandas())

# This result gives the wrong results (Data error)
sales.select('Date', to_date(sales.Date, 'mm/dd/yyyy').alias('Dateformatted'), month(to_date(sales.Date, 'mm/dd/yyyy')).alias('Month'),).show(5)

# But if we capitialize the mm part in the format, we get the correct results! 
sales.select('Date', to_date(sales.Date, 'MM/dd/yyyy').alias('Dateformatted'), month(to_date(sales.Date, 'MM/dd/yyyy')).alias('Month'),).show(5)

        Date Formatted Date
0   1/5/2019           None
1   3/8/2019           None
2   3/3/2019           None
3  1/27/2019           None
4   2/8/2019           None
5  3/25/2019           None
+---------+-------------+-----+
|     Date|Dateformatted|Month|
+---------+-------------+-----+
| 1/5/2019|   2019-01-05|    1|
| 3/8/2019|   2019-01-08|    1|
| 3/3/2019|   2019-01-03|    1|
|1/27/2019|   2019-01-27|    1|
| 2/8/2019|   2019-01-08|    1|
+---------+-------------+-----+
only showing top 5 rows

+---------+-------------+-----+
|     Date|Dateformatted|Month|
+---------+-------------+-----+
| 1/5/2019|   2019-01-05|    1|
| 3/8/2019|   2019-03-08|    3|
| 3/3/2019|   2019-03-03|    3|
|1/27/2019|   2019-01-27|    1|
| 2/8/2019|   2019-02-08|    2|
+---------+-------------+-----+
only showing top 5 rows



In [76]:
df = sales.select('Date', split(sales.Date, '/')[0].alias('Month'),'Total')

print("Verify")
df.show(5)
print(df.printSchema())

Verify
+---------+-----+--------+
|     Date|Month|   Total|
+---------+-----+--------+
| 1/5/2019|    1|548.9715|
| 3/8/2019|    3|   80.22|
| 3/3/2019|    3|340.5255|
|1/27/2019|    1| 489.048|
| 2/8/2019|    2|634.3785|
+---------+-----+--------+
only showing top 5 rows

root
 |-- Date: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Total: double (nullable = true)

None
