## Creatinfg Spark session 


In [None]:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession


  Logger.getLogger("org").setLevel(Level.ERROR)

  val spark = SparkSession.builder
    .appName("SparkSessionExample")
    .config("spark.master", "local")
    .config("spark.sql.shuffle.partitions", "5")
    .config("spark.executor.memory", "1g")
    .config("spark.driver.memory", "1g")
    .config("spark.executor.cores", "2")
    .config("spark.default.parallelism", "5")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalogImplementation", "hive")
    .enableHiveSupport() // If you want to enable Hive support
    .getOrCreate()

  println("Hello world!", spark)

## Creating a DataFrame
In the following, I would like to create a fake DataFrame which contains fake bought and sold stock assets.

#### First approach of a creating a DataFrame— Creating DataFrame using Tuple and .toDF() function
The first approach for creating a data frame in Spark using Scala syntax is to use the spark.implicits._.

In this approach, each row of the data frame corresponds to a tuple in which we bring the name of the columns in the .toDF() function. Let us create a DataFrame with a few rows using the following code snippet:

In [1]:
import spark.implicits._
  
val data = Seq(
    ("Stock", "Plus500", 0.5938, "Amazon", "01/11/2021", 3368.000, 4000.0),
 ("Stock", "Plus500", 0.6, "Facebook", "01/11/2021", 160.0, 200.0),
 ("Stock", "Plus500", 0.68, "Amazon", "01/11/2021", 160.0, 200.0),
 ("Stock", "eToro", 1.5, "Facebook", "12/11/2021", 180.0, 250.0),
 ("Stock", "Plus500", 0.065, "LinkedIn", "12/11/2021", 80.0, 140.0),
 ("Stock", "eToro", 1.3, "Pfeizer", "01/12/2021", 34.0, 85.5),
 ("Stock", "Plus500", 0.01, "Bitcoin", "01/11/2021", 45000.0, 48000.0),
 ("Stock", "Plus500", 0.08, "Sand", "29/11/2021", 5.4, 8.9)
    )

val firstApproachDF = data.toDF("Asset", "Platform", "Unit", "Trade Name", "Buy Date", "Buy Price", "Sell Price")

firstApproachDF.show()

firstApproachDF.printSchema 

Intitializing Scala interpreter ...

Spark Web UI available at http://host.docker.internal:4040
SparkContext available as 'sc' (version = 3.3.0, master = local[*], app id = local-1677920153771)
SparkSession available as 'spark'


+-----+--------+------+----------+----------+---------+----------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|
+-----+--------+------+----------+----------+---------+----------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|
+-----+--------+------+----------+----------+---------+----------+

root
 |-- Asset: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Unit: double (nullable = false)
 |-- Trade Name: string (nullable = true)
 |-- Buy Date: string (nullable = 

import spark.implicits._
data: Seq[(String, String, Double, String, String, Double, Double)] = List((Stock,Plus500,0.5938,Amazon,01/11/2021,3368.0,4000.0), (Stock,Plus500,0.6,Facebook,01/11/2021,160.0,200.0), (Stock,Plus500,0.68,Amazon,01/11/2021,160.0,200.0), (Stock,eToro,1.5,Facebook,12/11/2021,180.0,250.0), (Stock,Plus500,0.065,LinkedIn,12/11/2021,80.0,140.0), (Stock,eToro,1.3,Pfeizer,01/12/2021,34.0,85.5), (Stock,Plus500,0.01,Bitcoin,01/11/2021,45000.0,48000.0), (Stock,Plus500,0.08,Sand,29/11/2021,5.4,8.9))
firstApproachDF: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 5 more fields]


#### Second approach of creating dataframe — Using createDataFrame() function
In the second approach, we use spark.createDataFrame() to create a DataFrame. The data still contains the sequence of tuples in Scala like the following:

In [2]:
val data = Seq(
    ("Stock", "Plus500", 0.5938, "Amazon", "01/11/2021", 3368.000, 4000.0),
 ("Stock", "Plus500", 0.6, "Facebook", "01/11/2021", 160.0, 200.0),
 ("Stock", "Plus500", 0.68, "Amazon", "01/11/2021", 160.0, 200.0),
 ("Stock", "eToro", 1.5, "Facebook", "12/11/2021", 180.0, 250.0),
 ("Stock", "Plus500", 0.065, "LinkedIn", "12/11/2021", 80.0, 140.0),
 ("Stock", "eToro", 1.3, "Pfeizer", "01/12/2021", 34.0, 85.5),
 ("Stock", "Plus500", 0.01, "Bitcoin", "01/11/2021", 45000.0, 48000.0),
 ("Stock", "Plus500", 0.08, "Sand", "29/11/2021", 5.4, 8.9)
    )
    
 val secondApproachDF =  spark.createDataFrame(data)
   
 
 secondApproachDF.show()
 secondApproachDF.printSchema

