# Introduction to Spark SQL with Python

In [1]:
import findspark
findspark.init()

## Pyspark SQL

In this chapter you will learn how to create and query a SQL table in Spark. Spark SQL brings the expressiveness of SQL to Spark. You will also learn how to use SQL window functions in Spark. Window functions perform a calculation across rows taht are related to the current row. They greatly simplify achieving results that are difficult to express using only joins and traditional aggregations. We'll use window functions to perform running sums, running differences, and other operatios that are challenging to perform in basic SQL.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

### Create a SQL table from a dataframe

A dataframe can be used to create a **temporary table**. A *temporary table* is one that will not exist after the session ends.

In [3]:
# Load trainsched.txt
df = spark.read.csv("../data/trainsched.txt", header = True)

# create temporary tabl called schedule
df.createOrReplaceTempView("schedule")

### Determine the column names of a table

After creating a DataFrame you can query the data using SQL statements
> spark.sql("SELECT * FROM schedule WHERE station = 'San Jose'").show()

> result = spark.sql("SHOW COLUMNS FROM tablename")
<br>result = spark.swl("SELECT * FROM tablename LIMIT 0")
<br>result = spark.sql("DESCRIBE tablename")
<br>result.show()
<br>print(results.columns)

In [4]:
# inspecrt the columns in the table df
spark.sql("DESCRIBE schedule").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|train_id|   string|   null|
| station|   string|   null|
|    time|   string|   null|
+--------+---------+-------+



### What is a Window Function SQL?
- Express operations more simply than dot notation or queries
- Each row uses the values of other rows to calculate its value

### Dot notation and SQL

Pretty much a dot notation for every SQL clause, even window functions. For example:
> from pyspark.sql import Window
<br>from pyspark.sql.functions import row_number
<br>
<br>df.withColumn("id", row_number().over(Window.partitionBy('train_id').orderBy('time')))

Is the same as
>query = """
<br>SELECT *
<br>ROW_NUMBER() OVER(PARTITION BY train_id ORDER BY time) AS id
<br>FROM schedule
"""
<br>
<br>spark.sql(query).show(11)

### Aggregation, step by step

In [6]:
df.printSchema()

root
 |-- train_id: string (nullable = true)
 |-- station: string (nullable = true)
 |-- time: string (nullable = true)



In [7]:
# give the identical result in each command
spark.sql('SELECT train_id, MIN(time) AS start FROM schedule GROUP BY train_id').show()

df.groupBy(df.train_id).agg({'time':'min'}).withColumnRenamed('min(time)', 'start').show()

# print the second column of the result
spark.sql('SELECT train_id, MIN(time), MAX(time) FROM schedule GROUP BY train_id').show()

result = df.groupBy('train_id').agg({'time':'min','time':'max'})
result.show()
print(result.columns[1])

+--------+-----+
|train_id|start|
+--------+-----+
|     217|6:06a|
|     324|7:59a|
+--------+-----+

+--------+-----+
|train_id|start|
+--------+-----+
|     217|6:06a|
|     324|7:59a|
+--------+-----+

+--------+---------+---------+
|train_id|min(time)|max(time)|
+--------+---------+---------+
|     217|    6:06a|    6:59a|
|     324|    7:59a|    9:05a|
+--------+---------+---------+

+--------+---------+
|train_id|max(time)|
+--------+---------+
|     217|    6:59a|
|     324|    9:05a|
+--------+---------+

max(time)


### Aggregating the same column twice

In [8]:
from pyspark.sql.functions import min, max, col
expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
dot_df = df.groupBy("train_id").agg(*expr)
dot_df.show()

# Write a SQL query giving a result identical to dot_df
query = "SELECT train_id, MIN(time) AS start, MAX(time) AS end FROM schedule GROUP BY train_id"
sql_df = spark.sql(query)
sql_df.show()

+--------+-----+-----+
|train_id|start|  end|
+--------+-----+-----+
|     217|6:06a|6:59a|
|     324|7:59a|9:05a|
+--------+-----+-----+

+--------+-----+-----+
|train_id|start|  end|
+--------+-----+-----+
|     217|6:06a|6:59a|
|     324|7:59a|9:05a|
+--------+-----+-----+



