In [1]:
!wget -q 'https://assets.datacamp.com/production/repositories/3937/datasets/a367f6f461f670a364ab2a59afc25bc2e3fab157/trainsched.txt'
!wget -q 'https://assets.datacamp.com/production/repositories/3937/datasets/213ca262bf6af12428d42842848464565f3d5504/sherlock.txt'
!wget -q 'https://assets.datacamp.com/production/repositories/3937/datasets/de0a90e8f132c1f2846e70d7b5eec250923318d5/sherlock.parquet'
!ls

wget: /opt/conda/lib/libuuid.so.1: no version information available (required by wget)
wget: /opt/conda/lib/libuuid.so.1: no version information available (required by wget)
wget: /opt/conda/lib/libuuid.so.1: no version information available (required by wget)
__notebook__.ipynb  sherlock.parquet  sherlock.txt  trainsched.txt


In [2]:
!pip install -q pyspark

In [3]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
data_paths = {}
data_dirs = ['/kaggle/working', '/kaggle/input']

# store the datapaths for all files
for data_dir in data_dirs:
    for dirname, _, filenames in os.walk(data_dir):
        for filename in filenames:
            data_paths[filename] = os.path.join(dirname, filename)
            print(os.path.join(dirname, filename))

/kaggle/working/trainsched.txt
/kaggle/working/sherlock.parquet
/kaggle/working/sherlock.txt
/kaggle/working/__notebook__.ipynb


In [4]:
from pyspark import SparkContext, SparkConf
sc = SparkContext('local')
print(sc.version)

2.4.5


In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Print my_spark
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f96d9bbe5c0>


In [6]:
# Load trainsched.txt
df = spark.read.csv(data_paths["trainsched.txt"], header=True)

# Create temporary table called table1
df.createOrReplaceTempView('schedule')

In [7]:
from pyspark.sql.functions import *
# Inspect the columns in the table df
spark.sql("DESCRIBE schedule").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|train_id|   string|   null|
| station|   string|   null|
|    time|   string|   null|
+--------+---------+-------+



In [8]:
query = """
SELECT train_id, station, time, diff_min,
SUM(diff_min) OVER (PARTITION BY train_id ORDER BY time) AS running_total
FROM schedule
"""

query = "select count(*) from schedule"

# Run the query and display the result
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|      14|
+--------+



In [9]:
query = """
SELECT 
ROW_NUMBER() OVER (PARTITION BY train_id ORDER BY time) AS row,
train_id, 
station, 
time, 
LEAD(time,1) OVER (PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
"""
spark.sql(query).show()

+---+--------+-------------+-----+---------+
|row|train_id|      station| time|time_next|
+---+--------+-------------+-----+---------+
|  1|     217|       Gilroy|6:06a|    6:15a|
|  2|     217|   San Martin|6:15a|    6:21a|
|  3|     217|  Morgan Hill|6:21a|    6:36a|
|  4|     217| Blossom Hill|6:36a|    6:42a|
|  5|     217|      Capitol|6:42a|    6:50a|
|  6|     217|       Tamien|6:50a|    6:59a|
|  7|     217|     San Jose|6:59a|     null|
|  1|     324|San Francisco|7:59a|    8:03a|
|  2|     324|  22nd Street|8:03a|    8:16a|
|  3|     324|     Millbrae|8:16a|    8:24a|
|  4|     324|    Hillsdale|8:24a|    8:31a|
|  5|     324| Redwood City|8:31a|    8:37a|
|  6|     324|    Palo Alto|8:37a|    9:05a|
|  7|     324|     San Jose|9:05a|     null|
+---+--------+-------------+-----+---------+



### Aggregation, step by step
Whether to use dot notation or SQL is a personal preference. However, as demonstrated in the video exercise, there are cases where SQL is simpler. Also as demonstrated in the video lesson, there are also cases where the dot notation gives a counterintuitive result, such as when a second aggregation on a column clobbers a prior aggregation on that column. As mentioned in the video, the basic syntax of agg in pyspark is only able to do a single aggregation on each column at a time.

The following exercises calculate the time of the first departure for each train line.

The first two queries match. However, the second two do not. 

In [10]:
# Give the identical result in each command
spark.sql('SELECT train_id, MIN(time) AS start FROM schedule GROUP BY train_id').show()
df.groupBy('train_id').agg({'time':'min'}).withColumnRenamed('min(time)', 'start').show()

