# Spark SQL in Python

## Pyspark SQL

- Spark provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.
- The dataframe is a fundamental data abstraction in Spark.
- A Spark DataFrame is a distributed collection of data organized into named columns and is conceptually equivalent to a table in a relational database, also called, simply, “tabular” data.
- Distributed means Spark can split this dataset into parts then store each part on a
different server. 
- Spark SQL table allows us to take the data that is in a dataframe, namely, a distributed collection of rows having named columns, and treat it as a single table, and fetch data from it using an SQL query.
- We often use an instance of a SparkSession object. By convention this is provided in a variable called "spark". Some implementations of Spark, such as Pyspark Shell, automatically provide an instance of a SparkSession.


In [None]:
# General Syntax
df = spark.read.csv(filename)

# General Synatx when first line is column
df = spark.read.csv(filename, header=True)

# Spark table from dataframe
df.createOrReplaceTempView("schedule")

# Run a sql
spark.sql("SELECT * FROM schedule WHERE station = 'San Jose'").show()

# Inspect column names from the table
result = spark.sql("SHOW COLUMNS FROM tablename")

# Second method to inspect table
result = spark.sql("SELECT * FROM tablename LIMIT 0")

# anther way to inspect table
result = spark.sql("DESCRIBE tablename")

# to see column names from the result
result.show()

# another way to find columns from the result
print(result.columns)


# Another example of creating the table and inspecting the columns
# Load trainsched.txt
df = spark.read.csv("trainsched.txt", header=True)

# Create temporary table called schedule
df.createOrReplaceTempView("schedule")

# Inspect the columns in the table df
spark.sql("DESCRIBE schedule").show()


## Window function Spark SQL

- A Window function operates on a set of rows and returns a value for each row in the set – but now this value can depend on other rows in the set.
-  The term window describes the set of rows on which the function operates. The value returned for each row can be a value from one of the rows in the “window”, or, a value from a “window function” that uses values from the rows in the window to calculate its value

In [None]:
query = """SELECT train_id, station, time, LEAD(time, 1) OVER (ORDER BY time) AS time_next FROM sched WHERE train_id=324 """
spark.sql(query).show()

# Remove the train limitation
query = """SELECT train_id, station, time, LEAD(time, 1) OVER (PARTITION BY train_id ORDER BY time) AS time_next FROM sched """

# Example 2
# Add col running_total that sums diff_min col in each group
query = """
SELECT train_id, station, time, diff_min,
SUM(diff_min) OVER (PARTITION BY train_id ORDER BY time) AS running_total
FROM schedule
"""

# Run the query and display the result
spark.sql(query).show()

# Example 3
query = """
SELECT 
ROW_NUMBER() OVER (ORDER BY time) AS row,
train_id, 
station, 
time, 
LEAD(time,1) OVER (ORDER BY time) AS time_next 
FROM schedule
"""
spark.sql(query).show()

# Give the number of the bad row as an integer
bad_row = 7

# Provide the missing clause, SQL keywords in upper case
clause = 'PARTITION BY train_id'


## Dot notation and SQL

- Most Spark sql queries can be done in either dot notation or sql notation
- Querying dataframes using dataframe dot notation.
    1. Select Columns
    2. Rename Columns
    3. Window Function
- Examples
   1. ROW_NUMBER in SQL : `pyspark.sql.functions.row_number`
   2. The inside of the OVER clause : `pyspark.sql.Window`
   3. PARTITIONBY : `pyspark.sql.Window.partitionBy `
   4. ORDERBY : p`yspark.sql.Window.orderBy`

- A "WindowSpec" is defined using the "Window" class, and then used subsequently as an argument to the over() function in a window function query.

  - The over function in SparkSQL corresponds to a OVER clause in SQL.
  - The class `pyspark.sql.window.Window `represents the inside of an OVER 
  - type(window) is `pyspark.sql.window.WindowSpec`


In [None]:
# show all columns
df.columns['train_id', 'station', 'time']

# show first 5 rows
df.show(5)

# show only two columns
df.select('train_id','station').show(5)

# second method to do same thing
df.select(df.train_id, df.station)

# third method to do same thing
from pyspark.sql.functions import col
df.select(col('train_id'), col('station'))

# Rename columns
df.select('train_id','station').withColumnRenamed('train_id','train').show(5)

# Another method to rename
df.select(col('train_id').alias('train'), 'station')

# Don't do this i.e. trying to use all the three ways/conventions of refering the columns at the same time without good reason.
df.select('train_id', df.station, col('time'))

