In [1]:
import psycopg2
import pandas as pd
import csv
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from psycopg2 import errors
from pathlib import Path
import os

from dotenv import load_dotenv


load_dotenv()

PORT = os.getenv('PORT')
HOST = os.getenv('HOST')
DATABASE = os.getenv('DATABASE')
PASSWORD = os.getenv('PASSWORD')
USER  = os.getenv('USER')


###  Section TASK 5.1

##### Setup DB connection



First step will be setting up connection with PostgreSQL database via psycopg2 library

In [2]:
def connect_to_database():
    """
    Establish a connection to a PostgreSQL database using psycopg2.

    Returns:
    conn_psycopg2: A connection object to the PostgreSQL database if the connection is successful.
                   None if the connection is not successful.
    """
    # Define database connection parameters
    db_params = {
        'host': HOST,
        'port': PORT,
        'database': DATABASE,
        'user': USER,
        'password': PASSWORD
    }

    try:
        # Attempt to establish a connection to the database
        conn_psycopg2 = psycopg2.connect(**db_params)
        print('Connection was successful!')
        return conn_psycopg2
    except Exception as e:
        # Handle any exceptions that occur during the connection attempt
        print('Connection was not successful!')
        print(e)
        return None


In [3]:
def create_cursor(connection):
    """
    Create a cursor object using the provided database connection.

    Parameters:
    connection: The connection object to the database.

    Returns:
    cursor: A cursor object that can be used to execute database queries.
    """
    return connection.cursor()

def close_cursor(cursor):
    """
    Close the provided cursor object.

    Parameters:
    cursor: The cursor object to close.

    Returns:
    None
    """
    cursor.close()

def close_connection(connection):
    """
    Close the provided database connection.

    Parameters:
    connection: The connection object to close.

    Returns:
    None
    """
    connection.close()


Init db connection function will be a decorator to simplify performing various queries to the database

In [4]:
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def init_database_connection(func):
    """
    A decorator to handle the database connection and cursor for a function.
    
    This decorator establishes a database connection, creates a cursor, executes
    the decorated function, and then closes the cursor and connection. If the 
    decorated function's kwargs include 'commit' set to True, the transaction is committed.

    Parameters:
    func: The function to be decorated. The function should take a cursor as its first parameter.

    Returns:
    wrapper: The wrapped function with database connection and cursor management.
    """
    
    def wrapper(*args, **kwargs):
        logger.info("Establishing database connection...")
        # Establish database connection
        connection = connect_to_database()

        logger.info("Creating cursor...")
        # Create a cursor from the connection
        cursor = create_cursor(connection)
        
        try:
            logger.info("Executing the decorated function...")
            # Execute the decorated function
            res = func(cursor, *args, **kwargs)

            # Commit the transaction if 'commit' is set to True in kwargs
            if kwargs.get('commit', False):
                logger.info("Committing the transaction...")
                connection.commit()
        except Exception as e:
            # If an error occurs, rollback the transaction if 'commit' is set to True
            if kwargs.get('commit', False):
                logger.error("An error occurred, rolling back the transaction...")
                connection.rollback()
            logger.error(f"An error occurred: {e}")
            res = None
        finally:
            logger.info("Closing cursor and connection...")
            # Close the cursor and the connection
            close_cursor(cursor)
            close_connection(connection)
        
        return res

    return wrapper



###  END Section TASK 5.1

### Section TASK 5.2

Make query function will be reponsible for connection do database with decorator and fetching results. Moreover, any issues while query will be caught with try expect syntax

In [5]:
@init_database_connection
def make_query(cursor, query, show_results=False, commit=False):
    """
    Execute a database query using the provided cursor.

    This function executes a given SQL query. It can optionally fetch and 
    display results or commit the transaction based on the parameters.

    Parameters:
    cursor: The database cursor used to execute the query.
    query (str): The SQL query to be executed.
    show_results (bool): If True, fetch and return the query results. Default is False.
    commit (bool): If True, commit the transaction. Default is False.

    Returns:
    list or str: Returns the query results if show_results is True, otherwise returns 'Query succeeded'.
    """

    try:
        # Execute the provided query
        cursor.execute(query)

    except errors.DuplicateTable as e:
        # Handle the specific error for duplicate table creation
        print(e)
        print('The table already exists but since this is a View creation it is allowed')

    except Exception as e:
        # Handle any other exceptions that may occur
        print(e)
        return

    finally:
        # If show_results is True, fetch and return the query results
        if show_results:
            rows = cursor.fetchall()
            return rows
        
        return 'Query succeeded'


