### OCI Data Science - Useful Tips
<details>
<summary><font size="2">Check for Public Internet Access</font></summary>

```python
import requests
response = requests.get("https://oracle.com")
assert response.status_code==200, "Internet connection failed"
```
</details>
<details>
<summary><font size="2">Helpful Documentation </font></summary>
<ul><li><a href="https://docs.cloud.oracle.com/en-us/iaas/data-science/using/data-science.htm">Data Science Service Documentation</a></li>
<li><a href="https://docs.cloud.oracle.com/iaas/tools/ads-sdk/latest/index.html">ADS documentation</a></li>
</ul>
</details>
<details>
<summary><font size="2">Typical Cell Imports and Settings for ADS</font></summary>

```python
%load_ext autoreload
%autoreload 2
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.ERROR)

import ads
from ads.dataset.factory import DatasetFactory
from ads.automl.provider import OracleAutoMLProvider
from ads.automl.driver import AutoML
from ads.evaluations.evaluator import ADSEvaluator
from ads.common.data import ADSData
from ads.explanations.explainer import ADSExplainer
from ads.explanations.mlx_global_explainer import MLXGlobalExplainer
from ads.explanations.mlx_local_explainer import MLXLocalExplainer
from ads.catalog.model import ModelCatalog
from ads.common.model_artifact import ModelArtifact
```
</details>
<details>
<summary><font size="2">Useful Environment Variables</font></summary>

```python
import os
print(os.environ["NB_SESSION_COMPARTMENT_OCID"])
print(os.environ["PROJECT_OCID"])
print(os.environ["USER_OCID"])
print(os.environ["TENANCY_OCID"])
print(os.environ["NB_REGION"])
```
</details>

In [1]:
#!/usr/bin/env python3
from __future__ import print_function

import logging
import sys
import json
import time
import os
import datetime
import unicodedata

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import StructType

In [2]:
STREAM_POOL_OCID = "ocid1.streampool.oc1.sa-saopaulo-1.amaaaaaatsbrckqa2xlk7c626amtb2rqdxe34t2n5fpppgbo6mqaqvmwqqlq"
KAFKA_BOOTSTRAP_SERVER = "cell-1.streaming.sa-saopaulo-1.oci.oraclecloud.com:9092"
STREAM_OCID = "ocid1.stream.oc1.sa-saopaulo-1.amaaaaaatsbrckqae7gofhykpznkqhcts4yg7a34sjhmm7qjxhzryatpspla"
TENANCY = "brztechcloud01"
SUBSCRIBE = "FAKE_DATA_1"
USERNAME = "oracleidentitycloudservice/douglas.s.silva@oracle.com"
AUTHTOKEN = "<)G5xKts7{qIoZ52EV3Q"
UserName = TENANCY + "/" + USERNAME + "/" + STREAM_POOL_OCID
PassWord = AUTHTOKEN