+-----+-------+------+--------+----------+-------+-------+
|   _1|     _2|    _3|      _4|        _5|     _6|     _7|
+-----+-------+------+--------+----------+-------+-------+
|Stock|Plus500|0.5938|  Amazon|01/11/2021| 3368.0| 4000.0|
|Stock|Plus500|   0.6|Facebook|01/11/2021|  160.0|  200.0|
|Stock|Plus500|  0.68|  Amazon|01/11/2021|  160.0|  200.0|
|Stock|  eToro|   1.5|Facebook|12/11/2021|  180.0|  250.0|
|Stock|Plus500| 0.065|LinkedIn|12/11/2021|   80.0|  140.0|
|Stock|  eToro|   1.3| Pfeizer|01/12/2021|   34.0|   85.5|
|Stock|Plus500|  0.01| Bitcoin|01/11/2021|45000.0|48000.0|
|Stock|Plus500|  0.08|    Sand|29/11/2021|    5.4|    8.9|
+-----+-------+------+--------+----------+-------+-------+

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: double (nullable = false)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: double (nullable = false)
 |-- _7: double (nullable = false)



data: Seq[(String, String, Double, String, String, Double, Double)] = List((Stock,Plus500,0.5938,Amazon,01/11/2021,3368.0,4000.0), (Stock,Plus500,0.6,Facebook,01/11/2021,160.0,200.0), (Stock,Plus500,0.68,Amazon,01/11/2021,160.0,200.0), (Stock,eToro,1.5,Facebook,12/11/2021,180.0,250.0), (Stock,Plus500,0.065,LinkedIn,12/11/2021,80.0,140.0), (Stock,eToro,1.3,Pfeizer,01/12/2021,34.0,85.5), (Stock,Plus500,0.01,Bitcoin,01/11/2021,45000.0,48000.0), (Stock,Plus500,0.08,Sand,29/11/2021,5.4,8.9))
secondApproachDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 5 more fields]


#### Schema and columns of a DataFrame
It is always possible to use the following methods to see the schema and columns of a DataFrame in Spark. Let us consider the firstAppachDF defined in previous sub section as follows:

Schema of DataFrame

In [3]:
firstApproachDF.schema


res2: org.apache.spark.sql.types.StructType = StructType(StructField(Asset,StringType,true),StructField(Platform,StringType,true),StructField(Unit,DoubleType,false),StructField(Trade Name,StringType,true),StructField(Buy Date,StringType,true),StructField(Buy Price,DoubleType,false),StructField(Sell Price,DoubleType,false))


#### Printing schema of the DataFrame

In [5]:
firstApproachDF.printSchema


root
 |-- Asset: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Unit: double (nullable = false)
 |-- Trade Name: string (nullable = true)
 |-- Buy Date: string (nullable = true)
 |-- Buy Price: double (nullable = false)
 |-- Sell Price: double (nullable = false)



#### Columns of a DataFrame

In [6]:
firstApproachDF.columns


res5: Array[String] = Array(Asset, Platform, Unit, Trade Name, Buy Date, Buy Price, Sell Price)


#### A single column


In [7]:
firstApproachDF.col("Asset")


res6: org.apache.spark.sql.Column = Asset


#### Iterating over columns of a DataFrame using Map function and appending values
Sometimes, it is required to go through every column of a DataFrame and append a suffix. It is possible to achieve that using a map() function like below:

In [8]:
firstApproachDF.columns.map(col => col + "_suffix")


res7: Array[String] = Array(Asset_suffix, Platform_suffix, Unit_suffix, Trade Name_suffix, Buy Date_suffix, Buy Price_suffix, Sell Price_suffix)


#### Various approaches for selecting columns of a DataFrame
There are various ways to select columns of a DataFrame. Selecting the columns of a DataFrame is important due to two aspects:

You might see various approaches in code so it is better to see these formats.
You may need to add some sort of dynamics to your code in order to expand a DataFrame.
Approach 1. using select and column names

In [14]:
val DF = firstApproachDF

DF: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 5 more fields]


In [16]:
DF.select("Asset","Platform").show(5)


+-----+--------+
|Asset|Platform|
+-----+--------+
|Stock| Plus500|
|Stock| Plus500|
|Stock| Plus500|
|Stock|   eToro|
|Stock| Plus500|
+-----+--------+
only showing top 5 rows



### Approach 2 and 3. using select with $ sign or col( )
I brought the second and third approaches as it is possible to use them together like the following:

In [17]:
DF.select($"Asset", $"Platform", $"Unit", col("Trade Name"), 
col("Buy Date")).show(5)

+-----+--------+------+----------+----------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|
+-----+--------+------+----------+----------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|
+-----+--------+------+----------+----------+
only showing top 5 rows



#### Approach 4. using expressions and spark implicits

In [18]:
DF.select(expr("Asset"), $"Trade Name").show(5)

+-----+----------+
|Asset|Trade Name|
+-----+----------+
|Stock|    Amazon|
|Stock|  Facebook|
|Stock|    Amazon|
|Stock|  Facebook|
|Stock|  LinkedIn|
+-----+----------+
only showing top 5 rows



It is also possible to apply calculations using expr. Let us consider the following:

