<a href="https://colab.research.google.com/github/Zanskar-Geothermal/GRC_2022_Workshop_Zanskar/blob/main/BowersDataSnowflakePipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import packages, functions, and Google Authentication

In [None]:
# Modify this before you write any files anywhere!
SHEET_VERSION = "_v1"

In [None]:
import pandas as pd
import numpy as np

In [None]:
from google.colab import auth
import gspread
from google.auth import default
#autenticating to google
auth.authenticate_user()
creds, _ = default()
gc = gspread.authorize(creds)

In [None]:

import numpy as np
def get_sheet(workbook_url, worksheet):
  '''
      Retrieves a google sheet provided by an url

      Inputs:
        workbook_url : str
          url of the google sheet
        worksheet : str
          Worksheet you wish to import 

      Returns:
        pd.DataFrame
  '''
  workbook = gc.open_by_url(workbook_url)

  if worksheet not in [e.title for e in workbook.worksheets()]:
    raise ValueError(f"Worksheet {worksheet} was not found inside the {workbook_url}")

  # Get all rows:
  rows = workbook.worksheet(worksheet).get_all_values()

  #Convert to a DataFrame 
  df = pd.DataFrame(rows)
  df.columns = df.iloc[0]
  df = df.iloc[1:]

  return df

def get_and_concat_sheets(workbook_url, worksheets):
  '''
    Inputs: 
      workbook_url : str
        Workbook url 
      
      worksheets : list
        A list of worksheets to concat
  '''

  data = []
  columns = [] 

  for sheet in worksheets:

    df = get_sheet(workbook_url, sheet)
    df['Sheet_Name'] = sheet
    data.append(df)
    columns.append(df.columns.to_list())

    # check if all data frames have the same columns:
    # if len(columns) > 0:
    #   for col in columns:
    #     if not (all(c in columns[0] for c in col)):
    #       raise ValueError(f"The data frames don't have matching columns.")

    print(f"Read sheet {sheet} with {df.shape[0]} rows.")
  
  return data, columns

def concat_dfs(dfs):
  '''
    Concatenates data frames and makes sure they have matching columns
  '''
  cols = set()
  for df in dfs:
    cols.union(set(df.columns))

  all_dfs = []
  for df in dfs:
    col_dif = cols - set(df.columns)

    if len(col_dif) > 0:
      for cd in col_dif:
        df[cd] = np.nan
    
    all_dfs.append(df.loc[:,list(cols)])
  
  return pd.concat(all_dfs)

In [None]:
def write_tables_parquet(df, name):
  """ 
  input dataframe and name for file
  """
  df.to_parquet(str(name) + '.gzip',
                  compression='gzip')

# Import data and concatenate

In [None]:
#gets data from Google Drive
TEMPERATURE_WORKBOOK = 'https://docs.google.com/spreadsheets/d/17--KA6cvTn5Y-_T5W6p5nhrAySx7zin4AolpbHmShsg/edit#gid=1296186900'
TEMPERATURE_SHEETS = ['TG_Gina', 'TG_Kayla']

GEOCHEM_WORKBOOK = 'https://docs.google.com/spreadsheets/d/1u6sAjhFGPt944-lu7qO2i3P6MB61v_Ila2c_PW93z8A/edit#gid=0'
GEOCHEM_SHEETS = ['Sheet1']

In [None]:
#concatenates identified sheets into one dataframe
temperature, columns = get_and_concat_sheets(TEMPERATURE_WORKBOOK, TEMPERATURE_SHEETS)

temperature_DF = pd.concat(temperature, ignore_index=True)

Read sheet TG_Gina with 27006 rows.
Read sheet TG_Kayla with 24462 rows.


In [None]:
#concatenates identified sheets into one dataframe
geochem, gc_columns = get_and_concat_sheets(GEOCHEM_WORKBOOK, GEOCHEM_SHEETS)

geochem_DF = pd.concat(geochem)

Read sheet Sheet1 with 25 rows.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd '/content/drive/Shareddrives/Zanskar_Geoscience/Projects/Bowers_Data'

/content/drive/Shareddrives/Zanskar_Geoscience/Projects/Bowers_Data


#Geochem prep

In [None]:
#curate columns

geochem_DF['Number'] = geochem_DF['Number'].astype(str)

geochem_DF['pH'] = geochem_DF['pH'].apply(pd.to_numeric,errors='coerce')

