# Mount Google Drive

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Install Pyspark

In [3]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 42 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 37.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ac0265eb259fdbc75987955a1b6312504074675f1c505ec3edebdc9c84d9012b
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


# Create Spark session

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Load Text file

In [12]:
filename = '/content/drive/My Drive/sherlock.txt'
df = spark.read.text(filename)
print(df.first())

Row(value='The Project Gutenberg EBook of The Adventures of Sherlock Holmes')


In [13]:
print(df.count())

128457


# Load Parquet file

In [17]:
filename = '/content/drive/My Drive/sherlock.parquet'
df1 = spark.read.load(filename)

Loading first 10 rows

In [19]:
df1.show(10, truncate=False)

+----------+---+
|word      |id |
+----------+---+
|the       |0  |
|project   |1  |
|gutenberg |2  |
|ebook     |3  |
|of        |4  |
|the       |5  |
|adventures|6  |
|of        |7  |
|sherlock  |8  |
|holmes    |9  |
+----------+---+
only showing top 10 rows



Filter and show the first 5 rows

In [21]:
df1.where('id > 70').show(5, truncate=False)

+------+---+
|word  |id |
+------+---+
|it    |71 |
|do    |72 |
|not   |73 |
|change|74 |
|or    |75 |
+------+---+
only showing top 5 rows



Show 15 rows without truncate

In [22]:
df.show(15, truncate=False)

