In [None]:
#creating table for csv data from mongodb to postgres

import pymongo
import psycopg2

# MongoDB connection
mongo_client = pymongo.MongoClient("mongodb://localhost:27023/")
mongo_db = mongo_client["Motor_collisions"]
mongo_collection = mongo_db["MC_Crashes"]

# PostgreSQL connection parameters
postgres_host = "localhost"
postgres_port = "5432"
postgres_user = "dap"
postgres_password = "dap"
postgres_dbname = "postgres"

# PostgreSQL connection
postgres_connection = psycopg2.connect(
    host=postgres_host,
    port=postgres_port,
    user=postgres_user,
    password=postgres_password,
    dbname=postgres_dbname
)
postgres_cursor = postgres_connection.cursor()

# Create PostgreSQL tables if not exists
postgres_cursor.execute("""
    CREATE TABLE IF NOT EXISTS crashes (
        _id TEXT,
        "CRASH DATE" TEXT,
        "CRASH TIME" TEXT,
        "BOROUGH" FLOAT,
        "ZIP CODE" FLOAT,
        "LATITUDE" FLOAT,
        "LONGITUDE" FLOAT,
        "LOCATION" FLOAT,
        "ON STREET NAME" TEXT,
        "CROSS STREET NAME" TEXT,
        "OFF STREET NAME" FLOAT,
        "NUMBER OF PERSONS INJURED" INT,
        "NUMBER OF PERSONS KILLED" INT,
        "NUMBER OF PEDESTRIANS INJURED" INT,
        "NUMBER OF PEDESTRIANS KILLED" INT,
        "NUMBER OF CYCLIST INJURED" INT,
        "NUMBER OF CYCLIST KILLED" INT,
        "NUMBER OF MOTORIST INJURED" INT,
        "NUMBER OF MOTORIST KILLED" INT,
        "CONTRIBUTING FACTOR VEHICLE 1" TEXT,
        "CONTRIBUTING FACTOR VEHICLE 2" TEXT,
        "CONTRIBUTING FACTOR VEHICLE 3" FLOAT,
        "CONTRIBUTING FACTOR VEHICLE 4" FLOAT,
        "CONTRIBUTING FACTOR VEHICLE 5" FLOAT,
        "COLLISION_ID" INT,
        "VEHICLE TYPE CODE 1" TEXT,
        "VEHICLE TYPE CODE 2" TEXT,
        "VEHICLE TYPE CODE 3" FLOAT,
        "VEHICLE TYPE CODE 4" FLOAT,
        "VEHICLE TYPE CODE 5" FLOAT
    )
""")
postgres_connection.commit()

# Insert data from MongoDB into PostgreSQL
batch_size = 1000
total_documents = mongo_collection.count_documents({})
for i in range(0, total_documents, batch_size):
    batch = list(mongo_collection.find().skip(i).limit(batch_size))
    batch_values = []
    for doc in batch:
        doc['_id'] = str(doc['_id'])  # Convert ObjectId to string
        batch_values.append(tuple(doc.values()))
    values_template = ','.join(['%s'] * len(batch_values[0]))
    insert_query = f"INSERT INTO crashes (_id, \"CRASH DATE\", \"CRASH TIME\", \"BOROUGH\", \"ZIP CODE\", \"LATITUDE\", \"LONGITUDE\", \"LOCATION\", \"ON STREET NAME\", \"CROSS STREET NAME\", \"OFF STREET NAME\", \"NUMBER OF PERSONS INJURED\", \"NUMBER OF PERSONS KILLED\", \"NUMBER OF PEDESTRIANS INJURED\", \"NUMBER OF PEDESTRIANS KILLED\", \"NUMBER OF CYCLIST INJURED\", \"NUMBER OF CYCLIST KILLED\", \"NUMBER OF MOTORIST INJURED\", \"NUMBER OF MOTORIST KILLED\", \"CONTRIBUTING FACTOR VEHICLE 1\", \"CONTRIBUTING FACTOR VEHICLE 2\", \"CONTRIBUTING FACTOR VEHICLE 3\", \"CONTRIBUTING FACTOR VEHICLE 4\", \"CONTRIBUTING FACTOR VEHICLE 5\", \"COLLISION_ID\", \"VEHICLE TYPE CODE 1\", \"VEHICLE TYPE CODE 2\", \"VEHICLE TYPE CODE 3\", \"VEHICLE TYPE CODE 4\", \"VEHICLE TYPE CODE 5\") VALUES ({values_template})"
    try:
        postgres_cursor.executemany(insert_query, batch_values)
        postgres_connection.commit()
        print(f"Processed {i + len(batch)} out of {total_documents} documents.")
    except Exception as e:
        print(f"Error inserting documents into PostgreSQL table 'crashes': {e}")

