# OCI InLAB - Data & AI Team

## Primeiros passos com Data Flow Studio

Este notebook apresenta os primeiros passos para iniciar a utilização do **_Data Flow Studio_** diretamente do **_OCI Data Science_** e, deste modo, poder construir e executar aplicativos **_Apache Spark_** de forma interativa

O Oracle desenvolveu uma poderosa biblioteca chamada **_ADS SDK_** (Accelerated Data Science). Esta biblioteca, que é pública, nos permite trabalhar diferentes frentes de acesso, visualizalização, higienização e construção de modelos. Além disso, o ADS SDK nos permite interagir com outros serviços presentes na nuvem Oracle e é a biblioteca que vamos utilizar para iniciar a jornada de trabalho com o Data Flow Studio.

Conheça mais sobre o ADS SDK acessando o endereço web: https://docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/index.html

**Para obter sucesso na execução deste laboratório é preciso definir políticas de acesso dentro da OCI disponíveis em** https://docs.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#policies-data-flow-studio

In [2]:
# importando a biblioteca ADS e realizando a autenticação
import ads

ads.set_auth("resource_principal")

O OCI Data Science Notebook conta com algumas variáveis de ambiente que podem ser acessadas utilizando a biblioteca **_os_**. Além disso, vamos definir um local (bucket) para armazenar os logs gerados pelas sessões do Data Flow Studio

In [None]:
import os

compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID") #identificando o compartimento da OCI em utilização
logs_bucket_uri = "oci://nome-do-seu-bucket@namespace-da-tenancy" #definindo o bucket para armazenamento de logs

Para a criação da sessão do Data Flow Studio vamos definir uma curta função para manipular os argumentos e parâmetros da sessão

In [3]:
import json


def prepare_command(command: dict) -> str:
    """Converts dictionary command to the string formatted commands."""
    return f"'{json.dumps(command)}'"

Após a execução das etapas acima, carregamos o **_SparkMagic_**. O SparkMagic utiliza o **_Apache Livy_** para execução de sessões interativas de cargas de trabalho Apache Spark.

In [4]:
%load_ext dataflow.magics

Após a carga do SparkMagic podemos executar comandos "mágicos" diretamente das células do notebook. Vamos explorar as opções disponíveis executando **_%help_**. Será exibida uma lista de comandos suportados pelo SparkMagic e exemplos de utilização.

In [5]:
%help

Magic,Example,Explanation
create_session,"%create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""archiveUri"":""Object Storage URL for Data Flow zip archive."",""metastoreId"":""optional metastore OCID"",""configuration"":{ ""spark.archives"":""oci://bucket@namespace/path/to/conda/pack"", #optional property to use Dataflow 'Run' resource to access OCI resources.  ""dataflow.auth"":""resource_principal"" }}'","Creates new session by providing session details. Example command for Flex shapes :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard.E4.Flex"",""executorShape"":""VM.Standard.E4.Flex"",""numExecutors"":1,""driverShapeConfig"":{""ocpus"":1,""memoryInGBs"":16},""executorShapeConfig"":{""ocpus"":1,""memoryInGBs"":16}}'  Example command for Spark dynamic allocation :  %create_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""sparkVersion"":""3.2.1"",""driverShape"":""VM.Standard2.1"",""executorShape"":""VM.Standard2.1"",""numExecutors"":1,""configuration"":{ ""spark.dynamicAllocation.enabled"":""true"", ""spark.dynamicAllocation.shuffleTracking.enabled"":""true"", ""spark.dynamicAllocation.minExecutors"":""1"", ""spark.dynamicAllocation.maxExecutors"":""4"", ""spark.dynamicAllocation.executorIdleTimeout"":""60"", ""spark.dynamicAllocation.schedulerBacklogTimeout"":""60"", ""spark.dataflow.dynamicAllocation.quotaPolicy"":""min"" }}'"
activate_session,"%activate_session -l python -c '{""compartmentId"":""Data Flow Run resource compartment OCID"",""displayName"":""SessionApp"",""applicationId"":""Existing sessionId to activate.""}'",Activate session by providing existing sessionId.
use_session,%use_session -s {sessionId},To use already existing active session.
status,%status,Outputs current session status.
update_session,"%update_session -i '{""maxDurationInMinutes"": 4896,""idleTimeoutInMinutes"": 4888}'",Updates current active session[not session config] for max duration or idle time out.
stop_session,%stop_session,Stops current active session. One active session should be associated with current notebook to stop.
config,%config,Outputs current session configuration.
configure_session,"%configure_session -i '{""driverShape"": ""VM.Standard2.1"", ""executorShape"": ""VM.Standard2.1"", ""numExecutors"": 1}'","Configures the session creation parameters. The force flag -f is mandatory for immediate effect of the config change, in that case session will be dropped and recreated."
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."


