In [18]:
# Import login values from config
from configparser import *

In [19]:
# Import packages needed
from snowflake.snowpark import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
from snowflake.snowpark import Window
import pandas as pd
from urllib import request
from bs4 import BeautifulSoup
from urllib.request import urlopen
import re
import numpy as np

In [20]:
#we save credentials in a credentials_dict dictionary
import sys, json
with open("connection_parameters_prod.json") as jsonfile:
    credentials_dict = json.load(jsonfile)

In [21]:
#we connect to snowflake using credentials_dict and declare a session
def snowpark_session_create():
    session = Session.builder.configs(credentials_dict).create()
    return session

In [22]:
#we create a session
session = snowpark_session_create()

In [23]:
#we declare what db and what schema we will be using, we can change this within sql
session.use_database("SNF_MDRAZ")
session.use_schema("bronze")

In [66]:
#we are defining an sql query which will be used to create a bronze table where data will come from staging
#IF NOT EXISTS
session.sql("""
    create or replace TABLE SNF_MDRAZ.BRONZE.KNA1 (
	INDEX VARCHAR(16777216),
	KUNNR VARCHAR(16777216),
	GENDE VARCHAR(16777216),
	NAME1 VARCHAR(16777216),
	NAME2 VARCHAR(16777216),
	STRAS VARCHAR(16777216),
	ORT01 VARCHAR(16777216),
	PSTLZ VARCHAR(16777216),
	LAND1 VARCHAR(16777216),
	LAND2 VARCHAR(16777216),
	RSCON VARCHAR(16777216),
	TELF1 VARCHAR(16777216),
	REGIO VARCHAR(16777216),
	BIRTH DATE,
	LATI VARCHAR(16777216),
	LONG VARCHAR(16777216),
	LASTMODIFIED DATE
);
"""
).collect()                        

[Row(status='Table KNA1 successfully created.')]

In [13]:
#we execute sql query for bronze creation and use collect() to force execution
#session.sql(create_or_replace_table).collect()

[Row(status='KNA1 already exists, statement succeeded.')]

In [133]:
#we are selecting data from staging table kna1
source_table = """select
    index,
    kunnr,
    gende,
    name1,
    name2,
    stras,
    ort01,
    pstlz,
    land1,
    land2,
    rscon,
    telf1,
    regio,
    to_date(birth,'YYYY/MM/DD') birth,
    lati,
    long,
    current_date() as LASTMODIFIED from staging.kna1;"""
df_source = session.sql(source_table).to_pandas()

In [134]:
#we are listing what values we want to use to replace NaNs and Nulls
values = {
    'INDEX': -99,
    'KUNNR': 'NA',
    'GENDE': 'NA', 
    'NAME1': 'NA', 
    'NAME2': 'NA', 
    'STRAS': 'NA', 
    'ORT01': 'NA', 
    'PSTLZ': 'NA', 
    'LAND1': 'NA', 
    'LAND2': 'NA', 
    'RSCON': 'NA', 
    'TELF1': 'NA', 
    'REGIO': -99, 
    'BIRTH': '1900/01/01', 
    'LATI': 0, 
    'LONG': 0, 
    'LASTMODIFIED' : '1900/01/01'
}
df_source.fillna(value=values,inplace=True)

In [135]:
#since couple of columns have wrong types, here we are changing these types to what we need
#also couple of data has ' in them, we are replacing them with '' to avoid accidental break in string
df_source['INDEX'] = df_source['INDEX'].astype('int64')
df_source['REGIO'] = df_source['REGIO'].astype('str')
df_source['GENDE'] = df_source['GENDE'].astype('str')
df_source['BIRTH'] = pd.to_datetime(df_source['BIRTH'],format='ISO8601').dt.strftime('%Y-%m-%d')
df_source['ORT01'] = df_source['ORT01'].replace("'","''")
df_source = df_source.applymap(lambda x: x.strip() if isinstance(x, str) else x)

In [136]:
#another way of inserting data into snowflake
try:
    session.sql("BEGIN TRANSACTION")
    spDF = session.write_pandas(
        df = df_source,
        table_name="kna1",
        database="snf_mdraz",
        schema="bronze",
        quote_identifiers=False,
        overwrite=True
    )
    session.sql("COMMIT")
