In [None]:
#default_exp snowflake.copyinto

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted 01_azure.ipynb.
Converted 02_utils_parseyaml.ipynb.
Converted 03_utils_dataframes.ipynb.
Converted 04_dstools_preparedata.ipynb.
Converted 05_snowflake_query.ipynb.
Converted 06_snowflake_copyinto.ipynb.
Converted index.ipynb.


# ``CopyInto``

In [None]:
#export
from easymigration.imports import *
from easymigration.snowflake.query import *
from easymigration.utils.parseyaml import *
from easymigration.azure.filehandling import *
from easymigration import files

In [None]:
#export
class CopyInto(SnowflakeTool):

    def __init__(self,
                 sfAccount: str = None,
                 sfUser: str = None,
                 sfPswd: str = None,
                 sfWarehouse: str = None,
                 sfDatabase: str = None,
                 sfSchema: str = None,
                 sfRole: str = None,
                 ):
        """
        Instatiation of snowflake mover class inheriting the SnowflakeTool
        class from utils.
        Args:
            sfUser (str, optional): snowflake credential passed as string
            sfPswd (str, optional): snowflake credential passed as string
            sfWarehouse (str, optional): snowflake credential passed as string
            sfDatabase (str, optional): snowflake credential passed as string
            sfSchema (str, optional): snowflake credential passed as string
            sfRole (str, optional): snowflake credential passed as string
            logger ([type], optional): pass custom logger as many libraries are set to Warning. Defaults to None.
        """
        super().__init__(sfAccount,
                         sfUser,
                         sfPswd,
                         sfWarehouse,
                         sfDatabase,
                         sfSchema,
                         sfRole)
        self._logger = logger if logger is not None else logging.getLogger(__name__)

    def insert_csv(self,
                   blob_name: str = None,
                   blob_path: str = None,
                   storage_account: str = None,
                   container_name: str = None,
                   table_name: str = None,
                   sas_token: str = None,
                   fail_on_no_insert: bool = False,
                   delimiter: str = ',',
                   ):
        """
        Copies a csv file into a given snowflake table from
        blob

        :param blob_name: name of file in blob
        :param blob_path: path to file in blob
        :param storage_account: blob storage account name
        :param container_name: Azure container ID
        :param sas_token: shared access signature token for azure blob
        :param table_name: file to get the import data statement
        :param delimiter: csv delimeter type

        :return response: snowflake response from the copy into statement
        """

        # make blob name here if a path is given
        if blob_path:
            blob_name = blob_path + '/' + blob_name

        # read the sql file from the libary
        sql_file = os.path.join(os.path.abspath(files.__path__[0]), 'import_data_csv.sql')
        with open(sql_file) as file:
            query = ' '.join(file.readlines())
        inserts = ['INSERT_AZURE_STORAGE_ACCOUNT_NAME_HERE',
                   'INSERT_TABLE_NAME_HERE',
                   'INSERT_CONTAINER_NAME_HERE',
                   'INSERT_FILE_NAME_HERE',
                   'INSERT_AZURE_SAS_TOKEN_HERE',
                   'INSERT_DELIMITER_HERE']
        insert = [storage_account, table_name, container_name, blob_name, sas_token, delimiter]
        for k, v in zip(inserts, insert):
            query = query.replace(k, v)
        self._logger.info(query)

        # execute the snowflake command
        response = self.run_str_query(query)

        # output query results
        self._logger.info('snowflake insertion output:')
        self._logger.info(response)

        if response.loc[0].status == 'Copy executed with 0 files processed.':
            self._logger.info('No files uploaded to snowflake')
            if fail_on_no_insert:
                raise('fail_on_no_insert was equal TRUE, so program raised error')
        else:
            # check if the load was executed correctly
            assert (response['rows_parsed'][0] == response['rows_loaded'][0]), \
                "Rows loaded and parsed are not equal"

        return response

# How to Use

In [None]:
show_doc(CopyInto)