# Close connections
mongo_client.close()
postgres_cursor.close()
postgres_connection.close()


In [None]:
#loading json data form mongodb to postgres

import pymongo
import psycopg2
import json

# MongoDB connection parameters
mongo_host = "localhost"
mongo_port = 27023
mongo_dbname = "Motor_collisions"
mongo_collection_name = "MC_People"  # Change this to your actual collection name

# PostgreSQL connection parameters
postgres_host = "localhost"
postgres_port = "5432"
postgres_user = "dap"
postgres_password = "dap"
postgres_dbname = "postgres"

# Connect to MongoDB
mongo_client = pymongo.MongoClient(host=mongo_host, port=mongo_port)
mongo_db = mongo_client[mongo_dbname]
mongo_collection = mongo_db[mongo_collection_name]

# Connect to PostgreSQL
postgres_connection = psycopg2.connect(
    host=postgres_host,
    port=postgres_port,
    user=postgres_user,
    password=postgres_password,
    dbname=postgres_dbname
)
postgres_cursor = postgres_connection.cursor()

# Define the name for the PostgreSQL table
postgres_table_name = "person"  # Change this to your desired table name

# Extract data from MongoDB collection and load into PostgreSQL table
num_documents = 0
for doc in mongo_collection.find():
    num_documents += 1
    
    # Extract specific fields from the MongoDB document
    _id = str(doc.get("_id"))  # Convert ObjectId to string
    
    # Convert nested dictionaries to strings
    data = json.dumps(doc.get("data", {}))
    
    # Create INSERT statement
    insert_query = f"INSERT INTO {postgres_table_name} (_id, data) VALUES (%s, %s)"
    
    try:
        # Execute the INSERT statement
        postgres_cursor.execute(insert_query, (_id, data))
        postgres_connection.commit()
        print(f"Data inserted into PostgreSQL table. Document {num_documents}")
    except Exception as e:
        postgres_connection.rollback()
        print(f"Error inserting data into PostgreSQL table: {e}")

# Close connections
mongo_client.close()
postgres_cursor.close()
postgres_connection.close()

In [4]:
# checking data info on postgres
import psycopg2

# PostgreSQL connection parameters
postgres_host = "localhost"
postgres_port = "5432"
postgres_user = "dap"
postgres_password = "dap"
postgres_dbname = "postgres"

def get_database_info():
    try:
        # Connect to PostgreSQL
        conn = psycopg2.connect(
            host=postgres_host,
            port=postgres_port,
            user=postgres_user,
            password=postgres_password,
            dbname=postgres_dbname
        )
        cursor = conn.cursor()

        # Get list of databases
        cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
        databases = [row[0] for row in cursor.fetchall()]
        print("Databases:")
        for db in databases:
            print("-", db)

        # Get list of tables in 'postgres' database
        cursor.execute("""
            SELECT table_name 
            FROM information_schema.tables 
            WHERE table_schema = 'public' 
            ORDER BY table_name;
        """)
        tables = [row[0] for row in cursor.fetchall()]
        print("\nTables in 'postgres' database:")
        for table in tables:
            cursor.execute(f"SELECT COUNT(*) FROM {table};")
            row_count = cursor.fetchone()[0]
            print(f"- {table}: {row_count} rows")

        # Close cursor and connection
        cursor.close()
        conn.close()
    except psycopg2.Error as e:
        print(f"Error getting database info: {e}")

# Call the function to get database info
get_database_info()


Databases:
- postgres

Tables in 'postgres' database:
- crashes: 2040647 rows
- mc_people: 35 rows
- mc_vehicles: 0 rows
- people: 0 rows
- person: 10 rows
- vehicles: 0 rows
