In [31]:
import sys
sys.path.append('../../..')
from dev.utils import load_config
from projects.circle_to_intercom_migration.api import CircleAPI
import pandas as pd
from sqlalchemy import create_engine
from html_to_text import convert_html_to_text

In [32]:
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

class Snowflake:
    SNOWFLAKE_CONFIG = load_config()["snowflake"]

    def __init__(self, database: str):
        self.connector = snowflake.connector.connect(
            database = database,
            user = self.SNOWFLAKE_CONFIG['username'],
            password = self.SNOWFLAKE_CONFIG['password'],
            account = self.SNOWFLAKE_CONFIG['account'],
            warehouse = self.SNOWFLAKE_CONFIG["warehouse"],
            role = self.SNOWFLAKE_CONFIG["role"]
        )
        self.database = database
    
    def __del__(self):
        self.close_connection()
    
    def close_connection(self):
        self.connector.close()

    def create_schema(self, schema_name: str, verbose: bool = False):
        try:            
            # Create the CIRCLE_CUSTOM schema
            self.connector.cursor().execute(f'CREATE SCHEMA IF NOT EXISTS {schema_name}')
            if verbose: print(f"Schema '{schema_name}' created successfully.")
        except Exception as e:
            print(f"An error occurred: {e}")

    def drop_schema(self, schema_name: str, verbose: bool = False, are_you_sure: bool=False):
        try:            
            # Create the CIRCLE_CUSTOM schema
            self.connector.cursor().execute(f'DROP SCHEMA IF EXISTS {schema_name} CASCADE')
            if verbose: print(f"Schema '{schema_name}' has been dropped successfully.")
        except Exception as e:
            print(f"An error occurred: {e}")

    def create_table(self, schema_name: str, table_name: str, columns: list[dict], verbose: bool = False):
        """
        columns : [{'name': 'A', 'type': 'varchar'}]
        """
        try:
            self.connector.cursor().execute(f'use schema {schema_name}')
            
            # Create the CUSTOM_TABLE table
            create_table_query = f"create table if not exists {table_name} (\n\t"
            create_table_query += ',\n\t'.join([f"{c['name']} {c['type']}" for c in columns])
            create_table_query += '\n)'
        
            self.connector.cursor().execute(create_table_query)
            if verbose: print(f"Table '{table_name}' created successfully in schema '{schema_name}'.")
        except Exception as e:
            print(f"An error occurred: {e}")
        
    def load_table(self, df: pd.DataFrame, table_name: str, schema_name: str, verbose: bool = False):
        self.connector.cursor().execute(f'USE SCHEMA {schema_name}')
        success, nchunks, nrows, _ = write_pandas(self.connector, df, table_name)
        if verbose: print(f"DataFrame successfully loaded into '{table_name}'. Rows inserted: {nrows}")   

    def drop_table(self, are_you_sure: bool=False):
        print('not implemented.')

## load Circle articles

In [33]:
posts = CircleAPI.api_get_posts(render=False)

## format data

In [39]:
data_dict = [{
    'title': p['name'],
    'url': p['url'],
    'body_html': p['body']['body'],
    'slug': p['slug'],
    'folder_name': p['space_name'],
    'folder_slug': p['space_slug'],
    'created_at': p['created_at']
} for p in posts]

df = pd.DataFrame(data_dict)

## strop all columns
for c in df.columns: df[c] = df[c].str.strip()
# add column with text in markdown
df['body_markdown'] = df['body_html'].apply(lambda x: convert_html_to_text(x))
## reorg columns
df = df[['title', 'url','slug','body_html', 'body_markdown', 'folder_name', 'folder_slug', 'created_at']]
## upper the column name
df.columns = [c.upper() for c in df.columns]

## create Snowflake table and dump the table

In [40]:
database = "RAW"
schema_name = 'CIRCLE_CUSTOM'
table_name = "ARTICLES"
columns = [{"name":c.upper(), "type": "varchar"} for c in df.columns]

snow = Snowflake(database)

## create schema
snow.create_schema(schema_name)

In [41]:
## create table
snow.create_table(schema_name=schema_name, table_name=table_name, columns=columns)

In [42]:
## dump dataframe

snow.load_table(df=df, table_name=table_name, schema_name=schema_name)

In [29]:
df.to_csv('~/Desktop/2024-02-20-circle-articles.csv')