## **PYSPARK MEMORY MANAGEMENT**

In [1]:
import pandas as pd

APP_NAME = 'pyspark_python'
MASTER = 'local[*]'
from pyspark import SparkConf
from pyspark.sql import SparkSession


conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster(MASTER)
spark = SparkSession.builder.config(conf = conf).getOrCreate()
sc = spark.sparkContext

In [2]:
import datetime

# load data
modelDataFile = "../data/bank-transactions/trans.asc"

CV_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .option("delimiter", ";") \
  .load(modelDataFile)

CV_data.count()

1056320

In [3]:
CV_data.show(5)

+--------+----------+------+------+---------+------+-------+--------+----+-------+
|trans_id|account_id|  date|  type|operation|amount|balance|k_symbol|bank|account|
+--------+----------+------+------+---------+------+-------+--------+----+-------+
|  695247|      2378|930101|PRIJEM|    VKLAD| 700.0|  700.0|    null|null|   null|
|  171812|       576|930101|PRIJEM|    VKLAD| 900.0|  900.0|    null|null|   null|
|  207264|       704|930101|PRIJEM|    VKLAD|1000.0| 1000.0|    null|null|   null|
| 1117247|      3818|930101|PRIJEM|    VKLAD| 600.0|  600.0|    null|null|   null|
|  579373|      1972|930102|PRIJEM|    VKLAD| 400.0|  400.0|    null|null|   null|
+--------+----------+------+------+---------+------+-------+--------+----+-------+
only showing top 5 rows



In [4]:
# change types of the data
from pyspark.sql.functions import col , column
from pyspark.sql.functions import unix_timestamp, from_unixtime

CV_data = CV_data.withColumn("date", col("date").cast("string"))
#transform string to date
from pyspark.sql.functions import to_date
CV_data = CV_data.withColumn("date_time", to_date(from_unixtime(unix_timestamp('date', 'yymmdd'))))

In [5]:
#-----------------------
#only year
#-----------------------
#import pyspark.sql.functions as f
#CV_data = CV_data.withColumn('year',f.year(f.to_timestamp('date', 'dd/MM/yyyy')))
#-----------------------
# last day of the month
# ----------------------
from pyspark.sql.functions import last_day
CV_data = CV_data.withColumn('partition_id', last_day(CV_data.date_time))
CV_data.show(3)

+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+------------+
|trans_id|account_id|  date|  type|operation|amount|balance|k_symbol|bank|account| date_time|partition_id|
+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+------------+
|  695247|      2378|930101|PRIJEM|    VKLAD| 700.0|  700.0|    null|null|   null|1993-01-01|  1993-01-31|
|  171812|       576|930101|PRIJEM|    VKLAD| 900.0|  900.0|    null|null|   null|1993-01-01|  1993-01-31|
|  207264|       704|930101|PRIJEM|    VKLAD|1000.0| 1000.0|    null|null|   null|1993-01-01|  1993-01-31|
+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+------------+
only showing top 3 rows



We'll partitionated this data to year-month to be saved and loaded. Clearly, we have incompleted data (just transactions for january), but this will be useful anyway.

In [6]:
CV_data.groupBy('partition_id').count().sort('partition_id').show(40)

+------------+------+
|partition_id| count|
+------------+------+
|  1993-01-31| 28205|
|  1994-01-31| 91628|
|  1995-01-31|133022|
|  1996-01-31|196779|
|  1997-01-31|284409|
|  1998-01-31|322277|
+------------+------+



## **Save and load partitioned tables**

Sometimes the memory use of some calculations would be excesive, so it would be necessary to write intermediate tables, that allow us to execute the process taking care of the memory. For this we can use *save parquet*, with the option *partitionBy* to save time when we are interested only in specific parts of the tables.

In [7]:
## save to parquet  
CV_data.write\
.mode("overwrite").partitionBy("partition_id").parquet('data/partitioned_data')

Note that we partitionate the data using the last day of the month to represent an entire moth. In this way we can read and modify specific partitions without to load all the table.