### Aggregate dot SQL

In [9]:
df_time_next = spark.sql("""
SELECT *, 
LEAD(time,1) OVER(PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
""")
df_time_next.show()

# obtain the identical result using dot notation
from pyspark.sql.functions import lead
from pyspark.sql import Window

dot_df = df.withColumn('time_next', lead('time', 1)
                       .over(Window.partitionBy('train_id')
                            .orderBy('time')))
dot_df.show()

+--------+-------------+-----+---------+
|train_id|      station| time|time_next|
+--------+-------------+-----+---------+
|     217|       Gilroy|6:06a|    6:15a|
|     217|   San Martin|6:15a|    6:21a|
|     217|  Morgan Hill|6:21a|    6:36a|
|     217| Blossom Hill|6:36a|    6:42a|
|     217|      Capitol|6:42a|    6:50a|
|     217|       Tamien|6:50a|    6:59a|
|     217|     San Jose|6:59a|     null|
|     324|San Francisco|7:59a|    8:03a|
|     324|  22nd Street|8:03a|    8:16a|
|     324|     Millbrae|8:16a|    8:24a|
|     324|    Hillsdale|8:24a|    8:31a|
|     324| Redwood City|8:31a|    8:37a|
|     324|    Palo Alto|8:37a|    9:05a|
|     324|     San Jose|9:05a|     null|
+--------+-------------+-----+---------+

+--------+-------------+-----+---------+
|train_id|      station| time|time_next|
+--------+-------------+-----+---------+
|     217|       Gilroy|6:06a|    6:15a|
|     217|   San Martin|6:15a|    6:21a|
|     217|  Morgan Hill|6:21a|    6:36a|
|     217| Blos

### Convert window function from dot notation to SQL

In [12]:
from pyspark.sql.functions import unix_timestamp

window = Window.partitionBy('train_id').orderBy('time')
dot_df = df.withColumn('diff_min', 
                    (unix_timestamp(lead('time', 1).over(window),'H:m') 
                     - unix_timestamp('time', 'H:m'))/60)
dot_df.show()

# create a SQL query to obtain an identical result to dot_df
query = """
SELECT *,
(UNIX_TIMESTAMP(LEAD(time, 1) OVER (PARTITION BY train_id ORDER BY time), 'H:m')
- UNIX_TIMESTAMP(time, 'H:m'))/60 AS diff_min
FROM schedule
"""
sql_df = spark.sql(query)
sql_df.show()

sql_df.createOrReplaceTempView("schedule")

+--------+-------------+-----+--------+
|train_id|      station| time|diff_min|
+--------+-------------+-----+--------+
|     217|       Gilroy|6:06a|     9.0|
|     217|   San Martin|6:15a|     6.0|
|     217|  Morgan Hill|6:21a|    15.0|
|     217| Blossom Hill|6:36a|     6.0|
|     217|      Capitol|6:42a|     8.0|
|     217|       Tamien|6:50a|     9.0|
|     217|     San Jose|6:59a|    null|
|     324|San Francisco|7:59a|     4.0|
|     324|  22nd Street|8:03a|    13.0|
|     324|     Millbrae|8:16a|     8.0|
|     324|    Hillsdale|8:24a|     7.0|
|     324| Redwood City|8:31a|     6.0|
|     324|    Palo Alto|8:37a|    28.0|
|     324|     San Jose|9:05a|    null|
+--------+-------------+-----+--------+

+--------+-------------+-----+--------+
|train_id|      station| time|diff_min|
+--------+-------------+-----+--------+
|     217|       Gilroy|6:06a|     9.0|
|     217|   San Martin|6:15a|     6.0|
|     217|  Morgan Hill|6:21a|    15.0|
|     217| Blossom Hill|6:36a|     6.0|

### Running sums using window function SQL

A window function is like an aggregate function, except that it gives an output for every row in the dataset instead of a single row per group.

In [13]:
# Add col running_total that sums diff_min col in each group
query = """
SELECT train_id, station, time, diff_min,
SUM(diff_min) OVER (PARTITION BY train_id ORDER BY time) AS running_total
FROM schedule
"""

