In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 49.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=f155ed292c5391de5329ac4a30b5f27fd4003fdec9f25007848efacba899a06f
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext()
spark = SparkSession.builder.master('local[*]').appName('first_spark_application').getOrCreate()

# Thanks to Mike Metzger, a good instructor in DC
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [6]:
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")
spark.sql("SELECT * FROM schedule WHERE station= 'San Jose'").show()

+--------+--------+-----+--------+
|train_id| station| time|diff_min|
+--------+--------+-----+--------+
|     324|San Jose|9:05a|    null|
|     217|San Jose|6:59a|    null|
+--------+--------+-----+--------+



In [7]:
# Inspect Schema
result1 = spark.sql("SHOW COLUMNS FROM schedule")
result2 = spark.sql("SELECT * FROM schedule LIMIT 0")
result3 = spark.sql("DESCRIBE schedule")
for i in [result1, result2, result3]:
    i.show()

+--------+
|col_name|
+--------+
|train_id|
| station|
|    time|
|diff_min|
+--------+

+--------+-------+----+--------+
|train_id|station|time|diff_min|
+--------+-------+----+--------+
+--------+-------+----+--------+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|train_id|      int|   null|
| station|   string|   null|
|    time|   string|   null|
|diff_min|   string|   null|
+--------+---------+-------+



In [8]:
#OVER(ORDER BY)
query = """SELECT *, 
                  LEAD(time, 1) OVER(ORDER BY time) AS time_next
           FROM schedule 
           WHERE train_id = 324"""
spark.sql(query).show()

query = """SELECT *,
                  LEAD(time, 1) OVER(PARTITION BY train_id ORDER BY time) AS time_next
                  FROM schedule
                  ORDER BY train_id, time"""
spark.sql(query).show() 
                 

+--------+-------------+-----+--------+---------+
|train_id|      station| time|diff_min|time_next|
+--------+-------------+-----+--------+---------+
|     324|San Francisco|7:59a|       4|    8:03a|
|     324|  22nd Street|8:03a|      13|    8:16a|
|     324|     Millbrae|8:16a|       8|    8:24a|
|     324|    Hillsdale|8:24a|       7|    8:31a|
|     324| Redwood City|8:31a|       6|    8:37a|
|     324|    Palo Alto|8:37a|      28|    9:05a|
|     324|     San Jose|9:05a|    null|     null|
+--------+-------------+-----+--------+---------+

+--------+-------------+-----+--------+---------+
|train_id|      station| time|diff_min|time_next|
+--------+-------------+-----+--------+---------+
|     217|       Gilroy|6:06a|       9|    6:15a|
|     217|   San Martin|6:15a|       6|    6:21a|
|     217|  Morgan Hill|6:21a|      15|    6:36a|
|     217| Blossom Hill|6:36a|       6|    6:42a|
|     217|      Capitol|6:42a|       8|    6:50a|
|     217|       Tamien|6:50a|       9|    6:59a|

In [19]:
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")
query = """SELECT *,
                  LEAD(time, 1) OVER(PARTITION BY train_id ORDER BY time) AS time_next
                  FROM schedule
                  ORDER BY train_id, time"""

df = spark.sql(query)
df = df[["train_id", "station", "time", "time_next", "diff_min"]]
df.show()

# Running sums using window function SQL
query = """
SELECT *, 
SUM(diff_min) OVER (PARTITION BY train_id ORDER BY time) AS running_total
FROM schedule
ORDER BY train_id, time
"""
spark.sql(query).show()

# PARTITION BY is important
query = """
SELECT 
ROW_NUMBER() OVER (PARTITION BY train_id ORDER BY time) AS row,
train_id, 
station, 
time, 
LEAD(time,1) OVER (PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
ORDER BY train_id, time
"""
spark.sql(query).show()

