In [5]:
import pandas as pd
import psycopg2
from psycopg2 import sql, OperationalError

class PostgresDB:
#Initialization (__init__ method)
#This method sets up the initial state of the database connection parameters:
    def __init__(self, host, database, user, password, port=5432):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.connection = None

#Connecting to the Database (connect method)
#This method attempts to connect to the PostgreSQL 
#database using the provided credentials:
    def connect(self):
        try:
            self.connection = psycopg2.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password,
                port=self.port
            )
            print("Connection to PostgreSQL DB successful")
        except OperationalError as e:
            print(f"The error '{e}' occurred")
#Executing Queries (execute_query method)
#This method executes a given SQL query on the connected database:
    def execute_query(self, query):
        if self.connection is None:
            print("Connection not established. Call connect() first.")
            return

        cursor = self.connection.cursor()
        try:
            cursor.execute(query)
            self.connection.commit()
            print("Query executed successfully")
        except Exception as e:
            print(f"The error '{e}' occurred")
        finally:
            cursor.close()
#Closing the Connection (close_connection method)
#This method closes the database connection if it exists:
    def close_connection(self):
        if self.connection is not None:
            self.connection.close()
            print("Connection closed")



In [6]:
#function maps pandas data types to their equivalent SQL data types,
# ensuring that data type conversions are handled correctly when 
# migrating data between pandas and SQL databases.

def map_dtype_to_sql(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return "INTEGER"
    elif pd.api.types.is_float_dtype(dtype):
        return "FLOAT"
    elif pd.api.types.is_bool_dtype(dtype):
        return "BOOLEAN"
    else:
        return "TEXT"

In [7]:
# Initialize the database connection
db = PostgresDB(
    host="localhost",
    database="postgres",
    user="postgres",
    password="pass1234"
)
db.connect()

# Read the CSV file into a DataFrame
df_parquet = pd.read_parquet(r"D:\DATA_Analysis\Arkon\data\data2 (1).parquet")

df_data1 = pd.read_csv(r"D:\DATA_Analysis\Arkon\data\Data1.csv")

df_union = pd.concat([df_parquet, df_data1])
df_union.index = range(df_union.shape[0])
#df_union = df_union.dropna().drop_duplicates()

# Generate SQL for creating the table
table_name = "data_union"
schema_name = "arkon_data"
columns = []
for column_name, dtype in df_union.dtypes.items():
    sql_type = map_dtype_to_sql(dtype)
    columns.append(f"{column_name} {sql_type}")

create_table_query = f"""
CREATE SCHEMA IF NOT EXISTS {schema_name};
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
    id SERIAL PRIMARY KEY,
    {', '.join(columns)}
);
"""
db.execute_query(create_table_query)

# Insert data into the table
insert_query_template = f"""
INSERT INTO {schema_name}.{table_name} ({', '.join(df_union.columns)}) VALUES ({', '.join(['%s'] * len(df_union.columns))});
"""
cursor = db.connection.cursor()
for index, row in df_union.iterrows():
    cursor.execute(insert_query_template, tuple(row))
db.connection.commit()
cursor.close()

# Close the database connection
db.close_connection()

Connection to PostgreSQL DB successful
Query executed successfully
Connection closed