![alt text](fig/partition_by_date.png)

## **Load tables**

In this way we can load the entire table, that would be inefficient in many cases.

In [8]:
CV_data2 = spark.read.parquet('data/partitioned_data')
CV_data2.show(5)

+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+------------+
|trans_id|account_id|  date|  type|operation|amount|balance|k_symbol|bank|account| date_time|partition_id|
+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+------------+
| 3626680|      2907|980930|PRIJEM|     null|  85.1|22113.0|    UROK|null|   null|1998-01-30|  1998-01-31|
| 3627646|      2936|980930|PRIJEM|     null| 191.3|46995.0|    UROK|null|   null|1998-01-30|  1998-01-31|
| 3626619|      2906|980930|PRIJEM|     null|  98.5|20621.5|    UROK|null|   null|1998-01-30|  1998-01-31|
| 3627367|      2929|980930|PRIJEM|     null| 335.5|63704.6|    UROK|null|   null|1998-01-30|  1998-01-31|
| 3627279|      2927|980930|PRIJEM|     null| 420.0|56245.5|    UROK|null|   null|1998-01-30|  1998-01-31|
+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+------------+
only showing top 5 rows



Or we can load an specific partition to load the data. In this case the colunm *partition_id* is not loaded.

In [9]:
CV_data_01_98 = spark.read.parquet('data/partitioned_data/' + 'partition_id=1998-01-31')
CV_data_01_98.show(5)

+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+
|trans_id|account_id|  date|  type|operation|amount|balance|k_symbol|bank|account| date_time|
+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+
| 3626680|      2907|980930|PRIJEM|     null|  85.1|22113.0|    UROK|null|   null|1998-01-30|
| 3627646|      2936|980930|PRIJEM|     null| 191.3|46995.0|    UROK|null|   null|1998-01-30|
| 3626619|      2906|980930|PRIJEM|     null|  98.5|20621.5|    UROK|null|   null|1998-01-30|
| 3627367|      2929|980930|PRIJEM|     null| 335.5|63704.6|    UROK|null|   null|1998-01-30|
| 3627279|      2927|980930|PRIJEM|     null| 420.0|56245.5|    UROK|null|   null|1998-01-30|
+--------+----------+------+------+---------+------+-------+--------+----+-------+----------+
only showing top 5 rows



**Other recomentations:**

* When we do transformations is better to select (using the select command) only those columns necessary for the operations, in order to reduce the size of the table.
* In the time to proceeed to joint columns, it is not necessary to use *sort()*, spark can handle the joint operation without this. So, we can save time of processing avoiding the transformation *sort()* .

## **Avoid collect command**

To see the data, sometimes is better to save it in parquet, as before you can partition the data, over variables or only the number of the rows. Alternatives to partitionate the data are *partition* or *coalesce*. Here, we partitionated in two and three parts to compare the time spent. The load it is not equal than the case of the command *partitionBy* explained above.

In [10]:
#time of execution 
timestart= datetime.datetime.now()

## Inefficient code ----------------------------------------------------------

result = CV_data.collect() #will cause driver to collect the results

## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 9.38 segundos


In [11]:
#time of execution for one partition
timestart= datetime.datetime.now()
#spark.conf.set("spark.sql.shuffle.partitions", "1")

## better code -------------------------------------------------
CV_data.repartition(2).write.mode("overwrite").csv("data/test_2.csv") 
## will assign 2 executors
## to collect the result. Assuming executor are better provisioned
## -----------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 3.31 segundos


![alt text](fig/two_partitions.png)

In [12]:
#time of execution for one partition
timestart= datetime.datetime.now()
#spark.conf.set("spark.sql.shuffle.partitions", "1")

## better code -------------------------------------------------
CV_data.coalesce(3).write.mode("overwrite").csv("data/test_3.csv") 
## will assign x executors
## to collect the result. Assuming executor are better provisioned
## -----------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 3.56 segundos


