## reference repos
https://github.com/databricks/LearningSparkV2

https://github.com/RodrigoLima82/spark-certification



In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
    SparkSession.builder
    .appName("lean-spark-cap5")    
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
from pyspark import SparkContext
#sc= SparkContext()
sc = SparkContext.getOrCreate();

User-Defined Functions

In [None]:
# In Python
from pyspark.sql.types import LongType
# Create cubed function
def power2(s):
    return s * s
# Register UDF
spark.udf.register("power2", power2, LongType())
# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

In [None]:
# In Scala/Python
# Query the cubed UDF
spark.sql("SELECT id, power2(id) AS id_power2 FROM udf_test").show()


Evaluation order and null checking in Spark SQL

Spark SQL (this includes SQL, the DataFrame API, and the Dataset API) does not
guarantee the order of evaluation of subexpressions

1. Make the UDF itself null-aware and do null checking inside the UDF.
2. Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a
conditional branch.

Speeding up and distributing PySpark UDFs with Pandas UDFs

In [None]:
# In Python
# Import pandas
import pandas as pd
# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a
# Create the pandas UDF for the cubed function
cubed_udf = pandas_udf(cubed, returnType=LongType())


In [None]:
# Create a Pandas Series
x = pd.Series([1, 2, 3])
# The function for a pandas_udf executed with local Pandas data
print(cubed(x))

In [None]:
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.range(1, 4)
# Execute function as a Spark vectorized UDF
df.select("id", cubed_udf(col("id"))).show()

Querying with the Spark SQL Shell, Beeline, and Tableau

Using the Spark SQL Shell

To start the Spark SQL CLI, execute the following command in the $SPARK_HOME
folder:

./bin/spark-sql

spark-sql> CREATE TABLE people (name STRING, age int);

Insert data into the table

INSERT INTO people SELECT name, age FROM ...


Running a Spark SQL query

Now that you have data in your table, you can run Spark SQL queries against it. Let’s
start by viewing what tables exist in our metastore:

spark-sql> SHOW TABLES;

default people false
Time taken: 0.016 seconds, Fetched 1 row(s)

Next, let’s find out how many people in our table are younger than 20 years of age:

spark-sql> SELECT * FROM people WHERE age < 20;

Samantha 19

Time taken: 0.593 seconds, Fetched 1 row(s)

As well, let’s see who the individuals are who did not specify their age:


spark-sql> SELECT name FROM people WHERE age IS NULL;

Michael
Time taken: 0.272 seconds, Fetched 1 row(s)

Working with Beeline

Working with Tableau

./sbin/start-thriftserver.sh

External Data Sources



JDBC and SQL Databases

PostgreSQL

In [None]:
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
    SparkSession.builder
    .appName("learn-spark-cap5")
    .config("spark.jars", "C:/Spark/jars/postgresql-42.5.4.jar") 
    .getOrCreate()
)

In [None]:
# In Python
# Read Option 1: Loading data from a JDBC source using load method

jdbcDF1 = (spark
           .read
           .format("jdbc")
           .option("url", "jdbc:postgresql://postgres")
           .option("dbtable", "public.aluno")
           .option("user", "master")
           .option("password", "388020")
           .load())

