In [None]:
from sqlalchemy import create_engine, types
import pandas as pd
import json

def process_chunk(chunk):
    # Load the 'metadata' column and convert JSON strings into a DataFrame
    if 'metadata' in chunk.columns:
        metadata_df = chunk['metadata'].apply(json.loads).apply(pd.Series)
        
        # Reorder columns based on the keys in the first metadata JSON object
        columns_order = list(json.loads(chunk['metadata'].iloc[0]).keys())
        
        # Handle the 'created_date' field if it exists
        if 'created_date' in metadata_df.columns:
            metadata_df['created_date'] = metadata_df['created_date'].apply(lambda x: x['$date'] if isinstance(x, dict) and '$date' in x else x)
        
        # Combine the original chunk with the expanded metadata DataFrame
        final_df = pd.concat([chunk.drop(columns=['metadata']), metadata_df], axis=1)
        
        # Reorder columns so that the metadata columns appear in the desired order
        final_df = final_df.reindex(columns=columns_order + final_df.columns.difference(columns_order).tolist())

        # Convert any dictionaries in the DataFrame to JSON strings
        for col in final_df.columns:
            if final_df[col].dtype == 'object':
                final_df[col] = final_df[col].apply(lambda x: json.dumps(x) if isinstance(x, dict) else x)
        
        return final_df
    else:
        return chunk

def load_csv_to_postgresql(path, engine, chunksize=1000000):
    i = 0
    loaded_rows = 0

    for chunk in pd.read_csv(path, chunksize=chunksize):
        processed_chunk = process_chunk(chunk)
        
        # Define column types for PostgreSQL
        dtype = {
            'timestamp': types.TIMESTAMP(),
            'state': types.INTEGER(),
            'address': types.VARCHAR(),
            'created_date': types.TIMESTAMP(),
            'downtime': types.BOOLEAN(),
            'duration': types.INTEGER(),
            'groups': types.VARCHAR(),  # JSON or TEXT depending on the structure
            'host_name': types.VARCHAR(),
            'id': types.VARCHAR(),
            'labels': types.VARCHAR(),  # JSON or TEXT depending on the structure
            'name': types.VARCHAR(),
        }

        # Append processed data to PostgreSQL table with specified data types
        processed_chunk.to_sql(name='host_states_cleaned', con=engine, if_exists='append', index=False, dtype=dtype)
        
        loaded_rows += len(processed_chunk)
        i += 1
        print(f"Processed and loaded chunk #{i} ({loaded_rows} rows) to database.")

if __name__ == "__main__":
    # Define your PostgreSQL connection string
    connection_string = 'postgresql://<username>:<password>@<host>:<port>/<dbname>'
    engine = create_engine(connection_string)
    
    # Path to your CSV file
    path = 'D:/datasets/host_states.csv'
    
    # Load data from CSV to PostgreSQL
    load_csv_to_postgresql(path, engine, chunksize=1000000)