In [19]:
DF.select($"Unit",expr("Unit * 2").as("Unit Multiplied by 2"), 
$"Trade Name").show(5)

+------+--------------------+----------+
|  Unit|Unit Multiplied by 2|Trade Name|
+------+--------------------+----------+
|0.5938|              1.1876|    Amazon|
|   0.6|                 1.2|  Facebook|
|  0.68|                1.36|    Amazon|
|   1.5|                 3.0|  Facebook|
| 0.065|                0.13|  LinkedIn|
+------+--------------------+----------+
only showing top 5 rows



#### Approach 5. using selectExpr
Let us consider the following example of calculating profit from the difference of Buy Price and Sell Price using selectExpr function. Please note that we should use `` for columns with spaces:

In [20]:
DF.selectExpr("Asset", "Platform", """`Trade Name`""", """`Buy Date`""", 
"Unit", """ `Sell Price` - `Buy Price` as profit""" ).show(5)

+-----+--------+----------+----------+------+------+
|Asset|Platform|Trade Name|  Buy Date|  Unit|profit|
+-----+--------+----------+----------+------+------+
|Stock| Plus500|    Amazon|01/11/2021|0.5938| 632.0|
|Stock| Plus500|  Facebook|01/11/2021|   0.6|  40.0|
|Stock| Plus500|    Amazon|01/11/2021|  0.68|  40.0|
|Stock|   eToro|  Facebook|12/11/2021|   1.5|  70.0|
|Stock| Plus500|  LinkedIn|12/11/2021| 0.065|  60.0|
+-----+--------+----------+----------+------+------+
only showing top 5 rows




#### Renaming a column and creating a new column
It is possible to rename the column name using .withColumnRenamed() function, so let us have a look at it by adding underscore (_) to the column names with spaces:

In [21]:
val renamedColumns = DF.withColumnRenamed("Trade Name","Trade_Name")
.withColumnRenamed("Buy Price","Buy_Price")
.withColumnRenamed("Sell Price","Sell_Price")

renamedColumns.show()

+-----+--------+------+----------+----------+---------+----------+
|Asset|Platform|  Unit|Trade_Name|  Buy Date|Buy_Price|Sell_Price|
+-----+--------+------+----------+----------+---------+----------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|
+-----+--------+------+----------+----------+---------+----------+



renamedColumns: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 5 more fields]


For creating a new column, we can use the function .withColumn() in which we can specify the column name as well. In the following, we create two columns named “Profit” and “Profit Percentage”

In [22]:
val profitDF = DF.withColumn("Profit", col("Sell Price") - col("Buy Price"))
.withColumn("Profit Percentage", col("Sell Price") / col("Buy Price"))

profitDF.show()

+-----+--------+------+----------+----------+---------+----------+------+------------------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|Profit| Profit Percentage|
+-----+--------+------+----------+----------+---------+----------+------+------------------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0| 632.0| 1.187648456057007|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|  40.0|              1.25|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|  40.0|              1.25|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|  70.0|1.3888888888888888|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|  60.0|              1.75|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|  51.5| 2.514705882352941|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|3000.0|1.0666666666666667|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|   3

profitDF: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 7 more fields]


Filtering rows in Spark
Oftentimes, it is required to filter rows of DataFrame based on certain criteria. For doing so, it is possible to use filter function in which in the following, I review some example of filter functions:

Example 1

In [23]:
DF.filter(col("Platform") === "Plus500").show()

+-----+--------+------+----------+----------+---------+----------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|
+-----+--------+------+----------+----------+---------+----------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|
+-----+--------+------+----------+----------+---------+----------+



Example 2. using OR operator (||)

In [25]:
val orExampleDF = DF.filter(col("Trade Name") === "Amazon" || col("Trade Name") ==="Facebook")

orExampleDF.show()

+-----+--------+------+----------+----------+---------+----------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|
+-----+--------+------+----------+----------+---------+----------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|
+-----+--------+------+----------+----------+---------+----------+



orExampleDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Asset: string, Platform: string ... 5 more fields]


Example 3. using AND operator (&&)
Keeping only Facebook for the Trade Name and Plus500 as the platform:

In [26]:
val andExampleDF = DF.filter(col("Trade Name") === "Facebook" && col("Platform") ==="Plus500")

andExampleDF.show()

+-----+--------+----+----------+----------+---------+----------+
|Asset|Platform|Unit|Trade Name|  Buy Date|Buy Price|Sell Price|
+-----+--------+----+----------+----------+---------+----------+
|Stock| Plus500| 0.6|  Facebook|01/11/2021|    160.0|     200.0|
+-----+--------+----+----------+----------+---------+----------+



andExampleDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Asset: string, Platform: string ... 5 more fields]


Example 4. Not equal and greater than (>)
Let us select all trades with more than 0.5 unit in which the trade name is not Amazon:

In [27]:
val demoDF = DF.filter(col("Unit") >= 0.5 && col("Trade Name") =!= "Amazon")

demoDF.show()

