## Create sqlalchemy session

In [1]:
from functools import cached_property, lru_cache
from typing import Any

from pydantic import (
    BaseModel,
    Field,
    FieldValidationInfo,
    PostgresDsn,
)
from pydantic.functional_validators import field_validator
from typing_extensions import Annotated


class PostgresSettings(BaseModel):
    HOST: str = "localhost"
    PORT: int = 5432
    USER: str = "postgres_user"
    PASSWORD: str = "postgres_password"
    DB: str = "app"
    DATABASE_URI: Annotated[PostgresDsn | None, Field(validate_default=True)] = None

    @field_validator("DATABASE_URI", mode="after")
    @classmethod
    def assemble_db_connection(  # pylint: disable=no-self-argument
        cls, v: str | None, info: FieldValidationInfo
    ) -> Any:
        if isinstance(v, str):
            return v

        return PostgresDsn.build(
            scheme="postgresql",
            username=info.data["USER"],
            password=info.data["PASSWORD"],
            host=info.data["HOST"],  # type: ignore
            port=info.data["PORT"],
            path=str(info.data["DB"]) or "",
        )

    @cached_property
    def ASYNC_DATABASE_URI(self) -> str:
        if not self.DATABASE_URI:
            raise RuntimeError("DATABASE_URI must be not None")

        return (
            str(self.DATABASE_URI).replace("postgresql://", "postgresql+asyncpg://")
            if self.DATABASE_URI
            else str(self.DATABASE_URI)
        )


postgresql_settings = PostgresSettings()
postgresql_settings

PostgresSettings(HOST='localhost', PORT=5432, USER='postgres_user', PASSWORD='postgres_password', DB='app', DATABASE_URI=MultiHostUrl('postgresql://postgres_user:postgres_password@localhost:5432/app'))

In [2]:
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker


def sync_create_engine(
    pool_size: int = 5, max_overflow: int = 10, pool_timeout: int = 30
) -> Engine:
    return create_engine(
        str(postgresql_settings.DATABASE_URI),
        echo=True,
        pool_pre_ping=True,
        future=True,
        pool_size=pool_size,
        max_overflow=max_overflow,
        pool_timeout=pool_timeout,
    )


def sync_create_session(engine: Engine) -> sessionmaker:
    return sessionmaker(engine, autocommit=False, autoflush=False)  # type: ignore

In [3]:
db_engine = sync_create_engine(
    pool_size=20,
    max_overflow=10,
    pool_timeout=60,
    )
db_session = sync_create_session(db_engine)

In [4]:
from contextlib import contextmanager


@contextmanager
def get_db():
    """
    Dependency function that yields db sessions
    """

    if not db_session:
        raise RuntimeError("call init_db_engine before")

    with db_session() as session:
        try:
            yield session
        finally:
            session.commit()

In [None]:
from typing import List
from typing import Optional
from datetime import date, datetime, timedelta, timezone

from sqlalchemy import ForeignKey
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass

In [None]:
from typing import Any, Generic, Sequence, Type, TypeVar

from sqlalchemy import delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select


def create(db: AsyncSession, *, db_obj):
    db.add(db_obj)
    db.commit()
    db.refresh(db_obj)
    return db_obj

In [None]:
from datetime import datetime, timezone
from typing import Any

import pytz
from sqlalchemy import DateTime, TypeDecorator
from sqlalchemy.engine import Dialect


TIMEZONE = "Asia/Ho_Chi_Minh"


class TZDateTime(TypeDecorator):
    impl = DateTime
    cache_ok = True

    def process_bind_param(self, value: datetime | None, dialect: Dialect) -> Any:
        if value is not None:
            if not value.tzinfo:
                raise TypeError("tzinfo is required")
            value = value.astimezone(timezone.utc).replace(tzinfo=None)
        return value

    def process_result_value(self, value: datetime | None, dialect: Dialect) -> Any:
        if value is not None:
            value = value.replace(tzinfo=timezone.utc).astimezone(
                pytz.timezone(TIMEZONE)
            )
        return value

## IIP

### Create table

In [None]:
# from typing import List
# from typing import Optional
# from datetime import date, datetime, timedelta, timezone

# from sqlalchemy import ForeignKey
# from sqlalchemy import String
# from sqlalchemy.orm import DeclarativeBase
# from sqlalchemy.orm import Mapped
# from sqlalchemy.orm import mapped_column
# from sqlalchemy.orm import relationship

# class IIP(Base):
#     __tablename__ = "iip"

#     id: Mapped[int] = mapped_column(primary_key=True)
#     industry: Mapped[str | None] = mapped_column(String(255))
#     mom: Mapped[float | None] = mapped_column(nullable=True)
#     yoy: Mapped[float | None] = mapped_column(nullable=True)
#     ytd_yoy: Mapped[float | None] = mapped_column(nullable=True)
#     date: Mapped[datetime] = mapped_column(TZDateTime, default=None, nullable=True)

In [None]:
# Base.metadata.create_all(db_engine)

In [None]:
import pandas as pd

iip_df = pd.read_csv('csv/iip.csv')
iip_df

In [None]:
import sqlalchemy

iip_df.to_sql(
    "IIP",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### CPI

In [None]:
import pandas as pd

cpi_df = pd.read_csv('csv/cpi.csv')
cpi_df

In [None]:
import sqlalchemy

cpi_df.to_sql(
    "CPI",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### GDP

In [None]:
import pandas as pd

gdp_df = pd.read_csv('csv/gdp_ss.csv')
gdp_df

In [None]:
import sqlalchemy

gdp_df.to_sql(
    "GDP",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### SPCN

In [None]:
import pandas as pd

spcn_df = pd.read_csv('csv/spcn.csv')
spcn_df

In [None]:
import sqlalchemy

spcn_df.to_sql(
    "SPCN",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### Tổng mức bán lẻ

In [None]:
import pandas as pd

tongmuc_df = pd.read_csv('csv/tongmuc.csv')
tongmuc_df

In [None]:
import sqlalchemy

tongmuc_df.to_sql(
    "TongMuc",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### XK

In [None]:
import sqlalchemy
import pandas as pd

xk_df = pd.read_csv('csv/xk.csv')

xk_df.to_sql(
    "XK",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### NK

In [None]:
import sqlalchemy
import pandas as pd

nk_df = pd.read_csv('csv/nk.csv')

nk_df.to_sql(
    "NK",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

### FDI

In [5]:
import sqlalchemy
import pandas as pd

fdi_df = pd.read_csv('csv/fdi.csv')

fdi_df.to_sql(
    "FDI",
    con=db_engine,
    if_exists="replace",
    index=False,
    dtype={
        "Date": sqlalchemy.DateTime,
    },
)

2024-03-31 11:51:21,130 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2024-03-31 11:51:21,131 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-03-31 11:51:21,132 INFO sqlalchemy.engine.Engine select current_schema()
2024-03-31 11:51:21,133 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-03-31 11:51:21,134 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2024-03-31 11:51:21,134 INFO sqlalchemy.engine.Engine [raw sql] {}
2024-03-31 11:51:21,135 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-31 11:51:21,140 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname

534