In [1]:
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import *
from pyspark.sql.functions import split
import time

start a new spark session and generate spark context

In [2]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

read csv and generate spark DataFrame

In [3]:
df = spark.read.format('csv').option("escape","\"").option('header',True).load("s3://msbx5420-2020/yongbo/news.csv")

In [4]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[index: string, authors: string, title: string, publish_date: string, description: string, text: string, url: string]>

In [5]:
df.collect()[0]

Row(index='0', authors="['Cbc News']", title="Coronavirus a 'wake-up call' for Canada's prescription drug supply", publish_date='2020-03-27 08:00:00', description='Canadian pharmacies are limiting how much medication can be dispensed to try to prevent shortages, recognizing that most active ingredients for drugs come from India and China and medical supply chains have been disrupted by the spread of COVID-19.', text='Canadian pharmacies are limiting how much medication can be dispensed to try to prevent shortages, recognizing that most active ingredients for drugs come from India and China and medical supply chains have been disrupted by the spread of COVID-19. Provincial regulatory colleges are complying with the Canadian Pharmacists Association call to limit the amount of medications given to patients to 30-day\xa0supplies. The goal is to stop people from refilling prescriptions early and to ensure life-saving drugs don\'t run short when supply chains are\xa0vulnerable. Mina Tadrous 

use regex to get rid of some non-alphanumeric characters with text column

In [6]:
newdf = df.withColumn('text', regexp_replace('text', '\xa0', ''))
newdf = newdf.withColumn('text', regexp_replace('text', '\\\'', ''))

In [7]:
type(newdf)

pyspark.sql.dataframe.DataFrame

In [8]:
newdf.take(1)

[Row(index='0', authors="['Cbc News']", title="Coronavirus a 'wake-up call' for Canada's prescription drug supply", publish_date='2020-03-27 08:00:00', description='Canadian pharmacies are limiting how much medication can be dispensed to try to prevent shortages, recognizing that most active ingredients for drugs come from India and China and medical supply chains have been disrupted by the spread of COVID-19.', text='Canadian pharmacies are limiting how much medication can be dispensed to try to prevent shortages, recognizing that most active ingredients for drugs come from India and China and medical supply chains have been disrupted by the spread of COVID-19. Provincial regulatory colleges are complying with the Canadian Pharmacists Association call to limit the amount of medications given to patients to 30-daysupplies. The goal is to stop people from refilling prescriptions early and to ensure life-saving drugs dont run short when supply chains arevulnerable. Mina Tadrous is a phar

"\xa0" and "\'" are removed

Next we perform word count with rdd

In [9]:
rdd_text = newdf.select('text').rdd

In [10]:
counts_rdd = rdd_text.flatMap(lambda x: x[0].split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

In [11]:
type(counts_rdd)

pyspark.rdd.PipelinedRDD

convert the results from rdd to DataFrame

In [12]:
counts_rdd = counts_rdd.toDF()

In [13]:
counts_rdd.show()

+-----------+-----+
|         _1|   _2|
+-----------+-----+
|        Dr.| 2408|
|         of|60065|
|coronavirus| 5887|
|       have|16525|
|       four|  841|
|      virus| 3126|
|         to|82495|
|    Control|  256|
|         is|24348|
|   stable,"|    2|
|       said|18059|
| Henry."All|    1|
|         at|14274|
|     home,"|  112|
|      week,|  654|
|        two| 3196|
|      their| 7952|
|        30s|   90|
|        her| 2860|
|        was| 8222|
+-----------+-----+
only showing top 20 rows



add header to DataFrame

In [14]:
counts_rdd = counts_rdd.toDF('word', 'count')

In [15]:
counts_rdd.show()

+-----------+-----+
|       word|count|
+-----------+-----+
|        Dr.| 2408|
|         of|60065|
|coronavirus| 5887|
|       have|16525|
|       four|  841|
|      virus| 3126|
|         to|82495|
|    Control|  256|
|         is|24348|
|   stable,"|    2|
|       said|18059|
| Henry."All|    1|
|         at|14274|
|     home,"|  112|
|      week,|  654|
|        two| 3196|
|      their| 7952|
|        30s|   90|
|        her| 2860|
|        was| 8222|
+-----------+-----+
only showing top 20 rows



In [16]:
type(counts_rdd)

pyspark.sql.dataframe.DataFrame

print DataFrame size

In [17]:
print((counts_rdd.count(), len(counts_rdd.columns)))

(110744, 2)


get descending order by "count" column

In [18]:
counts_rdd.orderBy(desc("count")).show()

+----+------+
|word| count|
+----+------+
| the|113649|
|  to| 82495|
|  of| 60065|
| and| 55072|
|  in| 48456|
|   a| 42250|
| for| 25529|
|  is| 24348|
|that| 21667|
|  on| 21124|
| are| 20494|
|said| 18059|
|have| 16525|
|  be| 15575|
|with| 15557|
| The| 14716|
|  at| 14274|
|  as| 13050|
|from| 12857|
|will| 12065|
+----+------+
only showing top 20 rows



word count directly from rdd

In [19]:
rdd_text.flatMap(lambda x: x[0].split(" ")).map(lambda x: (x, 1))\
    .reduceByKey(lambda x,y: x+y).sortBy(lambda a: a[1], False).take(20)

[('the', 113649),
 ('to', 82495),
 ('of', 60065),
 ('and', 55072),
 ('in', 48456),
 ('a', 42250),
 ('for', 25529),
 ('is', 24348),
 ('that', 21667),
 ('on', 21124),
 ('are', 20494),
 ('said', 18059),
 ('have', 16525),
 ('be', 15575),
 ('with', 15557),
 ('The', 14716),
 ('at', 14274),
 ('as', 13050),
 ('from', 12857),
 ('will', 12065)]

word count with Dataframe

In [20]:
import pyspark.sql.functions as x

In [21]:
newdf.withColumn('word', x.explode(x.split(x.col('text'), ' '))).groupBy('word').count()\
    .sort('count', ascending=False).show()

+----+------+
|word| count|
+----+------+
| the|113649|
|  to| 82495|
|  of| 60065|
| and| 55072|
|  in| 48456|
|   a| 42250|
| for| 25529|
|  is| 24348|
|that| 21667|
|  on| 21124|
| are| 20494|
|said| 18059|
|have| 16525|
|  be| 15575|
|with| 15557|
| The| 14716|
|  at| 14274|
|  as| 13050|
|from| 12857|
|will| 12065|
+----+------+
only showing top 20 rows



test running time

In [29]:
start_time1 = time.time()
rdd_text.flatMap(lambda x: x[0].split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)
print("--- rdd running time (without sorting) is %s seconds ---" % (time.time() - start_time1))

start_time2 = time.time()
newdf.withColumn('word', x.explode(x.split(x.col('text'), ' '))).groupBy('word').count()
print("--- DataFrame running time (without sorting) is %s seconds ---" % (time.time() - start_time2))

start_time3 = time.time()
rdd_timing = rdd_text.flatMap(lambda x: x[0].split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)\
    .sortBy(lambda a: a[1], False)
print("--- rdd running time (with sorting) is %s seconds ---" % (time.time() - start_time3))

start_time4 = time.time()
newdf_timing = newdf.withColumn('word', x.explode(x.split(x.col('text'), ' '))).groupBy('word').count()\
    .sort('count', ascending=False)
print("--- DataFrame running time (with sorting) is %s seconds ---" % (time.time() - start_time4))

--- rdd running time (without sorting) is 0.030278921127319336 seconds ---
--- DataFrame running time (without sorting) is 0.18393468856811523 seconds ---
--- rdd running time (with sorting) is 8.592974662780762 seconds ---
--- DataFrame running time (with sorting) is 0.2873249053955078 seconds ---


It's obvious that creating a new column in DataFrame seems slower than a corresponding map to transformation in RDD.
But if we add (slightly) more aggregations and grouping, DataFrame is definitely the faster one compare to rdd.

In [30]:
start_time1 = time.time()
rdd_timing.collect()
print("--- rdd running time is %s seconds ---" % (time.time() - start_time1))

start_time2 = time.time()
newdf_timing.collect()
print("--- DataFrame running time is %s seconds ---" % (time.time() - start_time2))

newdf_timing_cache = newdf_timing.cache()
start_time3 = time.time()
newdf_timing_cache.collect()
print("--- DataFrame running time is %s seconds ---" % (time.time() - start_time3))

--- rdd running time is 0.7412478923797607 seconds ---
--- DataFrame running time is 8.040996551513672 seconds ---
--- DataFrame running time is 1.1875431537628174 seconds ---


sometimes collecting on rdd is very slow but sometimes are fast. might because network traffic and heavy disk IO operations

but caching on DataFrame first and then collecting definitely boost collect performance

Interesting experiment for fun, check number of words in each news (or you can say "each row")

In [31]:
nnewdf = newdf.withColumn('wordCount', x.size(x.split(x.col('text'), ' ')))
nnewdf.show()

+-----+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+---------+
|index|             authors|               title|       publish_date|         description|                text|                 url|wordCount|
+-----+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+---------+
|    0|        ['Cbc News']|Coronavirus a 'wa...|2020-03-27 08:00:00|Canadian pharmaci...|Canadian pharmaci...|https://www.cbc.c...|      395|
|    1|        ['Cbc News']|Yukon gov't names...|2020-03-27 01:45:00|The Yukon governm...|The Yukon governm...|https://www.cbc.c...|      303|
|    2|['The Associated ...|U.S. Senate passe...|2020-03-26 05:13:00|The Senate has pa...|The Senate late W...|https://www.cbc.c...|      974|
|    3|        ['Cbc News']|Coronavirus: The ...|2020-03-27 00:36:00|Scientists around...|Scientists around...|https://www.cbc.c...|      721|

In [33]:
nnewdf.select("authors","title","publish_date","text","url","wordCount").show()

+--------------------+--------------------+-------------------+--------------------+--------------------+---------+
|             authors|               title|       publish_date|                text|                 url|wordCount|
+--------------------+--------------------+-------------------+--------------------+--------------------+---------+
|        ['Cbc News']|Coronavirus a 'wa...|2020-03-27 08:00:00|Canadian pharmaci...|https://www.cbc.c...|      395|
|        ['Cbc News']|Yukon gov't names...|2020-03-27 01:45:00|The Yukon governm...|https://www.cbc.c...|      303|
|['The Associated ...|U.S. Senate passe...|2020-03-26 05:13:00|The Senate late W...|https://www.cbc.c...|      974|
|        ['Cbc News']|Coronavirus: The ...|2020-03-27 00:36:00|Scientists around...|https://www.cbc.c...|      721|
|        ['Cbc News']|The latest on the...|2020-03-26 20:57:00|   Trudeau says r...|https://www.cbc.c...|     1388|
|['Mark Gollom Is ...|'Worse' pandemic ...|2020-03-27 08:00:00|The conti

In [None]:
counts_rdd.write.parquet('s3://msbx5420-2020/Team-Torreys-Peak/counts.parquet')

In [None]:
nnewdf.write.parquet('s3://msbx5420-2020/Team-Torreys-Peak/news_countnews.parquet')