### Notebook - Running Spark on OCI Dataflow using Scala and Spark SQL
<details>
<summary><font size="2">Check OCI Dataflow documentation for policies required;</font></summary>
<li><a href="https://docs.public.oneportal.content.oci.oraclecloud.com/en-us/iaas/data-flow/using/policies-data-flow-studio.htm#policies-data-flow-studio">Policies Required to Integrate Data Flow and Data Science</a></li>
</details><details><summary><font size="2">Check that the data science notebook session has privileges on OCI Dataflow family and any dependencies;</font></summary>

```ALL {resource.type = 'datasciencenotebooksession', resource.compartment.id = '<compartment_id>'}```

</details><details><summary><font size="2">Policy for Data Science to manage OCI Data Flow</font></summary>

```allow dynamic-group <ds-dynamic-group> to manage dataflow-family in compartment '<your-compartment-name>'```

</details><details><summary><font size="2">Policy statements for logging</font></summary>

```allow dynamic-group <ds-dynamic-group> to manage log-groups in compartment '<your-compartment-name>'```
```allow dynamic-group <ds-dynamic-group> to manage log-content in compartment '<your-compartment-name>'```
</details>

In [1]:
import ads
ads.set_auth("resource_principal") # Supported values: resource_principal, api_key

In [2]:
# Replace with your valuues;
#  compartment id for creating the compute/application in OCI Dataflow
#  bucket for writing OCI Dataflow logs
#  namespace your OCI Object Storage namespace
#  driver_shape ie. VM.Standard.E4.Flex
#  executor_shape ie. VM.Standard.E4.Flex
# 
#
#Non- autoscaling create session command in New Compute - this will print a session id that you could reconnect to  
import json
command = {
    "compartmentId": "<your_compartment_id>",
    "displayName": "Data Analysis in Scala",
    "language": "SCALA",
    "sparkVersion": "3.2.1",
    "driverShape": "<driver_shape>",
    "executorShape": "<executor_shape>",
    "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
    "numExecutors": 1,
    "type": "SESSION",
    "logsBucketUri": "oci://<bucket>@<namespace>/",
    "configuration": {"spark.oracle.datasource.enabled":"true"}
}
command = f'\'{json.dumps(command)}\''

In [3]:
load_ext dataflow.magics

In [5]:
%help

Magic,Example,Explanation
create_session,"%create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""archiveUri"":""Object Storage URL for Data Flow zip archive."",""metastoreId"":""optional metastore OCID"",""configuration"":{ ""spark.archives"":""oci://bucket@namespace/path/to/conda/pack"", #optional property to use Dataflow 'Run' resource to access OCI resources.  ""dataflow.auth"":""resource_principal"" }}'","Creates new session by providing session details. Example command for Flex shapes :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard.E4.Flex"",""executorShape"":""VM.Standard.E4.Flex"",""numExecutors"":1,""driverShapeConfig"":{""ocpus"":1,""memoryInGBs"":16},""executorShapeConfig"":{""ocpus"":1,""memoryInGBs"":16}}'  Example command for Spark dynamic allocation :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""configuration"":{ ""spark.dynamicAllocation.enabled"":""true"", ""spark.dynamicAllocation.shuffleTracking.enabled"":""true"", ""spark.dynamicAllocation.minExecutors"":""1"", ""spark.dynamicAllocation.maxExecutors"":""4"", ""spark.dynamicAllocation.executorIdleTimeout"":""60"", ""spark.dynamicAllocation.schedulerBacklogTimeout"":""60"", ""spark.dataflow.dynamicAllocation.quotaPolicy"":""min"" }}'"
activate_session,"%activate_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""applicationId"":""Existing sessionId to activate.""}'",Activate session by providing existing sessionId.
use_session,%use_session -s {sessionId},To use already existing active session.
status,%status,Outputs current session status.
update_session,"%update_session -i '{""maxDurationInMinutes"": 4896,""idleTimeoutInMinutes"": 4888}'",Updates current active session[not session config] for max duration or idle time out.
stop_session,%stop_session,Stops current active session. One active session should be associated with current notebook to stop.
config,%config,Outputs current session configuration.
configure_session,"%configure_session -i '{""driverShape"": ""VM.Standard2.1"", ""executorShape"": ""VM.Standard2.1"", ""numExecutors"": 1}'","Configures the session creation parameters. The force flag -f is mandatory for immediate effect of the config change, in that case session will be dropped and recreated."
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."


In [11]:
%create_session -l scala -c $command

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.iad.anuwcljtnif7xwiaipyjes7et2si3aobmngiwjqlddhbzyupk3nyhi2p2cda,spark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


In [12]:
%%spark
case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, "Teo"), new Employee(3, "Fang")).toDF

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined class Employee
df: org.apache.spark.sql.DataFrame = [id: int, name: string]


In [13]:
%%spark
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----+
| id|name|
+---+----+
|  1|Elia|
|  2| Teo|
|  3|Fang|
+---+----+



In [16]:
%%spark
val schemaName = "silverzone"
spark.sql("Create schema " + schemaName)
spark.sql("Use " + schemaName)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

schemaName: String = silverzone
res3: org.apache.spark.sql.DataFrame = []
res4: org.apache.spark.sql.DataFrame = []


In [17]:
%%spark
df.createOrReplaceTempView("df_view")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
%%spark -c sql -o rslt_view
select  *
from    df_view; 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,id,name
0,1,Elia
1,2,Teo
2,3,Fang


In [19]:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(rslt_view)

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

AutoVizWidget()

In [20]:
%stop_session

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Session has been stopped successfully.
