In [0]:
%pylab inline

# About

<p><b>Author:</b> Nhan Tran
<p><b>Updated:</b> 12/18/2023
<p><b>Description:</b> Python code example for connecting to various data sources.

# <a name='toc'>Table of Content</a>

<ul style='font-size:12pt;line-height:1.8em'>
    <li><a href='#1_0'>1. Extract Credentail Using DataIku API</a></li>
    <li><a href='#2_0'>2. FTP/SFTP Connection</a></li>
    <ul>
        <li><a href='#2_a'>2.a Option 1: Dataiku API</a></li>
        <ul>
            <li><a href='#2_a1'>2.a1 Read data from ftp</a></li>
            <li><a href='#2_a2'>2.a2 Write df to ftp</a></li>
            <li><a href='#2_a3'>2.a3 Read/Write pttx to and from ftp folder</a></li>
            <li><a href='#2_a4'>2.a4 Read/write PDF to FTP folder</a></li>
        </ul>
        <li><a href='#2_b'>2.b Option 2: Python ftp</a></li>
        <ul>
            <li><a href='#2_b1'>2.b1 Established FTP connection</a></li>
            <li><a href='#2_b2'>2.b2 Read data from ftp</a></li>
            <li><a href='#2_b3'>2.b3 Write df to ftp</a></li>
            <li><a href='#2_b4'>2.b4 Move data between ftp folder</a></li>
        </ul>
    </ul>
    <li><a href='#3_0'>3. Snowflake Connection</a></li>
    <ul>
        <li><a href='#3_a'>3.a Option 1: Dataiku API</a></li>
        <ul>
            <li><a href='#3_a1'>3.a1 Read from Snowflake</a></li>
            <li><a href='#3_a2'>3.a2 Write to Snowflake</a></li>
        </ul>
        <li><a href='#3_b'>3.b Option 2: Python Snowflake Connector</a></li>
    </ul>
    <li><a href='#4_0'>4. SQL Connection</a></li>
    <ul>
        <li><a href='#4_a'>4.a MySQL Connection</a></li>
        <li><a href='#4_b'>4.b Oracle Connection</a></li>
        <li><a href='#4_c'>4.c Connection</a></li>
    </ul>
</ul>


<div align='right'><a href='#toc' style='text-decoration:none;font-weight:bold;color:#0877ff;'>&#11014;&#65039; Back to the Top</a></div>

# <a name='1_0'>1. Extract Credentail Using DataIku API</a>


In [0]:
import dataiku
from dataiku import pandasutils as pdu
import pandas as pd, numpy as np

In [0]:
# set the default user for dataiku api lookup
eid = 'E1724299'  

# create dataiku api object
# https://developer.dataiku.com/latest/concepts-and-examples/authinfo.html
client = dataiku.api_client()
user = client.get_user(eid)
client_usr = user.get_client_as()
ai = client_usr.get_auth_info(with_secrets=True)

# get existing connection info
dcon = client.list_connections()

In [0]:
def check_folder(fol_nm):
    project = client.get_default_project()
    
    try: #checking if a given folder existed
        folder = dataiku.Folder(fol_nm)
        folder.get_info()
        print('Folder Exist:', fol_nm)
    except:
        #create managed folder
        mfol = project.create_managed_folder(fol_nm)
        
        #change the folder connection to FTP and set the directory path
        settings = mfol.get_settings()
        settings.set_connection_and_path(connection='VSFTP_Write', path='/_PHI_DataScienceShare/')
        settings.save()
        print('New folder created:', fol_nm)
    return print('Folder check completed!')



In [0]:
# project = client.get_default_project()
# folder = dataiku.Folder('PHI_DataScienceShare')


In [0]:
# folderx = project.get_managed_folder('PHI_DataScienceShare')


<div align='right'><a href='#toc' style='text-decoration:none;font-weight:bold;color:#0877ff;'>&#11014;&#65039; Back to the Top</a></div>

# <a name='2_0'>2. FTP/SFTP Connection</a>
## <a name='2_a'>2.a Option 1: Dataiku API</a>
### <a name='2_a1a'>2.a1a Read CSV data from FTP folder</a>

In [0]:
# https://doc.dataiku.com/dss/11/python-api/managed_folders.html
# https://doc.dataiku.com/dss/latest/connecting/managed_folders.html

folder_name = 'PHI_DataScienceShare'
# check if folder exist, if not create new
# check_folder(folder_name)

#create a folder object in flow then write this next part
folder = dataiku.Folder(folder_name)
lis = [x for x in folder.list_paths_in_partition() if 'test_file.csv' in x][0]

