In [13]:
from dotenv import load_dotenv
import polars as pl
import psycopg2
import os

In [27]:
#env_path="./Deeplearning_Pipeline/keys/postgresql.env"
env_path="./keys/postgresql.env"
current_directory = os.getcwd()
full_path = os.path.join(current_directory, env_path)
normalized_path = os.path.normpath(full_path)
load_dotenv(normalized_path, override=True)
postgresql_config = {
    "host": os.getenv("POSTGRESQL_HOST"),
    "port": os.getenv("POSTGRESQL_PORT"),
    "dbname": os.getenv("POSTGRESQL_DB"),
    "user": os.getenv("POSTGRESQL_USER"),
    "password": os.getenv("POSTGRESQL_PASSWORD"),
}

In [28]:
DATABASE_STRUCT_DICT = {
    "bronze_schema" :{"stock_prices_fact_table":{"columns_names": ['stock_symbol', 'current_price', 'change', 'percent_change', 'high_price_of_the_day', 'low_price_of_the_day', 'open_price_of_the_day', 'previous_close_price', 'Datetime'], "columns_data_types_list": ["String", "Float64", "Float64", "Float64", "Float64", "Float64", "Float64", "Float64", "String"]},
                      "company_information_dim_table":{"columns_names": ['symbol', 'price', 'beta', 'volAvg', 'mktCap', 'lastDiv', 'range', 'changes', 'companyName', 'currency', 'cik', 'isin', 'cusip', 'exchange', 'exchangeShortName', 'industry', 'website', 'description', 'ceo', 'sector', 'country', 'fullTimeEmployees', 'phone', 'address', 'city', 'state', 'zip', 'dcfDiff', 'dcf', 'image', 'ipoDate', 'defaultImage', 'isEtf', 'isActivelyTrading', 'isAdr', 'isFund', 'Datetime'], "columns_data_types_list": ["String", "Float64", "Float64", "Int64", "Int64", "Float64", "String", "Float64", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "Float64", "Float64", "String", "String", "Boolean", "Boolean", "Boolean", "Boolean", "Boolean", "String"]}},
    "silver_schema" :{"silver_stock_prices_fact":{"columns_names": ['stock_symbol', 'current_price', 'change', 'percent_change', 'high_price_of_the_day', 'low_price_of_the_day', 'open_price_of_the_day', 'previous_close_price', 'Datetime', 'Date', 'Day', 'Month', 'Year', 'Hour', 'Min', 'Sec'], "columns_data_types_list": ["String", "Float64", "Float64", "Float64", "Float64", "Float64", "Float64", "Float64", "String", "Date", "Int8", "Int8", "Int32", "Int8", "Int8", "Int8"]},
                      "silver_company_info_dim":{"columns_names": ['symbol', 'price', 'beta', 'volAvg', 'mktCap', 'lastDiv', 'range', 'changes', 'companyName', 'currency', 'cik', 'isin', 'cusip', 'exchange', 'exchangeShortName', 'industry', 'website', 'description', 'ceo', 'sector', 'country', 'fullTimeEmployees', 'phone', 'address', 'city', 'state', 'zip', 'dcfDiff', 'dcf', 'image', 'ipoDate', 'defaultImage', 'isEtf', 'isActivelyTrading', 'isAdr', 'isFund', 'Datetime', 'Date', 'Day', 'Month', 'Year', 'Hour', 'Min', 'Sec'], "columns_data_types_list": ["String", "Float64", "Float64", "Int64", "Int64", "Float64", "String", "Float64", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "Float64", "Float64", "String", "String", "Boolean", "Boolean", "Boolean", "Boolean", "Boolean", "String", "Date", "Int8", "Int8", "Int32", "Int8", "Int8", "Int8"]}},
    "gold_schema" :{"gold_table":{"columns_names": ['stock_symbol', 'companyName', 'current_price', 'percent_change', 'mktCap', 'sector', 'industry', 'country', 'Date', 'Day', 'Month', 'Year', 'Hour', 'Min', 'Sec'], "columns_data_types_list": ["String", "String", "Float64", "Float64", "Int64", "String", "String", "String", "Date", "Int8", "Int8", "Int32", "Int8", "Int8", "Int8"]},
                      "gold_company_information_dim_table":{"columns_names": ['symbol', 'price', 'beta', 'volAvg', 'mktCap', 'lastDiv', 'range', 'changes', 'companyName', 'currency', 'cik', 'isin', 'cusip', 'exchange', 'exchangeShortName', 'industry', 'website', 'description', 'ceo', 'sector', 'country', 'fullTimeEmployees', 'phone', 'address', 'city', 'state', 'zip', 'dcfDiff', 'dcf', 'image', 'ipoDate', 'defaultImage', 'isEtf', 'isActivelyTrading', 'isAdr', 'isFund', 'Datetime', 'Date', 'Day', 'Month', 'Year', 'Hour', 'Min', 'Sec'], "columns_data_types_list": ["String", "Float64", "Float64", "Int64", "Int64", "Float64", "String", "Float64", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "String", "Float64", "Float64", "String", "String", "Boolean", "Boolean", "Boolean", "Boolean", "Boolean", "String", "Date", "Int8", "Int8", "Int32", "Int8", "Int8", "Int8"]}}
}

