<font size=6><b>
    
ACS Data download and transfer CARTO - Postgres

</b></font> 

<font size=3>

What's in this Notebook?

<p>

</p>

- **ACS Demographics data download and transfer**: from CARTO's Data observatory to CARTO and Python SDK
<p>

</p>

- **NYC boundaries data download and transfer**: from `data.cityofnewyork.us` through GeoPandas
    
<p>

</p>

- **Join do_label information**: from `block_group_label` and add it to ACS Sociodemographics dataset through SQL

<font size=3>

>**NOTE:** given that the data within the provided bucket wasn't public, a different data source with (apparently) the same information has been used

Datasets used: 

- https://carto.com/spatial-data-catalog/browser/dataset/acs_sociodemogr_2396d534/ 
    
<p>

</p>

- https://carto.com/spatial-data-catalog/browser/geography/cdb_blockgroup_7753dd51/
    
<p>

</p>

- https://data.cityofnewyork.us/api/geospatial/tqmj-j8zm?method=export&format=Shapefile

<font size=3>
    
><b>NOTE:</b> This Notebook purpose is to show the followed process to load the referred datasets into the recreated database
>

# Data download and transfer process

In [1]:
# Imports
import os
import json
import math
import pandas as pd
import geopandas as gpd
import cartoframes
import psycopg2

from sqlalchemy import create_engine
from carto.auth import APIKeyAuthClient
from carto.sql import SQLClient, CopySQLClient
from cartoframes.auth import Credentials
from cartoframes.utils import decode_geometry
from shapely.geometry.base import BaseGeometry

In [2]:
# Use Context Manager to open the json file containing the CARTO account credentials

with open("carto_creds.json") as config:
    config = json.load(config)
    
username = config.get('username')
api_key = config.get('api_key')
schema = 'mmoncada'
if_exists = 'replace'

table_list = ['block_group_label', 'do_sync_usa_acs_demographics']

In [3]:
# Define database connection parameters

param_dict = {
    "host": "postgres",
    "database": "postgres",
    "user": "postgres",
    "password": "postgres"
}

In [4]:
def download_carto_dataset(username, api_key, table_name): 
    """
    Function to download a CARTO dataset as a CSV file using COPY to command through CARTO's SQL API
    
    Returns name of the downloaded file (file_name, str)
    args:
        username: CARTO account username (str)
        api_key: CARTO API key with access to the dataset (str)
        table_name: Target CARTO dataset
    """
    
    file_name = f"{table_name}.csv"
    
    print(f"Downloading dataset {table_name}")

    base_url = f"https://{username}.carto.com"

    copy_client = CopySQLClient(APIKeyAuthClient(base_url, api_key))

    to_query = f"COPY {table_name} TO stdout WITH (FORMAT csv, HEADER true)"

    copy_client.copyto_file_path(to_query, f'{table_name}.csv')

    print(f"Dataset {table_name} downloaded")
        
    return file_name

In [5]:
def connect_database(host, database, user, password):
    """
    Function to connect to a PostgreSQL database 
    
    Returns SQLAlchemy and psycopg2 connection objects
    args:
        host: database server corresponding host (str)
        database: database name (str)
        user: database target user (str)
        password: user's corresponding password (str)
    """
    
    engine = create_engine(f"postgresql+psycopg2://{host}:{user}@{password}/{database}")
    
    con = psycopg2.connect(host=host, database=database, user=user, password=password)
    
    return engine, con

In [6]:
def check_table_name_length(table_name):
    """
    Function to check if a table name length is below PostgreSQL 63 byte limit
    
    Returns table name below this limit, truncating original name if necessary (table_name, str)
    args:
        host: database server corresponding host (str)
        database: database name (str)
        user: database target user (str)
        password: user's corresponding password (str)
    """
    
    if len(table_name) >= 63:
        
        table_name = table_name[:62]
        
        print(f"Table name too large, truncating to {table_name}")
        
    return table_name

In [7]:
def create_table_postgis(file_name, table_name, schema, engine, con, if_exists='replace'):
    """
    Function that given a CSV file path, creates a table inside the Postgres database 
    with the correct data structure
    
    Depends on function check_table_length
    
    This function reads the CSV file, creates a GeoDataFrame, formats the data to upload to PostgreSQL
    and creates a table with the desired data types and column order.
    
    Returns value to indicate if COPY process should happen (proceed_copy, bool) 
    and psycopg2 cursor object (cursor)
    args:
        file_name: path to csv file (str)
        table_name: desired database table name (str)
        schema: target database schema (str)
        host: connection host (str)
        database: target database (str)
        user: connection user (str)
        password: password for user (str)
        if_exists: defines how to behave if the table already exists {'fail', 'replace'}, default 'replace'
                   - fail: Raise a ValueError
                   - replace: Drop the table before inserting new values
    """
    
    COLLISION_STRATEGIES = ['fail', 'replace']
    
    if if_exists not in COLLISION_STRATEGIES:
        print("if_exists was not in available options, please try 'fail' or 'replace'")
        raise ValueError
    
    proceed_copy = True
    
    cursor = con.cursor()
    
    value = BaseGeometry()
    
    table_name = check_table_name_length(table_name)

    df = pd.read_csv(file_name, nrows=10)
    
    columns_ordered = [col if (col != 'the_geom') else 'geometry' for col in df.columns.values]

    gdf = gpd.GeoDataFrame(df, crs='EPSG:4326', geometry=decode_geometry(df['the_geom']))

    gdf['geometry'].fillna(value, inplace=True)

    gdf.drop('the_geom', axis=1, inplace=True)

    gdf = gdf.reindex(columns=columns_ordered)
    
    try:

        gdf.astype(object).to_postgis(name=table_name, schema=schema, con=engine, if_exists=if_exists)
        
        cursor.execute(f"truncate table {schema}.{table_name};")

        cursor.execute(f"alter table {schema}.{table_name} rename column geometry to the_geom;")

    except Exception as e:
        proceed_copy = False
        print(f"Some error ocurred creating table {e}")
    
    return proceed_copy, cursor