# Sample Query using sql notation
spark.sql('SELECT train_id AS train, station FROM schedule LIMIT 5').show()

# same query using dot notation
df.select(col('train_id').alias('train'), 'station')
  .limit(5)
  .show()

query = """
SELECT *,
 ROW_NUMBER() OVER(PARTITION BY train_id ORDER BY time) AS id
  FROM schedule"""
spark.sql(query)
     .show(11)

# Dot notaion equivalent
from pyspark.sql import Window,
from pyspark.sql.functions import row_number
 df.withColumn("id", row_number()
                      .over(
                               Window.partitionBy('train_id')
                                     .orderBy('time')
                           ) 
             )

# WindowSpec example
clause.window = Window.partitionBy('train_id').orderBy('time')
dfx = df.withColumn('next', lead('time',1).over(window))

In [None]:
# Example 1
# Give the identical result in each command
spark.sql('SELECT train_id, MIN(time) AS start FROM schedule GROUP BY train_id').show()
df.groupBy('train_id').agg({'time':'min'}).withColumnRenamed('min(time)', 'start').show()

# Print the second column of the result
spark.sql('SELECT train_id, MIN(time), MAX(time) FROM schedule GROUP BY train_id').show()
result = df.groupBy('train_id').agg({'time':'min', 'time':'max'})
result.show()
print(result.columns[1])


# Example 2
from pyspark.sql.functions import min, max, col
expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
dot_df = df.groupBy("train_id").agg(*expr)
dot_df.show()

# Write a SQL query giving a result identical to dot_df
query = "SELECT train_id, MIN(time) AS start, MAX(time) AS end FROM schedule GROUP BY train_id"
sql_df = spark.sql(query)
sql_df.show()

# Example 3
df = spark.sql("""
SELECT *, 
LEAD(time,1) OVER(PARTITION BY train_id ORDER BY time) AS time_next 
FROM schedule
""")

# Obtain the identical result using dot notation 
dot_df = df.withColumn('time_next', lead('time', 1)
        .over(Window.partitionBy('train_id')
        .orderBy('time')))

# Example 4
window = Window.partitionBy('train_id').orderBy('time')
dot_df = df.withColumn('diff_min', 
                    (unix_timestamp(lead('time', 1).over(window),'H:m') 
                     - unix_timestamp('time', 'H:m'))/60)


# Create a SQL query to obtain an identical result to dot_df
query = """
SELECT *, 
(UNIX_TIMESTAMP(LEAD(time, 1) OVER (PARTITION BY train_id ORDER BY time),'H:m') 
 - UNIX_TIMESTAMP(time, 'H:m'))/60 AS diff_min 
FROM schedule 
"""
sql_df = spark.sql(query)
sql_df.show()

## Natural Language Processing using Spark SQL

- Load Natural Language text into a dataframe while discarding all the unwanted data.


In [None]:
# Load text file
df = spark.read.text('sherlock.txt')

# Gets the first row
print(df.first())

# Counts the rows
print(df.count())

# Supports reading from multiple file formats
df1 = spark.read.load('sherlock.parquet')

# Prints first 15 lines
# Truncate - FALSE allows to print long rows and does not truncate them
df1.show(15, truncate=False)

# Converts a column to lower case
df = df1.select(lower(col('value')))
print(df.first())

# Result column name "lower(value)"
df.columns

# Alias operation allows us to give new name to a new column
df = df1.select(lower(col('value')).alias('v'))

# The operation regexp_replace replaces values that match a pattern.
# The first argument is the column name. The second argument is the pattern to be replaced. It replaces every occurrence of the second argument with the third argument.
# To prevent the period from being interpreted as a special character in the second argument, we put a backslash in front of it. This is called "escaping" it.
# We must also escape other special characters such as a single quote.
df = df1.select(regexp_replace('value', 'Mr\.', 'Mr').alias('v'))
df = df1.select(regexp_replace('value', 'don\'t', 'do not').alias('v'))


# The split operation separates a string into individual tokens.
# The second argument gives the list of characters on which to split. Here it is a space.
# Retuns an array of strings
df = df2.select(split('v', '[ ]').alias('words'))

# Handle punctuation marks..in addition to the space it discards unwanted symbols
punctuation = "_|.\?\!\",\'\[\]\*()"
df3 = df2.select(split('v', '[ %s]' % punctuation).alias('words'))