In [None]:
# Read Option 2: Loading data from a JDBC source using jdbc method
jdbcDF2 = (spark
           .read
           .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
                 properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

In [None]:
# Write Option 1: Saving data to a JDBC source using save method
(jdbcDF1
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://[DBSERVER]")
 .option("dbtable", "[SCHEMA].[TABLENAME]")
 .option("user", "[USERNAME]")
 .option("password", "[PASSWORD]")
 .save())

In [None]:
# Write Option 2: Saving data to a JDBC source using jdbc method
(jdbcDF2
 .write
 .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",
       properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

MySQL

In [None]:
# In Python
# Loading data from a JDBC source using load
jdbcDF = (spark
          .read
          .format("jdbc")
          .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("dbtable", "[TABLENAME]")
          .option("user", "[USERNAME]")
          .option("password", "[PASSWORD]")
          .load())
# Saving data to a JDBC source using save
(jdbcDF
 .write
 .format("jdbc")
 .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
 .option("driver", "com.mysql.jdbc.Driver")
 .option("dbtable", "[TABLENAME]")
 .option("user", "[USERNAME]")
 .option("password", "[PASSWORD]")
 .save())

In [None]:
from pyspark.sql import SparkSession



###### SparkSession ######
def session_spark():
    spark = (
        SparkSession.builder
            .master("local[*]")
            .appName("appDesafio")
            .config('spark.sql.debug.maxToStringFields', 500)
            .config('spark.debug.maxToStringFields', 500)
            .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("delta.autoOptimize.optimizeWrite", "true")
            .config("delta.autoOptimize.autoCompact", "true")
            .config("spark.jars", "C:/Spark/jars/postgresql-42.5.4.jar")
            .getOrCreate()
    )
    return spark

spark = session_spark()


# set log level
spark.sparkContext.setLogLevel("ERROR")


## Tratamento de Shuffle Spark
spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)



In [None]:
format = "jdbc"
url = "jdbc:postgresql://localhost:5432/postgres"
dbtable = "aluno"
username = "postgres"
password = "388020"
driver = "org.postgresql.Driver"


Azure Cosmos DB

In [None]:
# In Python
# Loading data from Azure Cosmos DB
# Read configuration
query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
readConfig = {
"Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
"Masterkey" : "[MASTER KEY]",
"Database" : "[DATABASE]",
"preferredRegions" : "Central US;East US2","Collection" : "[COLLECTION]",
"SamplingRatio" : "1.0",
"schema_samplesize" : "1000",
"query_pagesize" : "2147483647",
"query_custom" : query
}
# Connect via azure-cosmosdb-spark to create Spark DataFrame
df = (spark
.read
.format("com.microsoft.azure.cosmosdb.spark")
.options(**readConfig)
.load())
# Count the number of flights
df.count()
# Saving data to Azure Cosmos DB
# Write configuration
writeConfig = {
"Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",
"Masterkey" : "[MASTER KEY]",
"Database" : "[DATABASE]",
"Collection" : "[COLLECTION]",
"Upsert" : "true"
}
# Upsert the DataFrame to Azure Cosmos DB
(df.write
.format("com.microsoft.azure.cosmosdb.spark")
.options(**writeConfig)
.save())

MS SQL Server

In [None]:
# In Python
# Configure jdbcUrl
jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"
# Loading data from a JDBC source
jdbcDF = (spark
.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.load())
# Saving data to a JDBC source
(jdbcDF
.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "[TABLENAME]")
.option("user", "[USERNAME]")
.option("password", "[PASSWORD]")
.save())

Other External Sources

There are just some of the many external data sources Apache Spark can connect to;
other popular data sources include:

• Apache Cassandra

• Snowflake

• MongoDB

Higher-Order Functions in DataFrames and Spark SQL

Option 1: Explode and Collect

In [None]:
arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]

df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.printSchema()
df.show()

In [None]:
from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df2.printSchema()
df2.show()

In [None]:
#-- In SQL
from pyspark.sql.functions import explode,collect_list

spark.sql('''
SELECT id, collect_list(value + 1) AS values
FROM (SELECT id, EXPLODE(values) AS value
FROM dfFromData2) x
GROUP BY id'''
)

While collect_list() returns a list of objects with duplicates, the GROUP BY statement
requires shuffle operations, meaning the order of the re-collected array isn’t
necessarily the same as that of the original array. As values could be any number of
dimensions (a really wide and/or really long array) and we’re doing a GROUP BY, this
approach could be very expensive.

Option 2: User-Defined Function

In [None]:
#-- In SQL
spark.sql('''
SELECT id, collect_list(value + 1) AS values
FROM (SELECT id, EXPLODE(values) AS value
FROM table) x
GROUP BY id
''').show()

