# PYSPARK SQL

## Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Install Pyspark

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 67.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=75481b4b9d2a7a78a8edf42e53459d77c98b0b8e97b32d46c6167c87c6c8bdb0
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


## Import SparkSession from pyspark.sql

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f01bb9e3250>


## Create a SQL table from a dataframe
A dataframe can be used to create a temporary table. A temporary table is one that will not exist after the session ends.

### Load trainsched.txt

In [None]:
filename = '/content/drive/My Drive/trainsched.txt'
df = spark.read.csv(filename, header=True)

### Create temporary table called table1

In [None]:
df.createOrReplaceTempView("schedule")

### Determine the column names of a table

Inspect table schema

In [24]:
spark.sql("SHOW COLUMNS FROM schedule").show()

spark.sql("SELECT * FROM SCHEDULE LIMIT 0").show()

spark.sql("DESCRIBE schedule").show()

+--------+
|col_name|
+--------+
|train_id|
| station|
|    time|
+--------+

+--------+-------+----+
|train_id|station|time|
+--------+-------+----+
+--------+-------+----+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|train_id|   string|   null|
| station|   string|   null|
|    time|   string|   null|
+--------+---------+-------+



## Running sums using window function SQL

### Add col time_next that gets time to next stop

In [25]:
query = """
SELECT 
ROW_NUMBER() OVER (ORDER BY time) AS row,
train_id, 
station, 
time, 
LEAD(time,1) OVER (PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
"""



### Run the query and display the result

In [26]:
spark.sql(query).show()

+---+--------+-------------+-----+---------+
|row|train_id|      station| time|time_next|
+---+--------+-------------+-----+---------+
|  1|     217|       Gilroy|6:06a|    6:15a|
|  2|     217|   San Martin|6:15a|    6:21a|
|  3|     217|  Morgan Hill|6:21a|    6:36a|
|  4|     217| Blossom Hill|6:36a|    6:42a|
|  5|     217|      Capitol|6:42a|    6:50a|
|  6|     217|       Tamien|6:50a|    6:59a|
|  7|     217|     San Jose|6:59a|     null|
|  8|     324|San Francisco|7:59a|    8:03a|
|  9|     324|  22nd Street|8:03a|    8:16a|
| 10|     324|     Millbrae|8:16a|    8:24a|
| 11|     324|    Hillsdale|8:24a|    8:31a|
| 12|     324| Redwood City|8:31a|    8:37a|
| 13|     324|    Palo Alto|8:37a|    9:05a|
| 14|     324|     San Jose|9:05a|     null|
+---+--------+-------------+-----+---------+



### Three way to select 2 columns

In [34]:
df.select('train_id', 'station').show()

+--------+-------------+
|train_id|      station|
+--------+-------------+
|     324|San Francisco|
|     324|  22nd Street|
|     324|     Millbrae|
|     324|    Hillsdale|
|     324| Redwood City|
|     324|    Palo Alto|
|     324|     San Jose|
|     217|       Gilroy|
|     217|   San Martin|
|     217|  Morgan Hill|
|     217| Blossom Hill|
|     217|      Capitol|
|     217|       Tamien|
|     217|     San Jose|
+--------+-------------+



In [36]:
df.select(df.train_id, df.station).show()

+--------+-------------+
|train_id|      station|
+--------+-------------+
|     324|San Francisco|
|     324|  22nd Street|
|     324|     Millbrae|
|     324|    Hillsdale|
|     324| Redwood City|
|     324|    Palo Alto|
|     324|     San Jose|
|     217|       Gilroy|
|     217|   San Martin|
|     217|  Morgan Hill|
|     217| Blossom Hill|
|     217|      Capitol|
|     217|       Tamien|
|     217|     San Jose|
+--------+-------------+



In [38]:
from pyspark.sql.functions import col
df.select(col('train_id'), col('station')).show()

