In [None]:
#Create venv for project
python3 -m venv .tsheltervenv  # only once

#Activate venv
source .tsheltervenv/bin/activate

#Freeze requirement
pip3 freeze > requirements.txt  # only once

#Install requirements
pip3 freeze > requirements.txt

In [2]:
import requests
import os
import csv
import re
from datetime import datetime
import hashlib

In [3]:

# ------FETCH DATA FROM API---------




# Toronto Open Data is stored in a CKAN instance. Its APIs are documented here:
# https://docs.ckan.org/en/latest/api/

# Hitting Toronto's open data API
base_url = "https://ckan0.cf.opendata.inter.prod-toronto.ca"

# Datasets are called "packages". Each package can contain many "resources"
# To retrieve the metadata for this package and its resources, use the package name in this page's URL:
url = base_url + "/api/3/action/package_show"
params = {"id": "daily-shelter-overnight-service-occupancy-capacity"}
package = requests.get(url, params=params).json()

# I created a directory to store the CSV files
output_dir = os.getcwd()  # This is to get my current working directory
os.makedirs(output_dir, exist_ok=True)

# Since Data were in different files, I had to create a single directory to store all the files
output_file_path = os.path.join(output_dir, "toronto_shelter_occupancy.csv")

# This to determine count of total data and new data
total_data_count = 0
new_data_count = 0

# Logic to determine if header has been appended
header_appended = False
current_header = None


"""# Function to clean data
def clean_data(row):
    cleaned_row = re.sub(r'"(.*?)"', lambda match: match.group(1).replace(",", ""), row)
    return cleaned_row"""
    
# Function to clean data
def clean_data(row):
    # Remove quotes and any newline characters within the field
    cleaned_row = re.sub(r'"(.*?)"', lambda match: match.group(1).replace(",", ""), row).strip()
    return cleaned_row

# Create a set to store existing idempotent keys
existing_idempotent_keys = set()

# Check if the output file already exists
if os.path.exists(output_file_path):
    # Read existing idempotent keys from the previously processed CSV file
    with open(output_file_path, "r", newline="", encoding="utf-8") as existing_file:
        existing_csv_reader = csv.reader(existing_file)
        first_row = next(existing_csv_reader, None)  # Attempt to read the first row (header)
        if first_row:
            header_appended = True
            current_header = first_row[1:]  # Skip the first column (IDEMPOTENT_KEY)
            for row in existing_csv_reader:
                existing_idempotent_keys.add(row[0])  # Assuming idempotent_key is the first column

# Open the output file for writing, with proper newline handling
with open(output_file_path, "a", newline="", encoding="utf-8") as output_file:
    csv_writer = csv.writer(output_file)

    # To get resource data:
    for idx, resource in enumerate(package["result"]["resources"]):

        # for datastore_active resources:
        if resource["datastore_active"]:
            # To get all records in CSV format:
            url = base_url + "/datastore/dump/" + resource["id"]
            resource_dump_data = requests.get(url).text

            # Split data into rows
            rows = resource_dump_data.split("\n")
            
            for row in rows:
                if row.strip():  # Check if row is not empty
                    if not header_appended:
                        current_header = clean_data(row).split(",")
                        csv_writer.writerow(["IDEMPOTENT_KEY"] + current_header)  # This adds the idempotent_key to the header
                        header_appended = True
                    else:
                        data_fields = clean_data(row).split(",")
                        # This logic gives a uniform OCCUPANCY_DATE to yyyy-mm-dd format
                        date_index = current_header.index("OCCUPANCY_DATE")
                        original_date = data_fields[date_index]
                        formatted_date = None
                        try:
                            formatted_date = datetime.strptime(original_date, "%Y-%m-%d").strftime("%Y-%m-%d")
                        except ValueError:
                            try:
                                formatted_date = datetime.strptime(original_date, "%y-%m-%d").strftime("%Y-%m-%d")
                            except ValueError:
                                pass  # If I have missed any other date formats, so it doesn't throw an error

                        if formatted_date:
                            data_fields[date_index] = formatted_date

                        # Create an idempotent key by combining _id and OCCUPANCY_DATE
                        id_index = current_header.index("_id")
                        id_value = data_fields[id_index]
                        
                        # Exclude header rows from idempotent key creation
                        if id_value != "_id":
                            idempotent_key = f"{id_value}_{formatted_date}"

                            # Hash the idempotent key for uniqueness
                            hashed_key = hashlib.sha3_224(idempotent_key.encode()).hexdigest()[:20]

                            # Check if the hashed_key already exists
                            if hashed_key not in existing_idempotent_keys:
                                csv_writer.writerow([hashed_key] + data_fields)
                                new_data_count += 1

                    total_data_count += 1