In [None]:
spark.sql('''SELECT array_distinct(array(1,2, 3, null, 3));''').show()

In [None]:
spark.sql('''SELECT array_intersect(array(1, 2, 3), array(1,3, 5));''').show()

In [None]:
spark.sql('''SELECT array_union(array(1, 2,3), array(1, 3, 5));''').show()

In [None]:
spark.sql('''SELECT array_except(array(1,2, 3), array(1, 3, 5));''').show()

In [None]:
spark.sql('''SELECT array_join(array('hello','world'), ' ');''').show()

In [None]:
spark.sql('''SELECT array_max(array(1, 20,null, 3));''').show()

In [None]:
spark.sql('''SELECT array_min(array(1, 20,null, 3));''').show()

In [None]:
spark.sql('''SELECT array_position(array(3,2, 1), 1);''').show()

In [None]:
spark.sql('''SELECT array_remove(array(1,2, 3, null, 3), 3);''').show()

In [None]:
spark.sql('''SELECT arrays_overlap(array(1,2, 3), array(3, 4, 5));''').show()

In [None]:
spark.sql('''SELECT array_sort(array('b','d', null, 'c', 'a'));''').show()

In [None]:
spark.sql('''SELECT concat(array(1, 2, 3),array(4, 5), array(6));''').show()

In [None]:
spark.sql('''SELECT flatten(array(array(1,2), array(3, 4)));''').show()

In [None]:
spark.sql('''SELECT array_repeat('123', 3);''').show()

In [None]:
spark.sql('''SELECT reverse(array(2, 1, 4,3));''').show()

In [None]:
spark.sql('''SELECT sequence(to_date('2018-01-01'),to_date('2018-05-01'), interval 1 month);''').show(truncate=False)

In [None]:
spark.sql('''SELECT shuffle(array(1, 20,null, 3));''').show()

In [None]:
spark.sql('''SELECT slice(array(1, 2, 3,4), -3, 3);''').show()

In [None]:
spark.sql('''SELECT arrays_zip(array(1, 2),array(2, 3), array(3, 4));''').show(truncate=False)

In [None]:
spark.sql('''SELECT element_at(array(1, 2,3), 2);''').show()

In [None]:
spark.sql('''SELECT cardinality(array('x','b','d', 'c', 'a'));''').show()

In [None]:
spark.sql('''SELECT map_from_arrays(array(1.0,3.0,5.0), array('2', '4','5'));''').show(truncate=False)

In [None]:
spark.sql('''SELECT map_from_entries(array(struct(1,'a'), struct(2, 'b')));''').show(truncate=False)

In [None]:
spark.sql('''SELECT element_at(map(1, 'a',2, 'b'), 2);''').show()

In [None]:
spark.sql('''SELECT cardinality(map(1, 'a',2, 'b',3,'z'));''').show()

## HIGH ORDER FUNCTIONS

transform()

In [None]:
#In Python
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")
# Show the DataFrame
t_c.show()