# Run the query and display the result
spark.sql(query).show()

+--------+-------------+-----+--------+-------------+
|train_id|      station| time|diff_min|running_total|
+--------+-------------+-----+--------+-------------+
|     217|       Gilroy|6:06a|     9.0|          9.0|
|     217|   San Martin|6:15a|     6.0|         15.0|
|     217|  Morgan Hill|6:21a|    15.0|         30.0|
|     217| Blossom Hill|6:36a|     6.0|         36.0|
|     217|      Capitol|6:42a|     8.0|         44.0|
|     217|       Tamien|6:50a|     9.0|         53.0|
|     217|     San Jose|6:59a|    null|         53.0|
|     324|San Francisco|7:59a|     4.0|          4.0|
|     324|  22nd Street|8:03a|    13.0|         17.0|
|     324|     Millbrae|8:16a|     8.0|         25.0|
|     324|    Hillsdale|8:24a|     7.0|         32.0|
|     324| Redwood City|8:31a|     6.0|         38.0|
|     324|    Palo Alto|8:37a|    28.0|         66.0|
|     324|     San Jose|9:05a|    null|         66.0|
+--------+-------------+-----+--------+-------------+



## Using window function sql for natural language processing

### Loading natural language text

The dataset
- **The Project Gutenberg eBook of The Adventures of Sherlock Holmes.** by Sir Arthur Conan Doyle.
-Available from gutenberg.org

Loading text use `spark.read.text`

Loading parquet
- use `spark.read.load`
- parquet is **Hadoop**'s file format to store data stuctures

Loaded text
- `.show()` *arg truncate=False allows it to print longer rows
- 'lower(col('value'))' converts column to lowercase
- `.alias` method changes column name
- `regexp_replace(colname, pattern to be replace, replacing text)` replaces text
- `split()` tokenize text
- `explode()` puts each word on its own
- `monotonically_increasing_id()` adds row id column

Partitioning data
- can use `when()` and `otherwise()` case statement to partition the data
- example
    - > df2 = df.withColumn('title', when(df.id < 25000, 'Preface').when(df.id < 50000, 'Chapter 1').when(df.id < 75000, 'Chapter 2').otherwise('Chapter 3'))

### Loading a dataframe from a parquet file

In [16]:
# load the dataframe
df = spark.read.load("../data/sherlock.parquet")

# filter and show the first 5 rows
df.filter('id > 70').show(5, truncate=False)

+------+---+
|word  |id |
+------+---+
|it    |71 |
|do    |72 |
|not   |73 |
|change|74 |
|or    |75 |
+------+---+
only showing top 5 rows



### Split and explode a text column 

In [39]:
from pyspark.sql.functions import lower, length, monotonically_increasing_id, split, explode

clause_df = spark.read.text("../data/clause.txt")
clause_df = clause_df.select(lower(col('value')).alias('clause'))
clause_df = clause_df.filter(length('clause') > 0)
clause_df = clause_df.select("clause", monotonically_increasing_id().alias("id"))

In [40]:
# split the cluase column into a column called words
split_df = clause_df.select(split('clause', ' ').alias('words'))
split_df.show(5, truncate=False)

# explode the words column into a column called word
exploded_df = split_df.select(explode('words').alias('word'))
exploded_df.show(10)

# count the resulting number of rows in exploded_df
print("\nNumber of rows: ", exploded_df.count())

+-------------------------------------------------+
|words                                            |
+-------------------------------------------------+
|[title:, the, adventures, of, sherlock, holmes]  |
|[author:, sir, arthur, conan, doyle]             |
|[release, date:, march,, 1999, , [ebook, #1661]] |
|[[most, recently, updated:, november, 29,, 2002]]|
|[edition:, 12]                                   |
+-------------------------------------------------+
only showing top 5 rows

+----------+
|      word|
+----------+
|    title:|
|       the|
|adventures|
|        of|
|  sherlock|
|    holmes|
|   author:|
|       sir|
|    arthur|
|     conan|
+----------+
only showing top 10 rows


Number of rows:  1260


### Moving window analysis

In [None]:
# close connection
spark.stop()