Get column names will query all column names from database table

In [6]:

def get_column_names(table_name):
        q = f'''
    SELECT COLUMN_NAME
    FROM information_schema.columns
    WHERE table_schema ='group6_warehouse'
    AND table_name ='{table_name}'
    ORDER BY ordinal_position
    '''
        return np.array(make_query(q , show_results=True)).flatten()
    

Drop all the views to drop existing views, especially  if the script is rerun due to data changes


In [7]:
def drop_views():
    """
    Drop specific views from the group6_warehouse schema.

    This function executes SQL queries to drop the following views:
    - safe_driving
    - wind
    - precipitation
    - temperature
    - accident_data_17_23
    """

    # Drop the view 'safe_driving' in the group6_warehouse schema
    make_query('''
    DROP VIEW group6_warehouse.safe_driving
    ''', show_results=False, commit=True)

    # Drop the view 'wind' in the group6_warehouse schema
    make_query('''
    DROP VIEW group6_warehouse.wind
    ''', show_results=False, commit=True)

    # Drop the view 'precipitation' in the group6_warehouse schema
    make_query('''
    DROP VIEW group6_warehouse.precipitation
    ''', show_results=False, commit=True)

    # Drop the view 'temperature' in the group6_warehouse schema
    make_query('''
    DROP VIEW group6_warehouse.temperature
    ''', show_results=False, commit=True)

    # Drop the view 'accident_data_17_23' in the group6_warehouse schema
    make_query('''
    DROP VIEW group6_warehouse.accident_data_17_23
    ''', show_results=False, commit=True)


In [8]:
drop_views()

INFO: Establishing database connection...
INFO: Creating cursor...
INFO: Executing the decorated function...
INFO: Committing the transaction...
INFO: Closing cursor and connection...
INFO: Establishing database connection...
INFO: Creating cursor...
INFO: Executing the decorated function...
INFO: Committing the transaction...
INFO: Closing cursor and connection...
INFO: Establishing database connection...
INFO: Creating cursor...
INFO: Executing the decorated function...
INFO: Committing the transaction...
INFO: Closing cursor and connection...
INFO: Establishing database connection...
INFO: Creating cursor...
INFO: Executing the decorated function...
INFO: Committing the transaction...
INFO: Closing cursor and connection...
INFO: Establishing database connection...
INFO: Creating cursor...
INFO: Executing the decorated function...
INFO: Committing the transaction...
INFO: Closing cursor and connection...


Connection was successful!
Connection was successful!
Connection was successful!
Connection was successful!
Connection was successful!


Next step is to  create views to store date on team's warehouse with columns required for the project


In [9]:
make_query('''
CREATE VIEW group6_warehouse.safe_driving AS
SELECT *
FROM data_lake.safe_driving

''' , show_results  = False, commit = True)

Connection was successful!


'Query succeeded'

In [10]:
make_query('''
CREATE VIEW group6_warehouse.accident_data_17_23 AS
SELECT *
FROM data_lake.accident_data_17_23

''' , show_results  = False, commit = True)

Connection was successful!


'Query succeeded'

In [11]:
make_query('''
CREATE VIEW group6_warehouse.precipitation AS
SELECT DTG,RI_PWS_10
FROM data_lake.precipitation

''' , show_results  = False, commit = True)

Connection was successful!


'Query succeeded'

In [12]:
make_query('''
CREATE VIEW group6_warehouse.temperature AS
SELECT DTG,T_DRYB_10
FROM data_lake.temperature;

''' , show_results  = False, commit = True)

Connection was successful!


'Query succeeded'

In [13]:
make_query('''
CREATE VIEW group6_warehouse.wind AS
SELECT DTG,FF_SENSOR_10
FROM data_lake.wind;

''' , show_results  = False, commit = True)

