### OCI Data Science - Useful Tips
<details>
<summary><font size="2">Check for Public Internet Access</font></summary>

```python
import requests
response = requests.get("https://oracle.com")
assert response.status_code==200, "Internet connection failed"
```
</details>
<details>
<summary><font size="2">Helpful Documentation </font></summary>
<ul><li><a href="https://docs.cloud.oracle.com/en-us/iaas/data-science/using/data-science.htm">Data Science Service Documentation</a></li>
<li><a href="https://docs.cloud.oracle.com/iaas/tools/ads-sdk/latest/index.html">ADS documentation</a></li>
</ul>
</details>
<details>
<summary><font size="2">Typical Cell Imports and Settings for ADS</font></summary>

```python
%load_ext autoreload
%autoreload 2
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.ERROR)

import ads
from ads.dataset.factory import DatasetFactory
from ads.automl.provider import OracleAutoMLProvider
from ads.automl.driver import AutoML
from ads.evaluations.evaluator import ADSEvaluator
from ads.common.data import ADSData
from ads.explanations.explainer import ADSExplainer
from ads.explanations.mlx_global_explainer import MLXGlobalExplainer
from ads.explanations.mlx_local_explainer import MLXLocalExplainer
from ads.catalog.model import ModelCatalog
from ads.common.model_artifact import ModelArtifact
```
</details>
<details>
<summary><font size="2">Useful Environment Variables</font></summary>

```python
import os
print(os.environ["NB_SESSION_COMPARTMENT_OCID"])
print(os.environ["PROJECT_OCID"])
print(os.environ["USER_OCID"])
print(os.environ["TENANCY_OCID"])
print(os.environ["NB_REGION"])
```
</details>

In [None]:
# Upgrade Oracle ADS to pick up latest features and maintain compatibility with Oracle Cloud Infrastructure.
!pip install -U oracle-ads

## Preparando Instância Data Flow Studio

In [2]:
# importando a biblioteca ADS e realizando a autenticação
import ads

ads.set_auth("resource_principal")

In [3]:
import os

compartment_id = os.environ.get("NB_SESSION_COMPARTMENT_OCID")
logs_bucket_uri = "oci://bucket-logs@id3kyspkytmr"
archive_uri = "oci://bucket-library@id3kyspkytmr/archive3.zip"


In [4]:
import json

def prepare_command(command: dict) -> str:
    """Converts dictionary command to the string formatted commands."""
    return f"'{json.dumps(command)}'"

In [5]:
%load_ext dataflow.magics

In [None]:
%help

In [None]:
command = prepare_command(
    {
        "compartmentId": compartment_id,
        "displayName": "App_Demo_DataFlowStudio",
        "language": "PYTHON",
        "sparkVersion": "3.2.1",
        "numExecutors": 4,
        "archiveUri": archive_uri,
        "driverShape": "VM.Standard.E4.Flex",
        "executorShape": "VM.Standard.E4.Flex",
        "driverShapeConfig": {"ocpus": 1, "memoryInGBs": 8},
        "executorShapeConfig": {"ocpus": 1, "memoryInGBs": 8},
        "logsBucketUri": logs_bucket_uri,
        "type": "SESSION",
        "logsBucketUri": logs_bucket_uri,
        }
) 
%create_session -l python -c $command

In [None]:
%use_session -s ocid1.dataflowapplication.oc1.iad.anuwcljstsbrckqayo5gcntzzwjlbub4a7gwusy3jriboxjx3ox7mqifg4xa

In [7]:
%status

Session,State,Max Duration In Minutes,Total Execution Time In Minutes,Remaining Duration In Minutes,Current Session
ocid1.dataflowapplication.oc1.iad.anuwcljstsbrckqayo5gcntzzwjlbub4a7gwusy3jriboxjx3ox7mqifg4xa,IN_PROGRESS,1440,33,1407,Dataflow Run


In [None]:
%stop_session

In [6]:
command = prepare_command(
    {
        "compartmentId": compartment_id,
        "displayName": "App_Demo_DataFlowStudio",
        "applicationId": "ocid1.dataflowapplication.oc1.iad.anuwcljstsbrckqayo5gcntzzwjlbub4a7gwusy3jriboxjx3ox7mqifg4xa",
    }
)

%activate_session -l python -c $command

Setting up the Cluster..


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster setup is still in progress.
Cluster is ready..
Starting Spark application..


Session ID,Kind,State,Current session
ocid1.dataflowapplication.oc1.iad.anuwcljstsbrckqayo5gcntzzwjlbub4a7gwusy3jriboxjx3ox7mqifg4xa,pyspark,IN_PROGRESS,Dataflow Run


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
SparkContext available as 'sc'.