print("Data processing complete.")
print("Total data count:", total_data_count)
print("New data count:", new_data_count)


Data processing complete.
Total data count: 198559
New data count: 11451


In [None]:
"""!pip install pandas sqlalchemy openpyxl
!pip install psycopg2-binary
!pip install python-dotenv"""

In [5]:

# ------LOAD FETCHED DATA INTO DATABASE---------



import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, inspect
from sqlalchemy.dialects.postgresql import VARCHAR, TIMESTAMP
from sqlalchemy.sql import text
from urllib.parse import quote_plus
from dotenv import load_dotenv
import os

#Load environment variables
load_dotenv()

#Path to your CSV file
file_path = 'toronto_shelter_occupancy.csv'

#Read the CSV data into a pandas DataFrame
df = pd.read_csv(file_path)

"""#Define a function to convert columns to datetime with the correct timezone
def convert_to_datetime_with_timezone(df, col_name):
    df[col_name] = pd.to_datetime(df[col_name], utc=True, errors='coerce')
    df[col_name] = df[col_name].dt.tz_convert('US/Eastern')

#Convert 'Created At' and 'Updated At' columns to datetime with EST timezone
convert_to_datetime_with_timezone(df, 'Time')
convert_to_datetime_with_timezone(df, 'Updated At')  # Uncomment if column exists
"""
#Simplify and consolidate column name formatting
df.columns = df.columns.str.replace(' ', '_', regex=True).str.replace('[().?]', '', regex=True).str.lower()

#Database connection details
database_username = os.getenv('database_username')
database_password = os.getenv('database_password')
database_name = 'analytics_db'
database_host = 'localhost'
database_port = '5432'
database_schema = 'toronto'

database_connection_string = f'postgresql://{database_username}:{database_password}@{database_host}:{database_port}/{database_name}'
engine = create_engine(database_connection_string)

#Initialize MetaData object
metadata = MetaData()

#Define columns for the table
columns = []
for column_name, dtype in df.dtypes.items():
    if "datetime64[ns]" in str(dtype):
        columns.append(Column(column_name, TIMESTAMP(timezone=True)))
    else:
        columns.append(Column(column_name, VARCHAR))

#Name of the table in PostgreSQL
table_name = 'toronto_shelther_occupancy'

#Create a table with the defined columns within the specified schema
table = Table(table_name, metadata, *columns, schema=database_schema, extend_existing=True)

#Check if table exists
with engine.connect() as connection:
    inspector = inspect(engine)
    table_exists = inspector.has_table(table_name, schema=database_schema)

    if not table_exists:
        # Create table
        metadata.create_all(engine)
        print(f"The table '{table_name}' has been created.")
        
        # Initial insertion of data
        df.to_sql(table_name, engine, if_exists='append', index=False, schema=database_schema)
        print(f"Data inserted into '{table_name}' for the first time.")
    else:
        print(f"The table '{table_name}' already exists. Appending new data while ignoring duplicates.")
        
        # Dynamically retrieve the column names from the table
        columns = inspector.get_columns(table_name, schema=database_schema)
        idempotent_key = [col['name'] for col in columns if col['name'] == 'idempotent_key'][0]
        
        # Create a temporary CSV file
        temp_csv_path = 'temp.csv'
        df.to_csv(temp_csv_path, index=False)
        
        # Read existing ids from the database table
        existing_ids_query = text(f"SELECT {idempotent_key} FROM {database_schema}.{table_name}")
        existing_ids = pd.read_sql(existing_ids_query, engine)[idempotent_key].tolist()
        
        # Filter the temporary CSV file to exclude existing ids
        temp_df = pd.read_csv(temp_csv_path)
        new_data = temp_df[~temp_df[idempotent_key].isin(existing_ids)]
        
        # Insert new data into the database table
        if not new_data.empty:
            new_data.to_sql(table_name, engine, if_exists='append', index=False, schema=database_schema)
            print(f"New data appended to '{table_name}'.")
        else:
            print(f"No new data found.")
            
        # Remove the temporary file after the data is inserted
        if os.path.exists(temp_csv_path):
            os.remove(temp_csv_path)  # Delete the temporary CSV file
            print(f"Temporary file '{temp_csv_path}' has been deleted.")

## Get and print the row counts after loading data
with engine.connect() as connection:
    current_count = connection.execute(text(f"SELECT COUNT(*) FROM {database_schema}.{table_name}")).scalar()
    print(f"The table '{table_name}' now has {current_count} rows after the update.")

The table 'toronto_shelther_occupancy' already exists. Appending new data while ignoring duplicates.
No new data found.
Temporary file 'temp.csv' has been deleted.
The table 'toronto_shelther_occupancy' now has 198554 rows after the update.