Connection was successful!


'Query succeeded'

In [14]:
def load_sql_to_df(table_name):
    """
    Load data from a SQL table into a DataFrame and save it as a CSV file.

    This function performs the following steps:
    1. Retrieves the column names of the specified table.
    2. Executes a SQL query to fetch all data from the table.
    3. Loads the fetched data into a Pandas DataFrame.
    4. Saves the DataFrame as a CSV file in the './data/' directory.
    
    Parameters:
    table_name (str): The name of the SQL table to fetch data from.
    
    Returns:
    DataFrame: A Pandas DataFrame containing the data from the SQL table.
    """

    # Get the column names of the specified table
    col_names = get_column_names(table_name)

    # Define the SQL query to fetch all data from the table
    fetch_query = f'''
    SELECT * FROM group6_warehouse.{table_name};
    '''

    # Execute the SQL query and fetch the result
    result = make_query(fetch_query, show_results=True)

    # Load the fetched data into a Pandas DataFrame with the specified column names
    df = pd.DataFrame(columns=col_names.tolist(), data=result)

    # Check if the './data' directory exists, create it if it doesn't
    if not Path('./data').exists():
        os.mkdir('./data/')

    # Save the DataFrame as a CSV file in the './data/' directory
    df.to_csv(f'./data/{table_name}.csv', index=False)

    return df


This function will enable to load SQL views into dataframes stored only on the server

In [15]:
load_sql_to_df('safe_driving')

Connection was successful!
Connection was successful!


Unnamed: 0,eventid,event_start,event_end,duration_seconds,latitude,longitude,speed_kmh,end_speed_kmh,maxwaarde,category,incident_severity,is_valid,road_segment_id,road_manager_type,road_number,road_name,place_name,municipality_name,road_manager_name
0,50517339,2020-07-20 13:14:16.000,2020-07-20 13:14:22.000,6.0,51.622100,4.734313,41.842945,41.842945,46.670975,SPEED,SP1,True,219207053,G,,Overkroetenlaan,Breda,Breda,Breda
1,50310008,2020-07-20 14:51:39.900,2020-07-20 14:51:42.700,2.8,51.601600,4.801472,49.889664,33.796223,1.055730,HARSH CORNERING,HC1,True,600893097,G,,Nieuwe Kadijk,Breda,Breda,Breda
2,51284900,2020-07-20 20:38:30.000,2020-07-20 20:38:54.000,24.0,51.566757,4.794105,61.740105,64.800100,87.120150,SPEED,SP1,True,600388711,G,,Allerheiligenweg,Breda,Breda,Breda
3,50905531,2020-07-20 12:25:30.000,2020-07-20 12:25:37.000,7.0,51.617340,4.771865,64.373760,64.373760,70.811134,SPEED,SP1,True,600919775,G,,Terheijdenseweg,Breda,Breda,Breda
4,51182014,2020-07-20 18:34:58.700,2020-07-20 18:34:59.900,1.2,51.592342,4.770381,33.336056,22.212038,0.680752,HARSH CORNERING,HC1,True,601074788,G,,Nieuwe Prinsenkade,Breda,Breda,Breda
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
964483,51230580,2020-07-20 09:21:05.600,2020-07-20 09:21:06.400,0.8,51.519180,4.748960,38.916065,22.680038,1.113666,BRAKING,HB1,True,221184013,G,,Rijsbergsebaan,Breda,Breda,Breda
964484,50457415,2020-07-20 11:47:02.000,2020-07-20 11:47:02.700,0.7,51.601640,4.789253,32.186880,11.265408,0.804672,BRAKING,HB1,True,600753178,G,,Nieuwe Kadijk,Breda,Breda,Breda
964485,50828404,2020-07-20 06:55:10.000,2020-07-20 06:55:16.000,6.0,51.580240,4.786616,41.842945,41.842945,45.061630,SPEED,SP1,True,226198063,G,,Generaal Maczekstraat,Breda,Breda,Breda
964486,51212972,2020-07-20 13:49:16.000,2020-07-20 13:49:22.000,6.0,51.592720,4.830570,62.406105,62.107304,64.270905,SPEED,SP1,True,232201079,G,,Tilburgseweg,Breda,Breda,Breda