+-----+--------+----+----------+----------+---------+----------+
|Asset|Platform|Unit|Trade Name|  Buy Date|Buy Price|Sell Price|
+-----+--------+----+----------+----------+---------+----------+
|Stock| Plus500| 0.6|  Facebook|01/11/2021|    160.0|     200.0|
|Stock|   eToro| 1.5|  Facebook|12/11/2021|    180.0|     250.0|
|Stock|   eToro| 1.3|   Pfeizer|01/12/2021|     34.0|      85.5|
+-----+--------+----+----------+----------+---------+----------+



demoDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Asset: string, Platform: string ... 5 more fields]


#### Spark grouping and aggregation functions
It is very much common to calculate several statistics over groups of data. Spark has a very rich support for such calculations. Considering the DF from previous sections, let us calculate several statistics:

Example of count()
Here, we group by per Asset and Platform and count the number of trades. 

In [28]:
// Calculating number of trades per platform

val numberOfTradesPerPlatform = DF.groupBy("Asset", "Platform")
.agg(count("*").as("NumberOfTradesPerAssetPPlatForm"))

numberOfTradesPerPlatform.show()

+-----+--------+-------------------------------+
|Asset|Platform|NumberOfTradesPerAssetPPlatForm|
+-----+--------+-------------------------------+
|Stock| Plus500|                              6|
|Stock|   eToro|                              2|
+-----+--------+-------------------------------+



numberOfTradesPerPlatform: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 1 more field]


#### Example of countDistinct()
Here, we group by per Asset and Platform and count the distinct values of traded items:

In [29]:
// Count distinct of number of traded items
val numberOfAssets = DF.groupBy("Asset","Platform")
.agg(countDistinct("Trade Name").as("Number of Assets"))

numberOfAssets.show()

+-----+--------+----------------+
|Asset|Platform|Number of Assets|
+-----+--------+----------------+
|Stock|   eToro|               2|
|Stock| Plus500|               5|
+-----+--------+----------------+



numberOfAssets: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 1 more field]


#### Example of Sum, AVG, Median, Min, Max
In the following, we calculate sum, avg, min, max and P50 of buy and sell prices over over Asset, Platform and Trade Name:

In [31]:
val agg_df = DF.groupBy("Asset", "Platform", "Trade Name").agg(
    
    
    sum("Buy Price").as("buy_price_sum"),
    sum("Sell Price").as("sell_price_Sum"),
    
    min("Buy Price").as("buy_price_min"),
    min("Sell Price").as("sell_price_min"),
    
    max("Buy Price").as("buy_price_max"),
    max("Sell Price").as("sell_price_max"),
    
    avg("Buy Price").as("buy_price_avg"),
    avg("Sell Price").as("sell_price_avg"),
    
    expr("percentile(`Sell Price`, 0.5)").as("sell_price_P50"),
    expr("percentile(`Buy Price`, 0.5)").as("buy_price_P50")
    )
    
 agg_df.show()  

+-----+--------+----------+-------------+--------------+-------------+--------------+-------------+--------------+-------------+--------------+--------------+-------------+
|Asset|Platform|Trade Name|buy_price_sum|sell_price_Sum|buy_price_min|sell_price_min|buy_price_max|sell_price_max|buy_price_avg|sell_price_avg|sell_price_P50|buy_price_P50|
+-----+--------+----------+-------------+--------------+-------------+--------------+-------------+--------------+-------------+--------------+--------------+-------------+
|Stock| Plus500|    Amazon|       3528.0|        4200.0|        160.0|         200.0|       3368.0|        4000.0|       1764.0|        2100.0|        2100.0|       1764.0|
|Stock| Plus500|  Facebook|        160.0|         200.0|        160.0|         200.0|        160.0|         200.0|        160.0|         200.0|         200.0|        160.0|
|Stock|   eToro|  Facebook|        180.0|         250.0|        180.0|         250.0|        180.0|         250.0|        180.0|       

agg_df: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 11 more fields]


#### Spark dynamic groupBy and aggregation
On many occasions, you would like to perform aggregations on a lot of columns e.g., between 10 to 100 columns, and you would like to apply different aggregation functions, so it would not be practical to write all the code so it would be more efficient to use a bit of syntactic sugar using the map function. In another tutorial, I have explained in detail the Spark dynamic groupBy aggregation. I try to explain with an example but if you need further explanation, please refer to the link above.

Let us now perform the same groupBy and aggregation from the exact previous section as follows:

In [32]:
val intended_columns = List("Sell Price","Buy Price", "Unit")

val sumExpr = intended_columns.map(col => sum(col).as(col + "_sum"))
val avgExpr = intended_columns.map(col => avg(col).as(col + "_avg"))
val minExpr = intended_columns.map(col => min(col).as(col + "_min"))
val maxExpr = intended_columns.map(col => max(col).as(col + "_max"))

val aggExpression = sumExpr ++ avgExpr ++ minExpr ++ maxExpr

val agg_df_v2 = DF.groupBy("Asset", "Platform", "Trade Name")
.agg(aggExpression.head, aggExpression.tail: _*)


agg_df_v2.printSchema