+--------+-------------+
|train_id|      station|
+--------+-------------+
|     324|San Francisco|
|     324|  22nd Street|
|     324|     Millbrae|
|     324|    Hillsdale|
|     324| Redwood City|
|     324|    Palo Alto|
|     324|     San Jose|
|     217|       Gilroy|
|     217|   San Martin|
|     217|  Morgan Hill|
|     217| Blossom Hill|
|     217|      Capitol|
|     217|       Tamien|
|     217|     San Jose|
+--------+-------------+



### Two way to rename columns

In [40]:
df.select('train_id', 'station').withColumnRenamed('train_id', 'train').show(5)


+-----+-------------+
|train|      station|
+-----+-------------+
|  324|San Francisco|
|  324|  22nd Street|
|  324|     Millbrae|
|  324|    Hillsdale|
|  324| Redwood City|
+-----+-------------+
only showing top 5 rows



In [43]:
df.select(col('train_id').alias('train'), 'station').show()

+-----+-------------+
|train|      station|
+-----+-------------+
|  324|San Francisco|
|  324|  22nd Street|
|  324|     Millbrae|
|  324|    Hillsdale|
|  324| Redwood City|
|  324|    Palo Alto|
|  324|     San Jose|
|  217|       Gilroy|
|  217|   San Martin|
|  217|  Morgan Hill|
|  217| Blossom Hill|
|  217|      Capitol|
|  217|       Tamien|
|  217|     San Jose|
+-----+-------------+



## Windows function
* Using SQL

In [54]:
query = """
SELECT *,
ROW_NUMBER() OVER (PARTITION BY train_id ORDER BY time) AS id
FROM schedule
"""
spark.sql(query).show(11)

+--------+-------------+-----+---+
|train_id|      station| time| id|
+--------+-------------+-----+---+
|     217|       Gilroy|6:06a|  1|
|     217|   San Martin|6:15a|  2|
|     217|  Morgan Hill|6:21a|  3|
|     217| Blossom Hill|6:36a|  4|
|     217|      Capitol|6:42a|  5|
|     217|       Tamien|6:50a|  6|
|     217|     San Jose|6:59a|  7|
|     324|San Francisco|7:59a|  1|
|     324|  22nd Street|8:03a|  2|
|     324|     Millbrae|8:16a|  3|
|     324|    Hillsdale|8:24a|  4|
+--------+-------------+-----+---+
only showing top 11 rows



* Using dot notation

In [59]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number
df.withColumn("id", row_number()
.over(
    Window.partitionBy('train_id')
    .orderBy('time')
  )
).show(11)

+--------+-------------+-----+---+
|train_id|      station| time| id|
+--------+-------------+-----+---+
|     217|       Gilroy|6:06a|  1|
|     217|   San Martin|6:15a|  2|
|     217|  Morgan Hill|6:21a|  3|
|     217| Blossom Hill|6:36a|  4|
|     217|      Capitol|6:42a|  5|
|     217|       Tamien|6:50a|  6|
|     217|     San Jose|6:59a|  7|
|     324|San Francisco|7:59a|  1|
|     324|  22nd Street|8:03a|  2|
|     324|     Millbrae|8:16a|  3|
|     324|    Hillsdale|8:24a|  4|
+--------+-------------+-----+---+
only showing top 11 rows



## Using a WindowsSpec
* The **over** function in Spark SQL corresponds to a **OVER** clause in SQL
* The class **pyspark.sql.window.Window** represents the inside of an **OVER** clause

In [63]:
from pyspark.sql import Window
from pyspark.sql.functions import lead  

window = Window.partitionBy('train_id').orderBy('time')
dfx = df.withColumn('next', lead('time', 1).over(window))
dfx.show()

+--------+-------------+-----+-----+
|train_id|      station| time| next|
+--------+-------------+-----+-----+
|     217|       Gilroy|6:06a|6:15a|
|     217|   San Martin|6:15a|6:21a|
|     217|  Morgan Hill|6:21a|6:36a|
|     217| Blossom Hill|6:36a|6:42a|
|     217|      Capitol|6:42a|6:50a|
|     217|       Tamien|6:50a|6:59a|
|     217|     San Jose|6:59a| null|
|     324|San Francisco|7:59a|8:03a|
|     324|  22nd Street|8:03a|8:16a|
|     324|     Millbrae|8:16a|8:24a|
|     324|    Hillsdale|8:24a|8:31a|
|     324| Redwood City|8:31a|8:37a|
|     324|    Palo Alto|8:37a|9:05a|
|     324|     San Jose|9:05a| null|
+--------+-------------+-----+-----+