In [16]:
load_sql_to_df('precipitation')

Connection was successful!
Connection was successful!


Unnamed: 0,dtg,ri_pws_10
0,2011-12-28 10:10:00,0.0
1,2011-12-28 10:20:00,0.0
2,2011-12-28 10:30:00,0.0
3,2011-12-28 10:40:00,0.0
4,2011-12-28 10:50:00,0.0
...,...,...
1098427,2011-12-28 09:20:00,0.0
1098428,2011-12-28 09:30:00,0.0
1098429,2011-12-28 09:40:00,0.0
1098430,2011-12-28 09:50:00,0.0


In [17]:
load_sql_to_df('wind')

Connection was successful!
Connection was successful!


Unnamed: 0,dtg,ff_sensor_10
0,2013-04-22 17:50:00,6.66
1,2013-04-22 18:00:00,6.99
2,2013-04-22 18:10:00,5.73
3,2013-04-22 18:20:00,5.44
4,2013-04-22 18:30:00,5.28
...,...,...
1098832,2013-04-22 17:00:00,6.70
1098833,2013-04-22 17:10:00,6.31
1098834,2013-04-22 17:20:00,6.66
1098835,2013-04-22 17:30:00,7.05


In [18]:
load_sql_to_df('temperature')

Connection was successful!
Connection was successful!


Unnamed: 0,dtg,t_dryb_10
0,2017-01-03 03:20:00,1.6
1,2017-01-03 03:30:00,1.5
2,2017-01-03 03:40:00,1.6
3,2017-01-03 03:50:00,1.8
4,2017-01-03 04:00:00,1.7
...,...,...
954283,2017-01-03 02:30:00,1.3
954284,2017-01-03 02:40:00,1.3
954285,2017-01-03 02:50:00,1.3
954286,2017-01-03 03:00:00,1.4


In [19]:
load_sql_to_df('accident_data_17_23')

Connection was successful!
Connection was successful!


Unnamed: 0,Year,Accident severity,municipality,town,First Mode of Transport,Second mode of Transport,Area Type,Light condition,Road Location,Road condition,Road surface,Road situation,Speed limit,street,weather,accidents
0,2017,Fatal,Breda,BREDA,Car,Pedestrian,Urban area,Darkness,Intersection,Wet/damp,Brick,Bend,30 km/h,Valkeniersplein,Rain,1
1,2017,Fatal,Breda,BREDA,Lorry,Other,Urban area,Daylight,Intersection,Wet/damp,Brick,Intersection - 4 arms,50 km/h,Markendaalseweg,Dry,1
2,2017,Fatal,Breda,BREDA,Lorry,Other,Urban area,Daylight,Road section,Dry,Asphalt (other),Straight road,50 km/h,Academiesingel,Dry,1
3,2017,Injured,Breda,BAVEL,Car,Lorry,Rural area,Darkness,Road section,Wet/damp,Asphalt (other),Bend,120 km/h,KP ST.ANNABOSCH,Dry,1
4,2017,Injured,Breda,BAVEL,Car,Other,Rural area,Darkness,Road section,Wet/damp,Porous asphalt,Straight road,130 km/h,RYKSWG,Rain,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6897,2023,Fatal,Breda,BREDA,Car,Moped,Urban area,Daylight,Road section,Dry,Asphalt (other),Straight road,50 km/h,Terheijdenseweg,Dry,1
6898,2023,Fatal,Breda,BREDA,Car,Car,Urban area,Darkness,Intersection,Dry,Asphalt (other),Intersection - 4 arms,70 km/h,Rijsbergseweg,Dry,1
6899,2023,Fatal,Breda,BREDA,Other,Other,Rural area,Daylight,Road section,Wet/damp,Porous asphalt,Straight road,100 km/h,RYKSWG,Dry,1
6900,2023,Fatal,Breda,PRINSENBEEK,Car,Car,Rural area,Darkness,Road section,Dry,Porous asphalt,Straight road,130 km/h,RYKSWG,Dry,1


###  END Section TASK 5.2

Data is exported to csv files