# explode() takes an array of things, and puts each thing on its own row, preserving the order.
df4 = df3.select(explode('words').alias('word'))
print(df4.count())
nonblank_df = df.where(length('word') > 0) # Remove blank rows
print(nonblank_df.count())

# The monotonically_increasing_id() operation efficiently creates a column of integers that are always increasing.
# Here we are using it to create a column of unique IDs for each row.
df2 = df.select('word', monotonically_increasing_id().alias('id'))


### Partitioning Data

- Partitioning allows Spark to parallelize operations. We will organize the data allow window functions to use the partition clause. 

- The `when/otherwise` operation is a case statement. The first argument gives the condition. The second argument gives the desired value for the column. You can chain multiple when() operations. 
- The last `when()` operation is followed by an `otherwise()` clause that gives the column value used if none of the previous conditions applies. When combined with the `withColumn` operation when/otherwise groups the data into chapters. Repeating this adds a part id column.

In [None]:
# Adding title/chapter column
df2 = df.withColumn('title', when(df.id < 25000, 'Preface')                             
                             .when(df.id < 50000, 'Chapter 1')
                             .when(df.id < 75000, 'Chapter 2')      
                             .otherwise('Chapter 3'))
                             
# Adding aprtid column                             
df2 = df2.withColumn('part', when(df2.id < 25000, 0)
                            .when(df2.id < 50000, 1)
                            .when(df2.id < 75000, 2)
                            .otherwise(3))            
                            .show()

# Repartition a column
# First argument gives the desried number of partitions
# The second argument is saying put rows having the same part column value into the same partition.
df2 = df.repartition(4, 'part')
print(df2.rdd.getNumPartitions())

# Reading Pre-partitioned text files
# spark.read.text() tells Spark to load all of the text files in the folder into a dataframe.
# If available parallelism is more than one and the folder contains more than one file, this reads the files in parallel 
# and distributes the files over multiple partitions.
df_parts = spark.read.text('sherlock_parts')


In [None]:
# Load the dataframe
df = spark.read.load('sherlock_sentences.parquet')

# Filter and show the first 5 rows
df.where('id > 70').show(5, truncate=False)

# Split the clause column into a column called words 
split_df = clauses_df.select(split('clause', ' ').alias('words'))
split_df.show(5, truncate=False)

# Explode the words column into a column called word 
exploded_df = split_df.select(explode('words').alias('word'))
exploded_df.show(10)

# Count the resulting number of rows in exploded_df
print("\nNumber of rows: ", exploded_df.count())

## Moving Window Analysis

- This dataset has already been processed to remove unwanted characters and put one word per row. An id column was added to identify the position of each word in the document.
    
- The data is partitioned into 12 parts, corresponding to chapters, each having a unique "part" and "title" column. The `distinct()` operation eliminates duplicates, fetching unique records. This will allow us to easily parallelize our work across up to 12 machines in a cluster, or up to 12 cores in a CPU on a single machine.

In [None]:

df.select('part', 'title').distinct().sort('part').show(truncate=False)

# LEAD Window Function - Generates a sliding window/3-tuple
query = """
SELECT id, word AS w1,
LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3   
FROM df
  """
spark.sql(query).sort('id').show()

# LAG Window Function
lag_query = """ 
SELECT    id,
LAG(word,2) OVER(PARTITION BY part ORDER BY id ) AS w1,
LAG(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
word AS w3
FROM df
ORDER BY id"""
spark.sql(lag_query).show()

# Word for each row, previous two and subsequent two words
query = """
SELECT
part,
LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
word AS w3,
LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,
LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5
FROM text
"""
spark.sql(query).where("part = 12").show(10)

# Repartition text_df into 12 partitions on 'chapter' column
repart_df = text_df.repartition(12,'chapter')

# Prove that repart_df has 12 partitions
repart_df.rdd.getNumPartitions()


## Common Word Sequences

- Most frequent word sequences in a natural language text document.
- Application is in Sequence Prediction/Endword Prediction


In [None]:
query3agg = """
SELECT w1,
w2,
w3,
COUNT(*) as count FROM (
    SELECT    word AS w1,
    LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
    LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3 FROM df
    )
GROUP BY w1, w2, w3
ORDER BY count DESC"""

spark.sql(query3agg).show()

query3agg = """
SELECT w1,
w2,
w3,
length(w1)+length(w2)+length(w3) as length
FROM (
    SELECT    word AS w1,
    LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,
    LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3 
    FROM df
    WHERE part <> 0 and part <> 13
    )
GROUP BY w1, w2, w3
ORDER BY length DESC
"""
spark.sql(query3agg).show(truncate=False)

