# Environment Data, Querying and Visualization of DB

Note: This demo/portfolio utilizes notebooks as a way to better demostrate what I'm doing, and what each of my actions is doing. In reality, Notebooks would only be used for data visualization, whereas all Querying, Data Mutation, or Data Cleaning would not utilize this method.


In [44]:
import os
import datetime

import pandas as pd
import panel as pn
import hvplot.pandas
import param

import mysql.connector
import mysql.connector.errorcode as errorcode
from typing import Dict, List, Union
from mysql.connector.types import RowType, RowItemType

# Database configuration
from DB_Constants import USR_CONFIG, DATABASE_NAMES, TABLE_NAMES, ENVIRON_TABLE_LOCATION_MAPPING, TABLE_TEMPLATES

pn.extension(comms='ipywidgets')

In [45]:
# Init and Maintain Connection 
class SqlConnection(param.Parameterized):
    
    def __init__(self, user_config):
        try: 
            self.cnx = mysql.connector.connect(**user_config)
            self.cursor = self.cnx.cursor()
        except mysql.connector.Error as err:
            print(f"Error: {err}")
            self.Error= err
        self.Error = None
        
    def close(self):
        self.cursor.close()
        self.cnx.close()
        
    def execute_query(self, query: str) -> bool:
        """executes a query on MySQL DB. 

        Args:
            query (str): an string literal of the SQL command

        Returns:
            bool: True if query suceeded, False if it failed. 
        """
        try: 
            self.cursor.execute(query)
        except mysql.connector.Error as err:
            self.error = err
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("SqlConnection.execute_query: Access denied. Check your username or password.")
                return False
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("SqlConnection.execute_query: Database does not exist.")
                return False
            else:
                print(f"SqlConnection.execute_query: {err}")
                return False
        return True 
    
    def get_all_query_results(self) -> List[Union[RowType, Dict[str, RowItemType]]]: 
        return self.cursor.fetchall()
    
    def set_db(self, db_name) -> bool:
        return self.execute_query(f"USE {db_name};")

    def get_last_error(self) -> mysql.connector.Error:
        return self.error        

In [46]:
def create_table(db_connection, table_name, table_schema):
    print(f"Creating table {table_name}: ", end='')
    print(table_schema.format(tb_name=table_name))
    db_connection.execute_query(table_schema.format(tb_name=table_name))
    print("OK")


def create_database(db_connection, db_name):
    try:
        print(f"Attempting to create DB named {db_name}")
        db_connection.cursor.execute(f"CREATE DATABASE {db_name} DEFAULT CHARACTER SET 'utf8mb4'")
        print(f"Created DB named {db_name}")
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_DB_CREATE_EXISTS:
            print(f"Database {db_name} already exists.")
        else:
            print(f"Failed creating database: {err}")
            exit(1)


def create_environ_db(db_connection):
    db_create_query = f"SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA;"
    
    try:
        print("Attempting to create new DB in MySQL") 
        db_connection.execute_query(db_create_query)
        results = db_connection.get_all_query_results()
        if DATABASE_NAMES['environ'] not in results:
            create_database(db_connection, db_name=DATABASE_NAMES['environ'])
        print("Attempt to create new DB in MySQL suceeded")
    except mysql.connector.Error as err: 
        print(err)

    db_connection.set_db(DATABASE_NAMES['environ'])

    for t_name, t_schema in TABLE_TEMPLATES.items():
        create_table(db_connection, table_name=t_name, table_schema=t_schema)

    print(f"Database {DATABASE_NAMES['environ']} is ready!")


def verify_environ_db_tables(db_connection):
    # MySQL connection configuration
    try:
        # List all tables
        db_connection.execute_query("SHOW TABLES;")
        table_names = db_connection.get_all_query_results()
        print(f"\nTables in `{DATABASE_NAMES['environ']}`:")
        for name in table_names:
            print(f"Table Name: {name}")

        # Verify structure of the specific table
        print(f"\nStructure of `{TABLE_NAMES[0]}`:")
        db_connection.execute_query(f"DESCRIBE `{TABLE_NAMES[0]}`;")
        rows = db_connection.get_all_query_results
        for row in rows:
            print(row)

    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Access denied. Check your username or password.")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist.")
        else:
            print(err)

