In [None]:
#Import libraries
from snowflake.snowpark.session import Session
from config import connection_parameters
from ingesta import ingesta_setup,fileEntry
import os

In [None]:
#Open a session in Snowflake
sesion = Session.builder.configs(connection_parameters).create()
if sesion != None:
    print("Connection established")
    sesion.use_database('inegi')
    print(sesion.sql("select current_warehouse(), current_database(), current_role()").collect()) 

else:
    print("Connection error")

In [None]:
#Activate objects in Snowflake
sesion.use_role(connection_parameters['role'])
#database
sesion.sql("grant all privileges on database "+connection_parameters['database']+ " to role "+connection_parameters['role'] +";").collect()
sesion.use_database(connection_parameters['database'])
sesion.use_schema(connection_parameters['schema'])

#warehouse
sesion.sql("grant usage on warehouse "+connection_parameters['warehouse']+" to role "+connection_parameters['role'] +";").collect()
sesion.use_warehouse(connection_parameters['warehouse'])
#Schema
sesion.sql("grant all privileges on schema "+connection_parameters['schema']+" to role "+connection_parameters['role'] +";").collect()
sesion.sql("grant create stage on schema " +connection_parameters['schema']+" to role inegi_role;").collect()

#Create RAW table
sesion.sql("create or replace table inegi_raw (v VARIANT);").collect()
sesion.sql("grant select on all tables in schema "+connection_parameters['schema']+" to role "+connection_parameters['role'] +";").collect()
sesion.sql("grant select on all views in schema "+connection_parameters['schema']+" to role "+connection_parameters['role'] +";").collect()


In [None]:
#Create internal Stage
env = ingesta_setup()
sesion.sql('CREATE STAGE IF NOT EXISTS '+ env['snowstage']).collect()
#Loading Files
archivos_dir = os.path.join(os.getcwd(),'JSON')
archivos = fileEntry(archivos_dir)

for file in archivos:
    put_result = sesion.file.put('file:///' + file , '@' + env['snowstage'])
    print(put_result[0].status)
file = os.path.join(os.getcwd(),'entidad.py') 
put_result = sesion.file.put('file:///' + file , '@' + env['snowstage'], auto_compress= False, overwrite=True)
print(put_result[0].status)


In [None]:
#Transforming into a Snowflake object (table) 
print("Transforming...")
sesion.sql("create or replace file format json type = json;").collect()
sesion.sql("copy into inegi_raw from @" + env['snowstage'] + " file_format = json pattern = '.*inegi[1-8].json.gz';").collect()
print("Transformation completed!")

In [None]:
sesion.close()
print("Session closed")