# Database Operations

### Importing necessary statements

##### Install psycopg2-binary

* 'psycopg2-binary' helps you avoid the need to compile 'psycopg2' from source.
*  Run: `pip install psycopg2-binary` in your terminal/command prompt.


In [1]:
pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


In [2]:
#This imports the pandas library, a popular tool for data manipulation and analysis in Python.
import pandas as pd

#Imports the psycopg2 library, which provides the functionality to connect and interact with PostgreSQL databases.
import psycopg2 

#It helps create dynamic SQL queries in a safe way by avoiding SQL injection, ensuring safer parameterized query execution.
from psycopg2 import sql

### Input File paths

In [3]:
leads_cleaned_data_path = '../Inputs/leads_cleaned_data.xlsx'
leads_in_review_data_path = '../Inputs/leads_in_review_data.xlsx'

###  Clean DataFrame column names for SQL compatibility

In [4]:
# Function to clean column names (rename for SQL compatibility)
def clean_column_names(df):
    df.columns = df.columns.str.replace(' ', '_')  # Replacing spaces with underscores
    return df

<span style = "color:blue;font-size:14px" > [Note]There are some columns like 'country code' and 'cleaned phone number' where the function removes the spaces between words to make them compatible with SQL. This ensures that the column names are SQL-friendly by replacing spaces with underscores. </span>

### Function to load data and insert it into PostgreSQL

In [5]:
# Function to load data and insert it into PostgreSQL
def insert_data(cursor, conn, df, table_name):
    # The unique_id column is already present in the DataFrame
    columns = df.columns.tolist()  # Get the column names from the DataFrame

    # Create SQL insert query dynamically based on the table name and column names
    insert_query = sql.SQL('INSERT INTO {} ({}) VALUES ({}) ON CONFLICT (unique_id) DO NOTHING').format(
        sql.Identifier(table_name),
        sql.SQL(', ').join(map(sql.Identifier, columns)),
        sql.SQL(', ').join(sql.Placeholder() * len(columns))
    )

    # Insert the data from the DataFrame into the table
    cursor.executemany(insert_query.as_string(cursor), df.values.tolist())
    conn.commit()
    print(f"Data inserted into {table_name}")

### Function for tests

In [6]:
# Function to test PostgreSQL connection, create tables, and insert data
def test_postgresql_connection(host, port, database, user, password):
    conn = None
    try:
        # Establish the connection
        conn = psycopg2.connect(
            host=host,
            port=port,
            dbname=database,
            user=user,
            password=password
        )
        cursor = conn.cursor()

        # Execute a simple query
        cursor.execute("SELECT version();")
        version = cursor.fetchone()
        print(f"Connected to PostgreSQL. Server version: {version[0]}")

        # Get additional connection information
        cursor.execute("SELECT current_database();")
        db_name = cursor.fetchone()
        cursor.execute("SELECT current_user;")
        user_name = cursor.fetchone()
        print(f"Successfully connected to {db_name[0]}")
        print(f"Database: {db_name[0]}")
        print(f"User: {user_name[0]}")

        # Create the 'leads_cleaned' table
        create_cleaned_table_query = """
        CREATE TABLE IF NOT EXISTS leads_cleaned (
            id SERIAL PRIMARY KEY,
            firma VARCHAR(500),  
            street VARCHAR(500), 
            plz VARCHAR(20),     
            city VARCHAR(500),   
            telefon VARCHAR(100),  
            country VARCHAR(50),
            country_code VARCHAR(20), 
            cleaned_phone_number VARCHAR(100),  
            flag VARCHAR(50),
            salutation VARCHAR(100),
            first_name VARCHAR(100),
            surname VARCHAR(100),
            digit_length DOUBLE PRECISION,   
            firma_length DOUBLE PRECISION,
            unique_id VARCHAR(100) UNIQUE  -- Add unique_id column with UNIQUE constraint
        );
        """
        cursor.execute(create_cleaned_table_query)
        print("Table 'leads_cleaned' created or already exists.")

        # Create the 'leads_in_review' table
        create_review_table_query = """
        CREATE TABLE IF NOT EXISTS leads_in_review (
            id SERIAL PRIMARY KEY,
            firma VARCHAR(500),
            street VARCHAR(500),
            plz VARCHAR(20),
            city VARCHAR(500),
            telefon VARCHAR(100),
            country VARCHAR(50),
            country_code VARCHAR(20),
            cleaned_phone_number VARCHAR(100),
            flag VARCHAR(50),
            salutation VARCHAR(100),
            first_name VARCHAR(100),
            surname VARCHAR(100),
            digit_length DOUBLE PRECISION,   
            firma_length DOUBLE PRECISION,
            unique_id VARCHAR(100) UNIQUE  -- Add unique_id column with UNIQUE constraint
        );
        """
        cursor.execute(create_review_table_query)
        print("Table 'leads_in_review' created or already exists.")

        # Commit the transaction to save changes
        conn.commit()

        # load the data from and insert it into the tables
        # Load and clean data for leads_cleaned
        leads_cleaned_data = clean_column_names(pd.read_excel(leads_cleaned_data_path))
        leads_in_review_data = clean_column_names(pd.read_excel(leads_in_review_data_path))

        # Insert the data into both tables
        insert_data(cursor, conn, leads_cleaned_data, 'leads_cleaned')
        insert_data(cursor, conn, leads_in_review_data, 'leads_in_review')

    except Exception as e:
        print(f"Error while connecting to PostgreSQL: {e}")

    finally:
        if conn:
            cursor.close()
            conn.close()
            print("PostgreSQL connection is closed")