geochem_col = ["Number", "Identification",	"Basin No.",	"Temperature [C]",	"pH",	"Conductivity [µmhos/cm]",	"SiO2 [ppm]",	"Ag [ppm]",	"Al [ppm]",	"As [ppm]",	"Au [ppm]",	"B [ppm]",	
               "Ba [ppm]", "Be [ppm]", "Bi [ppm]",	"Br [ppm]",	"Ca [ppm]",	"Cd [ppm]",	"Ce [ppm]",	"Cl [ppm]",	"Co [ppm]",	"Cs [ppm]",	"Cr [ppm]",	"Cu [ppm]",	"CaCO3 [ppm]",
               "CO3 [ppm]", "OH [ppm]",	"HCO3 [ppm]",	"CO3 & HCO3 [ppm]",	"F [ppm]",	"Fe [ppm]",	"Hg [ppm]", "I [ppm]", "K [ppm]", "La [ppm]", "Li [ppm]", "Mg [ppm]", "Mn [ppm]", "Mo [ppm]",
               "Na [ppm]", "Ni [ppm]", "NH3 [ppm]", "NH4 [ppm]", "NO2 [ppm]", "NO3 [ppm]", "NO3 & NO2 [ppm]", "P [ppm]", "Pb [ppm]", "PO4 [ppm]", "Rb [ppm]", "S [ppm]", "Sb [ppm]", "Se [ppm]", "Sn [ppm]", 
               "Sr [ppm]", "SO4 [ppm]", "Te [ppm]", "Ti [ppm]", "Th [ppm]", "U [ppm]", "V [ppm]", "W [ppm]", "Zn [ppm]", "Zr [ppm]", "Total Major Species [ppm]", "Cations Sum", "Anions Sum", "Ratio Cations/Anions",	"TDS [ppm]",
               "Total Alkalinity", "Sodium Abosorption"]
for i in geochem_col:
  geochem_DF.loc[geochem_DF[i]=="", i] = np.nan #makes blank cells null


In [None]:
# Remove < symbol from results and mark in seperate column

ppm_wstring = ['B [ppm]',	'CO3 [ppm]', 'Hg [ppm]', 'Mn [ppm]', 'P [ppm]']

for col in ppm_wstring: 
  filter_v = geochem_DF[col].astype(str).str.contains("<", na=False)
  geochem_DF[col  + '_lessthan'] = False
  geochem_DF.loc[filter_v, col + '_lessthan'] = True
  geochem_DF = geochem_DF.replace({col + '_lessthan': {'<': ''}})
  geochem_DF = geochem_DF.replace({col + '_lessthan': {False: np.nan, True: "<"}})
  geochem_DF[[col]] = geochem_DF[
    [col]].apply(pd.to_numeric,errors='coerce')
  

In [None]:
write_tables_parquet(geochem_DF, f'Bowers_GeochemData_Sheets_{SHEET_VERSION}')

#Temperature Prep

In [None]:
#convert to float - columns: lat, long, depth [ft], depth [m], elevation [ft], temp [C], temp [F], run_number
float_columns = ["Elevation [ft]", "Latitude", "Longitude", "Depth [m]", "Temp [C]", "Depth [ft]", "Temp [F]", "Run_Number"]
index = range(len(temperature_DF))
for i in float_columns: 
  temperature_DF.loc[temperature_DF[i] == "", i] = np.nan #makes blank cells null
  print(i)
  for j in index:
    if type(temperature_DF[i][j]) == str:
        try:
          oldstring=temperature_DF[i][j]
          newstring=oldstring.replace(",", "")
          temperature_DF[i][j] = float(newstring)
        except:
          temperature_DF[i][j] = np.nan

Elevation [ft]
Latitude
Longitude
Depth [m]
Temp [C]
Depth [ft]
Temp [F]
Run_Number


In [None]:
#convert string column to timestamp column
temperature_DF["Log_Date"] = pd.to_datetime(temperature_DF["Log_Date"])

def fix_date_cols(df, tz='UTC'): #adding time zone to timestamp makes Snowflake read the timestamp as a date
    cols = df.select_dtypes(include=['datetime64[ns]']).columns 
    print(cols)
    for col in cols:
        df[col] = df[col].dt.tz_localize(tz)

fix_date_cols(temperature_DF)


Index(['Log_Date'], dtype='object', name=0)


In [None]:
write_tables_parquet(temperature_DF, f'Bowers_TempData_Sheets_{SHEET_VERSION}')

#Test read
Ensure that columns are of the proper data type for Snowflake injestion

In [None]:
Temperature_data_df = pd.read_parquet(f"Bowers_TempData_Sheets_{SHEET_VERSION}.gzip")


In [None]:
Temperature_data_df.info() #displays condensed info on dataframe

In [None]:
Geochem_data_df = pd.read_parquet(f"Bowers_GeochemData_Sheets_{SHEET_VERSION}.gzip")

Geochem_data_df

In [None]:
Geochem_data_df.info() #displays condensed info on dataframe

# Ingest to Snowflake

In [None]:
# install snowflake connector