In [47]:
def alter_table_with_location_id(db_connection):
    # Alter the table to add a location_id column
    alter_query = f"""
        ALTER TABLE {TABLE_NAMES[0]}
        ADD COLUMN location_id VARCHAR(50) AFTER dew_point_F;
        """

    find_all_column_names_query = f"""
        SELECT column_name FROM information_schema.columns
        WHERE table_schema = '{DATABASE_NAMES['environ']}'
        AND table_name = '{TABLE_NAMES[0]}';
    """
    db_connection.execute_query(find_all_column_names_query)
    col_names = db_connection.get_all_query_results()
    location_col_added = False

    for (name) in col_names:
        if "location_id" in name:
            location_col_added = True

    if location_col_added == False:
        db_connection.execute_query(alter_query)
        print("Table altered successfully to include 'location_id' column.")
    else:
        print("Table already altered to include 'location_id' column.")


def extract_location_id_from_filename(filename):
    # Extract location ID from the filename (assuming it's a numeric fragment)
    for i, location_id in enumerate(ENVIRON_TABLE_LOCATION_MAPPING):
        if location_id in filename:
            return location_id
    return None  # Return None if no match is found


def load_csv_to_mysql(db_connection, directory):
    # Loop through all CSV files in the given directory
    for file_name in os.listdir(directory):
        if file_name.endswith('.csv'):
            file_path = os.path.join(directory, file_name)

            # Read the CSV file into a Pandas DataFrame
            print(f"Reading {file_name}: To add to MySQL DB")
            df = pd.read_csv(file_path)

            # Extract location ID from the filename
            location_id = extract_location_id_from_filename(file_name)
            if location_id is None:
                print(f"Skipping {file_name}: No matching location ID found.")
                continue

            # Insert data into the MySQL table
            for _, row in df.iterrows():
                insert_query = f"""
                    INSERT INTO `{TABLE_NAMES[0]}` (entry_no, entry_datetime, temp_F, rh_percent, dew_point_F, location_id)
                    VALUES (%s, %s, %s, %s, %s, %s)
                    """
                # ['#', 'Date-Time (CDT)', 'Temperature (°F) ', 'RH (%) ', 'Dew Point (°F) '] "%Y-%m-%d %H:%M:%S"
                # Parse and format the date-time field
                formatted_date = datetime.datetime.strptime(
                    row['Date-Time (CDT)'], "%m/%d/%Y %H:%M:%S")

                # Prepare the data tuple
                data = (
                    row['#'],                                       # Order of entry into the orginal .csv file
                    formatted_date.strftime('%Y-%m-%d %H:%M:%S'),   # Ensure correct format for MySQL DATETIME
                    row['Temperature (°F) '],                       # Temperature
                    row['RH (%) '],                                 # Relative Humidity
                    row['Dew Point (°F) '],                         # Dew Point
                    location_id                                     # Location ID
                )
                db_connection.cursor.execute(insert_query, data)

            # Commit the transaction
            db_connection.cnx.commit()
            print(f"Data from {file_name} loaded successfully.")

In [48]:
try:
    sql_connection = SqlConnection(USR_CONFIG)

    create_environ_db(sql_connection)
    #verify_environ_db_tables(sql_connection)
    
    sql_connection.set_db(DATABASE_NAMES['environ'])

    alter_table_with_location_id(db_connection=sql_connection)

    # Specify the directory containing the CSV files
    DIRECTORY_PATH = 'c:\\repos\\CSV_Visualizer\\data'  # Update to your directory
    load_csv_to_mysql(db_connection=sql_connection, directory=DIRECTORY_PATH)
except mysql.connector.Error as err:
    print(f"Error: {err}")
finally:
    sql_connection.close()

