# Oracle Spark Notebook

In [None]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import os
import random
import cx_Oracle
import numpy as np
import pandas as pd

## Spark
In the following you see Spark related configurations. First some Java Libraries are selected which should be uploaded to the spark cluster. These libraries come from this Jupyterlab instance and are baked into this Jupyter enviornment. The next command shows, which jar-files are installed.


In [None]:
# Java libraries for Spark
%system ls /opt/spark/jars

In [None]:
# PACKAGE_OPTIONS = '--packages %s ' % ','.join((
#    'org.apache.hadoop:hadoop-aws:3.2.0',
# ))

JAR_OPTIONS = '--jars %s ' % ','.join((
#    'file:///opt/spark/jars/postgresql-42.3.1.jar',
    'file:///opt/spark/jars/ojdbc11.jar',
))
# os.environ['PYSPARK_SUBMIT_ARGS'] = JAR_OPTIONS + PACKAGE_OPTIONS + ' pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = JAR_OPTIONS + ' pyspark-shell'
# os.environ['SPARK_EXTRA_CLASSPATH'] = '/opt/spark/jars/*'

Next we define Spark configurations, which create a new Spark Cluster inside Kubernetes. You can leave the defaults unless you know what you are doing. You can control spark executors power by adjusting the `spark.executor` options. Also you can change the applicaiton name in `.setAppName()`.

In [None]:
os.environ['PYSPARK_PYTHON'] = 'python3' # Needs to be explicitly provided as env. Otherwise workers run Python 2.7
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'  # Same

conf = (SparkConf().setMaster("k8s://https://kubernetes.default")
    .set("spark.kubernetes.container.image", "10.34.96.225/gemitec/spark-py:3.2.0-hadoop-3.2.0")
    .set("spark.driver.port", "2222") # Needs to match svc
    .set("spark.driver.blockManager.port", "7777")
    .set("spark.driver.host", "driver-service.jupyterhub.svc.cluster.local") # Needs to match svc
    .set("spark.driver.bindAddress", "0.0.0.0") #  Otherwise tries to bind to svc IP, will fail
    .set("spark.kubernetes.namespace", "jupyterhub")
    .set("spark.jars","file:///opt/spark/jars/ojdbc11.jar") # Jars which should be uploaded
     
    # Set these to control the Spark computational power
    .set("spark.executor.instances", "2")
    .set("spark.executor.memory", "2g") # Right now do not enter more than 20g
    .set("spark.executor.cores", "1") # Right now do not enter more than 4
    .setAppName('oracle'))

`SparkSession` starts the cluster. **Never forget** to use `spark.stop()` when you are finished. Otherwise the cluster will live forever and take up computational resources.

In [None]:
# sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

### Oracle
We will connect to an oracle database. In this code cell we configure the connection options.

In [None]:
username = ""
password = ""
host = ""
service = ""
port = 

connection_str = f'{username}/{password}@{host}:{port}/{service}'

In this spark environment the oracle python client is installed. We can use it to test some things out before we connect spark with it.

In [None]:
connection = cx_Oracle.connect(connection_str)

Here we just print the avialabe tables in the database.

In [None]:
cursor = connection.cursor()
cursor.execute("SELECT table_name  FROM dba_tables")
for row in cursor:
    print(row)

OK now that we know everything works. Let's use spark to connect to oracle and give us a table. We get back a dataframe-like object.

In [None]:
db_table_name = ""
df_table = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:oracle:thin:{connection_str}") \
    .option("dbtable", db_table_name) \
    .option("user", username) \
    .option("password", password) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load()

In [None]:
df_table.printSchema()

We can also perform SQL-queries with spark. This query gives you a dataframe back.

In [None]:
sql_query = "SELECT * FROM YOUR_TABLE_NAME;"
df_test = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:oracle:thin:{connection_str}") \
    .option("dbtable", sql_query) \
    .option("user", username) \
    .option("password", password) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load()

However you can also use the dataframe to perform queries on Spark: 

In [None]:
df_table.select(expr('count(*)')).show() 

In [None]:
We ca

In [None]:
df_table.select("MOTOR_SPEED").describe().show()

In [None]:
from pyspark .sql.functions import *
dfm = df_table.select(((col('MOTOR_SPEED')) / (col('KILOMETRAGE')))*100)
df = df_table.withColumn('dfm',(col('MOTOR_SPEED')/(col('KILOMETRAGE')) *100))
df.take(10)
# df.toPandas() # PySpark DataFrame also provides the conversion back to a pandas DataFrame to leverage pandas API. Note that toPandas also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side.

# Other examples
Here we calculate the constant Pi using parallel processing in the Spark cluster.

In [None]:
NUM_SAMPLES = 1000000
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = spark.sparkContext.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

At the end of your calculations **ALWAYS** use this command to shutdown the sark cluster.

In [None]:
spark.stop()