+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
|The Project Gutenberg EBook of The Adventures of Sherlock Holmes    |
|by Sir Arthur Conan Doyle                                           |
|(#15 in our series by Sir Arthur Conan Doyle)                       |
|                                                                    |
|Copyright laws are changing all over the world. Be sure to check the|
|copyright laws for your country before downloading or redistributing|
|this or any other Project Gutenberg eBook.                          |
|                                                                    |
|This header should be the first thing seen when viewing this Project|
|Gutenberg file.  Please do not remove it.  Do not change or edit the|
|header without written permission.                                  |
|     

Show columns

In [27]:
df.columns

['value']

Lower case operation

In [25]:
from pyspark.sql.functions import col, lower
df_lower = df.select(lower(col('value')))
print(df_lower.first())

Row(lower(value)='the project gutenberg ebook of the adventures of sherlock holmes')


In [26]:
df_lower.columns

['lower(value)']

# Alias operation

In [28]:
df_lower = df.select(lower(col('value')).alias('v'))
df_lower.columns

['v']

# Replacing text
Mr.Holmes.==> Mr Holmes.

In [37]:
from pyspark.sql.functions import regexp_replace
df_replaced = df.select(regexp_replace('value', 'Mr\.', 'Mr').alias('v'))


don't know.==>do not know.


In [38]:
df_replaced = df_replaced.select(regexp_replace('v', 'don\'t', 'do not').alias('v'))

# Tokenizing text

In [40]:
from pyspark.sql.functions import split
df_replaced1 = df_replaced.select(split('v', '[ ]').alias('words'))
df_replaced1.show(truncate=False)

+--------------------------------------------------------------------------------------+
|words                                                                                 |
+--------------------------------------------------------------------------------------+
|[The, Project, Gutenberg, EBook, of, The, Adventures, of, Sherlock, Holmes]           |
|[by, Sir, Arthur, Conan, Doyle]                                                       |
|[(#15, in, our, series, by, Sir, Arthur, Conan, Doyle)]                               |
|[]                                                                                    |
|[Copyright, laws, are, changing, all, over, the, world., Be, sure, to, check, the]    |
|[copyright, laws, for, your, country, before, downloading, or, redistributing]        |
|[this, or, any, other, Project, Gutenberg, eBook.]                                    |
|[]                                                                                    |
|[This, header, shoul

# Split characters are discarded

In [44]:
punctuation = "_|.\?\!\",\'\[\]\*()"
df_replaced2 = df_replaced.select(split('v','[ %s]' % punctuation).alias('words'))
df_replaced2.show(truncate=False)

+---------------------------------------------------------------------------------------+
|words                                                                                  |
+---------------------------------------------------------------------------------------+
|[The, Project, Gutenberg, EBook, of, The, Adventures, of, Sherlock, Holmes]            |
|[by, Sir, Arthur, Conan, Doyle]                                                        |
|[, #15, in, our, series, by, Sir, Arthur, Conan, Doyle, ]                              |
|[]                                                                                     |
|[Copyright, laws, are, changing, all, over, the, world, , Be, sure, to, check, the]    |
|[copyright, laws, for, your, country, before, downloading, or, redistributing]         |
|[this, or, any, other, Project, Gutenberg, eBook, ]                                    |
|[]                                                                                     |
|[This, he

# Exploding an array

In [45]:
from pyspark.sql.functions import explode
df_replaced3 = df_replaced2.select(explode('words').alias('word'))
df_replaced3.show()

+----------+
|      word|
+----------+
|       The|
|   Project|
| Gutenberg|
|     EBook|
|        of|
|       The|
|Adventures|
|        of|
|  Sherlock|
|    Holmes|
|        by|
|       Sir|
|    Arthur|
|     Conan|
|     Doyle|
|          |
|       #15|
|        in|
|       our|
|    series|
+----------+
only showing top 20 rows



In [47]:
print(df_replaced2.count())

128457


Explode increases rowcount


In [48]:
print(df_replaced3.count())

1356337


# Removing empty rows

In [50]:
from pyspark.sql.functions import length
print(df_replaced3.count())
nonblank_df = df_replaced3.where(length('word') > 0)
print(nonblank_df.count())

1356337
1106521


# Adding a row id column

In [51]:
from pyspark.sql.functions import monotonically_increasing_id
df2 = nonblank_df.select('word', monotonically_increasing_id().alias('id'))
df2.show()

+----------+---+
|      word| id|
+----------+---+
|       The|  0|
|   Project|  1|
| Gutenberg|  2|
|     EBook|  3|
|        of|  4|
|       The|  5|
|Adventures|  6|
|        of|  7|
|  Sherlock|  8|
|    Holmes|  9|
|        by| 10|
|       Sir| 11|
|    Arthur| 12|
|     Conan| 13|
|     Doyle| 14|
|       #15| 15|
|        in| 16|
|       our| 17|
|    series| 18|
|        by| 19|
+----------+---+
only showing top 20 rows



# Partition the data

In [54]:
from pyspark.sql.functions import when
df3 = df2.withColumn('title', when(df2.id < 25000, 'Preface')
                                              .when(df2.id < 50000, 'Chapter 1')
                                              .when(df2.id < 75000, 'Chapter 2')
                                              .otherwise('Chapter 3')
)

df4 = df3.withColumn('part', when(df3.id < 25000, 0)
                                              .when(df3.id < 50000, 1)
                                              .when(df3.id < 75000, 2)
                                              .otherwise(3))
df4.show()

+----------+---+-------+----+
|      word| id|  title|part|
+----------+---+-------+----+
|       The|  0|Preface|   0|
|   Project|  1|Preface|   0|
| Gutenberg|  2|Preface|   0|
|     EBook|  3|Preface|   0|
|        of|  4|Preface|   0|
|       The|  5|Preface|   0|
|Adventures|  6|Preface|   0|
|        of|  7|Preface|   0|
|  Sherlock|  8|Preface|   0|
|    Holmes|  9|Preface|   0|
|        by| 10|Preface|   0|
|       Sir| 11|Preface|   0|
|    Arthur| 12|Preface|   0|
|     Conan| 13|Preface|   0|
|     Doyle| 14|Preface|   0|
|       #15| 15|Preface|   0|
|        in| 16|Preface|   0|
|       our| 17|Preface|   0|
|    series| 18|Preface|   0|
|        by| 19|Preface|   0|
+----------+---+-------+----+
only showing top 20 rows



# Repartitioning on a column

In [56]:
df5 = df4.repartition(4, 'part')
print(df5.rdd.getNumPartitions())

4


# Ejercicios

# Creating context window feature data

In [60]:
df5.createOrReplaceTempView("text")

## Word for each row, previous two and subsequent two words

In [62]:
query = """
SELECT
part,
LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
word AS w3,
LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,
LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5
FROM text
"""
spark.sql(query).where("part = 2").show(10)

+----+-------+-------+-------+-------+-------+
|part|     w1|     w2|     w3|     w4|     w5|
+----+-------+-------+-------+-------+-------+
|   2|   null|   null|    You|  would|   have|
|   2|   null|    You|  would|   have|   done|
|   2|    You|  would|   have|   done| better|
|   2|  would|   have|   done| better|     to|
|   2|   have|   done| better|     to|   have|
|   2|   done| better|     to|   have|trusted|
|   2| better|     to|   have|trusted|    you|
|   2|     to|   have|trusted|    you|   wife|
|   2|   have|trusted|    you|   wife|     It|
|   2|trusted|    you|   wife|     It|    was|
+----+-------+-------+-------+-------+-------+
only showing top 10 rows



In [63]:
df5.show()

+--------+-----+---------+----+
|    word|   id|    title|part|
+--------+-----+---------+----+
|     You|50000|Chapter 2|   2|
|   would|50001|Chapter 2|   2|
|    have|50002|Chapter 2|   2|
|    done|50003|Chapter 2|   2|
|  better|50004|Chapter 2|   2|
|      to|50005|Chapter 2|   2|
|    have|50006|Chapter 2|   2|
| trusted|50007|Chapter 2|   2|
|     you|50008|Chapter 2|   2|
|    wife|50009|Chapter 2|   2|
|      It|50010|Chapter 2|   2|
|     was|50011|Chapter 2|   2|
|     not|50012|Chapter 2|   2|
|     the|50013|Chapter 2|   2|
|   wife;|50014|Chapter 2|   2|
|      it|50015|Chapter 2|   2|
|     was|50016|Chapter 2|   2|
|     the|50017|Chapter 2|   2|
|children|50018|Chapter 2|   2|
| groaned|50019|Chapter 2|   2|
+--------+-----+---------+----+
only showing top 20 rows



## Finding common word sequences

### Find the top 10 sequences of five words

In [64]:
query = """
SELECT w1, w2, w3, w4, w5, COUNT(*) AS count FROM (
   SELECT word AS w1,
   LEAD(word,1) OVER(PARTITION BY part ORDER BY id) AS w2,
   LEAD(word,2) OVER(PARTITION BY part ORDER BY id) AS w3,
   LEAD(word,3) OVER(PARTITION BY part ORDER BY id) AS w4,
   LEAD(word,4) OVER(PARTITION BY part ORDER BY id) AS w5
   FROM text
)
GROUP BY w1, w2, w3, w4, w5
ORDER BY count DESC
LIMIT 10
"""
df = spark.sql(query)
df.show()

+-------+---------+--------+-------+----------+-----+
|     w1|       w2|      w3|     w4|        w5|count|
+-------+---------+--------+-------+----------+-----+
|History|       of|     the| United|    States|   57|
|     in|      the|  region|     of|       the|   36|
|Project|Gutenberg|Literary|Archive|Foundation|   35|
|     of|      the|  United| States|        pp|   31|
|     in|      the|  middle|     of|       the|   27|
|    the|    other|    side|     of|       the|   25|
|     on|      the|    same|  lines|        as|   25|
|     in|      the|    case|     of|       the|   25|
|     up|      and|    down|    the|      room|   23|
|    and|       at|     the|   same|      time|   23|
+-------+---------+--------+-------+----------+-----+



### Unique 5-tuples sorted in descending order

In [65]:
query = """
SELECT DISTINCT w1, w2, w3, w4, w5 FROM (
   SELECT word AS w1,
   LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
   LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3,
   LEAD(word,3) OVER(PARTITION BY part ORDER BY id ) AS w4,
   LEAD(word,4) OVER(PARTITION BY part ORDER BY id ) AS w5
   FROM text
)
ORDER BY w1 DESC, w2 DESC, w3 DESC, w4 DESC, w5 DESC 
LIMIT 10
"""
df = spark.sql(query)
df.show()

+---------+---------+-------+----------+------------+
|       w1|       w2|     w3|        w4|          w5|
+---------+---------+-------+----------+------------+
|        ~| asterisk|    and| underline|  characters|
|zygomatic|      and|frontal|     bones|         the|
|   zygoma|       in|  front|        of|         the|
|       zu|     sein|   Vera|        at|         the|
|       zu|schwachen|     so|      kann|         man|
|  zoology|      was|    not|    merely|acknowledged|
|  zoology|      for|     in|         a|        frog|
|  zoology|      and|     so|        on|        just|
|zone--not|      the|    red|margin--an|  artificial|
|     zone|    which|   lies|     about|        half|
+---------+---------+-------+----------+------------+



###   Most frequent 3-tuple per chapter  

In [69]:
subquery = """
SELECT title, w1, w2, w3, COUNT(*) as count
FROM
(
    SELECT
    title,
    word AS w1,
    LEAD(word, 1) OVER(PARTITION BY title ORDER BY id ) AS w2,
    LEAD(word, 2) OVER(PARTITION BY title ORDER BY id ) AS w3
    FROM text
)
GROUP BY title, w1, w2, w3
ORDER BY title, count DESC
"""
spark.sql(subquery).show(5)

+---------+-------+---+-----+-----+
|    title|     w1| w2|   w3|count|
+---------+-------+---+-----+-----+
|Chapter 1|   that| he|  was|   16|
|Chapter 1|Neville| St|Clair|   14|
|Chapter 1|      I| do|  not|   13|
|Chapter 1|   that| he|  had|   11|
|Chapter 1|    one| of|  the|   10|
+---------+-------+---+-----+-----+
only showing top 5 rows



In [70]:
query = """
SELECT title, w1, w2, w3, count FROM
(
  SELECT
  title,
  ROW_NUMBER() OVER (PARTITION BY title ORDER BY count DESC) AS row,
  w1, w2, w3, count
  FROM ( %s )
)
WHERE row = 1
ORDER BY title ASC
""" % subquery

spark.sql(query).show()

+---------+----+------+------+-----+
|    title|  w1|    w2|    w3|count|
+---------+----+------+------+-----+
|Chapter 1|that|    he|   was|   16|
|Chapter 2|that|    it|   was|   14|
|Chapter 3| the|United|States|  396|
|  Preface| one|    of|   the|   14|
+---------+----+------+------+-----+