## SQL queries using dot notation
### Excercise 1
* Using SQL

In [45]:
spark.sql('SELECT train_id AS train, station FROM schedule LIMIT 5').show()

+-----+-------------+
|train|      station|
+-----+-------------+
|  324|San Francisco|
|  324|  22nd Street|
|  324|     Millbrae|
|  324|    Hillsdale|
|  324| Redwood City|
+-----+-------------+



* Using dot notation

In [48]:
df.select(col('train_id').alias('train'), 'station').limit(5).show()

+-----+-------------+
|train|      station|
+-----+-------------+
|  324|San Francisco|
|  324|  22nd Street|
|  324|     Millbrae|
|  324|    Hillsdale|
|  324| Redwood City|
+-----+-------------+



### Excercise 2. 
#### Aggregations

Give the identical result in each command
* Using SQL

In [27]:

spark.sql('SELECT train_id, MIN(time) AS start FROM schedule GROUP BY train_id').show()

+--------+-----+
|train_id|start|
+--------+-----+
|     217|6:06a|
|     324|7:59a|
+--------+-----+



* Using dot notation

In [28]:
df.groupBy('train_id').agg({'time':'min'}).withColumnRenamed('min(time)', 'start').show()

+--------+-----+
|train_id|start|
+--------+-----+
|     217|6:06a|
|     324|7:59a|
+--------+-----+



Print the second column of the result
* Using SQL

In [29]:
spark.sql('SELECT train_id, MIN(time), MAX(time) FROM schedule GROUP BY train_id').show()

+--------+---------+---------+
|train_id|min(time)|max(time)|
+--------+---------+---------+
|     217|    6:06a|    6:59a|
|     324|    7:59a|    9:05a|
+--------+---------+---------+



* Using dot notation

In [33]:
result = df.groupBy('train_id').agg({'time':'min', 'time':'max'})
result.show()
print(result.columns[1])

+--------+---------+
|train_id|max(time)|
+--------+---------+
|     217|    6:59a|
|     324|    9:05a|
+--------+---------+

max(time)


### Excercise 3
#### Aggregating the same column twice
* Using dot notation


In [64]:
from pyspark.sql.functions import min, max, col
expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
dot_df = df.groupBy("train_id").agg(*expr)
dot_df.show()

+--------+-----+-----+
|train_id|start|  end|
+--------+-----+-----+
|     217|6:06a|6:59a|
|     324|7:59a|9:05a|
+--------+-----+-----+



* Using SQL

In [65]:
query = "SELECT train_id, MIN(time) as start, MAX(time) as end FROM schedule GROUP BY train_id"
sql_df = spark.sql(query)
sql_df.show()

+--------+-----+-----+
|train_id|start|  end|
+--------+-----+-----+
|     217|6:06a|6:59a|
|     324|7:59a|9:05a|
+--------+-----+-----+



### Excercise 4
* Using SQL

In [66]:
df = spark.sql("""
SELECT *, 
LEAD(time,1) OVER(PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
""").show()

+--------+-------------+-----+---------+
|train_id|      station| time|time_next|
+--------+-------------+-----+---------+
|     217|       Gilroy|6:06a|    6:15a|
|     217|   San Martin|6:15a|    6:21a|
|     217|  Morgan Hill|6:21a|    6:36a|
|     217| Blossom Hill|6:36a|    6:42a|
|     217|      Capitol|6:42a|    6:50a|
|     217|       Tamien|6:50a|    6:59a|
|     217|     San Jose|6:59a|     null|
|     324|San Francisco|7:59a|    8:03a|
|     324|  22nd Street|8:03a|    8:16a|
|     324|     Millbrae|8:16a|    8:24a|
|     324|    Hillsdale|8:24a|    8:31a|
|     324| Redwood City|8:31a|    8:37a|
|     324|    Palo Alto|8:37a|    9:05a|
|     324|     San Jose|9:05a|     null|
+--------+-------------+-----+---------+



