# Persist()

In [1]:
# Reading the dataframe

df = spark.read.format('csv')\
                .option('header','true')\
                .load('abfss://optimization@projectsynapsestorage.dfs.core.windows.net/3. Cache Persist/cache.csv')

In [2]:
print('Usign with Column addign a column')
from pyspark.sql.functions import col
df_transform = df.withColumn('UnEmployed Rate Percentage',(col('Unemployed')/col('Labor Force'))*100)
display(df_transform)

In [3]:
df_dropped = df_transform.drop('Unemployment Rate')

In [4]:
df_dropped.printSchema()

In [5]:
from pyspark.sql import functions as F

df_converted = df_dropped.withColumn('Employed',F.col('Employed').cast('Integer'))

In [15]:
from datetime import date,datetime
from pyspark.sql import functions as F

start = datetime.now()


# We are just doing a filtering columns where we are having degree
df_converted.filter(df_converted['Education Level'].contains('degree')).show(n=5)

# Lets do a sorting on same dataframe
df_converted.orderBy(col('Date Inserted').desc()).show(n=5)

# Lets also do some grouping
df_converted.groupBy('Gender').agg(F.sum('Employed'), F.avg('Employed')).show(n=5)


end = datetime.now()

duration = end-start

print('Notebook executed in ' + str(duration))


# ==================== PERSIST - MEMORY ONLY -========================

# PySpark Code : MEMORY_ONLY 

In [7]:
from pyspark import StorageLevel

df_converted.persist(StorageLevel.MEMORY_ONLY)

##### Unpersist()

In [9]:
df_converted.unpersist()

# Scala Code()
# Persist MEMORY_ONLY

In [10]:
%%spark
var df_Scala = spark.read.format("csv").option("header","true").load("abfss://optimization@projectsynapsestorage.dfs.core.windows.net/3. Cache Persist/cache.csv")

In [11]:
 %%spark
import org.apache.spark.storage.StorageLevel._ 
df_Scala.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

In [12]:
 %%spark

import org.apache.spark.sql.functions._

df_Scala.select($"employed", $"unemployed", ($"employed" + $"unemployed").alias("Total")).show(5)

df_Scala.filter(col("Education Level").contains("degree")).show(5)

df_Scala.orderBy(col("Date Inserted").desc).show(5)

In [13]:
%%spark 

df_Scala.unpersist()

## Using Storage Class - MEMORY_ONLY using pyspark code

### StorageLevel ( <-useDisk->,<-useMemory- >,<-useOffHeap->,<-de-serialized->,<-replication-> )
### 

In [14]:
df_converted.persist(StorageLevel(False,True,False,False,1))

# ==================== PERSIST - MEMORY AND DISK -========================

# PySpark Code <br>
# Persist - MEMORY AND DISK

In [16]:
# Unpersist previous cache or persisted StorageLevel

df_converted.unpersist()

# ------------------------------------ Storage Level -------------------------
# Applying the  StorageLevel - MEMORY AND DISK

from pyspark import StorageLevel

df_converted.persist(StorageLevel.MEMORY_AND_DISK)



# ----------------------------- ACTIONS -------------------------------

from datetime import date,datetime
from pyspark.sql import functions as F

start = datetime.now()


# We are just doing a filtering columns where we are having degree
df_converted.filter(df_converted['Education Level'].contains('degree')).show(n=5)

# Lets do a sorting on same dataframe
df_converted.orderBy(col('Date Inserted').desc()).show(n=5)

# Lets also do some grouping
df_converted.groupBy('Gender').agg(F.sum('Employed'), F.avg('Employed')).show(n=5)


end = datetime.now()

duration = end-start

print('Notebook executed in ' + str(duration))

In [18]:
df_converted.unpersist()

# Scala Code <br>
# Persist - MEMORY AND DISK

In [17]:
%%spark

df_Scala.unpersist()

// ---------------------------------- Storage Level --------------------


import org.apache.spark.storage.StorageLevel._ 
df_Scala.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)

// -------------------------------- Actions -------------------

import org.apache.spark.sql.functions._

df_Scala.select($"employed", $"unemployed", ($"employed" + $"unemployed").alias("Total")).show(5)

df_Scala.filter(col("Education Level").contains("degree")).show(5)

df_Scala.orderBy(col("Date Inserted").desc).show(5)

# ==================== PERSIST - MEMORY ONLY SER -========================

# Scala Code <br>
# Persist - MEMORY_ONLY_SER

In [19]:
%%spark

df_Scala.unpersist()