Attempting to create new DB in MySQL
Attempting to create DB named EnvironmentalConditions
Database EnvironmentalConditions already exists.
Attempt to create new DB in MySQL suceeded
Creating table EnvironData: CREATE TABLE `EnvironData` (
         `key` INT(10) NOT NULL AUTO_INCREMENT,
         `entry_no` INT(5) NOT NULL,
         `entry_datetime` DATETIME NOT NULL,
         `temp_F` FLOAT(10),
         `rh_percent` FLOAT(10),
         `dew_point_F` FLOAT(10),
         PRIMARY KEY (`key`), UNIQUE KEY (`key`)
        );
SqlConnection.execute_query: 1050 (42S01): Table 'environdata' already exists
OK
Creating table LocationMap: CREATE TABLE `LocationMap` (
         `location_id` int(5) NOT NULL, 
         `location_description` text(100), 
         PRIMARY KEY (`location_id`), 
        );
SqlConnection.execute_query: 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ')' at line 5
OK
Databas

In [49]:

import sqlalchemy
import pymysql
from sqlalchemy import create_engine

import holoviews as hv

con_string = f"mysql+pymysql://{USR_CONFIG['user']}:{USR_CONFIG['password']}@{USR_CONFIG['host']}/{DATABASE_NAMES['environ']}"
sqlalchemy_engine = create_engine(con_string)

sql_connection = SqlConnection(USR_CONFIG)


In [50]:

def query_mysql(db, sqlengine, query, params=None):
    """Execute a SQL query and return the result as a Pandas DataFrame."""
    try:
        db.cursor.execute(f"USE {DATABASE_NAMES['environ']};")
        df = pd.read_sql(query, sqlengine, params=params)
        return df
    except mysql.connector.Error as err:
        print(f"Error: {err}")
        return pd.DataFrame()  # Return an empty DataFrame on error
      

# Interactive widgets
start_date_widget = pn.widgets.DatePicker(
    name="Start Date Picker", value=datetime.datetime(2024, 5, 2))
end_date_widget = pn.widgets.DatePicker(
    name="End Date Picker", value=datetime.datetime(2024, 7, 26))

location_key_list = list(ENVIRON_TABLE_LOCATION_MAPPING.keys())
location_select_widget = pn.widgets.MultiChoice(
    name="Location MultiChoice", value=[location_key_list[0]], options=location_key_list)


def location_select_handler(location_select, query, params):
    if location_select:  # Check if the list is not empty
        placeholders = ", ".join(["%s"] * len(location_select))
        query += f" AND location_id IN ({placeholders})"
        params += tuple(location_select)
    return (location_select, query, params)
        
# Graphs
def time_series_plot(start_date, end_date, location_select):
    """Fetch data for the time-series plot and generate the plot."""
    query = f"""
        SELECT entry_datetime, temp_F, location_id
        FROM {TABLE_NAMES[0]}
        WHERE entry_datetime BETWEEN %s AND %s
    """
    params = (start_date, end_date)
    _, query, params = location_select_handler(location_select, query, params)

    df = query_mysql(sql_connection, sqlalchemy_engine, query, params)
    
    if df.empty:
        return "No data available for the selected filters."
    
    return df.hvplot.line(
        x="entry_datetime", 
        y="temp_F", 
        by="location_id", 
        title="Temperature Over Time", 
        ylabel="Temperature (°F)", 
        xlabel="Date",
        legend="top_right"
    )


def scatter_plot(start_date, end_date, location_select):
    """Fetch data for the scatter plot and generate the plot."""
    query = f"""
        SELECT temp_F, rh_percent, location_id
        FROM {TABLE_NAMES[0]}
        WHERE entry_datetime BETWEEN %s AND %s
    """
    params = (start_date, end_date)
    _, query, params = location_select_handler(location_select, query, params)

    df = query_mysql(sql_connection, sqlalchemy_engine, query, params)
    
    if df.empty:
        return "No data available for the selected filters."
    
    return df.hvplot.scatter(
        x="temp_F", 
        y="rh_percent", 
        color="location_id", 
        title="Temperature vs Relative Humidity",
        xlabel="Temperature (°F)", 
        ylabel="RH (%)", 
        alpha=0.2, 
        legend="top_right"
    )