except Exception as e:
        session.sql("Rollback")
        print(f"Error encountered: {e}. All inserts have been rolled back.")  

In [36]:
#we are creating a temp table with
temp_tbl = "temp_kna1_cleaned"
df_temp = session.create_dataframe(df_source)
df_temp.write.mode("overwrite").saveAsTable(temp_tbl)

In [37]:
session.sql("truncate bronze.kna1;").collect()
bulk_insert_sql = f"""
                    insert into snf_mdraz.bronze.kna1 (INDEX,KUNNR,GENDE,NAME1,NAME2,STRAS,ORT01,PSTLZ,LAND1,LAND2,RSCON,TELF1,REGIO,BIRTH,LATI,LONG,LASTMODIFIED) 
                    select 
                        index,
                        kunnr,    
                        gende,    
                        name1,    
                        name2,    
                        stras,    
                        ort01,    
                        pstlz,    
                        land1,    
                        land2,    
                        rscon,    
                        telf1,    
                        regio,    
                        birth,    
                        lati,    
                        long,
                        LASTMODIFIED
                    from {temp_tbl}
                    """
try:
    session.sql("BEGIN TRANSACTION")
    session.sql(bulk_insert_sql).collect()
    session.sql("COMMIT")
except Exception as e:
        session.sql("Rollback")
        print(f"Error encountered: {e}. All inserts have been rolled back.")  
session.sql(f"DROP TABLE IF EXISTS {temp_tbl}").collect()

[Row(status='TEMP_KNA1_CLEANED successfully dropped.')]

In [126]:
tablica = session.table("bronze.kna1")
tablica.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"INDEX"  |"KUNNR"  |"GENDE"  |"NAME1"     |"NAME2"    |"STRAS"                      |"ORT01"            |"PSTLZ"   |"LAND1"  |"LAND2"         |"RSCON"                        |"TELF1"        |"REGIO"  |"BIRTH"     |"LATI"     |"LONG"     |"LASTMODIFIED"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|486557   |C586557  |Female   |Lihue       |Brito      |Cercas Bajas 40              |Parets del Vallès  |08150     |ES       |Spain           |LihueBritoSantillan@rhyta.com  |672 370 879    |34.0     |1980-08-27  |41.574389  |2.

In [97]:
session.sql("""
    create or replace TABLE SNF_MDRAZ.SILVER.CUSTOMER (
	ID NUMBER(38,0),
	CUSTOMERID VARCHAR(16777216),
	GENDER VARCHAR(16777216),
	FIRSTNAME VARCHAR(16777216),
	LASTNAME VARCHAR(16777216),
	STREETADDRESS VARCHAR(16777216),
	CITY VARCHAR(16777216),
	POSTALCODE VARCHAR(16777216),
	COUNTRYCODE VARCHAR(16777216),
	COUNTRYNAME VARCHAR(16777216),
	EMAILADDRESS VARCHAR(16777216),
	PHONENUMBER VARCHAR(16777216),
	PHONECOUNTRYCODE NUMBER(38,0),
	DATEOFBIRTH DATE,
	LATITUDE NUMBER(10,6),
	LONGITUDE NUMBER(10,6),
	LASTMODIFIED DATE
);
"""
).collect()

[Row(status='Table CUSTOMER successfully created.')]

In [138]:
#we are selecting data from staging table kna1
source_table = """select
    index,
    kunnr,
    gende,
    name1,
    name2,
    stras,
    ort01,
    pstlz,
    land1,
    land2,
    rscon,
    telf1,
    regio,
    birth,
    lati,
    long,
    LASTMODIFIED from bronze.kna1;"""
df_bronze = session.sql(source_table).to_pandas()

In [140]:
df_bronze