+--------+-------------+-----+---------+--------+
|train_id|      station| time|time_next|diff_min|
+--------+-------------+-----+---------+--------+
|     217|       Gilroy|6:06a|    6:15a|       9|
|     217|   San Martin|6:15a|    6:21a|       6|
|     217|  Morgan Hill|6:21a|    6:36a|      15|
|     217| Blossom Hill|6:36a|    6:42a|       6|
|     217|      Capitol|6:42a|    6:50a|       8|
|     217|       Tamien|6:50a|    6:59a|       9|
|     217|     San Jose|6:59a|     null|    null|
|     324|San Francisco|7:59a|    8:03a|       4|
|     324|  22nd Street|8:03a|    8:16a|      13|
|     324|     Millbrae|8:16a|    8:24a|       8|
|     324|    Hillsdale|8:24a|    8:31a|       7|
|     324| Redwood City|8:31a|    8:37a|       6|
|     324|    Palo Alto|8:37a|    9:05a|      28|
|     324|     San Jose|9:05a|     null|    null|
+--------+-------------+-----+---------+--------+

+--------+-------------+-----+--------+-------------+
|train_id|      station| time|diff_min|runnin

In [39]:
# Select column using col
from pyspark.sql.functions import col
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")
df.select(col("train_id"), col("station")).show(3)
df[[col("train_id"), col("time")]].show(3)

# Rename column using col
df[[col("train_id").alias("train"), "station"]].show(3)

# Using SQL
spark.sql("SELECT train_id AS train, station FROM schedule ORDER BY train_id LIMIT 3").show()

+--------+-------------+
|train_id|      station|
+--------+-------------+
|     324|San Francisco|
|     324|  22nd Street|
|     324|     Millbrae|
+--------+-------------+
only showing top 3 rows

+--------+-----+
|train_id| time|
+--------+-----+
|     324|7:59a|
|     324|8:03a|
|     324|8:16a|
+--------+-----+
only showing top 3 rows

+-----+-------------+
|train|      station|
+-----+-------------+
|  324|San Francisco|
|  324|  22nd Street|
|  324|     Millbrae|
+-----+-------------+
only showing top 3 rows

+-----+-----------+
|train|    station|
+-----+-----------+
|  217|     Gilroy|
|  217|Morgan Hill|
|  217| San Martin|
+-----+-----------+



In [35]:
# .agg in pyspark is only able to do a single aggregation on each column at a time.

from pyspark.sql.functions import col
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")
spark.sql('SELECT train_id, MIN(time), MAX(time) FROM schedule GROUP BY train_id').show()
df.groupBy('train_id').agg({'time':'min', 'time':'max'}).show()


# Aggregating the same column twice
from pyspark.sql.functions import min, max, col
expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
dot_df = df.groupBy("train_id").agg(*expr)
dot_df.show()



+--------+---------+---------+
|train_id|min(time)|max(time)|
+--------+---------+---------+
|     324|    7:59a|    9:05a|
|     217|    6:06a|    6:59a|
+--------+---------+---------+

+--------+---------+
|train_id|max(time)|
+--------+---------+
|     324|    9:05a|
|     217|    6:59a|
+--------+---------+

+--------+-----+-----+
|train_id|start|  end|
+--------+-----+-----+
|     324|7:59a|9:05a|
|     217|6:06a|6:59a|
+--------+-----+-----+



In [2]:
# Window functions using dot notation
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")

from pyspark.sql import Window
from pyspark.sql.functions import row_number, lead
df.withColumn("id", row_number().over(Window.partitionBy("train_id").orderBy("time"))).show()
df.withColumn("next", lead("time", 1).over(Window.partitionBy("train_id").orderBy("time"))).show()

# Write a SQL query giving a result identical to dot_df
query = "SELECT train_id, MIN(time) AS start, MAX(time) AS end FROM schedule GROUP BY train_id"
sql_df = spark.sql(query)
sql_df.show()