## Script Deltalake

In [8]:
%%spark
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from delta import *
from datetime import datetime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
%%spark
builder = pyspark.sql.SparkSession.builder.appName("AppInLabDelta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
%%spark
df_nyc_tlc = spark.read.parquet("oci://hosted-ds-datasets@bigdatadatasciencelarge/nyc_tlc/201[8,9]/**/data.parquet", header=False, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
%%spark
df_nyc_tlc.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- vendor_id: string (nullable = true)
 |-- pickup_at: timestamp (nullable = true)
 |-- dropoff_at: timestamp (nullable = true)
 |-- passenger_count: byte (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- rate_code_id: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)

In [12]:
%%spark
df_nyc_tlc.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------------------+-------------------+---------------+-------------+------------+------------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|vendor_id|          pickup_at|         dropoff_at|passenger_count|trip_distance|rate_code_id|store_and_fwd_flag|pickup_location_id|dropoff_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+---------+-------------------+-------------------+---------------+-------------+------------+------------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|        1|2018-01-01 00:21:05|2018-01-01 00:24:23|              1|          0.5|           1|                 N|                41|                 24|           2|        4.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         5

In [13]:
%%spark
df_nyc_tlc.select("vendor_id", "pickup_at", "dropoff_at", "passenger_count").write.format("delta").save("oci://bucket-deltalake@id3kyspkytmr/deltatable")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
%%spark
read_delta = spark.read.format("delta").load("oci://bucket-deltalake@id3kyspkytmr/deltatable")
read_delta.printSchema()

In [None]:
%%spark
deltaTable = DeltaTable.forPath(spark, "oci://bucket-deltalake@id3kyspkytmr/deltatable")
deltaTable.toDF().show()

# Delta Lake Features

## Schema Management - Schema Enforcement

In [None]:
%%spark
deltaTable.toDF().show(2)

In [None]:
%%spark
read_delta.printSchema()

In [None]:
%%spark
nschema = read_delta.schema

new_schema_deltaTable = spark.createDataFrame([("2", datetime.strptime('2023-02-01 00:47:37', '%Y-%m-%d %H:%M:%S'), datetime.strptime('2023-02-01 01:22:26', '%Y-%m-%d %H:%M:%S'), 1)], nschema).withColumn("passenger_count",expr("cast(passenger_count as String)"))

new_schema_deltaTable.write.format("delta").mode("append").save("oci://bucket-deltalake@id3kyspkytmr/deltatable")

In [None]:
%%spark
new_schema_deltaTable.printSchema

In [None]:
%%spark
spark.read.format("delta").load("oci://bucket-deltalake@id3kyspkytmr/deltatable").printSchema()

## Schema Management - Schema Evolution

In [None]:
%%spark
nschema = read_delta.schema

se_deltaTable = spark.createDataFrame([("2", datetime.strptime('2023-02-01 00:47:37', '%Y-%m-%d %H:%M:%S'), datetime.strptime('2023-02-01 01:22:26', '%Y-%m-%d %H:%M:%S'), 1)], nschema).withColumn("pickup_location_id", lit("45"))

se_deltaTable.write.format("delta").option("mergeSchema", "true").mode("append").save("oci://bucket-deltalake@id3kyspkytmr/deltatable")

In [None]:
%%spark
spark.read.format("delta").load("oci://bucket-deltalake@id3kyspkytmr/deltatable").show()

In [None]:
%%spark
spark.read.format("delta").load("oci://bucket-deltalake@id3kyspkytmr/deltatable").where("pickup_location_id = 45").show()

# Delta Lake Features

## Update - Inserts - Merge - Deletes

In [None]:
%%spark
deltaTable = DeltaTable.forPath(spark, "oci://bucket-deltalake@id3kyspkytmr/deltatable")

#Update

deltaTable.update(
        condition = expr("vendor_id = 1"),
        set = {"pickup_location_id" : lit("30")}            
)

deltaTable.toDF().show()

In [None]:
%%spark
# Delete

deltaTable.delete(condition = expr("pickup_location_id = 45"))

deltaTable.toDF().where("pickup_location_id = 45").show()
#deltaTable.toDF().show()

# Delta Lake Features

## Time Travel

In [None]:
%%spark
deltaTable.history().show()

In [None]:
%%spark 
spark.read.format("delta").option("versionAsOf", "0").load("oci://bucket-deltalake@id3kyspkytmr/deltatable").show()