root
 |-- Asset: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Trade Name: string (nullable = true)
 |-- Sell Price_sum: double (nullable = true)
 |-- Buy Price_sum: double (nullable = true)
 |-- Unit_sum: double (nullable = true)
 |-- Sell Price_avg: double (nullable = true)
 |-- Buy Price_avg: double (nullable = true)
 |-- Unit_avg: double (nullable = true)
 |-- Sell Price_min: double (nullable = true)
 |-- Buy Price_min: double (nullable = true)
 |-- Unit_min: double (nullable = true)
 |-- Sell Price_max: double (nullable = true)
 |-- Buy Price_max: double (nullable = true)
 |-- Unit_max: double (nullable = true)



intended_columns: List[String] = List(Sell Price, Buy Price, Unit)
sumExpr: List[org.apache.spark.sql.Column] = List(sum(Sell Price) AS `Sell Price_sum`, sum(Buy Price) AS `Buy Price_sum`, sum(Unit) AS Unit_sum)
avgExpr: List[org.apache.spark.sql.Column] = List(avg(Sell Price) AS `Sell Price_avg`, avg(Buy Price) AS `Buy Price_avg`, avg(Unit) AS Unit_avg)
minExpr: List[org.apache.spark.sql.Column] = List(min(Sell Price) AS `Sell Price_min`, min(Buy Price) AS `Buy Price_min`, min(Unit) AS Unit_min)
maxExpr: List[org.apache.spark.sql.Column] = List(max(Sell Price) AS `Sell Price_max`, max(Buy Price) AS `Buy Price_max`, max(Unit) AS Unit_max)
aggExpression: List[org.apache.spark.sql.Column] = List(sum(Sell Price) AS `Sell Price_sum`, sum(Buy Price) AS `Buy Price_sum`, sum(Unit) AS Unit...


We define a list of columns that are interested in performing aggregations i.e., Sell Price, Buy Price, and Unit. Now, we define the expressions. An example is sumExpr in which it iterates over all the columns in the intended_columns, sums the column value, and renames it. Afterward, we create a final aggExpression which is the concatenation of all the other expressions. We use aggExpression.head and aggExpression.tail in the agg() function together with “:_*” to consider all the expressions.

The output is very huge so I cannot print all the columns, but here is the schema:



So, you can imagine if we have a lot of columns, how much flexibility this approach can give us. We can also add the same flexibility to the list of column names in the groupBy expression.

## Spark Join
You can perform joins in two ways in which is brought by an example:

We saw these two DataFrames in previous sections:

In [33]:
val numberOfTradesPerPlatform = DF.groupBy("Asset", "Platform")
.agg(count("*").as("NumberOfTradesPerAssetPPlatForm"))



val numberOfAssets = DF.groupBy("Asset","Platform")
.agg(countDistinct("Trade Name").as("Number of Assets"))

numberOfTradesPerPlatform: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 1 more field]
numberOfAssets: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 1 more field]


First approach for join using Seq()

In [34]:
// First approach for join

val joinedDF = numberOfTradesPerPlatform
.join(numberOfAssets, Seq("Asset","Platform"))

joinedDF.show()

+-----+--------+-------------------------------+----------------+
|Asset|Platform|NumberOfTradesPerAssetPPlatForm|Number of Assets|
+-----+--------+-------------------------------+----------------+
|Stock|   eToro|                              2|               2|
|Stock| Plus500|                              6|               5|
+-----+--------+-------------------------------+----------------+



joinedDF: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 2 more fields]


Second Approach for join

In [35]:
// Second approach for join

val joinedDF2 = numberOfTradesPerPlatform.join(numberOfAssets, 
numberOfTradesPerPlatform.col("Asset") === numberOfAssets.col("Asset") 
&& numberOfTradesPerPlatform.col("Platform") === numberOfAssets.col("Platform"))
joinedDF2.show()

+-----+--------+-------------------------------+-----+--------+----------------+
|Asset|Platform|NumberOfTradesPerAssetPPlatForm|Asset|Platform|Number of Assets|
+-----+--------+-------------------------------+-----+--------+----------------+
|Stock|   eToro|                              2|Stock|   eToro|               2|
|Stock| Plus500|                              6|Stock| Plus500|               5|
+-----+--------+-------------------------------+-----+--------+----------------+



joinedDF2: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 4 more fields]


In the second approach, it can be seen that some of the columns are duplicated, so it is recommended to rename them before joining. We can repeat the second approach as follows to get rid of the repetitive columns:

In [None]:
// First renaming the columns in join and creating a new DF

val numberOfAssets_renamed = numberOfAssets
.withColumnRenamed("Asset","AssetV2")
.withColumnRenamed("Platform","PlatformV2")

// performing the join with the new DF and dropping the renamed columns

val joinedDF3 = numberOfTradesPerPlatform
.join(numberOfAssets_renamed, 
numberOfTradesPerPlatform.col("Asset") === numberOfAssets_renamed.col("AssetV2") 
&& numberOfTradesPerPlatform.col("Platform") === numberOfAssets_renamed.col("PlatformV2"))
.drop("AssetV2","PlatformV2")