In [None]:
# In Scala/Python
# Calculate Fahrenheit from Celsius for an array of temperatures
spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM tC
""").show()

filter()

In [None]:
#Filter temperatures > 38C for array of temperatures
spark.sql("""
SELECT celsius,
filter(celsius, t -> t > 38) as high
FROM tC
""").show(truncate=False)

exists()

In [None]:
#Is there a temperature of 38C in the array of temperatures
spark.sql("""
SELECT celsius,
exists(celsius, t -> t = 38) as threshold
FROM tC
""").show(truncate=False)

reduce()

In [None]:
#Calculate average temperature and convert to F
spark.sql("""
SELECT celsius,
reduce(
celsius,
0,
(t, acc) -> t + acc,
acc -> (acc div size(celsius) * 9 div 5) + 32
) as avgFahrenheit
FROM tC
""").show()

## Common DataFrames and Spark SQL Operations

Part of the power of Spark SQL comes from the wide range of DataFrame operations
(also known as untyped Dataset operations) it supports. The list of operations is quite
extensive and includes:
• Aggregate functions
• Collection functions
• Datetime functions
• Math functions
• Miscellaneous functions
• Non-aggregate functions
• Sorting functions
• String functions
• UDF functions
• Window functions

In [None]:
# Set file paths
from pyspark.sql.functions import expr
tripdelaysFilePath = "C:/Lenzi/Spark/spark developer preparation/data/flights/departuredelays.csv"
airportsnaFilePath = "C:/Lenzi/Spark/spark developer preparation/data/flights/airport-codes-na.txt"
# Obtain airports data set
airportsna = (spark.read
.format("csv")
.options(header="true", inferSchema="true", sep="\t")
.load(airportsnaFilePath))
airportsna.createOrReplaceTempView("airports_na")
# Obtain departure delays data set
departureDelays = (spark.read
.format("csv")
.options(header="true")
.load(tripdelaysFilePath))
departureDelays = (departureDelays
.withColumn("delay", expr("CAST(delay as INT) as delay"))
.withColumn("distance", expr("CAST(distance as INT) as distance")))
departureDelays.createOrReplaceTempView("departureDelays")
# Create temporary small table
foo = (departureDelays
.filter(expr("""origin == 'SEA' and destination == 'SFO' and
date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView("foo")


In [None]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

In [None]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

In [None]:
spark.sql("SELECT * FROM foo").show()

Unions

In [None]:
# In Python
# Union two tables
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

In [None]:
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

In [None]:
#now we have duplicates
spark.sql("""
SELECT *
FROM bar
WHERE origin = 'SEA'
AND destination = 'SFO'
AND date LIKE '01010%'
AND delay > 0
""").show()

Joins

In [None]:
foo.join(
airportsna,
airportsna.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()

In [None]:
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
FROM foo f
JOIN airports_na a
ON a.IATA = f.origin
""").show()

Windowing

In [None]:
spark.sql('''
DROP TABLE IF EXISTS departureDelaysWindow;
''')

In [None]:
spark.conf.set("spark.sql.catalogImplementation", "hive")

In [None]:

df=spark.sql('''
SELECT origin, destination, SUM(delay) AS TotalDelays
FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, destination; 
''')

In [None]:
df.write.mode("overwrite").saveAsTable("departureDelaysWindow")

In [None]:
spark.sql('''SELECT * FROM departureDelaysWindow''').show()

In [None]:
spark.sql('''SELECT origin, destination, SUM(TotalDelays) AS TotalDelays
FROM departureDelaysWindow
WHERE origin IN ('SEA', 'SFO', 'JFK')
GROUP BY origin, destination
ORDER BY SUM(TotalDelays) DESC
''').show()

In [71]:
spark.sql("""
SELECT origin, destination, TotalDelays, rank
FROM (
SELECT origin, destination, TotalDelays, dense_rank()
OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
FROM departureDelaysWindow
) t
WHERE rank <= 3
""").show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+



# Modifications

Adding new columns

In [75]:
from pyspark.sql.functions import expr
foo2 = (foo.withColumn(
"status",
expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
))
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



Dropping columns

In [74]:
# In Python
foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



Renaming columns

In [77]:
# In Python
foo4 = foo2.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+-----+--------+------+-----------+-------------+
|    date|delay|distance|origin|destination|flight_status|
+--------+-----+--------+------+-----------+-------------+
|01010710|   31|     590|   SEA|        SFO|      Delayed|
|01010955|  104|     590|   SEA|        SFO|      Delayed|
|01010730|    5|     590|   SEA|        SFO|      On-time|
+--------+-----+--------+------+-----------+-------------+



Pivoting

In [78]:
#-- In SQL
spark.sql('''SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
FROM departureDelays WHERE origin = 'SEA'
)
PIVOT (
CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
''').show()

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D

Stop spark

In [79]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x00000292371E5760>>