# window function query uses dot notation
from pyspark.sql.functions import unix_timestamp
window = Window.partitionBy('train_id').orderBy('time')
dot_df = df.withColumn('diff_min_cal', (unix_timestamp(lead('time', 1).over(window),'H:mm') - unix_timestamp('time', 'H:mm'))/60)
dot_df.show()

# Using SQL
query = """
SELECT *, 
(UNIX_TIMESTAMP(LEAD(time, 1) OVER(PARTITION BY train_id ORDER BY time),'H:m')-UNIX_TIMESTAMP(time, 'H:m'))/60 AS diff_min_cal 
FROM schedule 
"""
spark.sql(query).show()

+--------+-------------+-----+--------+---+
|train_id|      station| time|diff_min| id|
+--------+-------------+-----+--------+---+
|     324|San Francisco|7:59a|       4|  1|
|     324|  22nd Street|8:03a|      13|  2|
|     324|     Millbrae|8:16a|       8|  3|
|     324|    Hillsdale|8:24a|       7|  4|
|     324| Redwood City|8:31a|       6|  5|
|     324|    Palo Alto|8:37a|      28|  6|
|     324|     San Jose|9:05a|    null|  7|
|     217|       Gilroy|6:06a|       9|  1|
|     217|   San Martin|6:15a|       6|  2|
|     217|  Morgan Hill|6:21a|      15|  3|
|     217| Blossom Hill|6:36a|       6|  4|
|     217|      Capitol|6:42a|       8|  5|
|     217|       Tamien|6:50a|       9|  6|
|     217|     San Jose|6:59a|    null|  7|
+--------+-------------+-----+--------+---+

+--------+-------------+-----+--------+-----+
|train_id|      station| time|diff_min| next|
+--------+-------------+-----+--------+-----+
|     324|San Francisco|7:59a|       4|8:03a|
|     324|  22nd Street

In [4]:
### The issue of time in this data set is the letter 'a'
### Here is how to fix it without settng spark config.  
### Note specifically the format string change of H:ma. The a represents either am or pm.
### If you set config in Spark 3 then it doesn't matter whether it is H:ma or H:m.

from pyspark.sql.functions import concat, lit
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.withColumn('fixed_time', concat(df["time"], lit('m'))).createOrReplaceTempView("schedule")

query = """
SELECT *, (UNIX_TIMESTAMP(LEAD(fixed_time, 1) OVER (PARTITION BY train_id ORDER BY fixed_time), 'H:ma') - UNIX_TIMESTAMP(fixed_time, 'H:ma'))/60 AS diff_min
FROM schedule
"""
spark.sql(query).show()


+--------+-------------+-----+--------+----------+--------+
|train_id|      station| time|diff_min|fixed_time|diff_min|
+--------+-------------+-----+--------+----------+--------+
|     324|San Francisco|7:59a|       4|    7:59am|     4.0|
|     324|  22nd Street|8:03a|      13|    8:03am|    13.0|
|     324|     Millbrae|8:16a|       8|    8:16am|     8.0|
|     324|    Hillsdale|8:24a|       7|    8:24am|     7.0|
|     324| Redwood City|8:31a|       6|    8:31am|     6.0|
|     324|    Palo Alto|8:37a|      28|    8:37am|    28.0|
|     324|     San Jose|9:05a|    null|    9:05am|    null|
|     217|       Gilroy|6:06a|       9|    6:06am|     9.0|
|     217|   San Martin|6:15a|       6|    6:15am|     6.0|
|     217|  Morgan Hill|6:21a|      15|    6:21am|    15.0|
|     217| Blossom Hill|6:36a|       6|    6:36am|     6.0|
|     217|      Capitol|6:42a|       8|    6:42am|     8.0|
|     217|       Tamien|6:50a|       9|    6:50am|     9.0|
|     217|     San Jose|6:59a|    null| 

In [43]:
### Loading text data
from pyspark.sql.functions import col, lower, regexp_replace, split, explode, length, monotonically_increasing_id, when
df2 = spark.read.text("sherlock.txt")
df3 = spark.read.load("sherlock.parquet")
df2.createOrReplaceTempView("scherlock")