print('Reading file: ',lis)
df = pd.read_csv(folder.get_download_stream(lis))

# dapi = dataiku.Dataset('tablename_inFlow')  #retrieve sf dataset obj
# dapi.write_with_schema(df)  #write BMA data to sf

### <a name='2_a1b'>2.a1b Read Excel data from FTP folder</a>

In [0]:
lis = [x for x in folder.list_paths_in_partition() if 'test_file.xlsx' in x][0]
df = pd.read_excel(folder.get_download_stream(lis).read())

### <a name='2_a2'>2.a2 Write df to FTP folder</a>

In [0]:
import io

#write csv files
buffer = io.StringIO()  #create textwrapper object
df.to_csv(buffer, index=False)  #write data to textwrapper
with io.BytesIO(str.encode(buffer.getvalue())) as f:  #convert txt into byte object
    folder.upload_stream("test_filex.csv", f)

In [0]:

#write xlsx files
buffer = io.BytesIO()  #create textwrapper object
df.to_excel(buffer, index=False)  #write data to textwrapper
with io.BytesIO(buffer.getvalue()) as f:  #convert txt into byte object
    folder.upload_stream("test_filex2.xlsx", f)

### <a name='2_a3'>2.a3 Read/write pptx to and from FTP folder</a>

In [0]:
from pptx import Presentation

folder = dataiku.Folder('PHI_DataScienceShare')  #input folder

# lis = [x for x in folder.list_paths_in_partition() if 'QA_MOR_DQC_template.pptx' in x][0]
path = '/script_scheduled/QA_pptx_auto/'
file = 'QA_MOR_DQC_template.pptx'
root = Presentation(io.BytesIO(folder.get_download_stream(path+file).read()))

# slide_layout = root.slide_layouts[1]
# root.slides.add_slide(slide_layout)

#write pptx files
buffer = io.BytesIO()  #create textwrapper object
root.save(buffer)  #write data to textwrapper

filen = path + "QA_MOR_DQC_test234.pptx"
with io.BytesIO(buffer.getvalue()) as f:  #convert txt into byte object
    folder.upload_stream(filen, f)
#     f.close()

print(lis)

### <a name='2_a4'>2.a4 Read/write PDF to FTP folder</a>

In [0]:
import os, shutil, tempfile
from tabula import read_pdf
import dataiku

#this method is a workaround for folder.get_download_stream() not being seekable.
def readPDF(folder, filename):
    with tempfile.TemporaryDirectory() as tmpdirname:
        local_file_path = os.path.join(tmpdirname, os.path.basename(filename))
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        with folder.get_download_stream(filename) as f_remote, open(local_file_path, 'wb') as f_local:
            shutil.copyfileobj(f_remote, f_local)
        df = pd.DataFrame(read_pdf(local_file_path, stream = True, area=(24, 30, 65, 80), relative_area=True)[0])
        
    return df


folder = dataiku.Folder(Folder_ID)
for file_name in folder.list_paths_in_partition():
    readPDF(folder, filename)

<div align='right'><a href='#toc' style='text-decoration:none;font-weight:bold;color:#0877ff;'>&#11014;&#65039; Back to the Top</a></div>


## <a name='2_b'>2.b Option 2: Python ftp library</a>
### <a name='2_b1'>2.b1 Established FTP connection</a>

In [0]:
from ftplib import FTP
import os, io

# create ftp connection
ftp = FTP()
ftp_cd = dcon['vsftp']['params']
ftp.connect('vsftp', ftp_cd['port'])

# usr = 'svc.dscidatiku'  #username for password lookup; read-only access
usr = 'svc.datasciencetm'  #username for password lookup; read-write access

pss = [k['value'] for k in ai["secrets"] if k['key'] == usr][0]
ftp.login(usr, pss)

# ftp.cwd('/_Phi_Datascienceshare/script_scheduled/WCSL_pop_auto')
ftp.dir()  #show default directory contents

### <a name='2_b2'>2.b2 Read data from ftp</a>

In [0]:
#read csv file
ftp.cwd('/')  #BestPractice: set the current working directory
filename = 'VSFTP - Virtual Directories.csv'  #filename

download_file = io.BytesIO()  #create memory object
ftp.retrbinary("RETR {}".format(filename), download_file.write)  #retrieve file from ftp
download_file.seek(0)  #load file into memory

ref = pd.read_csv(download_file)  #read virtual file into pandas
ref

In [0]:
#read excel file
ftp.cwd('/_Phi_DataScienceShare')  #BestPractice: set the current working directory
filename = 'DataIku_app/dataIku_srv_access.xlsx'  #filename