joinedDF3.show()

In the above, it is possible to figure out that first the columns “Asset” and “Platform” are renamed to “AssetV2” and “PlatformV2”. Then the new DataFrame is used for the join, and finally, the renamed columns are dropped. Now, it should be possible to see the output without any duplicates:

#### Window Function — An alternative for groupBy()
The window function allows one to perform certain calculations on specific dimensions of the data and add the result to the DataFrame. In fact, instead of performing a groupBy/aggregation and joining back the data with the original DataFrame, you still have the opportunity to perform the same by using the Window function. In case this is not clear, let us investigate it with an example:

In [36]:
import org.apache.spark.sql.expressions.Window

val custom_partition = Window.partitionBy($"Asset", $"Platform", $"Trade Name")

val resulting_df = DF.withColumn("sum_buy_price", 
sum("Buy Price").over(custom_partition))
.withColumn("sum_sell_price", sum("Sell Price").over(custom_partition))
.withColumn("min_sell_price", min("Sell Price").over(custom_partition))
.withColumn("min_buy_price", min("Buy Price").over(custom_partition))
.withColumn("avg_sell_price", avg("Sell Price").over(custom_partition))
.withColumn("avg_buy_price", avg("Buy Price").over(custom_partition))


resulting_df.show()

+-----+--------+------+----------+----------+---------+----------+-------------+--------------+--------------+-------------+--------------+-------------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|sum_buy_price|sum_sell_price|min_sell_price|min_buy_price|avg_sell_price|avg_buy_price|
+-----+--------+------+----------+----------+---------+----------+-------------+--------------+--------------+-------------+--------------+-------------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|       3528.0|        4200.0|         200.0|        160.0|        2100.0|       1764.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|       3528.0|        4200.0|         200.0|        160.0|        2100.0|       1764.0|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|      45000.0|       48000.0|       48000.0|      45000.0|       48000.0|      45000.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|        16

import org.apache.spark.sql.expressions.Window
custom_partition: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@4c2132a5
resulting_df: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 11 more fields]


It is possible to see above that it seems like a groupBy/aggregation and joins back with the original DataFrame. Let us prove it by performing groupBy/aggregation and joining back and see if we can see the same result:

In [37]:

val groupBy_aggregation_df = DF.groupBy($"Asset", $"Platform", $"Trade Name")
.agg(sum("Buy Price").as("sum_buy_price"), 
sum("Sell Price").as("sum_sell_price"), 
min("Buy Price").as("min_buy_price"), 
min("Sell Price").as("min_sell_price"), 
avg("Sell Price").as("avg_sell_price"),
avg("Buy Price").as("avg_buy_price"))


val joined_back_df = DF.join(groupBy_aggregation_df, 
Seq("Asset", "Platform","Trade Name"))

joined_back_df.show()

+-----+--------+----------+------+----------+---------+----------+-------------+--------------+-------------+--------------+--------------+-------------+
|Asset|Platform|Trade Name|  Unit|  Buy Date|Buy Price|Sell Price|sum_buy_price|sum_sell_price|min_buy_price|min_sell_price|avg_sell_price|avg_buy_price|
+-----+--------+----------+------+----------+---------+----------+-------------+--------------+-------------+--------------+--------------+-------------+
|Stock| Plus500|    Amazon|  0.68|01/11/2021|    160.0|     200.0|       3528.0|        4200.0|        160.0|         200.0|        2100.0|       1764.0|
|Stock| Plus500|    Amazon|0.5938|01/11/2021|   3368.0|    4000.0|       3528.0|        4200.0|        160.0|         200.0|        2100.0|       1764.0|
|Stock| Plus500|  Facebook|   0.6|01/11/2021|    160.0|     200.0|        160.0|         200.0|        160.0|         200.0|         200.0|        160.0|
|Stock|   eToro|  Facebook|   1.5|12/11/2021|    180.0|     250.0|        18

groupBy_aggregation_df: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 7 more fields]
joined_back_df: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 11 more fields]


#### Spark orderBy()
Normally, after a group by and aggregation, it may be needed to see the result in a specific order of a column although this operation is very time-consuming and the DataFrame resulting from the groupBy should not be too huge. However, it is also possible to apply orderBy() on the original DataFrame. As mentioned, if the DataFrame would be huge, this operation would not be recommended. Let us apply orderBy() on the DF:

In [None]:
DF.orderBy(col("Buy Price").desc).show()

It is also possible to order by multiple columns and use $ instead of col() like the following:

In [38]:
DF.orderBy(col("Buy Price").desc, $"Sell Price".desc).show()

+-----+--------+------+----------+----------+---------+----------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|
+-----+--------+------+----------+----------+---------+----------+
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|
+-----+--------+------+----------+----------+---------+----------+



#### Rank function
Rank function together with Window allow to sort/rank the rows based on a specific column in our intended partition. Let us define a rank function by the help of Window and orderBy like the following:

In [39]:
import org.apache.spark.sql.expressions.Window