# Get the first row
print(df2.first())

# Count the number of rows
print(df2.count())

# Show the first 15 rows
df2.show(15, truncate=False)

# Lower case operation
display(df2.select(lower(col("value"))).first())

print("\nText file is stored as dataframe in one column of the following name:")
display(df2.columns)

# Using regexp_replace: 1st arg is column name, 2nd arg is the pattern to be replaced, 3rd is what to replace.
df2[[regexp_replace("value", "eBook\.", "eBook").alias("Renamed_column")]].show(15, truncate=False)

# Tokenizing text
df2[[split("value", '[ ]').alias('words')]].show(truncate=False)
# df2[[split("value", '').alias('words')]].show(truncate=False) ก็ได้
punctuation = "_|.\?\!\",\'\[\]\*()#"
df3 = df2[[split("value", '[ %s]'%punctuation).alias('words without punctuation')]]
df3.show(truncate=False)

# Explode 
df4 = df3[[explode("words without punctuation").alias("word")]]
df4.show(truncate=False)

# Removing empty rows
nonblank_df = df4.where(length("word")>0)
nonblank_df.show()

# Adding a row id column
nonblank_df = nonblank_df[["word", monotonically_increasing_id().alias("id")]]
nonblank_df.show()

# Partitioning the data
nonblank_df = nonblank_df.withColumn("title", when(nonblank_df["id"]<25000, "Preface").when(nonblank_df["id"]<50000, "Chapter 1").when(nonblank_df["id"]<75000, "Chapter 2").otherwise("Chapter 3"))
nonblank_df.show()

# Repartion on a column
print("\n", f"Number of partitions before repartitioning = {nonblank_df.rdd.getNumPartitions()}.")
nonblank_df = nonblank_df.repartition(4, "title")
print("\n", f"Number of partitions after repartitioning = {nonblank_df.rdd.getNumPartitions()}.")