download_file = io.BytesIO()  #create memory object
ftp.retrbinary("RETR {}".format(filename), download_file.write)  #retrieve file from ftp
download_file.seek(0)  #load file into memory

ref = pd.read_excel(download_file.read())  #read virtual file into pandas
ref

In [0]:
def ftp_read(f):
    download_file = io.BytesIO()  #create memory object
    ftp.retrbinary("RETR {}".format(f), download_file.write)  #retrieve file from ftp
    download_file.seek(0)
    return download_file

# ftp_read('DataIku_app/dataIku_srv_access.xlsx').read()

### <a name='2_b3'>2.b3 Write df to ftp</a>

In [0]:
# ftp.cwd('/_Phi_Datascienceshare')  #BestPractice: set the working directory to write to
# ftp.dir()

df = ref.copy()

# write data to ftp
buffer = io.StringIO()  #create textwrapper object
df.to_csv(buffer)  #write data to textwrapper
bio = io.BytesIO(str.encode(buffer.getvalue()))  #convert txt into byte object

filename = 'test_file.csv'
ftp.storbinary(f'STOR /_Phi_Datascienceshare/{filename}', bio)  #write data to ftp


### <a name='2_b4'>2.b4 Move data between ftp folder</a>

In [0]:
from datetime import datetime
now = datetime.now()

# new subfolder name
fld = '{0}-{1}'.format(now.year, str(now.month).zfill(2))
WellHealth = dataiku.Folder("4gHKVHeJ")  #exiting folder in the flow 
files = [x[1:] for x in WellHealth.list_paths_in_partition() if fld in x]  #getting list of specific files for transfer

def movef(filez):
    for f in filez:
        ftp.sendcmd(f'RNFR {src}{f}')  #rename file from
        ftp.sendcmd(f'RNTO {path}{f}')  #rename file to
    return print('Status: Transfer complete.')


src = '/_FTP_Vendor_wellhealth/Referral Messages/'
des = '/_PHI_EnterpriseReferralManagement/zdata/'

path = f'{des}{fld}/'  #new path
print(path)

try: #create new subfolder if not existed
    ftp.mkd(path)
except: pass

if len(files) > 0: movef(files)  #move file if it exist
else: print('Status: No file available to transfer.')

<div align='right'><a href='#toc' style='text-decoration:none;font-weight:bold;color:#0877ff;'>&#11014;&#65039; Back to the Top</a></div>

# <a name='3_0'>3. Snowflake Connection</a>
## <a name='3_a'>3.a Option 1: Dataiku API</a>

### <a name='3_a1'>3.a1 Read from Snowflake</a>

In [0]:
# https://developer.dataiku.com/latest/api-reference/python/sql.html#dataiku.SQLExecutor2
from dataiku.core.sql import SQLExecutor2

exe = SQLExecutor2(connection="sf_write")

qr = """SELECT * FROM MEMORIALHERMANN_DB.MEMORIAL_HERMANN_PROD.ENCOUNTER limit 500"""

exe.query_to_df(qr)

### <a name='3_a2'>3.a2 Write to Snowflake</a>

In [0]:
dfobj = dataiku.Dataset('tablename_inFlow')  #retrieve sf dataset obj
dapi.write_with_schema(df)  #write BMA data to sf

<div align='right'><a href='#toc' style='text-decoration:none;font-weight:bold;color:#0877ff;'>&#11014;&#65039; Back to the Top</a></div>

## <a name='3_b'>3.b Option 2: Python Snowflake Connector</a>
### 3.b1 Established connection

In [0]:
from snowflake.connector import connect
from snowflake.connector.pandas_tools import write_pandas
# https://stephenallwright.com/python-connector-write-pandas-snowflake/
# https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.Session.write_pandas.html

# create snowflake connection
sf = dcon['sf_write']['params']  #retrieved connection info as py obj
# sf = dcon['DS_DATAIKU_SCHEMA']['params']
sf_usr = sf['user']  #retrieved connection's username
sf_rol = sf['role']  #retrieved connection's role
sf_pss = [k['value'] for k in ai["secrets"] if k['key'] == sf_usr][0]   #retrieved connection's password
sf_wrh = sf['warehouse']  #retrieved connection's warehouse


conn = connect(user = sf_usr, password = sf_pss, warehouse = sf_wrh, role = sf_rol,
               account = 'cerner-healtheedw_memorialhermann',)


### 3.b2 Read from snowflake

In [0]:
# *** Read ***
def read_sn(sq, con):
    # https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#fetch_pandas_all
    cur = con.cursor()
    cur.execute(sq)
    return cur.fetch_pandas_all()

sql = """SELECT * FROM MEMORIALHERMANN_DB.MEMORIAL_HERMANN_PROD.ENCOUNTER LIMIT 1000"""
read_sn(sql, conn)