![alt text](fig/three_partitions.png)

## **Note**

You can use repartition along with selected variables that serves to group the results:

```
red_mov_with_customer.write\
.repartition(100,_PKEY_C2C_INFO_)
.parquet('red_mov')
```

where _PKEY_C2C_INFO_ is a list of variables

## **Avoid additional transformations**
Like in the case of *sort*, we have to avoid to use additional operations that delay the results. Thera are functions that spark give us that simplify many processes that we need.
For example row_number() over a window can help us to enumerate some cases.

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

CV_data2 = CV_data2.withColumn("row_number", F.row_number()\
                    .over(Window.partitionBy("account_id", "partition_id")\
                          .orderBy("partition_id")))
# we have to avoid pandas, but to see the results in this case is useful to see
#all the columns
CV_data2.toPandas() 

Unnamed: 0,trans_id,account_id,date,type,operation,amount,balance,k_symbol,bank,account,date_time,partition_id,row_number
0,3530735,10,970531,PRIJEM,,196.4,42470.4,UROK,,,1997-01-31,1997-01-31,1
1,2678,10,970608,VYDAJ,VYBER,23840.0,18615.8,,,,1997-01-08,1997-01-31,2
2,2622,10,970612,VYDAJ,PREVOD NA UCET,7033.0,11582.8,SIPO,UV,18686104.0,1997-01-12,1997-01-31,3
3,2586,10,970613,PRIJEM,VKLAD,26529.0,38111.8,,,,1997-01-13,1997-01-31,4
4,2766,10,970624,PRIJEM,VKLAD,700.0,38811.8,,,,1997-01-24,1997-01-31,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1056315,3410801,11320,950131,VYDAJ,VYBER,14.6,30698.8,SLUZBY,,,1995-01-31,1995-01-31,62
1056316,3529384,11320,950131,PRIJEM,,150.9,30713.4,UROK,,,1995-01-31,1995-01-31,63
1056317,3410689,11320,950207,PRIJEM,VKLAD,17279.0,47977.8,,,,1995-01-07,1995-01-31,64
1056318,3410868,11320,950215,VYDAJ,VYBER,10500.0,37477.8,,,,1995-01-15,1995-01-31,65


## **Persistence**

Spark DataFrames can be *saved* or *cached* in Spark memory with the persist(). The persist() commnad allows saving the DataFrame using different storage levels:

-------------------------------------------------------------------------
* MEMORY_ONLY: stores objects in the Spark memory
* MEMORY_AND_DISK: stores serialized objects in the Spark memory
* DISK_ONLY: stores the data on the local disk
--------------------------------------------------------------------------
Ex.

.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)

This is useful when you will use the results of an specific data.frame to get different data.frames, so in order to not repeat the same operations several times you cache or persist the original data.frame.

**For example:**
We want to keep only the last transaction of each day for each account, to later apply other transfomation to the data.frame. 

In [14]:
CV_data_0 = spark.read.parquet('data/partitioned_data/')
CV_transf = CV_data_0.select(['date_time', 'trans_id', 'account_id', 'balance', 'partition_id'])\
.sort("account_id", "date_time", "trans_id")\
.drop_duplicates(subset=['date_time', 'account_id'])\
.sort("account_id", "date_time", "trans_id")

CV_transf.show(5)

+----------+--------+----------+-------+------------+
| date_time|trans_id|account_id|balance|partition_id|
+----------+--------+----------+-------+------------+
|1995-01-05|      58|         1|19035.3|  1995-01-31|
|1995-01-13|       5|         1| 4679.0|  1995-01-31|
|1995-01-19|     206|         1|19821.1|  1995-01-31|
|1995-01-20|     204|         1|22014.3|  1995-01-31|
|1995-01-21|     203|         1|21402.7|  1995-01-31|
+----------+--------+----------+-------+------------+
only showing top 5 rows



In [15]:
CV_transf.explain()