Unnamed: 0,INDEX,KUNNR,GENDE,NAME1,NAME2,STRAS,ORT01,PSTLZ,LAND1,LAND2,RSCON,TELF1,REGIO,BIRTH,LATI,LONG,LASTMODIFIED
0,767195,C867195,Female,Naiquen,García,Herrería 74,Diezma,18180,ES,Spain,NaiquenGarciaDuenas@jourrapide.com,632 393 331,34.0,1982-10-03,37.306270,-3.236022,2023-09-12
1,767196,C867196,Male,Jamairo,Vaessen,Hunenborglaan 105,Oldenzaal,7576 XP,NL,Netherlands,JamairoVaessen@dayrep.com,06-12639151,31.0,1980-10-20,52.306041,6.923527,2023-09-12
2,767198,C867198,Female,Zuleika,Valdés,Via Licola Patria 81,San Vito Dei Lombardi,83054,IT,Italy,ZuleikaValdesArmenta@rhyta.com,0335 6290189,39.0,1986-09-01,41.028317,15.109596,2023-09-12
3,767200,C867200,Female,Julitta,Zielinska,ul. Królowej Jadwigi 115,Bedzin,42-500,PL,Poland,,79 743 62 34,48.0,1990-09-10,50.355541,0.000000,2023-09-12
4,767201,C867201,Male,Andrew,Hale,60 Spilman Street,GRANSMOOR,YO25 2GZ,GB,United Kingdom,AndrewHale@fleckens.hu,070 8858 9587,44.0,1975-12-17,53.381289,-0.757120,2023-09-12
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
429182,767183,C867183,Female,Ghislaine,Lamers,Beukensingel 28,Raalte,8102 JA,NL,Netherlands,GhislaineLamers@fleckens.hu,06-77490188,31.0,1989-11-01,52.473943,6.304586,2023-09-12
429183,767184,C867184,Male,Tuur,Atsma,Abdissenlaan 30,Landgraaf,6374 BM,NL,Netherlands,TuurAtsma@teleworm.us,06-37440313,31.0,1996-08-14,50.887646,6.008784,2023-09-12
429184,767185,C867185,Female,Seweryna,Kowalczyk,Rue du Cornet 275,Asse,1730,BE,Belgium,SewerynaKowalczyk@cuvox.de,0498 95 85 49,32.0,1991-07-10,50.980902,4.303276,2023-09-12
429185,767190,C867190,Male,Dmit?i,,Dominee Klinckhamerlaan 91,Appingedam,9901 TD,NL,Netherlands,DmitrivanMechelen@rhyta.com,06-76484124,31.0,1991-11-09,53.398800,6.948384,2023-09-12


In [141]:
df_bronze= df_bronze.sort_values(by=['LASTMODIFIED'], ascending=False)
df_bronze['rn'] = df_bronze.groupby(['INDEX','KUNNR']).cumcount()+1
df_bronze[df_bronze.rn == 1]
df_bronze.reset_index(inplace=True)

Unnamed: 0,INDEX,KUNNR,GENDE,NAME1,NAME2,STRAS,ORT01,PSTLZ,LAND1,LAND2,RSCON,TELF1,REGIO,BIRTH,LATI,LONG,LASTMODIFIED,rn
0,767195,C867195,Female,Naiquen,García,Herrería 74,Diezma,18180,ES,Spain,NaiquenGarciaDuenas@jourrapide.com,632 393 331,34.0,1982-10-03,37.306270,-3.236022,2023-09-12,1
286159,373024,C473024,Female,Irene,Milano,Via Goffredo Mameli 121,Misano Adriatico,47843,IT,Italy,IreneMilano@einrot.com,0312 5199990,39.0,1989-06-12,44.021131,12.579491,2023-09-12,1
286131,372937,C472937,Male,Nicholas,Nyman,Alsteråvägen 16,MOTALA,591 67,SE,Sweden,NicholasNyman@teleworm.us,0141-8520273,46.0,1985-04-29,58.522212,15.088656,2023-09-12,1
286130,372935,C472935,,,Lees,86 Nith Street,GLASAHOILE,FK8 9HU,GB,United Kingdom,JonathanLees@gustr.com,070 7947 8473,44.0,1900-01-01,55.906269,-3.889257,2023-09-12,1
286129,372934,C472934,Female,Pauline,Ström,,ERIKSMÅLA,361 04,SE,Sweden,PaulineStrom@superrito.com,0471-3323318,46.0,1979-12-17,56.783852,15.524003,2023-09-12,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
143059,-99,C1260238,Male,Ryan,Scott,97 South Western Terrace,MINFFORDD,LL48 4SG,GB,United Kingdom,RyanScott@cuvox.de,079 0467 9462,44.0,2000-09-15,52.872901,-4.016153,2023-09-12,1
143058,1160235,C1260235,Female,Emily,Bryant,,KILCHRENAN,PA35 5HY,GB,United Kingdom,,077 0934 3274,44.0,1900-01-01,56.260805,-5.346506,2023-09-12,1
143057,1160231,C1260231,Male,Konstantyn,Grabowski,ul. Generala Sulkowskiego Józefa 150,Szczecin,71-129,PL,Poland,KonstantynGrabowski@armyspy.com,67 190 48 65,48.0,1977-06-01,53.359614,14.605133,2023-09-12,1
143056,1160227,C1260227,Male,Cyprien,Urbina,C/ Angosto 74,Sabiote,23410,ES,Spain,CyprienUrbinaToro@teleworm.us,724 173 146,34.0,1983-05-19,38.035674,-3.392758,2023-09-12,1