* Using Dot notation

In [70]:
filename = '/content/drive/My Drive/trainsched.txt'
df = spark.read.csv(filename, header=True)

dot_df = df.withColumn('time_next', lead('time', 1)
.over(Window.partitionBy('train_id')
.orderBy('time'))).show()

+--------+-------------+-----+---------+
|train_id|      station| time|time_next|
+--------+-------------+-----+---------+
|     217|       Gilroy|6:06a|    6:15a|
|     217|   San Martin|6:15a|    6:21a|
|     217|  Morgan Hill|6:21a|    6:36a|
|     217| Blossom Hill|6:36a|    6:42a|
|     217|      Capitol|6:42a|    6:50a|
|     217|       Tamien|6:50a|    6:59a|
|     217|     San Jose|6:59a|     null|
|     324|San Francisco|7:59a|    8:03a|
|     324|  22nd Street|8:03a|    8:16a|
|     324|     Millbrae|8:16a|    8:24a|
|     324|    Hillsdale|8:24a|    8:31a|
|     324| Redwood City|8:31a|    8:37a|
|     324|    Palo Alto|8:37a|    9:05a|
|     324|     San Jose|9:05a|     null|
+--------+-------------+-----+---------+



### Excercise 5
* Using dot notation


In [79]:
from pyspark.sql.functions import unix_timestamp
from pyspark.sql import Window

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
window = Window.partitionBy('train_id').orderBy('time')
dot_df = df.withColumn('diff_min', 
                    (unix_timestamp(lead('time', 1).over(window),'H:m') 
                     - unix_timestamp('time', 'H:m'))/60).show()


+--------+-------------+-----+--------+
|train_id|      station| time|diff_min|
+--------+-------------+-----+--------+
|     217|       Gilroy|6:06a|     9.0|
|     217|   San Martin|6:15a|     6.0|
|     217|  Morgan Hill|6:21a|    15.0|
|     217| Blossom Hill|6:36a|     6.0|
|     217|      Capitol|6:42a|     8.0|
|     217|       Tamien|6:50a|     9.0|
|     217|     San Jose|6:59a|    null|
|     324|San Francisco|7:59a|     4.0|
|     324|  22nd Street|8:03a|    13.0|
|     324|     Millbrae|8:16a|     8.0|
|     324|    Hillsdale|8:24a|     7.0|
|     324| Redwood City|8:31a|     6.0|
|     324|    Palo Alto|8:37a|    28.0|
|     324|     San Jose|9:05a|    null|
+--------+-------------+-----+--------+



* Using SQL

In [76]:
query = """
SELECT *, 
(UNIX_TIMESTAMP(LEAD(time, 1) OVER (PARTITION BY train_id ORDER BY time),'H:m') 
 - UNIX_TIMESTAMP(time, 'H:m'))/60 AS diff_min 
FROM schedule 
"""
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
sql_df = spark.sql(query)
sql_df.show()

+--------+-------------+-----+--------+
|train_id|      station| time|diff_min|
+--------+-------------+-----+--------+
|     217|       Gilroy|6:06a|     9.0|
|     217|   San Martin|6:15a|     6.0|
|     217|  Morgan Hill|6:21a|    15.0|
|     217| Blossom Hill|6:36a|     6.0|
|     217|      Capitol|6:42a|     8.0|
|     217|       Tamien|6:50a|     9.0|
|     217|     San Jose|6:59a|    null|
|     324|San Francisco|7:59a|     4.0|
|     324|  22nd Street|8:03a|    13.0|
|     324|     Millbrae|8:16a|     8.0|
|     324|    Hillsdale|8:24a|     7.0|
|     324| Redwood City|8:31a|     6.0|
|     324|    Palo Alto|8:37a|    28.0|
|     324|     San Jose|9:05a|    null|
+--------+-------------+-----+--------+