In [37]:
def read_table_to_polars_dataframe(postgresql_config):
        """
        Reads all data from a specified table in the given schema and imports it into a Polars DataFrame.
        Args:
            schema (str): The schema name.
            table_name (str): The table name.

        Returns:
            polars.DataFrame: The table data as a Polars DataFrame.
        """
        try:
            schema = "silver_schema"
            table = "silver_stock_prices_fact"
            table_metadata = DATABASE_STRUCT_DICT[schema][table]
            columns = table_metadata["columns_names"]
            column_types = table_metadata["columns_data_types_list"]
            
            polars_type_map = {
                "String": pl.Utf8,
                "Float64": pl.Float64,
                "Int8": pl.Int8,
                "Int32": pl.Int32,
                "Date": pl.Date,
                "Int64" : pl.Int64
            }
            
            schema_types = {col: polars_type_map[dtype] for col, dtype in zip(columns, column_types)}
            
            query = f"SELECT {', '.join(columns)} FROM {schema}.{table}"

            connection = psycopg2.connect(
                host=postgresql_config["host"],
                dbname=postgresql_config["dbname"],
                user=postgresql_config["user"],
                password=postgresql_config["password"],
                port=postgresql_config["port"]
            )
            
            cursor = connection.cursor()
            cursor.execute(query)
            data = cursor.fetchall()
            df = pl.DataFrame(data, schema=schema_types, orient="row")
            return df
        except Exception as e:
            print(f"Error: {e}")
            return None
        finally:
            try:
                connection.close()
            except Exception as e:
                return None

In [38]:
read_table_to_polars_dataframe(postgresql_config)

stock_symbol,current_price,change,percent_change,high_price_of_the_day,low_price_of_the_day,open_price_of_the_day,previous_close_price,Datetime,Date,Day,Month,Year,Hour,Min,Sec
str,f64,f64,f64,f64,f64,f64,f64,str,date,i8,i8,i32,i8,i8,i8
"""AAPL""",228.02,3.02,1.3422,229.74,225.17,225.25,225.0,"""2024-11-18T17:53:36.747774""",2024-11-18,18,11,2024,17,53,36
"""MSFT""",415.76,0.76,0.1831,418.4037,412.1,414.87,415.0,"""2024-11-18T17:53:36.747774""",2024-11-18,18,11,2024,17,53,36
"""GOOGL""",175.3,2.81,1.6291,175.438,172.9,173.42,172.49,"""2024-11-18T17:53:36.747774""",2024-11-18,18,11,2024,17,53,36
"""AMZN""",201.7,-0.91,-0.4491,204.67,200.95,204.15,202.61,"""2024-11-18T17:53:36.747774""",2024-11-18,18,11,2024,17,53,36
"""TSLA""",338.74,18.02,5.6186,348.5499,330.01,340.73,320.72,"""2024-11-18T17:53:36.747774""",2024-11-18,18,11,2024,17,53,36
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""AAPL""",226.74,-1.28,-0.5614,227.74,226.73,227.74,228.02,"""2024-11-19T09:30:57.204364""",2024-11-19,19,11,2024,9,30,57
"""MSFT""",413.41,-2.35,-0.5652,414.64,412.89,414.64,415.76,"""2024-11-19T09:30:57.204364""",2024-11-19,19,11,2024,9,30,57
"""GOOGL""",173.5844,-1.7156,-0.9787,174.06,173.564,174.06,175.3,"""2024-11-19T09:30:57.204364""",2024-11-19,19,11,2024,9,30,57
"""AMZN""",199.07,-2.63,-1.3039,201.25,199.03,201.25,201.7,"""2024-11-19T09:30:57.204364""",2024-11-19,19,11,2024,9,30,57