In [8]:
def dataset_to_postgis(file_name, table_name, schema, host, database, user, password, if_exists='replace'):
    """
    Function that uploads a dataset (CSV file) to a Postgres database.
    
    Depends on function create_table_postgis
    Once the table is created, it uses PostgreSQL COPY from command to upload the data
    args:
        file_name: path to csv file (str)
        table_name: desired database table name (str)
        schema: target database schema (str)
        host: connection host (str)
        database: target database (str)
        user: connection user (str)
        password: password for user (str)
    """
    
    engine, con = connect_database(host=host, database=database, 
                                   user=user, password=password)
    
    proceed_copy, cursor = create_table_postgis(file_name=file_name, table_name=table_name, 
                                                schema=schema, engine=engine, con=con, if_exists=if_exists)
    
    if proceed_copy:
        
        copy_sql = f"""
               COPY {schema}.{table_name} FROM stdin WITH CSV HEADER
               DELIMITER as ','
               """
        print(f"Copying dataset {table_name} to postgres")
    
        with open(file_name, 'r') as f:

            try:
                cursor.copy_expert(sql=copy_sql, file=f)
                con.commit()
                print(f"Dataset {table_name} copied to postgres")
            except (Exception, psycopg2.DatabaseError) as error:
                print(f"Some error ocurred copying dataset {error}")
                con.rollback()
            cursor.close()   

In [9]:
def carto_to_postgis(username, table_name, api_key, schema, host, database, user, password, if_exists='replace'):
    """
    Function that downloads a CARTO dataset and uploads it to a PostgreSQL database.
    
    Depends on download_carto_dataset and dataset_to_postgis.
    args:
        username: CARTO account username (str)
        api_key: CARTO API key with access to the dataset (str)
        table_name: desired database table name (str)
        schema: target database schema (str)
        host: connection host (str)
        database: target database (str)
        user: connection user (str)
        password: password for user (str)
    """
    
    file_name = download_carto_dataset(username=username, api_key=api_key, 
                                       table_name=table_name)
    
    dataset_to_postgis(file_name=file_name, table_name=table_name, schema=schema, host=host, 
                       database=database, user=user, password=password, if_exists=if_exists)
    
    os.remove(file_name)

In [11]:
for table_name in table_list:

    carto_to_postgis(username=username, table_name=table_name, api_key=api_key, 
                     schema=schema, **param_dict, if_exists='replace')

Downloading dataset block_group_label
Dataset block_group_label downloaded
Copying dataset block_group_label to postgres
Dataset block_group_label copied to postgres
Downloading dataset do_sync_usa_acs_demographics
Dataset do_sync_usa_acs_demographics downloaded
Copying dataset do_sync_usa_acs_demographics to postgres
Dataset do_sync_usa_acs_demographics copied to postgres


In [14]:
# Fetch NYC boundaries and transfer the data to PostgreSQL + PostGIS database

engine, _ = connect_database(host='postgres', database='postgres', 
                             user='postgres', password='postgres')

nyc_gdf = gpd.read_file(
    'https://data.cityofnewyork.us/api/geospatial/tqmj-j8zm?method=export&format=Shapefile')

nyc_gdf.to_postgis(name='nyc_boundaries', con=engine, schema=schema, if_exists='replace')

# Merge information from ACS tables  

<font size=3>

A new column has been created in the `ACS Sociodemographics` dataset and the contents have been filled using the `block_group_label` _do_label_ column based on the `geoid` column matching from both datasets

```sql
ALTER TABLE do_sync_usa_acs_demographics_sociodemographics_usa_blockgroup_2 ADD COLUMN do_label VARCHAR;
```

```sql
UPDATE do_sync_usa_acs_demographics_sociodemographics_usa_blockgroup_2 as census SET do_label = labels.do_label FROM block_group_label as labels WHERE census.geoid = labels.geoid;
```

<font size=4><b>


Access the next Notebook, [`2 - Data cleaning and data transfer`](2%20-%20Data%20cleaning%20and%20data%20transfer.ipynb)

</font> 