def box_plot(start_date, end_date, location_select):
    """Fetch data for the box plot and generate the plot."""
    query = f"""
        SELECT temp_F, location_id
        FROM {TABLE_NAMES[0]}
        WHERE entry_datetime BETWEEN %s AND %s
    """
    params = (start_date, end_date)
    _, query, params = location_select_handler(location_select, query, params)

    df = query_mysql(sql_connection, sqlalchemy_engine, query, params)
    
    if df.empty:
        return "No data available for the selected filters."
    
    return df.hvplot.box(
        y="temp_F", 
        by="location_id", 
        color="location_id",
        title="Temperature Distribution by Location", 
        ylabel="Temperature (°F)", 
        xlabel="Location Where Data was Collected", 
        legend="top_right"
    )


main_block = pn.Column(pn.Row(start_date_widget, end_date_widget, location_select_widget),
                              pn.bind(time_series_plot, start_date_widget,
                                      end_date_widget, location_select_widget),
                              pn.bind(scatter_plot, start_date_widget,
                                      end_date_widget, location_select_widget),
                              pn.bind(box_plot, start_date_widget, 
                                      end_date_widget, location_select_widget)
)

# Layout the dashboard
dashboard = pn.Column(
    
    "## Environmental Data Dashboard",
    main_block,
).show()

Launching server at http://localhost:52695


In [51]:
find_all_column_names_query = f"""
    SELECT column_name FROM information_schema.columns
    WHERE table_schema = '{DATABASE_NAMES['environ']}'
    AND table_name = '{TABLE_NAMES[0]}';
"""

cols = query_mysql(sql_connection, sqlalchemy_engine, find_all_column_names_query)
cols.drop(index=0, axis=1, inplace=True)

cols_list = cols.iloc[:,0].tolist()

x_widget = pn.widgets.Select(name='X-Axis', options=cols_list)
y_widget = pn.widgets.Select(name='Y-Axis', options=cols_list)

wig = pn.Column(x_widget, y_widget, location_select_widget)
wig