In [143]:
target = session.table("SNF_MDRAZ.SILVER.CUSTOMER")
df_sp_bronze = session.create_dataframe(df_bronze)

merged = target.merge(
    source=df_sp_bronze,
    join_expr=(target['ID'] == df_sp_bronze['INDEX']),
    clauses= [
        when_matched().update(
            {
            "CUSTOMERID": df_sp_bronze['KUNNR'],
            "GENDER": df_sp_bronze['GENDE'], 
            "FIRSTNAME": df_sp_bronze['NAME1'], 
            "LASTNAME": df_sp_bronze['NAME2'], 
            "STREETADDRESS": df_sp_bronze['STRAS'], 
            "CITY": df_sp_bronze['ORT01'], 
            "POSTALCODE": df_sp_bronze['PSTLZ'], 
            "COUNTRYCODE": df_sp_bronze['LAND1'], 
            "COUNTRYNAME": df_sp_bronze['LAND2'], 
            "EMAILADDRESS": df_sp_bronze['RSCON'], 
            "PHONENUMBER": df_sp_bronze['TELF1'], 
            "PHONECOUNTRYCODE": df_sp_bronze['REGIO'], 
            "DATEOFBIRTH": df_sp_bronze['BIRTH'], 
            "LATITUDE": df_sp_bronze['LATI'], 
            "LONGITUDE": df_sp_bronze['LONG'], 
            "LASTMODIFIED": df_sp_bronze['LASTMODIFIED']
            }
        ),
        when_not_matched().insert(
            {
            "ID": df_sp_bronze['INDEX'], 
            "CUSTOMERID": df_sp_bronze['KUNNR'],
            "GENDER": df_sp_bronze['GENDE'], 
            "FIRSTNAME": df_sp_bronze['NAME1'], 
            "LASTNAME": df_sp_bronze['NAME2'], 
            "STREETADDRESS": df_sp_bronze['STRAS'], 
            "CITY": df_sp_bronze['ORT01'], 
            "POSTALCODE": df_sp_bronze['PSTLZ'], 
            "COUNTRYCODE": df_sp_bronze['LAND1'], 
            "COUNTRYNAME": df_sp_bronze['LAND2'], 
            "EMAILADDRESS": df_sp_bronze['RSCON'], 
            "PHONENUMBER": df_sp_bronze['TELF1'], 
            "PHONECOUNTRYCODE": df_sp_bronze['REGIO'], 
            "DATEOFBIRTH": df_sp_bronze['BIRTH'], 
            "LATITUDE": df_sp_bronze['LATI'], 
            "LONGITUDE": df_sp_bronze['LONG'], 
            "LASTMODIFIED": df_sp_bronze['LASTMODIFIED']
            }
        )
    ]
)

In [144]:
merged

MergeResult(rows_inserted=429187, rows_updated=0, rows_deleted=0)