### **Data Flow Studio + Delta Table + Metastore**

##### Setup Data Flow Studio Session

In [None]:
import ads
ads.set_auth("resource_principal")

In [None]:
import os

compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID") # identificando o compartimento da OCI em utilização
logs_bucket_uri = "oci://bucket_name@namespace" # definindo o bucket para armazenamento de logs

In [None]:
import json

def prepare_command(command: dict) -> str:
    """Converts dictionary command to the string formatted commands."""
    return f"'{json.dumps(command)}'"

In [None]:
%load_ext dataflow.magics

In [None]:
%help

##### Create a new Data Flow Session

documentation: https://docs.oracle.com/en-us/iaas/data-flow/using/data-flow-studio.htm

In [None]:
%create_session -l python -c '{\
        "compartmentId": "ocid1.compartment.oc1........a",\
        "displayName": "DataFlow Studio Name",\
        "language": "PYTHON",\
        "sparkVersion": "3.2.1",\
        "numExecutors": 1,\
        "driverShape": "VM.Standard.E4.Flex",\
        "executorShape": "VM.Standard.E4.Flex",\
        "driverShapeConfig": {"ocpus": 1, "memoryInGBs": 16},\
        "executorShapeConfig": {"ocpus": 1, "memoryInGBs": 16},\
        "logsBucketUri": "oci://bucket_name@namespace",\
        "configuration":{\
          "spark.archives": "bucket_name@namespace/conda_environments_path#conda",\
          "metastoreId":"ocid1.datacatalogmetastore..........3xa",\
          "spark.oracle.datasource.enabled":"true"\
          "privateEndpointId":"ocid1.dataflowprivateendpoint..........aifq"}}'

##### Use a existing Data Flow Session

In [None]:
%use_session -s {'ocid1.dataflowapplication........5vq'} -f

In [None]:
%%spark
print(f'A versão do Spark em execução no cluster do Data Flow Studio é: {sc.version}')

### **Work with files in OCI Object Storage - Buckets**

#### Read .csv files from OCI Object Storage

In [None]:
%%spark
# Path to csv file
csv_file = "oci://bucket_name@namespace/file_name.csv"

# Read csv file to dataframe
df_vendas = spark.read.csv(csv_file, header=True, inferSchema=True)
df_vendas.show(5)

#### Create Temporary View using a dataframe

In [None]:
%%spark
df.createOrReplaceTempView("view_name")

In [None]:
%%spark -c sql
describe view_name

#### Query the Temp View with SQL

In [None]:
%%spark -c sql
SELECT * FROM view_name LIMIT 10;

In [None]:
%%spark -c sql
SELECT * FROM view_name LIMIT 10;

#### Write .csv files from OCI Object Storage

In [None]:
%%spark
# Path to csv file
csv_file_path = "bucket_name@namespace" 

# Escrever no bucket em CSV
df_transformed.write.csv(csv_file_path, mode="overwrite", header=True)

### **Work with Metastore Database**

In [None]:
%%spark -c sql
-- create database
CREATE DATABASE <DATABASE_NAME>

In [None]:
%%spark -c sql
-- create table from select
CREATE TABLE <DATABASE_NAME>.view_name AS SELECT * FROM table_name

In [None]:
%%spark -c sql
USE <DATABASE_NAME>

In [None]:
%%spark -c sql
SHOW TABLES

In [None]:
%%spark -c sql
SHOW DATABASES

#### Using SQL with Metastore data

In [None]:
%%spark -c sql
SELECT * FROM <DATABASE_NAME>.table_name LIMIT 5

#### Read CSV files from OCI Object Storage and write in Delta Table

In [None]:
%%spark
# Path to csv file
csv_file = "oci://bucket_name@namespace/file_name.csv"

# Read csv file to dataframe
df_file = spark.read.csv(csv_file, header=True, inferSchema=True)

# Write data from a dataframe to a bucket in Delta Table
df_file.write.format("delta").mode("overwrite").save("oci://bucket_name@namespace/destination_name")

#### Read JSON files from OCI Object Storage

In [None]:
%%spark
# Read JSON file from OCI Object Storage Bucket
json_file = "oci://bucket_name@namespace/file_name.json"
df_json = spark.read.option("multiline", "true").json(json_file)

# Check JSON
df_json.printSchema()

#### Read PARQUET files from OCI Object Storage and write in Delta Table

In [None]:
%%spark
# Path to parquet file
parquet_file = "oci://bucket_name@namespace/file_name"

# Read parquet file to dataframe
df_clientes = spark.read.parquet(parquet_file)

# Write data from a dataframe to a bucket in Delta Table
df_clientes.write.format("delta").mode("overwrite").save("oci://bucket_name@namespace/destination_name")