# Example connections between Python and PySpark to Oracle and S3.  

## Pyspark connections

### Pyspark to Oracle

NB:  need to download the ojbc7.jar file.  
 

In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import json

# credentials cannot be left in git.  
# this is a work around
d = None
with open('../../../.credentials/credentials.json') as f:
  d = json.load(f)

if d is not None:
    d = d['QABI_EDWQA1']
    user = d['user'] 
    password = d['password']
    hostname = d['hostname']
    port = d['port']
    service_name = d['service_name'] 
    # "jdbc:oracle:thin:USER/PASSWORD@//hostname:port/service_name"
    url = 'jdbc:oracle:thin:{}/{}@//{}:{}/{}'.format(user, password, hostname,port,service_name)
    print('url: \n{}'.format(url))
    
    spark = SparkSession.builder.appName("Operations").config("spark.jars", "/usr/local/lib/ojdbc7.jar").getOrCreate()
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "mykey")
    df = spark.read \
        .format("jdbc") \
        .option("url",url) \
        .option("dbtable","LANDING_SDRMX.DATABASECHANGELOG") \
        .option("driver","oracle.jdbc.driver.OracleDriver") \
        .load()
    
    status_type = str( type(df) )
    
    print('Success!  type was {}'.format(status_type))
    
else:
    print('ERROR: Need credentials')

    
df.printSchema()

url: 
jdbc:oracle:thin:QABI_USER/EDWQA@//brscc02-scan1.us1.ocm.s7358785.oraclecloudatcustomer.com:8685/EDWQA
Success!  type was <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- ID: string (nullable = true)
 |-- AUTHOR: string (nullable = true)
 |-- FILENAME: string (nullable = true)
 |-- DATEEXECUTED: timestamp (nullable = true)
 |-- ORDEREXECUTED: decimal(38,0) (nullable = true)
 |-- EXECTYPE: string (nullable = true)
 |-- MD5SUM: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- COMMENTS: string (nullable = true)
 |-- TAG: string (nullable = true)
 |-- LIQUIBASE: string (nullable = true)



### The snowflake connector/cursor method.

In [1]:
# REQUIRED:  
# pip install -r https://raw.githubusercontent.com/snowflakedb/snowflake-connector-python/v2.4.1/tested_requirements/requirements_36.reqs
# pip install snowflake-connector-python==2.4.1

import snowflake.connector
import boto3

ssm = boto3.client('ssm', region_name='us-east-1')
token = "/dtci-linear/snowflake/dev/password"
password = ssm.get_parameter(Name=token, WithDecryption=True )['Parameter']['Value']

user = "SVC_LINEAR_REPORTING_QUBOLE_NONPROD"
account = "dtcipoc.us-east-1"
warehouse = "LINEAR_REPORTING_NONPROD"
database = "LINEAR_ADSALES_SBX"
schema = "CONFORMED"
# role = SVC_LINEAR_REPORTING_NONPROD


conn = snowflake.connector.connect(
                user=user,
                password=password,
                account=account,
                warehouse=warehouse,
                database=database,
                schema=schema
                )
                
cur = conn.cursor()
cur.execute(" select * from CONFORMED.CRNCY")
for token in cur:
    print(token)


(63, 'NZD', 'NEW ZEALAND DOLLAR', 'US-English', '999,999,999,999', '.99', '$', datetime.datetime(2020, 11, 18, 5, 0, tzinfo=pytz.FixedOffset(-480)), datetime.datetime(2020, 11, 18, 5, 0, tzinfo=pytz.FixedOffset(-480)), 102, 'conformance_common', None, None, None, 'SK_102__PK_63', datetime.datetime(2020, 11, 18, 5, 0, tzinfo=pytz.FixedOffset(-480)), 'N', datetime.datetime(2021, 3, 3, 18, 26, 30, 332000, tzinfo=pytz.FixedOffset(-480)), 'SK_100__PK_ET', datetime.date(2021, 3, 3))
(51, 'MYR', 'MALAYSIAN RINGGIT', 'US-English', '999,999,999', '.99', 'MYR', datetime.datetime(2020, 11, 18, 5, 0, tzinfo=pytz.FixedOffset(-480)), datetime.datetime(2020, 11, 18, 5, 0, tzinfo=pytz.FixedOffset(-480)), 102, 'conformance_common', None, None, None, 'SK_102__PK_51', datetime.datetime(2020, 11, 18, 5, 0, tzinfo=pytz.FixedOffset(-480)), 'N', datetime.datetime(2021, 3, 3, 18, 26, 30, 332000, tzinfo=pytz.FixedOffset(-480)), 'SK_100__PK_ET', datetime.date(2021, 3, 3))
(88, 'IDR', 'INDONESIAN RUPIAH', 'Indon

### PySpark to Snowflake

In [7]:
import json

# credentials cannot be left in git.  
# this is a work around
d = None
with open('../../../.credentials/credentials.json') as f:
  d = json.load(f)


def materialize_sf(tablename, viewname):
    ''' Reads from a specific Snowflake DB (see spark.read for particulars) and returns a dataframe and view for use elsewhere
    :param tablename - the table in SF
    :param viewname - the view that is created
    :return sf_df - a Spark dataframe paired to the specified view.  
    '''
    warehouse = 'LINEAR_REPORTING_NONPROD'
    sqlquery = "select * from CONFORMED.{};".format(tablename)
    sf_df = spark.read.\
            option("sfWarehouse", warehouse).\
            option("sfDatabase","LINEAR_ADSALES_SBX").\
            option("sfSchema", "CONFORMED").\
            option("sfRole", "SVC_LINEAR_REPORTING_NONPROD").\
            snowflake("Linear-adsales-sandbox", warehouse, sqlquery)
    sf_df.createOrReplaceTempView(viewname)
    return sf_df

kms_key = d['kms_key_conformance']

spark._jsc.hadoopConfiguration().set('fs.s3a.server-side-encryption.key', kms_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.server-side-encryption-algorithm","SSE-KMS");
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true");
spark._jsc.hadoopConfiguration().set("fs.s3a.impl.disable.cache", "true");

tablename, viewname = 'OUTLT','viewname'
df = materialize_sf(tablename, viewname)

df.head(5)


AttributeError: 'DataFrameReader' object has no attribute 'snowflake'

### PySpark to S3

UNVALIDATED Need AWS particulars

In [None]:
spark = SparkSession.builder \
            .appName("my_app") \
            .config('spark.sql.codegen.wholeStage', False) \
            .getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "mykey")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "mysecret")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "eu-west-3.amazonaws.com")