# Print the second column of the result
spark.sql('SELECT train_id, MIN(time), MAX(time) FROM schedule GROUP BY train_id').show()
result = df.groupBy('train_id').agg({'time':'min', 'time':'max'})
result.show()
print(result.columns[1])

+--------+-----+
|train_id|start|
+--------+-----+
|     217|6:06a|
|     324|7:59a|
+--------+-----+

+--------+-----+
|train_id|start|
+--------+-----+
|     217|6:06a|
|     324|7:59a|
+--------+-----+

+--------+---------+---------+
|train_id|min(time)|max(time)|
+--------+---------+---------+
|     217|    6:06a|    6:59a|
|     324|    7:59a|    9:05a|
+--------+---------+---------+

+--------+---------+
|train_id|max(time)|
+--------+---------+
|     217|    6:59a|
|     324|    9:05a|
+--------+---------+

max(time)


In [11]:
query = "SELECT train_id, MIN(time) AS start, MAX(time) AS end FROM schedule GROUP BY train_id"
sql_df = spark.sql(query)
sql_df.show()

# from pyspark.sql.functions import min, max, col
# expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
# dot_df = df.groupBy("train_id").agg(*expr)
# dot_df.show()

+--------+-----+-----+
|train_id|start|  end|
+--------+-----+-----+
|     217|6:06a|6:59a|
|     324|7:59a|9:05a|
+--------+-----+-----+



In [12]:
from pyspark.sql.window import Window

# Obtain the identical result using dot notation 
dot_df = df.withColumn('time_next', lead('time', 1)
        .over(Window.partitionBy('train_id')
        .orderBy('time')))

# df = spark.sql("""
# SELECT *, 
# LEAD(time,1) OVER(PARTITION BY train_id ORDER BY time) AS time_next 
# FROM schedule
# """)

In [13]:
# Create a SQL query to obtain an identical result to dot_df
query = """
SELECT *, 
(unix_timestamp(LEAD(time, 1) OVER (PARTITION BY train_id ORDER BY time),'H:m') 
 - unix_timestamp(time, 'H:m'))/60 AS diff_min 
FROM schedule 
"""
sql_df = spark.sql(query)
sql_df.show()

# window = Window.partitionBy('train_id').orderBy('time')
# dot_df = df.withColumn('diff_min', 
#                     (unix_timestamp(lead('time', 1).over(window),'H:m') 
#                      - unix_timestamp('time', 'H:m'))/60)

+--------+-------------+-----+--------+
|train_id|      station| time|diff_min|
+--------+-------------+-----+--------+
|     217|       Gilroy|6:06a|     9.0|
|     217|   San Martin|6:15a|     6.0|
|     217|  Morgan Hill|6:21a|    15.0|
|     217| Blossom Hill|6:36a|     6.0|
|     217|      Capitol|6:42a|     8.0|
|     217|       Tamien|6:50a|     9.0|
|     217|     San Jose|6:59a|    null|
|     324|San Francisco|7:59a|     4.0|
|     324|  22nd Street|8:03a|    13.0|
|     324|     Millbrae|8:16a|     8.0|
|     324|    Hillsdale|8:24a|     7.0|
|     324| Redwood City|8:31a|     6.0|
|     324|    Palo Alto|8:37a|    28.0|
|     324|     San Jose|9:05a|    null|
+--------+-------------+-----+--------+



### Text Analysis using SparkSQL

In [14]:
from pyspark.sql.functions import lower, col, split, explode, length

# Load the dataframe
df = spark.read.text(data_paths['sherlock.txt'])

print(df.first())
print('no.of lines:', df.count())

df.show(5, truncate = False)