# Database credentials
admin_host = "data-mgmt-dev-movido-c44b.l.aivencloud.com"
admin_port = 25680
admin_database = "bc56a305-2e83-465b-915e-1a243ff67b41"
target_user = "baghirli_exam"
target_user_password = "AVNS_6BV84RAeLbUKqUO7llr"

# Test the connection, create tables, and insert data
test_postgresql_connection(admin_host, admin_port, admin_database, target_user, target_user_password)

Connected to PostgreSQL. Server version: PostgreSQL 16.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 13.3.1 20240522 (Red Hat 13.3.1-1), 64-bit
Successfully connected to bc56a305-2e83-465b-915e-1a243ff67b41
Database: bc56a305-2e83-465b-915e-1a243ff67b41
User: baghirli_exam
Table 'leads_cleaned' created or already exists.
Table 'leads_in_review' created or already exists.
Data inserted into leads_cleaned
Data inserted into leads_in_review
PostgreSQL connection is closed


In [7]:
def run_query_with_pandas(host, port, database, user, password, query):
    try:
        # Connect to PostgreSQL
        conn = psycopg2.connect(
            host=host,
            port=port,
            dbname=database,
            user=user,
            password=password
        )
        cursor = conn.cursor()

        # Execute the query
        cursor.execute(query)

        # Fetch all rows
        results = cursor.fetchall()

        # Get column names
        colnames = [desc[0] for desc in cursor.description]

        # Create a DataFrame to display the results
        df = pd.DataFrame(results, columns=colnames)
        print(df)

    except Exception as e:
        print(f"Error while running the query: {e}")

    finally:
        if conn:
            cursor.close()
            conn.close()
            print("PostgreSQL connection is closed")

### Some Queries

In [8]:
# Queries for leads_cleaned

# Query 1: Select all data from leads_cleaned
query_1_cleaned = "SELECT * FROM leads_cleaned LIMIT 10;"  # Limiting to 10 for display

# Query 2: Count the number of rows in leads_cleaned
query_2_cleaned = "SELECT COUNT(*) AS total_rows FROM leads_cleaned;"

 

# Queries for leads_in_review

# Query 1: Select all data from leads_in_review
query_1_review = "SELECT * FROM leads_in_review LIMIT 10;"  # Limiting to 10 for display

# Query 2: Count the number of rows in leads_in_review
query_2_review = "SELECT COUNT(*) AS total_rows FROM leads_in_review;"



# Run queries for leads_cleaned
print("Running queries for leads_cleaned table:\n")
run_query_with_pandas(admin_host, admin_port, admin_database, target_user, target_user_password, query_1_cleaned)
run_query_with_pandas(admin_host, admin_port, admin_database, target_user, target_user_password, query_2_cleaned)


# Run queries for leads_in_review
print("\nRunning queries for leads_in_review table:\n")
run_query_with_pandas(admin_host, admin_port, admin_database, target_user, target_user_password, query_1_review)
run_query_with_pandas(admin_host, admin_port, admin_database, target_user, target_user_password, query_2_review)


Running queries for leads_cleaned table:

   id                                              firma  \
0   1                            Abschleppdienst Arnolds   
1   2                                      AAS-Fink GmbH   
2   3                          Allfolia Deutschland GmbH   
3   4                              Autohaus Schmohl GmbH   
4   5           Autohaus Stürmeyer, Inh. Juri Düsterhoft   
5   6       Autohaus Thomas Thies, Inhaber: Thomas Thies   
6   7                              Autohaus Musberg GmbH   
7   8                        Klingler Fahrzeugtechnik AG   
8   9  König Wilhelm Autoteile - Einbrodt & Schubert GbR   
9  10                             Autoverwertung Kabashi   

                  street    plz                     city  \
0  Völlesbruchstrasse 19  52152                Simmerath   
1            Morsbach 39  42857                Remscheid   
2            Morsbach 39  42857                Remscheid   
3     Potsdamer Str. 175  14469                  Potsdam 

*For the integration of my datasets (cleaned_data and bad_data) into a PostgreSQL database, I followed a structured approach. First, I ensured that both datasets were cleaned and properly structured, then I added a unique_id column to each dataset to uniquely identify each row during the database insertion process, preventing conflicts or duplication.

*One of the main challenges I encountered was the issue of data being overwritten every time I ran the insertion code. Initially, using an auto-incrementing id as the primary key did not prevent duplicate rows, as the id was generated for each new row, even if the data was identical. This led to redundant data being inserted into the tables. Additionally, I faced an issue with column value lengths, which resulted in errors, so I increased the VARCHAR length for certain columns (such as firma and street) to accommodate larger values.

*To resolve these issues, I dropped the tables and recreated them with an extended VARCHAR length and a unique_id column to uniquely identify rows. I applied a UNIQUE constraint to the unique_id column (this is done in Data Cleaning file) and used the ON CONFLICT clause in my insertion queries, ensuring that rows with the same unique_id would not be inserted multiple times.

*Finally, I successfully inserted both datasets (cleaned_data into the leads_cleaned table and bad_data into the leads_in_review table) without any duplication or data conflicts. This approach maintained data integrity and ensured efficient data insertion.