Row(value='The Project Gutenberg EBook of The Adventures of Sherlock Holmes')
128457
+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
|The Project Gutenberg EBook of The Adventures of Sherlock Holmes    |
|by Sir Arthur Conan Doyle                                           |
|(#15 in our series by Sir Arthur Conan Doyle)                       |
|                                                                    |
|Copyright laws are changing all over the world. Be sure to check the|
|copyright laws for your country before downloading or redistributing|
|this or any other Project Gutenberg eBook.                          |
|                                                                    |
|This header should be the first thing seen when viewing this Project|
|Gutenberg file.  Please do not remove it.  Do not change or ed

Row(lower(value)='the project gutenberg ebook of the adventures of sherlock holmes')


Text file is stored as dataframe in one column of the following name:


['value']

+--------------------------------------------------------------------+
|Renamed_column                                                      |
+--------------------------------------------------------------------+
|The Project Gutenberg EBook of The Adventures of Sherlock Holmes    |
|by Sir Arthur Conan Doyle                                           |
|(#15 in our series by Sir Arthur Conan Doyle)                       |
|                                                                    |
|Copyright laws are changing all over the world. Be sure to check the|
|copyright laws for your country before downloading or redistributing|
|this or any other Project Gutenberg eBook                           |
|                                                                    |
|This header should be the first thing seen when viewing this Project|
|Gutenberg file.  Please do not remove it.  Do not change or edit the|
|header without written permission.                                  |
|     

In [62]:
# A moving window query
### Loading text data
from pyspark.sql.functions import col, lower, regexp_replace, split, explode, length, monotonically_increasing_id, when
df2 = spark.read.text("sherlock.txt")
df2.createOrReplaceTempView("scherlock")
punctuation = "_|.\?\!\",\'\[\]\*()#"
df3 = df2[[split("value", '[ %s]'%punctuation).alias('words')]]
df4 = df3[[explode("words").alias("word")]]
df = df4.where(length("word")>0)
df = df[["word", monotonically_increasing_id().alias("id")]]
df = df.withColumn("part", when(df["id"]<92210, 1).when(df["id"]<2*92210, 2).when(df["id"]<3*92210, 3).when(df["id"]<4*92210, 4).when(df["id"]<5*92210, 5).when(df["id"]<6*92210, 6).when(df["id"]<7*92210, 7).when(df["id"]<8*92210, 8).when(df["id"]<9*92210, 9).when(df["id"]<10*92210, 10).when(df["id"]<11*92210, 11).otherwise(12))
print("\n", f"Number of partitions before repartitioning = {df.rdd.getNumPartitions()}.")
df = df.repartition(12, "part")
print("\n", f"Number of partitions after repartitioning = {df.rdd.getNumPartitions()}.")
df.show(3)
df.createOrReplaceTempView("df")
query = """SELECT id, 
                  word AS w1,
                  LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
                  LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3
           FROM df"""
spark.sql(query).sort("id").show(3)

print("Using lag query")
lag_query = """SELECT id, 
                      LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
                      LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
                      word AS w3
                      FROM df
                      ORDER BY id"""
spark.sql(lag_query).show(3)

# Suppose we want to look at part 2
lag_query = """SELECT id, 
                      LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
                      LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
                      word AS w3
                      FROM df
                      WHERE part = 2
                      ORDER BY id"""
spark.sql(lag_query).show(3)


 Number of partitions before repartitioning = 2.

 Number of partitions after repartitioning = 12.
+---------+------+----+
|     word|    id|part|
+---------+------+----+
|flattened|461050|   6|
|   nodule|461051|   6|
|       in|461052|   6|
+---------+------+----+
only showing top 3 rows

+---+---------+---------+---------+
| id|       w1|       w2|       w3|
+---+---------+---------+---------+
|  0|      The|  Project|Gutenberg|
|  1|  Project|Gutenberg|    EBook|
|  2|Gutenberg|    EBook|       of|
+---+---------+---------+---------+
only showing top 3 rows

Using lag query
+---+----+-------+---------+
| id|  w1|     w2|       w3|
+---+----+-------+---------+
|  0|null|   null|      The|
|  1|null|    The|  Project|
|  2| The|Project|Gutenberg|
+---+----+-------+---------+
only showing top 3 rows

+-----+------+-------+-------+
|   id|    w1|     w2|     w3|
+-----+------+-------+-------+
|92210|  null|   null| helped|
|92211|  null| helped|himself|
|92212|helped|himself|     to|


In [74]:
from pyspark.sql.functions import col, lower, regexp_replace, split, explode, length, monotonically_increasing_id, when
df2 = spark.read.text("sherlock.txt")
df2.createOrReplaceTempView("scherlock")
punctuation = "_|.\?\!\",\'\[\]\*()#"
df3 = df2[[split("value", '[ %s]'%punctuation).alias('words')]]
df4 = df3[[explode("words").alias("word")]]
df = df4.where(length("word")>0)
df = df[["word", monotonically_increasing_id().alias("id")]]
df = df.withColumn("part", when(df["id"]<92210, 1).when(df["id"]<2*92210, 2).when(df["id"]<3*92210, 3).when(df["id"]<4*92210, 4).when(df["id"]<5*92210, 5).when(df["id"]<6*92210, 6).when(df["id"]<7*92210, 7).when(df["id"]<8*92210, 8).when(df["id"]<9*92210, 9).when(df["id"]<10*92210, 10).when(df["id"]<11*92210, 11).otherwise(12))
print("\n", f"Number of partitions before repartitioning = {df.rdd.getNumPartitions()}.")
df = df.repartition(12, "part")
print("\n", f"Number of partitions after repartitioning = {df.rdd.getNumPartitions()}.")
df.show(3)
df.createOrReplaceTempView("df")

query = """SELECT w1, w2, w3, COUNT(*) as count
           FROM (SELECT word AS w1,
                        LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
                        LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3
                 FROM df)
           GROUP BY w1, w2, w3
           ORDER BY count DESC"""

spark.sql(query).show()

query = """SELECT w1, w2, w3, length(w1)+length(w2)+length(w3) as length
           FROM (SELECT word AS w1,
                        LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
                        LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3
                 FROM df
                 WHERE part <> 1 AND part <> 2 AND part <> 12) 
           GROUP BY w1, w2, w3
           ORDER BY length DESC"""

spark.sql(query).show()


 Number of partitions before repartitioning = 2.

 Number of partitions after repartitioning = 12.
+---------+------+----+
|     word|    id|part|
+---------+------+----+
|flattened|461050|   6|
|   nodule|461051|   6|
|       in|461052|   6|
+---------+------+----+
only showing top 3 rows

+---------+---------+------+-----+
|       w1|       w2|    w3|count|
+---------+---------+------+-----+
|      the|   United|States|  397|
|      one|       of|   the|  335|
|      out|       of|   the|  245|
|       of|      the|United|  235|
|        I|      don|     t|  217|
|     that|       he|   was|  194|
|     that|       it|   was|  181|
|      and|       in|   the|  180|
|      met|     with|    in|  174|
|     part|       of|   the|  160|
|       he|      did|   not|  159|
|       up|       to|   the|  159|
|      the|formation|    of|  152|
|       in|    front|    of|  145|
|commander|       in| chief|  144|
|       as|     well|    as|  144|
|      the|      end|    of|  144|
|      

In [85]:
query = """SELECT w1, w2, w3, w4, w5, COUNT(*) AS count 
           FROM (SELECT word AS w1,
                        LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
                        LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3,
                        LEAD(word, 3) OVER(PARTITION BY part ORDER BY id) AS w4,
                        LEAD(word, 4) OVER(PARTITION BY part ORDER BY id) AS w5
                 FROM df)
GROUP BY w1, w2, w3, w4, w5
ORDER BY count DESC
LIMIT 10
"""
spark.sql(query).show()

# Unique 5-tuples sorted in descending order
query = """
SELECT DISTINCT w1, w2, w3, w4, w5 FROM (
   SELECT word AS w1,
   LEAD(word, 1) OVER(PARTITION BY part ORDER BY id ) AS w2,
   LEAD(word, 2) OVER(PARTITION BY part ORDER BY id ) AS w3,
   LEAD(word, 3) OVER(PARTITION BY part ORDER BY id ) AS w4,
   LEAD(word, 4) OVER(PARTITION BY part ORDER BY id ) AS w5
   FROM df
)
ORDER BY w1 DESC, w2 DESC, w3 DESC, w4 DESC, w5 DESC 
LIMIT 10
"""
spark.sql(query).show()



+-------+---------+--------+-------+----------+-----+
|     w1|       w2|      w3|     w4|        w5|count|
+-------+---------+--------+-------+----------+-----+
|History|       of|     the| United|    States|   57|
|     in|      the|  region|     of|       the|   36|
|Project|Gutenberg|Literary|Archive|Foundation|   35|
|     of|      the|  United| States|        pp|   31|
|     in|      the|  middle|     of|       the|   27|
|    the|    other|    side|     of|       the|   25|
|     on|      the|    same|  lines|        as|   25|
|     in|      the|    case|     of|       the|   25|
|     up|      and|    down|    the|      room|   23|
|    and|       at|     the|   same|      time|   23|
+-------+---------+--------+-------+----------+-----+

+---------+---------+-------+----------+------------+
|       w1|       w2|     w3|        w4|          w5|
+---------+---------+-------+----------+------------+
|        ~| asterisk|    and| underline|  characters|
|zygomatic|      and|fronta

In [95]:
#   Most frequent 3-tuple per chapter
subquery = """SELECT part, w1, w2, w3, COUNT(*) as count 
              FROM(SELECT part, 
                          word AS w1,
                          LEAD(word, 1) OVER(PARTITION BY part ORDER BY id ) AS w2,
                          LEAD(word, 2) OVER(PARTITION BY part ORDER BY id ) AS w3
                          FROM df)
              GROUP BY part, w1, w2, w3
              ORDER BY part, count DESC"""
spark.sql(subquery).show()

query = """SELECT part, w1, w2, w3, count 
           FROM (SELECT part,
                        ROW_NUMBER() OVER (PARTITION BY part ORDER BY count DESC) AS row,
                        w1, w2, w3, count
                 FROM ( %s ))
           WHERE row = 1
           ORDER BY part ASC""" %subquery
spark.sql(query).show()

+----+-----+-----+-----+-----+
|part|   w1|   w2|   w3|count|
+----+-----+-----+-----+-----+
|   1|    I|think| that|   42|
|   1|  one|   of|  the|   38|
|   1|  out|   of|  the|   33|
|   1| that|   it|  was|   31|
|   1| that|    I| have|   29|
|   1| Lord|   St|Simon|   28|
|   1|   It|  was|    a|   28|
|   1| that|   it|   is|   27|
|   1|    I|   do|  not|   27|
|   1| that|   he|  was|   26|
|   1|   It|   is|    a|   24|
|   1|think| that|    I|   23|
|   1| that|   he|  had|   23|
|   1|    I| have| been|   21|
|   1| that|    I|   am|   21|
|   1|    I| have|   no|   21|
|   1| that|    I|  had|   21|
|   1|    I|could|  not|   21|
|   1|    a|  man|  who|   20|
|   1| that|    I|  was|   20|
+----+-----+-----+-----+-----+
only showing top 20 rows

+----+---------+------+------+-----+
|part|       w1|    w2|    w3|count|
+----+---------+------+------+-----+
|   1|        I| think|  that|   42|
|   2|      the|United|States|   88|
|   3|      the|United|States|  169|
|   4|  

In [117]:
### Caching
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
# from pyspark.sql.functions import col, lower, regexp_replace, split, explode, length, monotonically_increasing_id, when
# df2 = spark.read.text("sherlock.txt")
# df2.createOrReplaceTempView("scherlock")
# punctuation = "_|.\?\!\",\'\[\]\*()#"
# df3 = df2[[split("value", '[ %s]'%punctuation).alias('words')]]
# df4 = df3[[explode("words").alias("word")]]
# df = df4.where(length("word")>0)
# df = df[["word", monotonically_increasing_id().alias("id")]]
# df = df.withColumn("part", when(df["id"]<92210, 1).when(df["id"]<2*92210, 2).when(df["id"]<3*92210, 3).when(df["id"]<4*92210, 4).when(df["id"]<5*92210, 5).when(df["id"]<6*92210, 6).when(df["id"]<7*92210, 7).when(df["id"]<8*92210, 8).when(df["id"]<9*92210, 9).when(df["id"]<10*92210, 10).when(df["id"]<11*92210, 11).otherwise(12))
# print(f"\nNumber of partitions before repartitioning = {df.rdd.getNumPartitions()}.")
# df = df.repartition(12, "part")
# print(f"\nNumber of partitions after repartitioning = {df.rdd.getNumPartitions()}.")
print(f"Is df cached? {df.is_cached}.")
display(df.storageLevel)
df.cache()
print(f"Is df cached? {df.is_cached}.")
display(df.storageLevel)
df.unpersist()
print(f"Is df cached? {df.is_cached}.")
display(df.storageLevel)

# When memory is scarce, use this
import pyspark
print("\nWhen memory is scarce, use this")
df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)
display(df.storageLevel)
print(f"Is df cached? {df.is_cached}.")

# Caching table in spark session
df.createOrReplaceTempView("df")
display(spark.catalog.isCached(tableName='df'))
spark.catalog.cacheTable('df')
display(spark.catalog.isCached(tableName='df'))
spark.catalog.uncacheTable('df')
display(spark.catalog.isCached(tableName='df'))

# Remove all cached tables
spark.catalog.clearCache()
print("Tables:\n", spark.catalog.listTables())

Is df cached? False.


StorageLevel(False, False, False, False, 1)

Is df cached? True.


StorageLevel(True, True, False, True, 1)

Is df cached? False.


StorageLevel(False, False, False, False, 1)


When memory is scarce, use this


StorageLevel(True, True, False, False, 1)

Is df cached? True.


False

True

False

Tables:
 [Table(name='df', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='schedule', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='scherlock', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [6]:
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")
display(spark.catalog.listTables())
spark.catalog.dropTempView("schedule")
display(spark.catalog.listTables())


[Table(name='schedule', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

[]

In [10]:
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)



2021-01-31 14:14:39,891 - INFO - Hello world
2021-01-31 14:14:39,894 - INFO - Hello world


In [14]:
# ทำไม บรรทัดที่สองไม่ออก ?

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)

2021-01-31 14:15:57,768 - INFO - Hello world


In [44]:
df = spark.read.csv("trainsched.txt", header=True, inferSchema=True, sep='\t')
df.createOrReplaceTempView("schedule")
display(spark.sql("EXPLAIN SELECT * FROM schedule").first())
df.explain()



Row(plan='== Physical Plan ==\nFileScan csv [train_id#664,station#665,time#666,diff_min#667] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/trainsched.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<train_id:int,station:string,time:string,diff_min:string>\n\n')

== Physical Plan ==
FileScan csv [train_id#664,station#665,time#666,diff_min#667] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/content/trainsched.txt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<train_id:int,station:string,time:string,diff_min:string>




In [87]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType, StringType, IntegerType, FloatType, ArrayType
from pyspark.sql.functions import col, lower, regexp_replace, split, explode, length, monotonically_increasing_id, when, size
df = spark.read.text("sherlock.txt")
df.createOrReplaceTempView("scherlock")
short_udf = udf(lambda x: True if not x or len(x)<10 else False, BooleanType())
df[[short_udf("value").alias("short_udf")]].show(5)
df.show(5, truncate=False)

# Try to replicate the data as close as that in the exercise.  Not 100% match.
punctuation = "_|.\?\!\",\'\[\]\*()#"
df = df[[split("value", '[ %s]'%punctuation).alias('words')]]
df.show(5, truncate=False)

# Creating an array udf
def remove(x):
    for i in x:
        if i == "":
            x.remove(i)
    return x
remove_udf = udf(remove, ArrayType(StringType()))
# in_udf = udf(lambda x: x[0:len(x)-1] if x and len(x)>1 and x[-1] != "" else [], ArrayType(StringType()))
# df = df.withColumn("words", in_udf("words"))
df = df.withColumn("words", remove_udf("words"))
df = df.where(size(df["words"]) > 0)
df[["words", size(df["words"])]].show(truncate=False)



+---------+
|short_udf|
+---------+
|    false|
|    false|
|    false|
|     true|
|    false|
+---------+
only showing top 5 rows

+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
|The Project Gutenberg EBook of The Adventures of Sherlock Holmes    |
|by Sir Arthur Conan Doyle                                           |
|(#15 in our series by Sir Arthur Conan Doyle)                       |
|                                                                    |
|Copyright laws are changing all over the world. Be sure to check the|
+--------------------------------------------------------------------+
only showing top 5 rows

+-----------------------------------------------------------------------------------+
|words                                                                              |
+------------------------------

In [81]:
list(set(1, 2, 3, 3, 4, 5, 3, 1, 2) - )


Help on function length in module pyspark.sql.functions:

length(col)
    Computes the character length of string data or number of bytes of binary data.
    The length of character data includes the trailing spaces. The length of binary data
    includes binary zeros.
    
    >>> spark.createDataFrame([('ABC ',)], ['a']).select(length('a').alias('length')).collect()
    [Row(length=4)]
    
    .. versionadded:: 1.5

