# Example connections between Python and PySpark to Oracle and S3.  

## Python only connections

### Oracle to Python 
requires the Oracle client to be installed (/usr/local/lib) and the cx_Oracle package.

VALIDATED:  

In [None]:
import cx_Oracle
import pandas as pd

In [None]:
# Establish the oracle connection
dsn_tns = cx_Oracle.makedsn('brsoda1-scan.corp.espn.pvt', 8685, service_name='EDWTEST')
conn = cx_Oracle.connect(user=r'load_master', password='****', dsn=dsn_tns)
c = conn.cursor()

# 
sql_query = "select owner,table_name,count(*) from sys.all_tab_columns group by owner,table_name"
c.execute(sql_query)
results = c.fetchall()
tmp0_df = pd.DataFrame(data=results,columns=["owner","table_name","num_columns"])

# The row counts are available in all_tables
sql_query = "select owner,table_name,num_rows,avg_row_len from all_tables"
c.execute(sql_query)
results = c.fetchall()
tmp1_df = pd.DataFrame(data=results,columns=["owner","table_name", "num_rows","avg_row_len"])

df = pd.merge(left=tmp0_df,right=tmp1_df, left_on=["owner","table_name"], right_on=["owner","table_name"])
fields = ["owner","table_name","num_columns","num_rows","avg_row_len"]
df = df.loc[:,fields]

# Good citizens close their Oracle connections
conn.close()

In [None]:
# Drop system schema tables.  
df = df.loc[( (sizes_df.owner!="SYS") 
                   & (sizes_df.owner!="SYSTEM")
                  & (sizes_df.owner!="XDB")
                  & (sizes_df.owner!="MDSYS")),]

# Tables with no rows are in all_tables but num_rows is null (-->0)
df.loc[:,'num_rows'].fillna(0, inplace=True)
df.loc[:,'avg_row_len'].fillna(0, inplace=True)

# enforce integer values for num_row (but not avg_row_len)
df.loc[:,'num_rows']= df.loc[:,'num_rows'].astype(int)

# pandas is fussy about direct assignments, two steps
values = df.num_rows * df.avg_row_len / (1000000.00)
df.loc[:,"Sizes_MB"] = values

In [None]:
df.head(10)

Python and S3

## Pyspark connections

### Pyspark to Oracle

NB:  need to download the ojbc7.jar file.  

UNVALIDATED.  Returns a df with schema but data is not readable.  

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Operations").config("spark.jars", "/usr/local/lib/ojdbc7.jar").getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "mykey")

edwtest_password = "****"
url = "jdbc:oracle:thin:load_master/"+edwtest_password+"@//brsoda1-scan.corp.espn.pvt:8685/edwtest"
df = spark.read \
.format("jdbc") \
.option("url",url) \
.option("dbtable","LANDING_SDRMX.DATABASECHANGELOG") \
.option("driver","oracle.jdbc.driver.OracleDriver") \
.load()


In [3]:
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- AUTHOR: string (nullable = true)
 |-- FILENAME: string (nullable = true)
 |-- DATEEXECUTED: timestamp (nullable = true)
 |-- ORDEREXECUTED: decimal(38,0) (nullable = true)
 |-- EXECTYPE: string (nullable = true)
 |-- MD5SUM: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- COMMENTS: string (nullable = true)
 |-- TAG: string (nullable = true)
 |-- LIQUIBASE: string (nullable = true)



### PySpark to S3

UNVALIDATED Need AWS particulars

In [None]:
spark = SparkSession.builder \
            .appName("my_app") \
            .config('spark.sql.codegen.wholeStage', False) \
            .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "mykey")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "mysecret")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "eu-west-3.amazonaws.com")

### PySpark to Snowflake
UNVALIDATED Need Snowflake particulars

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

from pyspark import SparkFiles
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import StringType, DoubleType

spark = SparkSession.builder \
        .appName("save mdm updates to bifrost mysql") \
        .config('spark.driver.memory', '5G') \
        .config("spark.sql.autoBroadcastJoinThreshold","-1") \
        .config("spark.sql.broadcastTimeout","300") \
        .getOrCreate()
    
spark.builder.config('spark.executor.memory', '16G')
spark.builder.config("spark.executor.cores", "4")

import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages net.snowflake:spark-snowflake_2.11:2.4.13-spark_2.4 pyspark-shell'


import sys
from datetime import datetime
import time
import hashlib

import logging


In [None]:
# snowflake connection params
env = "dev" # user input
sf_url = "" # user input
sf_user = "" # user input
sf_password = "" # user input
sf_role = "" # user input
    
table_schema_name = 'raw_stage'
sf_table_name = "{}.gam_api_data_impression".format(table_schema_name)
    
sfOptions = {
        "sfURL" : sf_url,
        "sfUser" : sf_user,
        "sfPassword" : sf_password,
        "sfRole" : sf_role,
        "sfDatabase" : "dvdm_db_{}".format(env),
        "sfSchema" : table_schema_name,
        "sfWarehouse" : "dvdm_wh_{}_xl".format(env),
}
    
sf_sql_stmt = "select * from {} limit 10".format(sf_table_name)
sql_results = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
            .options(**sfOptions) \
            .option("query", sql_stmt) \
            .load()