A seguir vamos criar um sessão do Data Flow Studio

In [None]:
command = prepare_command(
    {
        "compartmentId": compartment_id,
        "displayName": "DataFlow Studio Primeira Sessão",
        "language": "PYTHON",
        "sparkVersion": "3.2.1",
        "numExecutors": 1,
        "driverShape": "VM.Standard.E4.Flex",
        "executorShape": "VM.Standard.E4.Flex",
        "driverShapeConfig": {"ocpus": 1, "memoryInGBs": 16},
        "executorShapeConfig": {"ocpus": 1, "memoryInGBs": 16},
        "logsBucketUri": logs_bucket_uri,
        "type": "SESSION",
        "configuration": {
            "spark.archives": custom_conda_environment_uri,
            "spark.jars.ivy": "/opt/spark/work-dir/conda/.ivy2",
            "spark.jars": "oci://nome-do-seu-bucket@namespace-da-tenancy/seu-jar.jar",
        },
        "privateEndpointId":"privateEndpointId",
        "metastoreId":"metastoreId",
    }
) 
%create_session -l python -c $command

Você pode explorar outros exemplos de criação de sessão e explicação sobre cada parâmetros acesso o site https://docs.oracle.com/en-us/iaas/tools/ads-sdk/latest/user_guide/apachespark/dataflow-spark-magic.html

Neste notebook é utilizada uma sessão já existente

In [6]:
%use_session -s {'sua-sessão-existente-id'}

Using Active Session .. ocid1.dataflowrun.oc1.sa-saopaulo-1.antxeljrtsbrckqausxu55q7cdh4aprkp7pgfzcgaqlbyp7e4kfosbtmsp3q


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.sa-saopaulo-1.antxeljrtsbrckqadnztrjhiprmtnccq6ggl5giu2c4hd6ymyza2xangogka,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


Após a criação ou ativação de uma sessão já existe, são disponibilizados dois novos _magic commands_. Estes são: **_%%spark_** e também o **_%%sc_**.

In [8]:
%%spark
print(f'A versão do Spark em execução no cluster do Data Flow Studio é: {sc.version}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

A versão do Spark em execução no cluster do Data Flow Studio é: 3.2.1

A partir daí podemos executar operações spark, sc e sql diretamente no cluster do Data Flow Studio

In [9]:
%%spark
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

First element of numbers is 4
The RDD, numbers, has the following description
b'(2) ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274 []'

Podemos criar um dataframe spark e realizar operações sobre ele. Neste exemplo é utilizado o [NYC Taxi and Limousine Commission (TLC) Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).
Trata-se de um dataset de aproximadamente 488Mb com dados do ano de 2022 do mês de janeiro até outubro.

In [11]:
%%spark
df_nyc_tlc = spark.read.parquet("oci://arquivos@id3kyspkytmr/nyc_tlc/", header=False, inferSchema=True)
df_nyc_tlc.show()
df_nyc_tlc.createOrReplaceTempView("nyc_tlc")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-10-01 00:03:41|  2022-10-01 00:18:39|            1.0|          1.7|       1.0|                 N|         249|         107|           1|        9.5|  3.0|    0.5|      2.6

Após a criação da tempView podemos rodar consultas baseadas em SQL e armazenar a saída num objeto dataframe usando o parâmetro **_-o_**.

In [13]:
%%spark -c sql -o df_nyc_tlc
SELECT vendorID, passenger_count, trip_distance, payment_type FROM nyc_tlc LIMIT 3;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,vendorID,passenger_count,trip_distance,payment_type
0,1,1.0,1.7,1
1,2,2.0,0.72,2
2,2,1.0,1.74,1


Além disso, também é possivel executar comandos **_Delta Lake_** para interagir sobre os dados

In [14]:
%%spark
df_nyc_tlc.write.format("delta").save("oci://arquivos@id3kyspkytmr/nyc_tlc/delta")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Por fim, também há suporte ao **_Hive Metastore_**. Na nuvem Oracle a OCI disponibiliza o **_OCI Data Catalog_** e o mesmo conta com um Metastore que pode ser utilizado. Acesse https://docs.oracle.com/en-us/iaas/data-flow/using/hive-metastore.htm para detalhes completos.

In [16]:
%%spark -c sql
CREATE DATABASE studio;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
%%spark -c sql
USE studio;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
%%spark -c sql
CREATE TABLE nyc_tlc_table AS (SELECT * FROM nyc_tlc);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
%%spark -c sql
SHOW TABLES;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,namespace,tableName,isTemporary
0,studio,nyc_tlc_table,False
1,,nyc_tlc,True


In [21]:
%%spark -c sql
SELECT COUNT(1) FROM nyc_tlc_table;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Unnamed: 0,count(1)
0,33003832