!pip install "snowflake-connector-python[secure-local-storage,pandas]"

In [None]:
#initiate connection
import snowflake.connector


credentials = {
    'account'    : "ivvssic-ds27297"
    , 'user'     : "jacob@zanskar.us"
    , 'authenticator' : 'externalbrowser'
    }
# Create a connection string:
cnx = snowflake.connector.connect(**credentials)
cur  = cnx.cursor()
cur.execute('SELECT CURRENT_USER()')
text = cur.fetchall()

print(text)

# Close the connection once done. 
cnx.close()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://accounts.google.com/o/saml2/idp?idpid=C01b7xnzd&SAMLRequest=nZJdb9sgFIb%2FisWuDRg3WoLiRFmirpbaNWrSadsdBeKyYPAA22l%2F%2FXA%2BpO6ivdgFEoL3nOec857p%2FFDrpJPOK2sKkEEMEmm4FcpUBXjcXqdjkPjAjGDaGlmAF%2BnBfDb1rNYNXbTh2TzIP630IYmJjKfDRwFaZ6hlXnlqWC09DZxuFne3lEBMmffShYgD5xDhVWQ9h9BQhPq%2Bh30OrasQwRgjPEFRNUg%2BgTeI5mNG42yw3OpLyCH29A4iQ%2FhqQERFJKzPgV%2BUOY3gI8rTSeTpzXa7Ttf3my1IFpfultb4tpZuI12nuHx8uD0V4IcKQp5PRlew9SmXJjimM1jxBnpj%2B51me8lt3bQh5obxhnZSIG0rFSdWrgrQ7JU4lIxY%2FnrXHdgN%2BdqUPxYdUXa3N6ufv%2Bs%2BM2M3yolch67bc5B8v%2FhLBn9L71tZmsHVEJ8wyVNMUjLeEkJxRkcE5njyCySr6KoyLBwjL6Uzzm1rgoeVtZWWx%2FosGiwhSIlmHo8SxRJnT58P5lWA06rQI9PN%2Fm8AU%2FQ2x3n3vkU7ytXaasVfkmvrahbedyuD2fFFiXR3lFJZM6UXQjjpfXRNa9svnWQhrnhwrQRodqL%2Bu%2BSzvw%3D%3D&RelayState=59607 to 

In [None]:
#writes pandas dataframe to Snowflake

from snowflake.connector.pandas_tools import write_pandas

cxn = snowflake.connector.connect(**credentials)

write_pandas(cxn, Temperature_data_df, "BowersTempData", "DB_SCRATCH", "RAW", auto_create_table=True)

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://accounts.google.com/o/saml2/idp?idpid=C01b7xnzd&SAMLRequest=nZJNj9owEIb%2FSuSeYzsJCLAIiELZ0tKWLmH7cfM6Jlg4drAdAv31dfiQtofdQw%2BWLPudeWbmneH4VMrgyI0VWqUgghgEXDGdC1WkYJPNwz4IrKMqp1IrnoIzt2A8GlpayopMardTj%2FxQc%2BsCn0hZ0n6koDaKaGqFJYqW3BLHyHryZUliiAm1lhvnceAWklvhWTvnKoJQ0zSwSaA2BYoxxggPkFe1knfgBaJ6m1EZ7TTT8h5y8j29gogQ7rQIr%2FCE1S3wvVDXEbxFeb6KLPmYZatw9W2dgWBy726qla1LbtbcHAXjm8fltQDbVuCSZNDtwNqGjCtnqIxgwSpolW62ku4502VVO58b%2Bhva8hxJXQg%2FscUsBdVe5O5h%2BfNH1H3iJums5a%2FjTs22h80n8%2BEsdNLr7b9nyfRzMT887PcMBE93f%2BPW34W1NV%2Bo1lXnn3CchDgO434WxySKCR7Abr%2F%2FGwQz76pQ1F0i76VTxnStnIWF1oXkl%2Fo0ai2JkcirsT8iT6c4eu6d1J8cXFeFXJhm9H8DGKKXOW6799XbsZittBTsHMy1Kal73a0IRpcXkYfbi5Twkgo5yXPDrfWuSambqeHU%2BRV3puYAja7Uf5d89Bc%3D&RelayState=45025 to authenti

(True,
 1,
 51468,
 [('bacijgixcf/file0.txt',
   'LOADED',
   51468,
   51468,
   1,
   0,
   None,
   None,
   None,
   None)])

In [None]:
#writes pandas dataframe to Snowflake

from snowflake.connector.pandas_tools import write_pandas

cxn = snowflake.connector.connect(**credentials)

write_pandas(cxn, Geochem_data_df, "BowersGeochemData", "DB_SCRATCH", "RAW", auto_create_table=True)