### 3.b3 Write to snowflake

In [0]:
# *** Write ***
# https://www.mobilize.net/blog/creating-a-table-in-snowflake-using-the-python-connector

def fix_pddate(df, tz='UTC'):  #fixed datetime issue between pandas and snowflake
    # https://stackoverflow.com/a/70834485
    tb = df.copy()
    cols = tb.select_dtypes(include=['datetime64[ns]']).columns  #identify all datetime col
    for col in cols:
        tb[col] = tb[col].dt.tz_localize(tz)  #convert to local timezone datetime
    return tb

def sql_typ(dx, tbn):
    #standardize object type to str
    cols = dx.select_dtypes(include=['object']).columns
    df = dx.astype({x:'str' for x in cols})
    
    # https://www.mobilize.net/blog/creating-a-table-in-snowflake-using-the-python-connector
    #from pandas to snowflake datatype
    tp = pd.DataFrame(df.dtypes, columns=['type'])
    tp.index = tp.index.str.replace(' ','_')
    tc = {'object':'varchar(500)','int64':'int','float64':'float','datetime64[ns]':'datetime','bool':'boolean'}
    tp['type'] = tp['type'].replace(tc)

    #create snowflake table schema
    tx = [f'{x} {y}' for x,y in tp.to_dict()['type'].items()]
    tx1 = ', '.join(tx)
    db, sh = 'MHHS_DSCIENCE_DB','MHHS_DSCIENCE_WRITE'

    sql = f"""create or replace table {db}.{sh}.{tbn} ({tx1})"""
    cur.execute(sql)
    tb = fix_pddate(df)  #fix pandas-2-snowflake datetime issue

    #write data to newly created table
    write_pandas(conn=conn, df=tb, table_name=tbn, database=db, schema=sh, overwrite=True)
    return print('Snowflake table:', f'{db}.{sh}.{tbn}')

# sql_typ(df, 'SF_table_name', conn)  #insert the table name you want to create/use

### 3.b4 Execute sql within snowflake only

In [0]:
def sql_exc(sql, con):
    cur = sql_exc.cursor()
    return cur.execute(sql)

sql1 = """CREATE OR replace TABLE MHHS_DSCIENCE_DB.MHHS_DSCIENCE_WRITE.test_table AS

SELECT DISTINCT d.icd10 code, d.obstetrics, d.perinatal, d.POSTPARTUM, d.gyn, d.pediatrics, d.NORMAL_NEWBORN, d.neonate, d.fetal, d.obpop, 'dx' src_cdx
FROM MHHS_DSCIENCE_DB.MHHS_DSCIENCE_WRITE.WCSL_Pop_ref_Dx d

UNION

SELECT DISTINCT p.icd10 code, p.obstetrics, p.perinatal, p.POSTPARTUM, p.gyn, p.pediatrics, p.NORMAL_NEWBORN, p.neonate, p.fetal, p.obpop, 'pc' src_cdx
FROM MHHS_DSCIENCE_DB.MHHS_DSCIENCE_WRITE.WCSL_Pop_ref_PCS p

UNION

SELECT DISTINCT c.cpt code, c.obstetrics, c.perinatal, c.POSTPARTUM, c.gyn, c.pediatrics, c.NORMAL_NEWBORN, c.neonate, c.fetal, c.obpop, 'cpt' src_cdx
FROM MHHS_DSCIENCE_DB.MHHS_DSCIENCE_WRITE.WCSL_Pop_ref_CPT c

UNION

SELECT DISTINCT r.MSDRG code, r.obstetrics, r.perinatal, r.POSTPARTUM, r.gyn, r.pediatrics, r.NORMAL_NEWBORN, r.neonate, r.fetal, r.obpop, 'msdrg' src_cdx
FROM MHHS_DSCIENCE_DB.MHHS_DSCIENCE_WRITE.WCSL_Pop_ref_DRG r
 """

# sql_exc(sql1, conn)

In [0]:
sql2 = """Drop table MHHS_DSCIENCE_DB.MHHS_DSCIENCE_WRITE.test_table"""
# sql_exc(sql2, conn)

<div align='right'><a href='#toc' style='text-decoration:none;font-weight:bold;color:#0877ff;'>&#11014;&#65039; Back to the Top</a></div>

# <a name='4_0'>4. SQL Connection (Dataiku API)</a>

## <a name='4_a'>4.a MySQL Connection</a>

In [0]:
# use existing sql connection
exe1 = sql_exc(connection="DSci_BMA_conn")

exe1.query_to_df("""select * from [bma_NursingStation]""")