### UDFs

In [31]:
import findspark
findspark.init()
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [32]:
# Create cubed functions
def cubed(s):
    return s*s*s

In [33]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('Capitulo5')\
        .getOrCreate()

In [4]:
 #Register UDF
spark.udf.register('cubed', cubed, LongType())

# Generte temprary view
spark.range(1,9).createOrReplaceTempView('udf_test')

**Spark SQL** para ejecutar cubed()

In [5]:
spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



#### Evaluation order cheking in Spark Sql

* Spark SQL no garantiza el orden de la evaluación de las subexpresiones. En este ejemplo no garantiza que la clausula `IS NOT NULL` se ejecute antes que `strlen(s) > 1` 

In [None]:
spark.sql ('SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1')

#### UDF Pandas

In [6]:
# !pip install PyArrow

In [7]:
import pandas as pd

# Import various pysparkSQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# declared the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

# Create the pandas UDF for the cubed function

cubed_udf = pandas_udf(cubed, returnType=LongType())

In [8]:
x = pd.Series([1, 2, 3]) # Create a pd Series

# The function for a pandas_udf executed with local Pandas data
print(cubed(x))

0     1
1     8
2    27
dtype: int64


Cambiar a DF de Spark

In [9]:
# Create a Spark DataFRame, 'spark' is an existing SparkSession
df = spark.range(1, 4)

# Execute functions as a Spark vectorized UDF
df.select('id', cubed_udf(col('id'))).show()

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



### PostgreSQL

In [None]:


# Read option 1: Loading data from a JDBC source using load method
jdbcDF1 = (spark.read.format("jdbc")
          .option('url', 'jdbc:postgresql://[DBSERVER]')
          .option('dbtable', '[SCHEMA].[TABLENAME]')
          .option('user', '[USERNAME]')
          .option('password', '[PASSWORD]')
          .load())

# Read Option 2: Loading data from JDBC source using jdbc method
jdbcDF2 = (spark.read.jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
                          properties={"user":"[USERNAME]", "password": "[PASSWORD]"}))

# Write Option 1: Saving data to a JDBC source using save method
(jdbcDF1.write
    .format('jdbc')
    .option('url', 'jdbc:postgresql://[DBSERVER]')
    .option('dbtable', '[SCHEMA].[TABLENAME]')
    .option('user', '[USERNAME]')
    .option('password', '[PASSWORD]')
    .save())

# Write Option 2: Saving data to a JDBC source using jdbc method
(jdbcDF2.write.jdbc('jdbc:postgresql:[DEBESERVER]', '[SCHEMA].[TABLENAME]',
                   properties={'user': '{USERNAME}', 'password': '[PASSWORD]'}))

### MySQL

In [5]:
passKey = '************'

spark = SparkSession.builder.appName('Capitulo05').getOrCreate()
employee_df = (spark.read.format('jdbc')
            .option('url', 'jdbc:mysql://localhost:3306/employees')
            .option('driver', 'com.mysql.cj.jdbc.Driver')
            .option('dbtable', 'employees')
            .option('user', 'root')
            .option('password', passKey)
            .load())

employee_df.show(10)

+------+----------+----------+---------+------+----------+
|emp_no|birth_date|first_name|last_name|gender| hire_date|
+------+----------+----------+---------+------+----------+
| 10001|1953-09-02|    Georgi|  Facello|     M|1986-06-26|
| 10002|1964-06-02|   Bezalel|   Simmel|     F|1985-11-21|
| 10003|1959-12-03|     Parto|  Bamford|     M|1986-08-28|
| 10004|1954-05-01| Chirstian|  Koblick|     M|1986-12-01|
| 10005|1955-01-21|   Kyoichi| Maliniak|     M|1989-09-12|
| 10006|1953-04-20|    Anneke|  Preusig|     F|1989-06-02|
| 10007|1957-05-23|   Tzvetan|Zielinski|     F|1989-02-10|
| 10008|1958-02-19|    Saniya| Kalloufi|     M|1994-09-15|
| 10009|1952-04-19|    Sumant|     Peac|     F|1985-02-18|
| 10010|1963-06-01| Duangkaew| Piveteau|     F|1989-08-24|
+------+----------+----------+---------+------+----------+
only showing top 10 rows



#### HIGHER-ORDER FUNCTIONS

In [6]:
from pyspark.sql.types import *
schema = StructType([StructField('celsius', ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]

t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView('tC')
# Show the DF
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [8]:
#Transform()
# Calculate Farenheit from Celsius for an array of temperatures
spark.sql("""SELECT celsius,
transform(celsius, t -> ((t*9) div 5) + 32) as Farenheit
FROM tC""").show()

+--------------------+--------------------+
|             celsius|           Farenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [11]:
# Filter()--------- Filter temperatures > 38C for array temperatures
spark.sql("""
SELECT celsius, filter(celsius, t -> t > 38) as high
FROM tC""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [13]:
# exists()----- Is there a temperature of 38C in the array of temperature
spark.sql("""
SELECT celsius,
exists(celsius, t -> t = 38) as threshold
FROM tC""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



In [16]:
# reduce()---- Calculate average temperature and convert to F
spark.sql("""SELECT celsius,
reduce(celsius,
0,
(t, acc) -> t + acc,
acc -> (acc div size(celsius) * 9 div 5) + 32
) as avgFarenheit FROM tC""").show()

AnalysisException: Undefined function: 'reduce'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 2 pos 0

In [44]:
# Files path
tripdelaysFilePath = 'data/departuredelays.csv'
airportsnaFilePath = 'data/airportCodesNa.txt'

# Obtain airports data set
airportsna = (spark.read
                .format("csv")
                .options(header="true", inferSchema="true", sep="\t")
                .load(airportsnaFilePath))


airportsna.createOrReplaceTempView('airport_na')

# Obtain departure delays data set
departureDelays =(spark.read
                 .format('csv')
                 .options(header='true')
                 .load(tripdelaysFilePath))

departureDelays = (departureDelays.withColumn('delay', expr('CAST(delay as INT) as delay'))
                  .withColumn('distance', expr('CAST(distance as INT) as distance')))

departureDelays.createOrReplaceTempView('departureDelays')

# Create temporary small table
foo = (departureDelays
      .filter(expr("""origin == 'SEA' and destination =='SFO' and
      date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView('foo')

In [46]:
spark.sql("SELECT * FROM airport_na LiMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [47]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [48]:
spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



### UNIONS

In [49]:
# Union two tables
bar = departureDelays.union(foo)
bar.createOrReplaceTempView('bar')

# Show the union filtering for SEA and SFO in a specific time range
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



### JOINS

In [52]:
# Join departure delays data (foo) with airport info
foo.join(airportsna,
        airportsna.IATA == foo.origin
        ).select('City', 'State', 'date', 'delay', 'distance', 'destination').show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



### Windowing