# Find the top 10 sequences of five words
query = """
SELECT w1, w2, w3, w4, w5, COUNT(*) AS count FROM (
   SELECT word AS w1,
   LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
   LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3,
   LEAD(word, 3) OVER(PARTITION BY part ORDER BY id) AS w4,
   LEAD(word, 4) OVER(PARTITION BY part ORDER BY id) AS w5
   FROM text
)
GROUP BY w1, w2, w3, w4, w5
ORDER BY count DESC
LIMIT 10 """
df = spark.sql(query)
df.show()

# Unique 5-tuples sorted in descending order
query = """
SELECT Distinct w1, w2, w3, w4, w5 FROM (
   SELECT word AS w1,
   LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
   LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w3,
   LEAD(word, 3) OVER(PARTITION BY part ORDER BY id) AS w4,
   LEAD(word, 4) OVER(PARTITION BY part ORDER BY id) AS w5
   FROM text
)
ORDER BY w1 DESC, w2 DESC, w3 DESC, w4 DESC, w5 DESC 
LIMIT 10
"""
df = spark.sql(query)
df.show()

# Using Subquery
subquery = """
SELECT chapter, w1, w2, w3, COUNT(*) as count
FROM
(
    SELECT
    chapter,
    word AS w1,
    LEAD(word, 1) OVER(PARTITION BY chapter ORDER BY id ) AS w2,
    LEAD(word, 2) OVER(PARTITION BY chapter ORDER BY id ) AS w3
    FROM text
)
GROUP BY chapter, w1, w2, w3
ORDER BY chapter, count DESC
"""

#   Most frequent 3-tuple per chapter
query = """
SELECT chapter, w1, w2, w3, count FROM
(
  SELECT
  chapter,
  ROW_NUMBER() OVER (PARTITION BY chapter ORDER BY count DESC) AS row,
  w1, w2, w3, count
  FROM ( %s )
)
WHERE row = 1
ORDER BY chapter ASC
""" % subquery

spark.sql(query).show()

## Caching

What is caching?

- Keeping data in memory so that it does not have to be refetched or recalculated each time it is used.
- Spark tends to unload memory aggressively. It will err on the side of unloading data from memory if it is not used, even if it is going to be needed later
- Caching is a lazy operation. A dataframe won't appear in the cache until an action is performed on the dataframe. Don't overdo it.
-  Only cache if more than one operation is to be performed and it takes substantial time to create the dataframe. Unpersist unneeded objects. Caching incurs a cost. Caching everything generally slows things down.



## Eviction Policy

- Eviction Policy determines when and which data is removed from cache. The policy is LRU. Each worker manages its own cache, and eviction depends on the memory available to each worker.
- LeastRecentlyUsed(LRU) policy
- Eviction happens independently on each worker
- Depends on memory available to each worker

## Storage Level
In the `storagelevel` above the following hold

- useDisk = True (specifies whether to move some or all of the dataframe to disk if it needed to free up memory)
- useMemory = True (specifies whether to keep the data in memory)
- useOffHeap = False 
   - tells Spark to use off-heap storage instead of on-heap memory. 
   - The on-heap store refers to objects in an in-memory data structure that is fast to access. 
   - The off-heap store is also in memory, but is slightly slower than the on-heap store. However, off-heap storage is still faster than disk. 
   - Even though the best performance is obtained when operating solely in on-heap memory, Spark also makes it possible to use off-heap storage for certain operations. 
   - Off-heap storage is slightly slower than on-heap but still faster than disk. The downside is that the user has to manually deal with managing the allocated memory
- deserialized = True (deserialized True is faster but uses more memory. Serialized data is more space-efficient but slower to read. This option only applies to in-memory storage. Disk cache is always serialized.)
- replication = 1 (replication is used to tell Spark to replicate data on multiple nodes. This allows faster fault recovery when a node fails.)

## Persist

- The `persist()` command allows you to specify the desired storage level using the first argument.
-  If that argument is not provided, it uses a default setting. When memory is scarce, it is recommended to use MEMORY_AND_DISK caching strategy. 
- This will spill the dataframe to disk if memory runs low. Reading the dataframe from disk cache is slower than reading it from memory, but can still be faster than recreating from scratch. `cache()` is equivalent to using `persist()` with the default storageLevel.

In [None]:
#To cache dataframe
df.cache()

#To uncache dataframe
df.unpersist()

