In [None]:
"""
__author__ = "Nate Cutler"
__credits__ = ["Data sourced from Bundesamt fur Strahlenschutz"]
__version__ = "1.0"
__maintainer__ = "Nate Cutler"
__email__ = "ncutler211@gmail.com"
__status__ = "Prototype"
"""

In [None]:
# Imports
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
import psycopg2
from sqlalchemy import create_engine

In [None]:
def read_data_into_dataframe(directory):
    """
    Reads data from .dat files in a specified directory into a Pandas DataFrame and combines them.

    Parameters:
        directory (str): The path to the directory containing the .dat files.

    Returns:
        pd.DataFrame: A combined DataFrame with data from all .dat files in the directory.

    Example:
        directory = "F:/UofA/ISTA_498_Capstone/Data ETL/Germany/Extract/data/Uncompressed Original"
        combined_data = read_data_into_dataframe(directory)
    """
    datafiles = []

    # Iterate over each .dat file in the specified directory
    for filename in os.listdir(directory):
        if filename.endswith(".dat"):
            file_path = os.path.join(directory, filename)

            # Read the .dat file into a DataFrame, specifying delimiter and encoding
            df = pd.read_csv(file_path, delimiter='|', encoding='latin1')

            # Rename the DataFrame columns to standard names
            new_column_names = ["Postal code", "city", "start_date", "reading"]
            df.rename(columns=dict(zip(df.columns, new_column_names)), inplace=True)

            # Append the DataFrame to the list of datafiles
            datafiles.append(df)

    # Combine all DataFrames into a single DataFrame
    combined_data = pd.concat(datafiles, ignore_index=True)

    return combined_data


In [None]:
directory = r"F:\UofA\ISTA_498_Capstone\Data ETL\Germany\Extract\data\Uncompressed Original"
combined_data = read_data_into_dataframe(directory)

In [None]:
def transformations(combined_data):
    """
    Perform data transformations on a Pandas DataFrame.

    This function applies several transformations to the input DataFrame to prepare it for further analysis or storage.

    Parameters:
        combined_data (pd.DataFrame): The input DataFrame to be transformed.

    Returns:
        pd.DataFrame: The transformed DataFrame.
    """
    # Create unit column with microsievert per hour
    combined_data["unit"] = 'μSv/h'
    # Drop Postal Code column
    combined_data = combined_data.drop("Postal code", axis=1)
    # Convert Date column to datetime
    combined_data['start_date'] = pd.to_datetime(combined_data['start_date'])
    # Add End_Date column. All data is daily data, so no need for sampling over time.
    combined_data['end_date'] = combined_data['start_date']
    # Add CID column
    combined_data['cid'] = '04'
    # Add State column (it's initially empty)
    combined_data['state'] = ""
    # Add Latitude column (initially filled with NaN)
    combined_data['lat'] = np.nan
    # Add Longitude column (initially filled with NaN)
    combined_data['long'] = np.nan
    # Add Comment column (it's initially empty)
    combined_data['comment'] = ""
    # Reorganize columns to a specific order
    combined_data = combined_data[["start_date", "end_date", "reading", "unit", "city", "state", "cid", "lat", "long", "comment"]]
    return combined_data

In [None]:
transformed_data = transformations(combined_data)

In [None]:
print(transformed_data)

In [None]:
#Check for NaNs
nan_check = transformed_data["reading"].isna()
nan_count = nan_check.sum()
print(nan_count)

In [None]:
#Remove NaNs from dataset. Due to German Copyright laws I have chosen not to average values for missing days
transformed_data.dropna(subset=['reading'], inplace=True)

In [None]:
# Resample the data to a monthly average
resampled_data = transformed_data.resample('M', on='start_date').mean()

# Create the area plot
plt.figure(figsize=(10, 6))  # Adjust the figure size as needed
plt.fill_between(resampled_data.index, resampled_data['reading'], alpha=0.5)
plt.xlabel('Date')
plt.ylabel('Daily Average Reading')
plt.title('Daily Average Reading Over Time')
plt.grid(True)
plt.show()

In [None]:
# Resample the data to a monthly average
resampled_data = transformed_data.resample('M', on='start_date').mean()

# Create the plot
plt.figure(figsize=(10, 6))  # Adjust the figure size as needed
plt.plot(resampled_data.index, resampled_data['reading'], marker='o', linestyle='-')  # Modify the linestyle here
plt.xlabel('Date')
plt.ylabel('Daily Average Reading')
plt.title('Daily Average Reading Over Time')
plt.grid(True)
plt.show()


In [None]:
#Dataset ~ size requirment
print(f"Total: {transformed_data.memory_usage(deep=True).sum()/1e+9} Gigabyte(s)")