Row(value='The Project Gutenberg EBook of The Adventures of Sherlock Holmes')
no.of lines: 128457
+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
|The Project Gutenberg EBook of The Adventures of Sherlock Holmes    |
|by Sir Arthur Conan Doyle                                           |
|(#15 in our series by Sir Arthur Conan Doyle)                       |
|                                                                    |
|Copyright laws are changing all over the world. Be sure to check the|
+--------------------------------------------------------------------+
only showing top 5 rows



In [15]:
df = df.select(lower(col('value')).alias('value'))
print(df.first())
print('columns:', df.columns)

Row(value='the project gutenberg ebook of the adventures of sherlock holmes')
columns: ['value']


In [16]:
punctuation = "_|.\?\!\",\'\[\]\*()"
df = df.select(split('value', '[ %s]' % punctuation).alias('words'))
df.show(5, truncate=False)

+-----------------------------------------------------------------------------------+
|words                                                                              |
+-----------------------------------------------------------------------------------+
|[the, project, gutenberg, ebook, of, the, adventures, of, sherlock, holmes]        |
|[by, sir, arthur, conan, doyle]                                                    |
|[, #15, in, our, series, by, sir, arthur, conan, doyle, ]                          |
|[]                                                                                 |
|[copyright, laws, are, changing, all, over, the, world, , be, sure, to, check, the]|
+-----------------------------------------------------------------------------------+
only showing top 5 rows



In [17]:
df.withColumn("length", size(df.words)).show(10)

+--------------------+------+
|               words|length|
+--------------------+------+
|[the, project, gu...|    10|
|[by, sir, arthur,...|     5|
|[, #15, in, our, ...|    11|
|                  []|     1|
|[copyright, laws,...|    14|
|[copyright, laws,...|     9|
|[this, or, any, o...|     8|
|                  []|     1|
|[this, header, sh...|    12|
|[gutenberg, file,...|    17|
+--------------------+------+
only showing top 10 rows



In [18]:
from pyspark.sql.types import *
df = df.where(size(col("words")) > 1)
df.show(10)
# user defined function 
# size_ = udf(lambda xs: len(xs), IntegerType())
# df.where(size_(df.words) > 1).show(5)


+--------------------+
|               words|
+--------------------+
|[the, project, gu...|
|[by, sir, arthur,...|
|[, #15, in, our, ...|
|[copyright, laws,...|
|[copyright, laws,...|
|[this, or, any, o...|
|[this, header, sh...|
|[gutenberg, file,...|
|[header, without,...|
|[please, read, th...|
+--------------------+
only showing top 10 rows



In [19]:
df = df.select(explode('words').alias('word'))
print(df.count())
df.show(10)

1331684
+----------+
|      word|
+----------+
|       the|
|   project|
| gutenberg|
|     ebook|
|        of|
|       the|
|adventures|
|        of|
|  sherlock|
|    holmes|
+----------+
only showing top 10 rows



In [20]:
nonblank_df = df.where(length('word') > 0)
nonblank_df.count()

1106370

In [21]:
df2 = nonblank_df.select('word', monotonically_increasing_id().alias('id'))
df2.show(10)

+----------+---+
|      word| id|
+----------+---+
|       the|  0|
|   project|  1|
| gutenberg|  2|
|     ebook|  3|
|        of|  4|
|       the|  5|
|adventures|  6|
|        of|  7|
|  sherlock|  8|
|    holmes|  9|
+----------+---+
only showing top 10 rows



In [22]:
# Filter and show the first 5 rows
df2.where('id > 70').show(5, truncate=False)

+------+---+
|word  |id |
+------+---+
|it    |71 |
|do    |72 |
|not   |73 |
|change|74 |
|or    |75 |
+------+---+
only showing top 5 rows



In [23]:
df2 = df2.withColumn('title', when(df2.id < 25000,'Preface')
                     .when(df2.id < 50000,'Chapter 1')
                     .when(df2.id < 75000,'Chapter 2')
                     .when(df2.id < 100000,'Chapter 3')
                     .when(df2.id < 150000,'Chapter 4')
                     .when(df2.id < 200000,'Chapter 5')
                     .when(df2.id < 250000,'Chapter 6')
                     .otherwise('Chapter 7'))

df2 = df2.withColumn('part', when(df2.id < 25000, 0)
                     .when(df2.id < 50000, 1)
                     .when(df2.id < 75000, 2)
                     .when(df2.id < 100000,3)
                     .when(df2.id < 150000,4)
                     .when(df2.id < 200000,5)
                     .when(df2.id < 250000,6)
                     .otherwise(7))
df2.show(10)

+----------+---+-------+----+
|      word| id|  title|part|
+----------+---+-------+----+
|       the|  0|Preface|   0|
|   project|  1|Preface|   0|
| gutenberg|  2|Preface|   0|
|     ebook|  3|Preface|   0|
|        of|  4|Preface|   0|
|       the|  5|Preface|   0|
|adventures|  6|Preface|   0|
|        of|  7|Preface|   0|
|  sherlock|  8|Preface|   0|
|    holmes|  9|Preface|   0|
+----------+---+-------+----+
only showing top 10 rows



In [24]:
df2 = df2.repartition(8,'part')
print(df2.rdd.getNumPartitions())

8


In [25]:
!rm -rf "sherlock_parts"
df2.write.partitionBy("part").format("csv").save("sherlock_parts")
!ls sherlock_parts

# df_parts = spark.read.csv('sherlock_parts')
# df_parts.count()

_SUCCESS  part=0  part=1  part=2  part=3  part=4  part=5  part=6  part=7


In [26]:
df2.createOrReplaceTempView("df")

# Word for each row, previous two and subsequent two words
query = """SELECT part,
LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
word AS w3,
LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,
LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5
FROM df
"""
spark.sql(query).where("part = 7").show(10)

+----+-------+-------+-------+-------+--------+
|part|     w1|     w2|     w3|     w4|      w5|
+----+-------+-------+-------+-------+--------+
|   7|   null|   null|   land|    and|     the|
|   7|   null|   land|    and|    the|     new|
|   7|   land|    and|    the|    new|    york|
|   7|    and|    the|    new|   york| bankers|
|   7|    the|    new|   york|bankers|    were|
|   7|    new|   york|bankers|   were| sipping|
|   7|   york|bankers|   were|sipping|   their|
|   7|bankers|   were|sipping|  their|     tea|
|   7|   were|sipping|  their|    tea|      in|
|   7|sipping|  their|    tea|     in|absolute|
+----+-------+-------+-------+-------+--------+
only showing top 10 rows



In [27]:
# Find the top 10 sequences of five words
query = """
SELECT w1, w2, w3, w4, w5, COUNT(*) AS count FROM (
   SELECT word AS w1,
   LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
   LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3,
   LEAD(word, 3) OVER(PARTITION BY part ORDER BY id) AS w4,
   LEAD(word, 4) OVER(PARTITION BY part ORDER BY id) AS w5
   FROM df
)
GROUP BY w1, w2, w3, w4, w5
ORDER BY count DESC
LIMIT 10 """
df5 = spark.sql(query)
df5.show()

+-------+---------+---------+--------+----------+-----+
|     w1|       w2|       w3|      w4|        w5|count|
+-------+---------+---------+--------+----------+-----+
|history|       of|      the|  united|    states|   60|
|     in|      the|   region|      of|       the|   40|
|     in|      the|   middle|      of|       the|   38|
|project|gutenberg| literary| archive|foundation|   34|
|     of|      the|   united|  states|        pp|   31|
|    the|  project|gutenberg|literary|   archive|   30|
|     in|      the|     case|      of|       the|   28|
|     at|      the|      end|      of|       the|   26|
|    the|    other|     side|      of|       the|   25|
|     on|      the|     same|   lines|        as|   25|
+-------+---------+---------+--------+----------+-----+



In [28]:
# Unique 5-tuples sorted in descending order
spark.sql("""
SELECT DISTINCT w1, w2, w3, w4, w5 FROM (
   SELECT word AS w1,
   LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
   LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3,
   LEAD(word,3) OVER(PARTITION BY part ORDER BY id ) AS w4,
   LEAD(word,4) OVER(PARTITION BY part ORDER BY id ) AS w5
   FROM df
   )
   ORDER BY w1 DESC, w2 DESC, w3 DESC, w4 DESC, w5 DESC 
   LIMIT 10""").show()

+---------+--------+---------+---------+----------+
|       w1|      w2|       w3|       w4|        w5|
+---------+--------+---------+---------+----------+
|        ~|asterisk|      and|underline|characters|
|zygomatic|     and|  frontal|    bones|       the|
|   zygoma|      in|    front|       of|       the|
|    zweck|     ist|      nur|      den|     feind|
|      zum|  henker|    diese|   russen|  muttered|
|  zueblin|american|municipal| progress|         w|
| zubovski| rampart|   pierre|      and|  thirteen|
| zubovski| rampart|      but|     rose|   through|
|   zubova|    with|    false|    curls|       and|
|   zubova|     and|     this|     same|     laugh|
+---------+--------+---------+---------+----------+



In [29]:
# # Most frequent 3-tuple per chapter
# query = """
# SELECT chapter, w1, w2, w3, count FROM
# (
#   SELECT
#   chapter,
#   ROW_NUMBER() OVER (PARTITION BY chapter ORDER BY count DESC) AS row,
#   w1, w2, w3, count
#   FROM ( %s )
# )
# WHERE row = 1
# ORDER BY chapter ASC
# """ % subquery

# spark.sql(query).show()

In [30]:
# # Load the dataframe
# par_df = spark.read.load(data_paths['sherlock.parquet'])