# Confirm whether dataframe is cached or uncached
df.is_cached

# Five Details about how dataframe is cached
df.storageLevel

# df.cache() is shorthand for df.persist() with the first argument set to its default value
df.persist()

# persist with storagelevel set to pyspark.StorageLevel.MEMORY_AND_DISK
df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)

# List the tables
print("Tables:\n", spark.catalog.listTables())

# cache table and check if it is cached 
df.createOrReplaceTempView('df')
spark.catalog.cacheTable('df')
spark.catalog.isCached(tableName='df')

# uncache a table
spark.catalog.uncacheTable('df')

# removes all cached tables.
spark.catalog.clearCache()

# removes a table from the cache
spark.catalog.dropTempView('table1')


## Spark UI

- Spark UI is a web interface to inspect Spark execution
- `Spark Task` is a unit of execution that runs on a single cpu
- `Spark Stage` a group of tasks that perform the same computation in parallel,each task typically running on a different subset of the data
- `Spark Job` is a computation triggered by an action, sliced into one or more stages.

# Logging

- Logging and how to avoid stealth loss of cpu when logging spark actions
- If you are deploying Spark jobs to production then logging is important, In Pyspark this is especially very important if you are using log statements to inspect dataframes.
- The `level` argument of the `basicConfig()` function sets the logging level to info level, and has each log message print the time, the log level of the statement, and the message itself.
- In the below example the first statement printed, whereas the second one did not. This is because the second log statement is at debug level.
- Due to the combination of Spark's lazy evaluation and distributed computation, debugging a complex application can be challenging.
- This is because an erroneous action might not be triggered until well downstream of where the action is coded. One way to simplify the debugging of an application is to force actions to be triggered. However, a naive approach to doing this can cause stealth loss of CPU.
- Spark operations that trigger an action must be logged with care to avoid stealth loss of compute resource.Allow for dataframe actions that you need during development or during debugging, and  disable them in production, while doing so with confidence that they will not be silently executed.

In [None]:
# Logging with INFO Level
import logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)

# Logging with DEBUG Level
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, 
                                  format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)

# Even though logging Level is INFO the debug statement will be executed and not printed becaause Spark actions are executed
import logging
logging.basicConfig(level=logging.INFO,
                     format='%(asctime)s - %(levelname)s - %(message)s')
# < create dataframe df here >
t = timer()
logging.info("No action here.")
t.elapsed()
logging.debug("df has %d rows.", df.count())
t.elapsed()

# Disable Action to avoid Stealth Loss of CPU
ENABLED = False
t = timer()
logger.info("No action here.")
t.elapsed()
if ENABLED:
       logger.info("df has %d rows.", df.count())
t.elapsed()


# Log columns of text_df as debug message
logging.debug("text_df columns: %s", text_df.columns)

# Log whether table1 is cached as info message
logging.info("table1 is cached: %s", spark.catalog.isCached(tableName="table1"))

# Log first row of text_df as warning message
logging.warning("The first row of text_df:\n %s", text_df.first())

# Log selected columns of text_df as error message
logging.error("Selected columns: %s", text_df.select("id", "word"))

# Uncomment the 5 statements that do NOT trigger text_df
logging.debug("text_df columns: %s", text_df.columns)
logging.info("table1 is cached: %s", spark.catalog.isCached(tableName="table1"))
# logging.warning("The first row of text_df: %s", text_df.first())
logging.error("Selected columns: %s", text_df.select("id", "word"))
logging.info("Tables: %s", spark.sql("show tables").collect())
logging.debug("First row: %s", spark.sql("SELECT * FROM table1 limit 1"))
#logging.debug("Count: %s", spark.sql("SELECT COUNT(*) AS count FROM table1").collect())

## Query Plans

If you put the `Explain` keyword at the head of an sql query, running the query provides detailed plan information about the query without actually running it. Instead of the usual table of data, it returns a query execution plan, also called a query plan. A query plan is a string, representing a set of steps used to access the data.

- It tells us that it read the data from a parquet file, having 4 columns along with their names, located at /temp/df.parquet. It also tells us the schema of the table, including the column types. This allows us to determine how the data was obtained and from where.
- Query plan start from the bottom first.

In [None]:
df = sqlContext.read.load('/temp/df.parquet')
df.registerTempTable('df')

# explain on query result
spark.sql('EXPLAIN SELECT * FROM df').first()

# Expain on dataframe
df.explain()

# cache the dataframe and then explain
df.cache()
df.explain()