# Spark Session 20/04/2023: SQL try at home

## 1.1 Creating Spark session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .getOrCreate()

# in general...

# spark = SparkSession \
# .builder \
# .master("local[*]") \
# .appName("Python Spark SQL basic example").getOrCreate()
# .config("spark.some.config.option", "some-value") \
# .getOrCreate()

## 1.2 Creating DataFrame

In [2]:
# spark is an existing SparkSession
df = spark.read.json("./data/people.json")
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## 1.3 Data Frame Operations

In [3]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [5]:
df.select(df['name']).show() # prefered

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [6]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [7]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [8]:
df.groupBy("age").count().show() # sql equivalent- SELECT age,count(name) as count FROM people group by age

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



There are many places you have to refer to a column, and there are three different ways to do it. These are equivalent:

In [10]:
df.groupby(df['name']).count().show()  # as a getitem on the DF
df.groupby('name').count().show()        # by column name only
df.groupby(df.name).count().show()     # as a property on the DF

+-------+-----+
|   name|count|
+-------+-----+
|Michael|    1|
|   Andy|    1|
| Justin|    1|
+-------+-----+

+-------+-----+
|   name|count|
+-------+-----+
|Michael|    1|
|   Andy|    1|
| Justin|    1|
+-------+-----+

+-------+-----+
|   name|count|
+-------+-----+
|Michael|    1|
|   Andy|    1|
| Justin|    1|
+-------+-----+



Mixing these can be confusing. Suggestion: stick to data['lname'] style: it always works an is unambiguous.

The various representations fail weirdly:

In [12]:
df.where(df['age'] < 25).show() # works
df.where(df.age < 25).show()    # works
df.where('age' < 25).show()      # fails: TypeError

+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+

+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+



TypeError: '<' not supported between instances of 'str' and 'int'

… because 'age' < 25 is a bool, not a column expression.

Similarly as shown below:

In [13]:
maxage = df.groupby(df['name']).max()
maxage.show()
maxage.select(maxage['max(age)']).show() # works
maxage.select('max(age)').show()         # works
maxage.select(maxage.max(age)).show()    # fails: AttributeError

+-------+--------+
|   name|max(age)|
+-------+--------+
|Michael|    null|
|   Andy|      30|
| Justin|      19|
+-------+--------+

+--------+
|max(age)|
+--------+
|    null|
|      30|
|      19|
+--------+

+--------+
|max(age)|
+--------+
|    null|
|      30|
|      19|
+--------+



AttributeError: 'DataFrame' object has no attribute 'max'

because maxage doesn't have a max attribute, and if it did, maxage.max(age) is a Python function call, not what you expect.

# 2.0 Running SQL Queries Programmatically

## 2.1 Inferring the Schema Using Reflection

In [15]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT age,name FROM people where age > 21") # Same query as in line 8 
sqlDF.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [16]:
sqlGroup = spark.sql("SELECT age,count(name) as count FROM people group by age") 
sqlGroup.show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



## 2.2 Programmatically Specifying the Schema

### Let's see how to create an Spark dataframe from a .txt file programmatically specifying the Schema, and from the rows of the file

`Schema` is nothing else than the structure which Spark will find of the file's rows

In [18]:
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("./data/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



# 3.0 User Defined Functions

### Create and Register the function as a UDF

In [16]:
def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

<function __main__.squared(s)>

### Optionally, you can also explicitly set the return type of your UDF.

In [17]:
from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared, LongType())

<function __main__.squared(s)>

### Call the UDF in Spark SQL

In [18]:
spark.range(1, 20).registerTempTable("test")
testDF = spark.sql("select id, squaredWithPython(id) as id_squared from test")
testDF.show()

+---+----------+
| id|id_squared|
+---+----------+
|  1|         1|
|  2|         4|
|  3|         9|
|  4|        16|
|  5|        25|
|  6|        36|
|  7|        49|
|  8|        64|
|  9|        81|
| 10|       100|
| 11|       121|
| 12|       144|
| 13|       169|
| 14|       196|
| 15|       225|
| 16|       256|
| 17|       289|
| 18|       324|
| 19|       361|
+---+----------+



### Use UDF with DataFrames

In [19]:
from pyspark.sql.functions import udf
squared_udf = udf(squared, LongType())
df = spark.table("test")
df.select("id", squared_udf("id").alias("id_squared")).show()

+---+----------+
| id|id_squared|
+---+----------+
|  1|         1|
|  2|         4|
|  3|         9|
|  4|        16|
|  5|        25|
|  6|        36|
|  7|        49|
|  8|        64|
|  9|        81|
| 10|       100|
| 11|       121|
| 12|       144|
| 13|       169|
| 14|       196|
| 15|       225|
| 16|       256|
| 17|       289|
| 18|       324|
| 19|       361|
+---+----------+



# 4.0 Data Sources

### Generic Load/Save Functions

In [30]:
df = spark.read.load("users.parquet")
df.show()
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



### Manually Specifying Options

In [32]:
df = spark.read.load("people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

### Run SQL on files directly

In [33]:
df = spark.sql("SELECT * FROM parquet.`users.parquet`")
df.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



### Save Modes

In [24]:
df = spark.read.load("users.parquet")
df.show()
df.select("name", "favorite_color").write.mode('overwrite').save("namesAndFavColors.parquet")

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



### Saving to Persistent Tables

In [25]:
df.write.saveAsTable("tempt") # store in spark default location i.e workhouse directory

In [26]:
df.write.option("path", "temptables/permanentt").saveAsTable("permanentt")

### Partitioning

In [34]:
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

# JSON Datasets

In [35]:
sc = spark.sparkContext # get the spark Context

#### A JSON dataset is pointed to by path.
#### The path can be either a single text file or a directory storing text files

In [36]:
path = "people.json"
peopleDF = spark.read.json(path)

#### The inferred schema can be visualized using the printSchema() method

In [37]:
peopleDF.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [38]:
# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()

+------+
|  name|
+------+
|Justin|
+------+