In [3]:
print("Spark Session")
spark = (
    SparkSession.builder
      .appName('App_Fake_Data')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .config('spark.jars.packages', 'com.oracle.oci.sdk:oci-java-sdk-addons-sasl:2.20.0')
      .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0')
      .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

Spark Session


In [4]:
spark.sparkContext.setLogLevel("WARN")
spark.conf.set("spark.debug.maxToStringFields", "10000")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.streaming.stateStore.stateSchemaCheck", "false")
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

In [5]:
df = (spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("includeHeaders", "false")
    .option("failOnDataLoss", "false")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("group.id", "STREAM_OCID")
    .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="brztechcloud01/oracleidentitycloudservice/douglas.s.silva@oracle.com/ocid1.streampool.oc1.sa-saopaulo-1.amaaaaaatsbrckqa2xlk7c626amtb2rqdxe34t2n5fpppgbo6mqaqvmwqqlq" password="<)G5xKts7{qIoZ52EV3Q";')
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("startingOffsets", "latest")
    .option("subscribe", SUBSCRIBE)
    .load())

In [6]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [35]:
df_adj = df.selectExpr("CAST(value as STRING)", "timestamp")

In [None]:
# print("Write Stream")
# query = df_cust.writeStream \
# .trigger(processingTime='1 seconds') \
# .outputMode("append") \
# .format("console") \
# .start()

# query.awaitTermination()

In [None]:
# query = (df.select(to_json(struct("*")).alias("value"))
#     .writeStream
#     .format("kafka")
#     .option("topic","Fake-Data-2")
#     .outputMode("update")
#     .option("kafka.bootstrap.servers", "cell-1.streaming.sa-saopaulo-1.oci.oraclecloud.com:9092")
#     .option("kafka.sasl.mechanism", "PLAIN")
#     .option("kafka.security.protocol", "SASL_SSL")
#     .option("group.id", "stream_demo_deltalake_dois")
#     .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="brztechcloud01/oracleidentitycloudservice/douglas.s.silva@oracle.com/ocid1.streampool.oc1.sa-saopaulo-1.amaaaaaatsbrckqakwxyfcbzghgqw3e53dil2qwqq76jbk6unkycdwnahb5q" password="<)G5xKts7{qIoZ52EV3Q";')
#     .option("kafka.request.timeout.ms", 999999)
#     .option("kafka.batch-size", 100)
#     .option("checkpointLocation", "/tmp/kafka_events/_checkpoints/")
#     .start()
# )
# query.awaitTermination()

In [46]:
sample_schema = (
        StructType()
        .add("table", StringType())
        .add("op_type", StringType())
        .add("op_ts", StringType())
        .add("current_ts", StringType())
        .add("pos", StringType())
        .add("after", StringType())
    )

In [47]:
df_cust = df_adj.withColumn("jsonData",from_json(col("value"),sample_schema)).select("jsonData.*")

In [49]:
df_cust.printSchema()

root
 |-- table: string (nullable = true)
 |-- op_type: string (nullable = true)
 |-- op_ts: string (nullable = true)
 |-- current_ts: string (nullable = true)
 |-- pos: string (nullable = true)
 |-- after: string (nullable = true)



In [54]:
sample_schema_after = (
    StructType()
    .add("COLUMN_1", StringType())
    .add("CUST_NAME", StringType())
    .add("CUST_ADD", StringType())
    .add("CUST_CITY", StringType())
    .add("CUST_STATE", StringType())
    .add("CUST_JOB", StringType())
    .add("CUST_PHONE", StringType())
    .add("CUST_DOC", StringType())
)

In [56]:
df_result = df_cust.withColumn("jsonData",from_json(col("after"),sample_schema_after)).select("jsonData.*")

In [57]:
df_result.printSchema()

root
 |-- COLUMN_1: string (nullable = true)
 |-- CUST_NAME: string (nullable = true)
 |-- CUST_ADD: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_JOB: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_DOC: string (nullable = true)



In [58]:
query = df_result.writeStream \
        .queryName("qraw_df_result")\
        .format("memory")\
        .start()

In [None]:
from IPython.display import display, clear_output
from time import sleep

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM qraw_df_result').toPandas())
    sleep(10)

In [63]:
display(spark.sql('SELECT * FROM qraw_df_result').toPandas())

Unnamed: 0,COLUMN_1,CUST_NAME,CUST_ADD,CUST_CITY,CUST_STATE,CUST_JOB,CUST_PHONE,CUST_DOC
0,,Vicente Barros,"Rodovia Moraes, 79",Viana de Mendes,MA,Meeiro,+55 (47) 9 1176 2276,790.483.215-14
1,,Augusto Gomes,"Largo Agatha Pires, 365",Almeida,AC,Engenheiro de materiais,+55 (012) 95404 7101,897.613.450-84
2,,Luiz Felipe Gomes,Estação de Ferreira,Ribeiro,RN,Patologista,+55 69 9 6634 3468,152.940.683-89
3,,Stella Almeida,"Núcleo Natália Moura, 1",Silveira de Goiás,BA,Pintor,+55 43 9 6366-9512,651.027.348-90
4,,Nicole Sales,Esplanada Yago Rezende,Ramos Verde,RN,Regente,+55 (63) 9 9234 1193,846.017.239-22
...,...,...,...,...,...,...,...,...
7002,,Sra. Isis Duarte,"Via Farias, 991",Moura Alegre,RN,Agente penitenciário,+55 (045) 90455-1684,789.542.036-47
7003,,Julia das Neves,"Viaduto Maria Cecília Peixoto, 7",Duarte de Nunes,RJ,Laboratorista,+55 (50) 90551-1717,546.371.028-71
7004,,Juliana Moura,"Estrada de Souza, 49",da Luz Verde,PR,Perfusionista,+55 (57) 9 6916 5590,729.031.865-02
7005,,Pedro Henrique Costela,"Colônia de Teixeira, 8",Araújo,MT,Investigador particular,+55 (04) 9 3721-8456,790.836.241-96