// ---------------------------------- Storage Level --------------------


import org.apache.spark.storage.StorageLevel._ 
df_Scala.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

// -------------------------------- Actions -------------------

import org.apache.spark.sql.functions._

df_Scala.select($"employed", $"unemployed", ($"employed" + $"unemployed").alias("Total")).show(5)

df_Scala.filter(col("Education Level").contains("degree")).show(5)

df_Scala.orderBy(col("Date Inserted").desc).show(5)

# ======================= PERSIST - MEMORY AND DISK SER -=================

# Scala Code
# Persist - MEMORY AND DISK SER

In [20]:
%%spark

df_Scala.unpersist()

// ---------------------------------- Storage Level --------------------


import org.apache.spark.storage.StorageLevel._ 
df_Scala.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

// -------------------------------- Actions -------------------

import org.apache.spark.sql.functions._

df_Scala.select($"employed", $"unemployed", ($"employed" + $"unemployed").alias("Total")).show(5)

df_Scala.filter(col("Education Level").contains("degree")).show(5)

df_Scala.orderBy(col("Date Inserted").desc).show(5)

In [23]:
%%spark

df_Scala.unpersist()

# ======================= PERSIST - DISK ONLY -=================

# PySpark Code <br>
# Persist - DISK ONLY

In [24]:
# Unpersist previous cache or persisted StorageLevel

df_converted.unpersist()

# ------------------------------------ Storage Level -------------------------
# Applying the  StorageLevel - MEMORY AND DISK

from pyspark import StorageLevel

df_converted.persist(StorageLevel.DISK_ONLY)



# ----------------------------- ACTIONS -------------------------------

from datetime import date,datetime
from pyspark.sql import functions as F

start = datetime.now()


# We are just doing a filtering columns where we are having degree
df_converted.filter(df_converted['Education Level'].contains('degree')).show(n=5)

# Lets do a sorting on same dataframe
df_converted.orderBy(col('Date Inserted').desc()).show(n=5)

# Lets also do some grouping
df_converted.groupBy('Gender').agg(F.sum('Employed'), F.avg('Employed')).show(n=5)


end = datetime.now()

duration = end-start

print('Notebook executed in ' + str(duration))

In [26]:
df_converted.unpersist()


# Scala Code <br>
# Persist - DISK ONLY

In [25]:
%%spark

df_Scala.unpersist()

// ---------------------------------- Storage Level --------------------


import org.apache.spark.storage.StorageLevel._ 
df_Scala.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

// -------------------------------- Actions -------------------

import org.apache.spark.sql.functions._

df_Scala.select($"employed", $"unemployed", ($"employed" + $"unemployed").alias("Total")).show(5)

df_Scala.filter(col("Education Level").contains("degree")).show(5)

df_Scala.orderBy(col("Date Inserted").desc).show(5)

# Scala Code <br>
# Persist - OFF HEAP

In [27]:
%%spark

df_Scala.unpersist()

// ---------------------------------- Storage Level --------------------


import org.apache.spark.storage.StorageLevel._ 
df_Scala.persist(org.apache.spark.storage.StorageLevel.OFF_HEAP)

// -------------------------------- Actions -------------------

import org.apache.spark.sql.functions._

df_Scala.select($"employed", $"unemployed", ($"employed" + $"unemployed").alias("Total")).show(5)

df_Scala.filter(col("Education Level").contains("degree")).show(5)

df_Scala.orderBy(col("Date Inserted").desc).show(5)

In [28]:
%%spark

df_Scala.unpersist()

# Pyspark Code <br>
# Perist - MEMORY_ONLY_2

In [1]:
# Unpersist previous cache or persisted StorageLevel

df_converted.unpersist()

# ------------------------------------ Storage Level -------------------------
# Applying the  StorageLevel - MEMORY AND DISK

from pyspark import StorageLevel

df_converted.persist(StorageLevel.MEMORY_ONLY_2)



# ----------------------------- ACTIONS -------------------------------

from datetime import date,datetime
from pyspark.sql import functions as F

start = datetime.now()


# We are just doing a filtering columns where we are having degree
df_converted.filter(df_converted['Education Level'].contains('degree')).show(n=5)

# Lets do a sorting on same dataframe
df_converted.orderBy(col('Date Inserted').desc()).show(n=5)

# Lets also do some grouping
df_converted.groupBy('Gender').agg(F.sum('Employed'), F.avg('Employed')).show(n=5)


end = datetime.now()

duration = end-start

print('Notebook executed in ' + str(duration))