val custom_partition_rank = Window.partitionBy($"Asset", $"Platform")
                            .orderBy($"Sell Price".desc)

val resulting_df = DF.withColumn("rank_col", 
                      row_number().over(custom_partition_rank))


resulting_df.show()

+-----+--------+------+----------+----------+---------+----------+--------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|rank_col|
+-----+--------+------+----------+----------+---------+----------+--------+
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|       1|
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|       2|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|       3|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|       4|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|       5|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|       6|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|       1|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|       2|
+-----+--------+------+----------+----------+---------+----------+--------+



import org.apache.spark.sql.expressions.Window
custom_partition_rank: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dbd5764
resulting_df: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 6 more fields]


So, we can see that for each Platform, we can see the rank based on Sell Price in the following:

You can always keep the rows with your intended rank like the following in which we keep the rows with 1 or 2 rank using .filter(col(“rank_col”)<=2):

In [40]:
import org.apache.spark.sql.expressions.Window

val custom_partition_rank = Window.partitionBy($"Asset", $"Platform")
                            .orderBy($"Sell Price".desc)

val resulting_df = DF.withColumn("rank_col", 
                  row_number().over(custom_partition_rank))
                  .filter(col("rank_col")<=2)


resulting_df.show()


+-----+--------+------+----------+----------+---------+----------+--------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|rank_col|
+-----+--------+------+----------+----------+---------+----------+--------+
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|       1|
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|       2|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|       1|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|       2|
+-----+--------+------+----------+----------+---------+----------+--------+



import org.apache.spark.sql.expressions.Window
custom_partition_rank: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5e05b35d
resulting_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Asset: string, Platform: string ... 6 more fields]


#### Pivoting in Spark
Pivoting in Spark is a very much useful operation and you might require to pivot your data at some point. I have written a dedicated post regarding pivoting in Spark. Below, I briefly show on our example DF an example of pivoting:

In [41]:
val dfPrivot = DF.groupBy("Asset", "Trade Name").pivot("Platform",List("Plus500", "eToro"))
.agg(sum("Buy Price").as("sum_buy_price"), sum("Sell Price").as("sum_sell_price"))

dfPrivot.show()

+-----+----------+---------------------+----------------------+-------------------+--------------------+
|Asset|Trade Name|Plus500_sum_buy_price|Plus500_sum_sell_price|eToro_sum_buy_price|eToro_sum_sell_price|
+-----+----------+---------------------+----------------------+-------------------+--------------------+
|Stock|   Pfeizer|                 null|                  null|               34.0|                85.5|
|Stock|  Facebook|                160.0|                 200.0|              180.0|               250.0|
|Stock|   Bitcoin|              45000.0|               48000.0|               null|                null|
|Stock|  LinkedIn|                 80.0|                 140.0|               null|                null|
|Stock|    Amazon|               3528.0|                4200.0|               null|                null|
|Stock|      Sand|                  5.4|                   8.9|               null|                null|
+-----+----------+---------------------+---------------

dfPrivot: org.apache.spark.sql.DataFrame = [Asset: string, Trade Name: string ... 4 more fields]


In the above, pivot appeared after groupBy() and before agg() functions. Also, the input to the pivot is the column name i.e., Platform, and the corresponding list of values in the column name which is Plus500 and eToro.

#### UDF in Spark
UDFs are very much helpful to write a custom function that can be applied optimised on a Spark DataFrame. For defining a UDF, we need to a. import the “org.apache.spark.sql.functions.udf” b. define our own function c. wrap our function inside the udf( _) d. and apply the UDF for creating a new column. Let us see how to use UDF in Spark with the following simple example:

In [42]:
import org.apache.spark.sql.functions._

// Defining a simple multiplyByTwo function

def multiplyByTwo(x: Float) : Float =  x*2

// Wrap the function inside the udf( function_name _)

val multiplyByTwoUDF = udf(multiplyByTwo _)

// Apply the UDF on our intended colum names
val dfDoubled = DF.withColumn("buy_price_doubled", 
                  multiplyByTwoUDF(col("Buy price")) )
                  .withColumn("sell_price_doubled", 
                    multiplyByTwoUDF(col("Sell Price")))
dfDoubled.show()

+-----+--------+------+----------+----------+---------+----------+-----------------+------------------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|buy_price_doubled|sell_price_doubled|
+-----+--------+------+----------+----------+---------+----------+-----------------+------------------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0|           6736.0|            8000.0|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|            320.0|             400.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|            320.0|             400.0|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|            360.0|             500.0|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0|            160.0|             280.0|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|             68.0|             171.0|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.

import org.apache.spark.sql.functions._
multiplyByTwo: (x: Float)Float
multiplyByTwoUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$5632/1813232473@4befa70f,FloatType,List(Some(class[value[0]: float])),Some(class[value[0]: float]),None,false,true)
dfDoubled: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 7 more fields]


Let us have a look at a more complicated example of using multiple inputs, slightly more complicated function and using of $ sign for usage of UDFs:

In [43]:
def custom_function(unit: Float, buy: Float, sell: Float) : Float = {
    
    val diff = (sell - buy ) 
    
    val res =  unit * diff
    
    res
}

val custom_udf = udf(custom_function _)

val dfUDFExample = DF.withColumn("profit", 
custom_udf($"Unit", $"Buy Price", $"Sell Price"))

dfUDFExample.show()

+-----+--------+------+----------+----------+---------+----------+----------+
|Asset|Platform|  Unit|Trade Name|  Buy Date|Buy Price|Sell Price|    profit|
+-----+--------+------+----------+----------+---------+----------+----------+
|Stock| Plus500|0.5938|    Amazon|01/11/2021|   3368.0|    4000.0| 375.28162|
|Stock| Plus500|   0.6|  Facebook|01/11/2021|    160.0|     200.0|      24.0|
|Stock| Plus500|  0.68|    Amazon|01/11/2021|    160.0|     200.0|      27.2|
|Stock|   eToro|   1.5|  Facebook|12/11/2021|    180.0|     250.0|     105.0|
|Stock| Plus500| 0.065|  LinkedIn|12/11/2021|     80.0|     140.0| 3.8999999|
|Stock|   eToro|   1.3|   Pfeizer|01/12/2021|     34.0|      85.5|     66.95|
|Stock| Plus500|  0.01|   Bitcoin|01/11/2021|  45000.0|   48000.0|      30.0|
|Stock| Plus500|  0.08|      Sand|29/11/2021|      5.4|       8.9|0.27999994|
+-----+--------+------+----------+----------+---------+----------+----------+



custom_function: (unit: Float, buy: Float, sell: Float)Float
custom_udf: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$5667/2045681611@1b8e7893,FloatType,List(Some(class[value[0]: float]), Some(class[value[0]: float]), Some(class[value[0]: float])),Some(class[value[0]: float]),None,false,true)
dfUDFExample: org.apache.spark.sql.DataFrame = [Asset: string, Platform: string ... 6 more fields]


This function has three inputs, and the corresponding UDF also accepts three parameters. So, the output looks like the following:



#### Spark SQL
It is possible to write SQL queries and take advantage of the Spark distributed computing capability. For doing so, it is required to create a temporary view and apply the SQL query on the corresponding view. Let us show the usage of Spark SQL using our DataFrame DF:

In [49]:
DF.createOrReplaceTempView("DF_View")
val resulting_df = spark.sql(""" SELECT `Platform`, 
`Trade Name`, sum(`Buy Price`) AS sum_buy_price, 
sum(`Sell Price`) AS sum_sell_price FROM DF_View 
group by Platform, `Trade Name` """)

resulting_df.show()

+--------+----------+-------------+--------------+
|Platform|Trade Name|sum_buy_price|sum_sell_price|
+--------+----------+-------------+--------------+
| Plus500|    Amazon|       3528.0|        4200.0|
| Plus500|  Facebook|        160.0|         200.0|
|   eToro|  Facebook|        180.0|         250.0|
| Plus500|  LinkedIn|         80.0|         140.0|
|   eToro|   Pfeizer|         34.0|          85.5|
| Plus500|   Bitcoin|      45000.0|       48000.0|
| Plus500|      Sand|          5.4|           8.9|
+--------+----------+-------------+--------------+



resulting_df: org.apache.spark.sql.DataFrame = [Platform: string, Trade Name: string ... 2 more fields]


#### Spark Input and Output
Here, I would like to bring the code for reading and writing Parquet and CSV files in Spark

Various approaches for reading Parquet files

In [50]:
val df_ = spark.read.parquet(file_path)

// Reading a list of paths with Parquet Files with same schemaVarious ways of reading CSV file

val df = spark.read.parquet(path_list:_*)

<console>: 34: error: not found: value file_path

## Various approaches for reading CSV files

In [None]:
// Reading csv files using a map of options

val df_csv_v1 = spark.read.options(Map("inferSchema"->"true","delimiter"->","))
.csv(file_path)

// Reading csv file in a single folder path

val df_csv_v2 = spark.read.options(Map("inferSchema"->"true","delimiter"->","))
.csv(folder_path)

// A list of file paths with same schema

val df_csv_v3 = spark.read.options(Map("inferSchema"->"true","delimiter"->","))
.csv(file_path:_*)

// An example with a single option

val df_csv_v4 = spark.read.option("delimiter", ",").csv(file_path)

// Using option() sequentially

val df_csv_v5 = spark.read.option("delimiter", ",").option("inferSchema, "true")
.csv(file_path)

// Reading CSV file with a specific schema

val df_csv_v6 = spark.read.format("csv").option("header", "true")
      .schema(schema)
      .load(file_path)

#### Writing Parquet and CSV Files


In [None]:
// writes by the predefined number of partitions

DF.spark.write.parquet(folder_path)

// defining some options and number of partitions using coalesce()

DF.coalesce().spark.write.mode('append').parquet(folder_path)


// Using partition by

DF.write.mode('append').partitionBy('col_name').parquet(folder_path)


// writing CSV files

DF.write.csv(folder_path)