In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
from datetime import datetime

import configparser
import os

In [2]:
def get_snowpark_session():
    """"Returns an snowpark session object"""

    ##Get configs
    config = configparser.ConfigParser()
    conf_path = os.path.join('C:\\Users\\Esli\\.snowsql','config')
    config.read(conf_path)

    #Snowflake config
    sfAccount = config['connections.dev']['accountname']
    sfUser = config['connections.dev']['username']
    sfPass = config['connections.dev']['password']
    SfRole = config['connections.dev']['rolename']
    sfWarehouse = config['connections.dev']['warehousename']
    sfDatabase = config['connections.dev']['dbname']
    CONNECTION_PARAMETERS = {
        "account": sfAccount,
        "user": sfUser,
        "password": sfPass,
        "database": sfDatabase,
        "warehouse": sfWarehouse,
        "role": SfRole
    }

    return Session.builder.configs(CONNECTION_PARAMETERS).create()


In [7]:
def set_schema(schema_name: str):
    """Set working schema on Snowflake"""
    return session.sql(f'USE SCHEMA {schema_name}').collect()

In [18]:
def read_csv_from_stage(stage_name: str, csv_schema: StructType, csv_pattern: str=None):
    """
    Read the CSV's from given stage and returns as DF, if csv_pattern is not given will read all CSV's from stage.

    Params:
    stage_name (string): The name of the Stage where the files are on snowflake
    csv_schema (StructType): The schema of the file that will be read
    csv_pattern (String) [optional]: The regex expression that will match the files on Stage

    code snippet:
    df = read_csv_from_stage(stage_name = 'RAW_DATA_STAGE', csv_schema=csv_schema, csv_pattern='.*yds_data*[.]csv')    
    """    

    if csv_pattern is not None:
        stage_files = session.sql(f"LIST @{stage_name} PATTERN='{csv_pattern}'")
    else:
        stage_files = session.sql(f"LIST @{stage_name}")

    for row in stage_files.collect():
        csv_path = '@'+row["name"]
        df_csv = session.read.schema(csv_schema).option("skip_header", 1).option("field_optionally_enclosed_by",'"').csv(csv_path)     
        df_csv = df_csv.withColumn('path', F.lit(row["name"]))
        df_csv = df_csv.withColumn('load_at', F.lit(datetime.now()))
        df_csv = df_csv.unionByName(df_csv)

    return df_csv

In [None]:
def create_hz_dim(table_name: str, sql_query_new: str, sql_query_append: str):
    """
    Create a new dimension on HZ, or append data if it already exists
    
    params:
    table_name: the name of the new dimension
    sql_query_new: the query that will be used to read from pz table and create new table
    sql_query_append: the query that will read pz to append on hz    
    """

    try:
        session.sql(f"select * from hz_clear_strategy.{table_name} limit 1").show()
        print(f'Table: {table_name} alread exists')

        query_dim = f"""
                with w_new_values as ({sql_query_append})

                select
                    seq_{table_name}.NEXTVAL,
                    w.*
                from w_new_values w
            """

        df_dim = session.sql(query_dim)
        df_dim.write.mode("append").saveAsTable(table_name)
        return print(f'Execution finished, {table_name} appended')

    except:
        print(f'Table: {table_name} will be created')
        session.sql(f'CREATE SEQUENCE IF NOT EXISTS hz_clear_strategy.seq_{table_name} START 1 INCREMENT 1;').show()

        query_dim = f"""
            select seq_{table_name}.NEXTVAL as id_{table_name},
            t.*
        from ({sql_query_new}) t
        """

        df_dim = session.sql(query_dim)
        df_dim.write.mode("overwrite").saveAsTable(table_name)

        return print(f'Execution finished, {table_name} created')

In [None]:
def get_field_list(df, control_fields: list):
    """
    Receive a dataframe object and a list of control fields, return all fields from dataframe except the control fields as a string

    Params:
    df (DataFrame): The dataframe object
    control_fields: a list of fields
    """


    field_list = [field for field in df.columns if field not in control_fields]
    str_field_list = ', '.join(field_list)
    return str_field_list

In [None]:
def get_most_recent_value(zone: str, table_name: str, str_group_fields: str):
    """
    Return the most recent value from a table considering the "load_at" field value and fields to group.

    Params:
    zone (string): The name of the zone of the table, iex: hz_clear_strategy
    table_name: the name of the table
    str_group_field: an string with the cols that should be considered when grouping to find the max(load_at), iex: 'match_id, shot_id'
    """
    sql_query = f"""
        select * from (
            select 
                d.*,
                max(d.load_at) over (partition by '{str_group_fields}') as max_load_at
            from {zone}.{table_name} d
        ) t
        where t.max_load_at = t.load_at    
    """

    df = session.sql(sql_query).drop('max_load_at')
    return df


In [2]:
print('''####################################################
Available packages:
    from snowflake.snowpark.session import Session
    from snowflake.snowpark import functions as F
    from snowflake.snowpark.types import *
    from datetime import datetime

    import configparser
    import os

####################################################

Available functions, for details use help(<function_name>:
    get_snowpark_session
    set_schema
    read_csv_from_stage
    create_hz_dim
    get_field_list
    get_most_recent_value
''')

####################################################
Available packages:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import udtf

import configparser
import os

####################################################

Available functions:
get_snowpark_session