<h2 id="CopyInto" class="doc_header"><code>class</code> <code>CopyInto</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>CopyInto</code>(**`sfAccount`**:`str`=*`None`*, **`sfUser`**:`str`=*`None`*, **`sfPswd`**:`str`=*`None`*, **`sfWarehouse`**:`str`=*`None`*, **`sfDatabase`**:`str`=*`None`*, **`sfSchema`**:`str`=*`None`*, **`sfRole`**:`str`=*`None`*) :: [`SnowflakeTool`](/easymigration/snowflake_query.html#SnowflakeTool)

Class that holds basic snowflake functionality including testing connection
and running queries.

In [None]:
sf_load = CopyInto(sfAccount=os.environ['sfAccount'],
                   sfUser=os.environ['sfUser'],
                   sfPswd=os.environ['sfPswd'],
                   sfWarehouse=os.environ['sfWarehouse'],
                   sfDatabase=os.environ['sfDatabase'],
                   sfSchema=os.environ['sfSchema'],
                   sfRole=os.environ['sfRole'])

In [None]:
#hide
fh = FileHandling(os.environ['connection_str'])
yaml = ParseYaml('./files/snowflake.yaml').get_yaml(['test'])
dict1 = [{'ecid': 150, 'home': 'CA', 'avg_visits': 0.20, 'LTR': 6},
         {'ecid': 151, 'home': 'LA', 'avg_visits': 10, 'LTR': 2},
         {'ecid': 160, 'home': 'CO', 'avg_visits': 0.56, 'LTR': 4},
         {'ecid': 100, 'home': 'LA', 'avg_visits': 2.0, 'LTR': 3}]
df = pd.DataFrame(dict1)
df.to_csv('./files/test_df.csv', index=False)
fh.upload(container_name=yaml.get('container_name'),
          file_path='./files/test_df.csv',
          dest='snowflake_load_test/test_df.csv',
          overwrite=True)
_ = sf_load.run_str_query(yaml.get('create_test_table'))
sf_load.run_str_query(yaml.get('check_test_table'))

In [None]:
show_doc(CopyInto.insert_csv)

<h4 id="CopyInto.insert_csv" class="doc_header"><code>CopyInto.insert_csv</code><a href="__main__.py#L34" class="source_link" style="float:right">[source]</a></h4>

> <code>CopyInto.insert_csv</code>(**`blob_name`**:`str`=*`None`*, **`blob_path`**:`str`=*`None`*, **`storage_account`**:`str`=*`None`*, **`container_name`**:`str`=*`None`*, **`table_name`**:`str`=*`None`*, **`sas_token`**:`str`=*`None`*, **`fail_on_no_insert`**:`bool`=*`False`*, **`delimiter`**:`str`=*`','`*)

Copies a csv file into a given snowflake table from
blob

:param blob_name: name of file in blob
:param blob_path: path to file in blob
:param storage_account: blob storage account name
:param container_name: Azure container ID
:param sas_token: shared access signature token for azure blob
:param table_name: file to get the import data statement
:param delimiter: csv delimeter type

:return response: snowflake response from the copy into statement

In [None]:
__file__ = './'
sf_load.insert_csv(blob_name=yaml.get('blob_name'),
                   blob_path=yaml.get('blob_path'),
                   storage_account=yaml.get('account_name'),
                   container_name=yaml.get('container_name'),
                   table_name=yaml.get('table_name'),
                   sas_token=os.environ['snowflake_blob_SAS_token_SECRET'])

In [None]:
df = sf_load.run_str_query('SELECT * FROM DSDESNOWFLAKETEST')
assert df.shape == (4, 4), 'Query 4 observations w/ 4 columns from easymigration'
sf_load.run_str_query('DROP TABLE IF EXISTS easymigration')

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted 01_azure.ipynb.
Converted 02_utils_parseyaml.ipynb.
Converted 03_utils_dataframes.ipynb.
Converted 04_dstools_preparedata.ipynb.
Converted 05_snowflake_query.ipynb.
Converted 06_snowflake_copyinto.ipynb.
Converted index.ipynb.
