In [None]:
from dotenv import load_dotenv
import os
import mysql.connector
import psycopg2
import pyodbc
import pandas as pd
import openpyxl
## from pyspark.sql import SparkSession

In [None]:

# Load environment variables from .env
load_dotenv(dotenv_path=r"C:\Users\aadia\OneDrive\Desktop\projects\practice\new\env.env")

# Get DB config
db_type = os.getenv("DB_TYPE")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
database = os.getenv("DB_NAME")
username = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
# jar_path = os.getenv("JAR_PATH")
table_list = os.getenv("DB_TABLES").split(',')


In [None]:

def connect_db (db_type, host, port, database, username, password):
    # Start Spark session
    # spark = SparkSession.builder \
    #     .appName("Multi-Table SQL Connector") \
    #     .config("spark.jars", jar_path) \
    #     .getOrCreate()

    # Choose driver and JDBC URL
    if db_type == "mysql":
        conn = mysql.connector.connect(host = host,port = port,user = username,password = password,database = database)
    elif db_type == "postgresql":
       conn = psycopg2.connect(host = host,port = port,user = username,password = password,dbname = database)
    elif db_type == "sqlserver":
        conn = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};"f"SERVER={host},{port};"f"DATABASE={database};"f"UID={username};"f"PWD={password}")
    else:
        raise ValueError("Unsupported db_type")

    # # JDBC connection properties
    # connection_properties = {
    #     "user": username,
    #     "password": password,
    #     "driver": driver
    # }

    return conn

def print_tables(table_list,conn):
    # Load tables into a dictionary
    dataframes = {}
    for table in table_list:
        # table = table.strip()
        print(f"📥 Loading table: {table}")
        # df = spark.read.jdbc(url=url, table=table, properties=connection_properties)
        # df.createOrReplaceTempView(table)
        # df.show(5)  # Preview
        # dataframes[table] = df
        cursor=conn.cursor()
        cursor.execute(f"SELECT * FROM {table}")
        rows = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        df = pd.DataFrame((rows),columns = columns)
        # for row in rows:
        #     print(row)
        dataframes [table] = df
        display(df)
        cursor.close()
        print()    
    print()
    return dataframes

In [None]:
def merge_tables(dfs,join):
    if join == "inner":
        merge_table = pd.merge(dfs["Customers"], dfs["Accounts"], on = "customer_id", how = "inner")
    elif join == "left":
        merge_table = pd.merge(dfs["Customers"], dfs["Accounts"], on = "customer_id", how = "left")
    elif join == "right":
        merge_table = pd.merge(dfs["Customers"], dfs["Accounts"], on = "customer_id", how = "right")
    elif join == "outer":
        merge_table = pd.merge(dfs["Customers"], dfs["Accounts"], on = "customer_id", how = "outer")
    else :
        raise ValueError("Invalid join")
    return merge_table     


def SQL_join(conn,join):
    cursor = conn.cursor()
    if join == "inner":
        query = ("SELECT Customers.*,Accounts.* FROM Customers INNER JOIN Accounts ON Customers.customer_id = Accounts.customer_id")
    elif join == "left":
        query = ("SELECT Customers.*,Accounts.* FROM Customers LEFT JOIN Accounts ON Customers.customer_id = Accounts.customer_id")
    elif join == "right":
        query = ("SELECT Customers.*,Accounts.* FROM Customers RIGHT JOIN Accounts ON Customers.customer_id = Accounts.customer_id")
    elif join == "outer":
        query = ("SELECT Customers.*,Accounts.* FROM Customers LEFT JOIN Accounts ON Customers.customer_id = Accounts.customer_id UNION SELECT Customers.*,Accounts.* FROM Customers RIGHT JOIN Accounts ON Customers.customer_id = Accounts.customer_id")
    else :
        print("Invalid join")
    cursor.execute(query)
    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]
    df = pd.DataFrame((rows),columns = columns)
    cursor.close()
    return df
    

In [None]:
def sql_aggregate_summary(conn, table_name):
    cursor = conn.cursor()
    
    cursor.execute(f"SELECT * FROM {table_name}")
    sample = cursor.fetchall()
    col_names = [desc[0] for desc in cursor.description]
    df_sample = pd.DataFrame(sample, columns=col_names)

    numeric_cols = df_sample.select_dtypes(include=['number']).columns.tolist()
    string_cols = df_sample.select_dtypes(include=['object']).columns.tolist()
    datetime_cols = df_sample.select_dtypes(include=['datetime64', 'datetime']).columns.tolist()

    agg_expressions = []
    col_metric_pairs = []

    for col in col_names:
        if col in numeric_cols:
            metrics = ['COUNT', 'SUM', 'AVG', 'MIN', 'MAX']
            funcs = [
                f"COUNT({col})",
                f"SUM({col})",
                f"AVG({col})",
                f"MIN({col})",
                f"MAX({col})"
            ]
        elif col in string_cols:
            metrics = ['COUNT', 'DISTINCT_COUNT', 'GROUP_CONCAT']
            funcs = [
                f"COUNT({col})",
                f"COUNT(DISTINCT {col})",
                f"GROUP_CONCAT({col})"
            ]
        elif col in datetime_cols:
            metrics = ['COUNT', 'MIN', 'MAX']
            funcs = [
                f"COUNT({col})",
                f"MIN({col})",
                f"MAX({col})"
            ]
        else:
            metrics = ['COUNT']
            funcs = [f"COUNT({col})"]

        agg_expressions.extend(funcs)
        col_metric_pairs.extend([(col, m) for m in metrics])

    # Final SQL
    final_sql = f"SELECT {', '.join(agg_expressions)} FROM {table_name}"
    cursor.execute(final_sql)
    data = cursor.fetchone()
    cursor.close()

    summary_data = []
    for (col, metric), value in zip(col_metric_pairs, data):
        summary_data.append({
            'column_name': col,
            'metric_type': metric.lower(),
            'value': value
        })

    return pd.DataFrame(summary_data)


In [None]:
# Load all tables into dictionary of DataFrames

# spark , url,connection_properties = connect_and_load_tables(db_type, host, port, database, username, password, jar_path)
# print('Reached')
# dfs = create_df_from_tables(table_list,spark , url,connection_properties)
conn = connect_db(db_type,host,port,database,username,password)
print('Connected')
dfs = {}
dfs = print_tables(table_list,conn)
# print (dfs)
m_t = merge_tables(dfs,"outer")
print(m_t)
s_t =  SQL_join(conn,"inner")
print(s_t)
excel_writer = pd.ExcelWriter("aggregated_summary.xlsx", engine='openpyxl')
for i in table_list:
    dp = sql_aggregate_summary(conn,i)
    print(f"\nAggregated summary for table: {i}")
    print(dp)
    
    dp.to_csv(f"{i}.csv",index =False)
    dp.to_excel(excel_writer, sheet_name=i, index=False)
    dp["value"] = dp["value"].astype(str)
    dp.to_parquet(f"{i}.parquet", index=False)

excel_writer.close()



In [None]:
# 1) MERGE
# 2) AGGREGATE FUNCTION (SUM,COUNT)
# 3) MATPLOTLIB