BokehModel(combine_events=True, render_bundle={'docs_json': {'c6f1d66c-6463-47b7-bb93-79e7a062ebc3': {'version…

In [52]:

# Assuming TABLE_NAMES and DATABASE_NAMES are predefined
def correlation_analysis(x_widget_value, y_widget_value, _locat):
    print("Selected Locations:", _locat)
    print("Type of location input:", type(_locat))
    
    # Validate inputs
    if not x_widget_value or not y_widget_value:
        raise ValueError("Both x_widget_value and y_widget_value must be provided.")
    if not isinstance(_locat, list):
        raise ValueError("`_locat` must be a list of location IDs.")
    
    # Construct query with column names dynamically
    correlation_query = f"""
        SELECT `{x_widget_value}`, `{y_widget_value}` FROM {TABLE_NAMES[0]}
    """
    
    # Add location filtering
    params = []
    if _locat:  # If specific locations are provided
        location_conditions = " OR ".join([f"location_id = %s" for _ in _locat])
        query = f"SELECT `{x_widget_value}`, `{y_widget_value}` FROM {TABLE_NAMES[0]} WHERE {location_conditions}"
        params.extend(_locat)
    else:  # No locations provided, fetch all rows
        print("No locations provided, fetching data for all locations.")
    
    print("Constructed Query:", correlation_query)
    print("Query Parameters:", param)
    
    try:
        # Execute query and load data into a DataFrame
        sql_connection.cursor.execute(f"USE {DATABASE_NAMES['environ']};")
        df = pd.read_sql(correlation_query.format(param), sqlalchemy_engine)
        
        if df.empty:
            return "No data available for the selected filters."
        
        # Create hvplot KDE visualization
        return df.hvplot.scatter(
            x=x_widget_value,
            y=y_widget_value,
            alpha=0.8,
            xlabel=x_widget_value,
            ylabel=y_widget_value,
            title=f"Correlation Analysis: {x_widget_value} vs {y_widget_value}"
        )
    
    except Exception as e:
        print(f"Error executing query: {e}")
        return pd.DataFrame()  # Return an empty DataFrame on error

    
display_pane1 = pn.Column(x_widget, y_widget, location_select_widget, pn.bind(correlation_analysis, x_widget,
                                      y_widget, location_select_widget))
#display_pane = pn.Column(widgets, correlation_analysis)
display_pane1

Selected Locations: ['21999191']
Type of location input: <class 'list'>
Constructed Query: 
        SELECT `entry_no`, `entry_no` FROM EnvironData
    
Query Parameters: <module 'param' from 'c:\\repos\\Data_Analyst_Portfolio\\.venv\\Lib\\site-packages\\param\\__init__.py'>
Error executing query: Dimensions may not reference duplicated DataFrame columns (found duplicate 'entry_no' columns). If you want to plot a column against itself simply declare two dimensions with the same name.

PandasInterface expects tabular data, for more information on supported datatypes see https://holoviews.org/user_guide/Tabular_Datasets.html


BokehModel(combine_events=True, render_bundle={'docs_json': {'345b5629-e166-4f51-ba78-d3f25b8717d2': {'version…

In [None]:
import mysql.connector
import pandas as pd
import matplotlib.pyplot as plt

# Database connection configuration
config = {
    'user': 'your_username',
    'password': 'your_password',
    'host': 'localhost',
    'database': 'EnvironmentDataDB',
}

def fetch_data():
    """Fetch relevant data from the MySQL database."""
    try:
        # Connect to the database
        cnx = mysql.connector.connect(**config)
        
        # SQL query to fetch the data
        query = """
        SELECT entry_datetime, temp_F, dew_point_F, rh_percent, location_id
        FROM environ_data_table
        WHERE rh_percent IS NOT NULL;  -- Ensure RH is available
        """
        
        # Fetch data into a Pandas DataFrame
        df = pd.read_sql(query, cnx)
        
        # Close the connection
        cnx.close()
        
        return df
    except mysql.connector.Error as err:
        print(f"Error: {err}")
        return pd.DataFrame()  # Return empty DataFrame on error

def bucket_and_plot(df):
    """Bucket relative humidity and plot distribution."""
    # Define RH buckets
    bins = [0, 20, 40, 60, 80, 100]
    labels = ["0-20%", "21-40%", "41-60%", "61-80%", "81-100%"]

    # Assign RH values to buckets
    df["rh_bucket"] = pd.cut(df["rh_percent"], bins=bins, labels=labels, right=True)

    # Count occurrences in each bucket
    bucket_counts = df["rh_bucket"].value_counts().sort_index()

    # Plot the distribution
    plt.figure(figsize=(8, 5))
    bucket_counts.plot(kind="bar", color="skyblue")
    plt.title("Relative Humidity Distribution")
    plt.xlabel("Relative Humidity Buckets")
    plt.ylabel("Frequency")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

def bucket_and_plot_by_location(df):
    """Bucket relative humidity and plot distribution by location."""
    # Define RH buckets
    bins = [0, 20, 40, 60, 80, 100]
    labels = ["0-20%", "21-40%", "41-60%", "61-80%", "81-100%"]

    # Assign RH values to buckets
    df["rh_bucket"] = pd.cut(df["rh_percent"], bins=bins, labels=labels, right=True)

    # Group by location and bucket
    grouped = df.groupby(["location_id", "rh_bucket"]).size().unstack(fill_value=0)

    # Plot for each location
    grouped.plot(kind="bar", figsize=(10, 6), stacked=True, colormap="viridis")
    plt.title("Relative Humidity Distribution by Location")
    plt.xlabel("Location ID")
    plt.ylabel("Frequency")
    plt.legend(title="RH Buckets")
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    # Fetch data from MySQL
    data = fetch_data()

    if not data.empty:
        # Plot overall distribution
        bucket_and_plot(data)

        # Plot distribution by location
        bucket_and_plot_by_location(data)
    else:
        print("No data available to process.")