In [None]:
# Try Connection to DB prints version if successful
try:
    conn = psycopg2.connect(
        host="radiance-db-instance.cdolgqfaeaoj.us-east-2.rds.amazonaws.com",
        database="Radiance_db",
        user="radianceUN",
        password="radianceP",
        port='5432'
    )

    # Create a cursor
    cur = conn.cursor()

    # Execute a test query
    cur.execute("SELECT version();")

    # Fetch and print the result
    db_version = cur.fetchone()
    print("PostgreSQL database version:")
    print(db_version)

except psycopg2.Error as e:
    print("Error connecting to the database:", e)

finally:
    if conn:
        cur.close()
        conn.close()

In [None]:
# Functions for traversing host
host="radiance-db-instance.cdolgqfaeaoj.us-east-2.rds.amazonaws.com"
database="Radiance_db"
user="radianceUN"
password="radianceP"
port='5432'

def get_conn_cur(): # define function name and arguments (there aren't any)
  # Make a connection
  conn = psycopg2.connect(
    host=host,
    database=database,
    user=user,
    password=password,
    port='5432')

  cur = conn.cursor()   # Make a cursor after

  return(conn, cur)   # Return both the connection and the cursor

def get_table_names():
  conn, cur = get_conn_cur() # get connection and cursor

  # query to get table names
  table_name_query = """SELECT table_name FROM information_schema.tables
       WHERE table_schema = 'public' """

  cur.execute(table_name_query) # execute
  my_data = cur.fetchall() # fetch results

  cur.close() #close cursor
  conn.close() # close connection

  return(my_data) # return your fetched results

def get_column_names(table_name): # arguement of table_name
  conn, cur = get_conn_cur() # get connection and cursor

  # Now select column names while inserting the table name into the WERE
  column_name_query =  """SELECT column_name FROM information_schema.columns
       WHERE table_name = '%s' """ %table_name

  cur.execute(column_name_query) # exectue
  my_data = cur.fetchall() # store

  cur.close() # close
  conn.close() # close

  return(my_data) # return

def run_query(query_string):

  conn, cur = get_conn_cur() # get connection and cursor

  cur.execute(query_string) # executing string as before

  my_data = cur.fetchall() # fetch query data as before

  # here we're extracting the 0th element for each item in cur.description
  colnames = [desc[0] for desc in cur.description]

  cur.close() # close
  conn.close() # close

  return(colnames, my_data) # return column names AND data

In [None]:
def create_table(host, database, user, password, table_name, table_definition):
    """
    Create a new table in a PostgreSQL database.

    Args:
        host (str): The host or IP address of the database server.
        database (str): The name of the database.
        user (str): The database user.
        password (str): The user's password.
        table_name (str): The name of the new table to be created.
        table_definition (str): The table's column definitions and constraints.
    """

    try:
        # Connect to the database
        conn = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password
        )

        # Create a cursor
        cur = conn.cursor()

        # Define the SQL command to create the table
        create_table_query = f"CREATE TABLE {table_name} {table_def}"

        # Execute the SQL command
        cur.execute(create_table_query)

        # Commit the changes
        conn.commit()

        print(f"Table {table_name} created successfully.")

    except (Exception, psycopg2.Error) as error:
        print(f"Error: {error}")

    finally:
        # Close the cursor and the connection
        if conn:
            cur.close()
            conn.close()

In [None]:
# Creates Germany table
table_name = "germany"
table_def = "(start_date TIMESTAMP,end_date TIMESTAMP, reading FLOAT, unit VARCHAR(12), city VARCHAR(255), state VARCHAR(30), cid VARCHAR(3), lat INT, long INT, comment VARCHAR(255))"
create_table(host, database, user, password, table_name, table_def)

In [None]:
def drop_table(table_name):
    # Define your database URI
    database_uri = "postgresql://radianceUN:radianceP@radiance-db-instance.cdolgqfaeaoj.us-east-2.rds.amazonaws.com:5432/Radiance_db"
    engine = create_engine(database_uri)

    # Connect to the database and drop the table
    connection = engine.connect()
    connection.execute(f"DROP TABLE IF EXISTS {table_name}")
    connection.close()


In [None]:
g = 'germany'
drop_table(g)
get_table_names()

In [None]:
get_table_names()

In [None]:
def sql_head(table_name):
  conn, cur = get_conn_cur()
  sales_df = pd.read_sql(""" SELECT * FROM %s
          LIMIT 5;""" %table_name, conn)
  return (sales_df)

In [None]:
# pandas DF to SQL table & Upload
database_uri = "postgresql://radianceUN:radianceP@radiance-db-instance.cdolgqfaeaoj.us-east-2.rds.amazonaws.com:5432/Radiance_db"
engine = create_engine(database_uri)
tn = "germany"

transformed_data_subset = transformed_data.head(10)  # Limit to the first 10 rows
transformed_data_subset.to_sql(tn, engine, if_exists='replace', index=False)

