#                                           --------- Spark --------

##  Instead of having a spark context, hive context, SQL context, now all of it is encapsulated in a Spark session.

https://medium.com/@achilleus/spark-session-10d0d66d1d24

https://annefou.github.io/pyspark/04-pyspark_sql/

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [2]:
sc = SparkContext('local', 'Spark SQL') 
spark = SQLContext(sc)

## ------- read JSON file -------

In [3]:
df = spark.read.format("json").load("/Users/manikhossain/Downloads/2015-summary.json")
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

## ----- read CSV file ---------

In [5]:
df = spark.read.format("csv").option("header", "true").load("/Users/manikhossain/Downloads/Practice_BigData/Spark-The-Definitive-Guide-master/data/retail-data/by-day/2010-12-01.csv")
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [6]:
from pyspark.sql.functions import instr, col

In [7]:
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1 

df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



## Exact SQL code for the above pyspark code:
 
SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

In [8]:
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1 

df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive = true")\
.select("unitPrice", "isExpensive").show()

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



## Exact SQL code for the above pyspark code:
 
SELECT UnitPrice, (StockCode = 'DOT' AND
    (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
  FROM dfTable
  WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

In [9]:
from pyspark.sql.functions import expr, pow

In [10]:
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
.where("isExpensive")\
.select("Description", "UnitPrice").show(5)

+--------------+---------+
|   Description|UnitPrice|
+--------------+---------+
|DOTCOM POSTAGE|   569.77|
|DOTCOM POSTAGE|   607.49|
+--------------+---------+



In [11]:
from pyspark.sql.functions import lit, col, pow, round
calcPow = pow(col("Quantity") * col("UnitPrice"), 2) + 5 
df.select(expr("CustomerId"), round(lit(calcPow)).alias("roundedQuantity")).show(3)

df.selectExpr("CustomerId", "(pow(Quantity * UnitPrice, 2) + 5) as realQuantity").show(3)

+----------+---------------+
|CustomerId|roundedQuantity|
+----------+---------------+
|   17850.0|          239.0|
|   17850.0|          419.0|
|   17850.0|          489.0|
+----------+---------------+
only showing top 3 rows

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
|   17850.0|             489.0|
+----------+------------------+
only showing top 3 rows



## Exact SQL code for the above pyspark code:
SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity FROM dfTable

In [12]:
from pyspark.sql.functions import lit, round, bround, count, mean, stddev_pop, min, max
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [13]:
from pyspark.sql.functions import count, mean, stddev_pop, min, max


colName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.freqItems(["StockCode", "Quantity"]).show()

+--------------------+--------------------+
| StockCode_freqItems|  Quantity_freqItems|
+--------------------+--------------------+
|[90214E, 20728, 2...|[-4, 600, 30, 2, ...|
+--------------------+--------------------+



In [14]:
df.describe().show()


+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|        InvoiceDate|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+-------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|               3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128|               null| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|               null|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             

In [15]:
df.stat.freqItems(["StockCode", "Quantity"]).show()

+--------------------+--------------------+
| StockCode_freqItems|  Quantity_freqItems|
+--------------------+--------------------+
|[90214E, 20728, 2...|[-4, 600, 30, 2, ...|
+--------------------+--------------------+



In [16]:
from pyspark.sql.functions import monotonically_increasing_id 

df.select(monotonically_increasing_id()).show(2)

+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
+-----------------------------+
only showing top 2 rows



In [17]:
from pyspark.sql.functions import initcap, lower, upper

df.select(initcap(col("Description"))).show(2)

df.select(col("Description"), lower(col("Description")), upper(col("Description")),\
initcap(col("Description"))).show(2)

# SELECT initcap(Description) FROM dfTable

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
+--------------------+
only showing top 2 rows

+--------------------+--------------------+--------------------+--------------------+
|         Description|  lower(Description)|  upper(Description)|initcap(Description)|
+--------------------+--------------------+--------------------+--------------------+
|WHITE HANGING HEA...|white hanging hea...|WHITE HANGING HEA...|White Hanging Hea...|
| WHITE METAL LANTERN| white metal lantern| WHITE METAL LANTERN| White Metal Lantern|
+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [18]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

value = "      Hello    "

df.select(ltrim(lit(value)).alias("Ltrim"), rtrim(lit(value)).alias("Rtrim"), trim(lit(value)).alias("trim")).show(2)

+---------+-----------+-----+
|    Ltrim|      Rtrim| trim|
+---------+-----------+-----+
|Hello    |      Hello|Hello|
|Hello    |      Hello|Hello|
+---------+-----------+-----+
only showing top 2 rows



In [19]:
val = "HELLO"

df.select(lpad(lit(val), 10, " ").alias("lp"), rpad(lit(val), 3, " ").alias("rp")).show(2)

+----------+---+
|        lp| rp|
+----------+---+
|     HELLO|HEL|
|     HELLO|HEL|
+----------+---+
only showing top 2 rows



In [20]:
from pyspark.sql.functions import regexp_replace, regexp_extract

regex_string = "BLACK|WHITE|RED|GREEN|BLUE|METAL" 

df.select(regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"), col("Description"))\
.show(2)

#SELECT regexp_replace(Description,'BLACK|WHITE|RED|GREEN|BLUE','COLOR') as color_clean, Description FROM dfTable

extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(regexp_extract(col("Description"), extract_str, 1).alias("color_clean"), col("Description")).show(2)


+--------------------+--------------------+
|         color_clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR COLOR LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
only showing top 2 rows

+-----------+--------------------+
|color_clean|         Description|
+-----------+--------------------+
|      WHITE|WHITE HANGING HEA...|
|      WHITE| WHITE METAL LANTERN|
+-----------+--------------------+
only showing top 2 rows



In [21]:
from pyspark.sql.functions import instr

containsBlack = instr(col("Description"), "BLACK") >= 1 
containsWhite = instr(col("Description"), "WHITE") >= 1

df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
.where("hasSimpleColor")\
.select("Description", "hasSimpleColor").show(3, False)

# SELECT Description FROM dfTable WHERE instr(Description, 'BLACK') >= 1 OR instr(Description, 'WHITE') >= 1

+----------------------------------+--------------+
|Description                       |hasSimpleColor|
+----------------------------------+--------------+
|WHITE HANGING HEART T-LIGHT HOLDER|true          |
|WHITE METAL LANTERN               |true          |
|RED WOOLLY HOTTIE WHITE HEART.    |true          |
+----------------------------------+--------------+
only showing top 3 rows



# varargs

When we convert a list of values into a set of arguments and pass them into a function, we use a language feature called varargs. Using this feature, we can
effectively unravel an array of arbitrary length and pass it as arguments to a function. This, coupled with select makes it possible for us to create arbitrary numbers of columns dynamically:

In [22]:
from pyspark.sql.functions import expr, locate 

simpleColors = ["black", "white", "red", "green", "blue"] 

def color_locator(column, color_string):
    return locate(color_string.upper(), column).cast("boolean").alias("is_" + color_string)

selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
#selectedColumns
selectedColumns.append(expr("*")) # has to a be Column type

df.select(*selectedColumns).where(expr("is_white OR is_red")).select("Description", "is_white", "is_red").show(3, False)

+----------------------------------+--------+------+
|Description                       |is_white|is_red|
+----------------------------------+--------+------+
|WHITE HANGING HEART T-LIGHT HOLDER|true    |false |
|WHITE METAL LANTERN               |true    |false |
|RED WOOLLY HOTTIE WHITE HEART.    |true    |true  |
+----------------------------------+--------+------+
only showing top 3 rows



# Date_time & Time_Stamp

At the end of the day, Spark is working with Java dates and timestamps and therefore conforms to those standards. Let’s begin with the basics and get the current date and the current timestamps:

In [23]:
from pyspark.sql.functions import current_date, current_timestamp 

dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp()) 

dateDF.createOrReplaceTempView("dateTable")
dateDF.show(2)
dateDF.printSchema()


+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2021-01-02|2021-01-02 17:43:...|
|  1|2021-01-02|2021-01-02 17:43:...|
+---+----------+--------------------+
only showing top 2 rows

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [24]:
from pyspark.sql.functions import date_add, date_sub 


dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

# SELECT date_sub(today, 5), date_add(today, 5) FROM dateTable

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2020-12-28|        2021-01-07|
+------------------+------------------+
only showing top 1 row



In [25]:
from pyspark.sql.functions import datediff, months_between, to_date 

dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today")).alias("DateDiff")).show(1)


dateDF.select(to_date(lit("2016-01-01")).alias("start"), to_date(lit("2017-05-22")).alias("end"))\
.select(months_between(col("start"), col("end"))).show(1)

# SELECT to_date('2016-01-01'), months_between('2016-01-01', '2017-01-01'), datediff('2016-01-01', '2017-01-01')
# FROM dateTable

+--------+
|DateDiff|
+--------+
|      -7|
+--------+
only showing top 1 row

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



# to_date

to_date function, The to_date function allows you to convert a string to a date, optionally with a specified format

In [26]:
from pyspark.sql.functions import to_date, lit, to_timestamp

datedf = spark.range(5).withColumn("Date", lit("2017-01-01")).select(to_date(col("Date"))).show(1)

# the date format that has switched from year-
# month-day to year-day-month. Spark will fail to parse this date and silently return null instead
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

dateFormat = "yyyy-dd-MM" # specify input date format
dateDF.select(to_date(lit("2016-20-12"), dateFormat).alias("date"),\
to_timestamp(lit("2017-12-11"), dateFormat).alias("to_timestamp")).show(1)

# SELECT cast(to_date("2017-01-01", "yyyy-dd-MM") as timestamp)

+---------------+
|to_date(`Date`)|
+---------------+
|     2017-01-01|
+---------------+
only showing top 1 row

+---------------------+---------------------+
|to_date('2016-20-12')|to_date('2017-12-11')|
+---------------------+---------------------+
|                 null|           2017-12-11|
+---------------------+---------------------+
only showing top 1 row

+----------+-------------------+
|      date|       to_timestamp|
+----------+-------------------+
|2016-12-20|2017-11-12 00:00:00|
+----------+-------------------+
only showing top 1 row



In [47]:
dateFormat = "yyyy-dd-MM"

cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2")) 
cleanDateDF.createOrReplaceTempView("dateTable2")

spark.table("dateTable2").count

<bound method DataFrame.count of DataFrame[date: date, date2: date]>

In [38]:
cleanDateDF.filter(col("date2") < lit("2017-12-12")).show()
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()

+----+-----+
|date|date2|
+----+-----+
+----+-----+

+----+-----+
|date|date2|
+----+-----+
+----+-----+



In [32]:
from pyspark.sql.functions import coalesce


# df.show(1000)
df.select(coalesce(col("Description"), col("CustomerId"))).show(2)

+---------------------------------+
|coalesce(Description, CustomerId)|
+---------------------------------+
|             WHITE HANGING HEA...|
|              WHITE METAL LANTERN|
+---------------------------------+
only showing top 2 rows



In [34]:
df.na.drop("any") # drop a row that contains a 'null' value any of the columns
df.na.drop("all", subset=["StockCode", "InvoiceNo"]) # drop a row that contains a 'null' of these 2 columns

df.na.fill("all", subset=["StockCode", "InvoiceNo"]) # fill by all that have null value in these 2 columns


DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]

### We can also do this with with a Scala Map, where the key is the column name and the value is the value we would like to use to fill null values.

In [35]:
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"} 
df.na.fill(fill_cols_vals) # fill null value with key value pair...

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]

# Structs

structs is a DataFrames within DataFrames

In [86]:
from pyspark.sql.functions import struct


complexDF = df.select(struct("Description", "InvoiceNo").alias("complex")) 
complexDF.createOrReplaceTempView("complexDF")

complexDF.select(col("complex").getField("Description"), "complex.InvoiceNo", "complex.*").show(1)

spark.sql("SELECT complex.* FROM complexDF").show()

+--------------------+---------+--------------------+---------+
| complex.Description|InvoiceNo|         Description|InvoiceNo|
+--------------------+---------+--------------------+---------+
|WHITE HANGING HEA...|   536365|WHITE HANGING HEA...|   536365|
+--------------------+---------+--------------------+---------+
only showing top 1 row

+--------------------+---------+
|         Description|InvoiceNo|
+--------------------+---------+
|WHITE HANGING HEA...|   536365|
| WHITE METAL LANTERN|   536365|
|CREAM CUPID HEART...|   536365|
|KNITTED UNION FLA...|   536365|
|RED WOOLLY HOTTIE...|   536365|
|SET 7 BABUSHKA NE...|   536365|
|GLASS STAR FROSTE...|   536365|
|HAND WARMER UNION...|   536366|
|HAND WARMER RED P...|   536366|
|ASSORTED COLOUR B...|   536367|
|POPPY'S PLAYHOUSE...|   536367|
|POPPY'S PLAYHOUSE...|   536367|
|FELTCRAFT PRINCES...|   536367|
|IVORY KNITTED MUG...|   536367|
|BOX OF 6 ASSORTED...|   536367|
|BOX OF VINTAGE JI...|   536367|
|BOX OF VINTAGE AL...|   5363

We now have a DataFrame with a column complex. We can query it just as we might another DataFrame, the only difference is that we use a dot syntax to do so, or the column method getField...

In [60]:
from pyspark.sql.functions import split, size

df.select(split(col("Description"), " "), size(split(col("Description"), " "))).show(2)

# SELECT split(Description, ' ') FROM dfTable

+-------------------------+-------------------------------+
|split(Description,  , -1)|size(split(Description,  , -1))|
+-------------------------+-------------------------------+
|     [WHITE, HANGING, ...|                              5|
|     [WHITE, METAL, LA...|                              3|
+-------------------------+-------------------------------+
only showing top 2 rows



In [62]:
df.select(split(col("Description"), " ").alias("array_col"))\
    .selectExpr("array_col[0]", "array_col[1]").show(2)

# SELECT split(Description, ' ')[0] FROM dfTable

+------------+------------+
|array_col[0]|array_col[1]|
+------------+------------+
|       WHITE|     HANGING|
|       WHITE|       METAL|
+------------+------------+
only showing top 2 rows



In [None]:
from pyspark.sql.functions import array_contains

df.select( array_contains(split(col("Description"), " "), "WHITE"), size(split(col("Description"), " "))).show(2)

# SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable

# explode

The explode function takes a column that consists of arrays and creates one row (with the rest of the values duplicated) per value in the array.

In [79]:
from pyspark.sql.functions import split, explode

df.withColumn("Splited", split(col("Description"), " "))\
    .withColumn("Explode", explode(col("Splited")))\
    .select("Description", "InvoiceNo", "Explode").show(3);

# df.select("Description", "InvoiceNo").show(36)

+--------------------+---------+-------+
|         Description|InvoiceNo|Explode|
+--------------------+---------+-------+
|WHITE HANGING HEA...|   536365|  WHITE|
|WHITE HANGING HEA...|   536365|HANGING|
|WHITE HANGING HEA...|   536365|  HEART|
+--------------------+---------+-------+
only showing top 3 rows



# Maps
Maps are created by using the map function and key-value pairs of columns. You then can select them just like you might select from an array:

In [80]:
from pyspark.sql.functions import create_map

df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_Map"))\
    .selectExpr("complex_Map['WHITE METAL LANTERN']").show(3)

+--------------------------------+
|complex_Map[WHITE METAL LANTERN]|
+--------------------------------+
|                            null|
|                          536365|
|                            null|
+--------------------------------+
only showing top 3 rows



In [84]:
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_Map"))\
    .selectExpr("explode(complex_Map)").show(3)

+--------------------+------+
|                 key| value|
+--------------------+------+
|WHITE HANGING HEA...|536365|
| WHITE METAL LANTERN|536365|
|CREAM CUPID HEART...|536365|
+--------------------+------+
only showing top 3 rows



# SQL in Spark


In [87]:
df.createOrReplaceTempView("SQLTable")

In [92]:
spark.sql("SELECT map(Description, InvoiceNo) as complex_map FROM SQLTable WHERE Description IS NOT NULL").show(3)

+--------------------+
|         complex_map|
+--------------------+
|[WHITE HANGING HE...|
|[WHITE METAL LANT...|
|[CREAM CUPID HEAR...|
+--------------------+
only showing top 3 rows



# User-Defined Functions (UDF)

One of the most powerful things that you can do in Spark is define your own functions. These user-defined functions (UDFs) make it possible for you to write your own custom transformations using Python or Scala and even use external libraries. UDFs can take and return one or more columns as input. Spark UDFs are incredibly powerful because you can write them in several different programming languages; you do not need to create them in an esoteric format or domain-specific language. They’re just functions that operate on the data, record by record. By default, these functions are registered as temporary functions to be used in that specific SparkSession or Context.
Although you can write UDFs in Scala, Python, or Java, there are performance considerations that you should be aware of. To illustrate this, we’re going to walk through exactly what happens when you create UDF, pass that into Spark, and then execute code using that UDF.
The first step is the actual function. We’ll create a simple one for this example. Let’s write a power3 function that takes a number and raises it to a power of three:

In [129]:
def calcPower3(int_value):
    return int_value ** 3 

calcPower3(3.0)


27.0

# NOTE:
If you specify the type that doesn’t align with the actual type returned by the function, Spark will not throw an error but will just return null to designate a failure. You can see this if you were to switch the return type in the following function to be a DoubleType:

In [130]:
from pyspark.sql.functions import udf, col

udfExampleDF = spark.range(5).toDF("numbers")

# registering the user defined function to make it available as dataframe functions/code.
caclPower3UDF = udf(calcPower3)
udfExampleDF.select(caclPower3UDF(col("numbers"))).show(5)


+-------------------+
|calcPower3(numbers)|
+-------------------+
|                  0|
|                  1|
|                  8|
|                 27|
|                 64|
+-------------------+



In [133]:
from pyspark.sql.types import IntegerType, DoubleType 

spark.udf.register("power3py", calcPower3, IntegerType()) # DoubleType will not work here and return null
# in Python
udfExampleDF.selectExpr("power3py(numbers)").show(5)

+-----------------+
|power3py(numbers)|
+-----------------+
|                0|
|                1|
|                8|
|               27|
|               64|
+-----------------+



# Chapter 7. Aggregations
