In [1]:
from pyspark.ml.feature import SQLTransformer
from pyspark.sql import SparkSession

Having imported the relevant classes from pyspark, we are now ready to get or create a SparkSession named "SQLTransformerExample" that is available for us to run Spark jobs.

In [2]:
spark = SparkSession\
        .builder\
        .appName("SQLTransformerExample")\
        .getOrCreate()

21/09/28 17:58:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Using the resources in that SparkSession, we create a _Spark_ dataframe that we populate with some numbers. Note that the fields in that dataframe are named `id`, `vi` and `v2`.

In [3]:
sparkDf = spark.createDataFrame([
        (0, 1.0, 3.0),
        (2, 2.0, 5.0)
    ], ["id", "v1", "v2"])

We inspect the resulting `sparkDf` dataframe, to make sure it looks like what we want.

In [4]:
sparkDf.show()

                                                                                

+---+---+---+
| id| v1| v2|
+---+---+---+
|  0|1.0|3.0|
|  2|2.0|5.0|
+---+---+---+



Now we define a simple Transformation specified in the SQL statement: adding 2 computed columns v3 and v4.

In [5]:
sqlTrans = SQLTransformer(
        statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

Now we can apply this SQL transformation to `sparkDf`, which replaces the placeholder `__THIS__` in `sqlTrans` above.

In [6]:
sparkDf2 = sqlTrans.transform(sparkDf)
sparkDf2.show()

+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+



We can convert the Spark dataframe (which lives in HDFS) to an in-memory pandas dataframe as follows

In [7]:
pandasDf = sparkDf2.select("*").toPandas()
pandasDf

Unnamed: 0,id,v1,v2,v3,v4
0,0,1.0,3.0,4.0,3.0
1,2,2.0,5.0,7.0,10.0


Now we can modify that pandas dataframe, using typical pandas operations, adding a new column to the dataframe. We can view the output just to check that the `v5` column has been added.

In [8]:
import pandas as pd
pandasDf['v5'] = pandasDf['v4'] - pandasDf['v3']
pandasDf

Unnamed: 0,id,v1,v2,v3,v4,v5
0,0,1.0,3.0,4.0,3.0,-1.0
1,2,2.0,5.0,7.0,10.0,3.0


We can convert pandasDf to a Spark dataframe, writing it to HDFS in the process...

In [9]:
sparkDf3 = spark.createDataFrame(pandasDf)
sparkDf3.show()

+---+---+---+---+----+----+
| id| v1| v2| v3|  v4|  v5|
+---+---+---+---+----+----+
|  0|1.0|3.0|4.0| 3.0|-1.0|
|  2|2.0|5.0|7.0|10.0| 3.0|
+---+---+---+---+----+----+




We now stop that SparkContext, releasing its resources back into the pool.

In [10]:
spark.stop()