== Physical Plan ==
*(4) Sort [account_id#404 ASC NULLS FIRST, date_time#413 ASC NULLS FIRST, trans_id#403 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(account_id#404 ASC NULLS FIRST, date_time#413 ASC NULLS FIRST, trans_id#403 ASC NULLS FIRST, 200)
   +- *(3) HashAggregate(keys=[date_time#413, account_id#404], functions=[first(trans_id#403, false), first(balance#409, false), first(partition_id#414, false)])
      +- Exchange hashpartitioning(date_time#413, account_id#404, 200)
         +- *(2) HashAggregate(keys=[date_time#413, account_id#404], functions=[partial_first(trans_id#403, false), partial_first(balance#409, false), partial_first(partition_id#414, false)])
            +- *(2) Sort [account_id#404 ASC NULLS FIRST, date_time#413 ASC NULLS FIRST, trans_id#403 ASC NULLS FIRST], true, 0
               +- Exchange rangepartitioning(account_id#404 ASC NULLS FIRST, date_time#413 ASC NULLS FIRST, trans_id#403 ASC NULLS FIRST, 200)
                  +- *(1) Project [date_tim

If you want to save the result to a (1) database and later (2) apply some additional transformation, then each time you would repeat the operations above.

## **Ineficient code**

In [16]:
#time of execution 
timestart= datetime.datetime.now()

## Inefficient code (1)----------------------------------------------------------
CV_transf.write\
.mode("overwrite").partitionBy("partition_id").parquet('data/partitioned_data_2')
## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 20.31 segundos


In [17]:
#time of execution 
timestart= datetime.datetime.now()

## Inefficient code (2)----------------------------------------------------------
CV_transf_2 = CV_transf\
.groupby(['trans_id', 'account_id', 'balance', 'partition_id'])\
.avg('balance')
## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 0.02 segundos


## **Efficient code**

In [18]:
CV_transf = CV_transf.persist()

In [19]:
#time of execution 
timestart= datetime.datetime.now()

## Efficient code (1)----------------------------------------------------------
CV_transf.write\
.mode("overwrite").partitionBy("partition_id")\
.parquet('data/partitioned_data_2')
## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 20.48 segundos


In [20]:
#time of execution 
timestart= datetime.datetime.now()

## Efficient code (2)----------------------------------------------------------
CV_transf_2 = CV_transf\
.groupby(['trans_id', 'account_id', 'balance', 'partition_id'])\
.avg('balance')
## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

Time require to run the model: 0.01 segundos


In [21]:
## do not forget
CV_transf.unpersist()

DataFrame[date_time: date, trans_id: int, account_id: int, balance: double, partition_id: date]

## **Plan of execution**

There is not difference between the order of filter or select looking the plan of execution of spark.

In [22]:
import pyspark.sql.functions as f
CV = spark.read.parquet('data/partitioned_data/')
df1 = CV.select('trans_id', 'account_id', 'amount', 'balance', 'partition_id')\
.filter(f.col('account_id')== '2907')
df2 = CV.filter(f.col('account_id')== '2907')\
.select('trans_id', 'account_id', 'amount', 'balance', 'partition_id')

In [23]:
df1.explain()

== Physical Plan ==
*(1) Project [trans_id#628, account_id#629, amount#633, balance#634, partition_id#639]
+- *(1) Filter (isnotnull(account_id#629) && (account_id#629 = 2907))
   +- *(1) FileScan parquet [trans_id#628,account_id#629,amount#633,balance#634,partition_id#639] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/erikapat/Dropbox/PRUEBAS_DATA_SCIENCE/SPARK/GIT_SPARK-PRACTICE-NOTE..., PartitionCount: 6, PartitionFilters: [], PushedFilters: [IsNotNull(account_id), EqualTo(account_id,2907)], ReadSchema: struct<trans_id:int,account_id:int,amount:double,balance:double>


In [24]:
df2.explain()

== Physical Plan ==
*(1) Project [trans_id#628, account_id#629, amount#633, balance#634, partition_id#639]
+- *(1) Filter (isnotnull(account_id#629) && (account_id#629 = 2907))
   +- *(1) FileScan parquet [trans_id#628,account_id#629,amount#633,balance#634,partition_id#639] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/erikapat/Dropbox/PRUEBAS_DATA_SCIENCE/SPARK/GIT_SPARK-PRACTICE-NOTE..., PartitionCount: 6, PartitionFilters: [], PushedFilters: [IsNotNull(account_id), EqualTo(account_id,2907)], ReadSchema: struct<trans_id:int,account_id:int,amount:double,balance:double>


In [25]:
#time of execution 
timestart= datetime.datetime.now()

## --------------------------------------------------------------
CV.select('trans_id', 'account_id', 'amount', 'balance', 'partition_id')\
.filter(f.col('account_id')== '2907').show(5)
## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

+--------+----------+------+-------+------------+
|trans_id|account_id|amount|balance|partition_id|
+--------+----------+------+-------+------------+
| 3626680|      2907|  85.1|22113.0|  1998-01-31|
|  853005|      2907|6891.0|28989.4|  1998-01-31|
|  853077|      2907|3983.0|25006.4|  1998-01-31|
|  853233|      2907|  14.6|25089.4|  1998-01-31|
| 3626681|      2907|  97.6|25104.0|  1998-01-31|
+--------+----------+------+-------+------------+
only showing top 5 rows

Time require to run the model: 0.13 segundos


In [26]:
#time of execution 
timestart= datetime.datetime.now()

## --------------------------------------------------------------
CV.filter(f.col('account_id')== '2907')\
.select('trans_id', 'account_id', 'amount', 'balance', 'partition_id').show(5)
## -----------------------------------------------------------------------------

# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")

+--------+----------+------+-------+------------+
|trans_id|account_id|amount|balance|partition_id|
+--------+----------+------+-------+------------+
| 3626680|      2907|  85.1|22113.0|  1998-01-31|
|  853005|      2907|6891.0|28989.4|  1998-01-31|
|  853077|      2907|3983.0|25006.4|  1998-01-31|
|  853233|      2907|  14.6|25089.4|  1998-01-31|
| 3626681|      2907|  97.6|25104.0|  1998-01-31|
+--------+----------+------+-------+------------+
only showing top 5 rows

Time require to run the model: 0.08 segundos


## **join broadcast**

Spark broadcast joins are perfect for joining a large DataFrame with a small DataFrame. Broadcast joins cannot be used when joining two large DataFrames.

In [29]:
# load data
modelDataFile = "../data/bank-transactions/account.asc"

client_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .option("delimiter", ";") \
  .load(modelDataFile)

client_data = client_data.select('account_id', 'district_id')\
.drop_duplicates(subset=['account_id', 'district_id'])

In [31]:
CV = spark.read.parquet('data/partitioned_data/')

In [42]:
#time of execution 
timestart= datetime.datetime.now()

## --------------------------------------------------------------
from pyspark.sql.functions import broadcast

#You do the broadcast in the small dataset
data_joined = CV.join(broadcast(client_data), 
                 CV.account_id == client_data.account_id)
print(data_joined.count())
## --------------------------------------------------------------
# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")


1056320
Time require to run the model: 0.23 segundos


In [44]:
#time of execution 
timestart= datetime.datetime.now()

## --------------------------------------------------------------
# Perform join
data_joined = client_data.join(CV, client_data.account_id == CV.account_id)
print(data_joined.count())
## --------------------------------------------------------------
# Calculation of the time
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print("Time require to run the model: " + str(timedelta) + " segundos")



1056320
Time require to run the model: 0.22 segundos


## References

* WHY YOUR SPARK APPS ARE SLOW OR FAILING: PART I MEMORY MANAGEMENT. [Here]()
* WHY YOUR SPARK APPS ARE SLOW OR FAILING: PART II DATA SKEW AND GARBAGE COLLECTION [Here](https://unraveldata.com/common-failures-slowdowns-part-ii/)