In [2]:

import json
import pathlib
import urllib.parse

import geopandas as gpd
import matplotlib.pyplot as plt
import pandas as pd
import requests
from shapely.geometry import shape

import sqlalchemy as db
import sqlalchemy.dialects.postgresql as pg
import hashlib

from sqlalchemy.orm import declarative_base
from geoalchemy2.types import Geometry
from geoalchemy2.shape import from_shape

from typing import Any

from shapely.wkb import loads as wkb_loads
import geoplot as gplt
from shapely.geometry import Point

import warnings

import os
# data prepare
# Where data files will be read from/written to - this should already exist
DATA_DIR = pathlib.Path("data")

# ZIPCODE_DATA_FILE = DATA_DIR / "zipcodes" / "ZIP_CODE_040114.shp"
ZIPCODE_DATA_FILE = DATA_DIR / "nyc_zipcodes.shp"

ZILLOW_DATA_FILE = DATA_DIR / "zillow_rent_data.csv"

NYC_DATA_APP_TOKEN = "EqSebYPKeoZPWmssyC2rvIPN8"
BASE_NYC_DATA_URL = "https://data.cityofnewyork.us/"
NYC_DATA_311 = "erm2-nwe9.geojson"
NYC_DATA_TREES = "5rq2-4hqu.geojson"

DB_NAME = "apartment"
DB_USER = "postgres"
DB_URL = f"postgresql+psycopg2://{DB_USER}@localhost/{DB_NAME}"
DB_SCHEMA_FILE = "schema.sql"
# directory where DB queries for Part 3 will be saved
QUERY_DIR = pathlib.Path("queries")

BASIC_USER = 'bo8yv64rbrt1cas4iyua598vp'
BASIC_PASS = '5vwh31bomglif6wi66lb1py390txqu57vkgv8319f2kg1hxkuk'


# When FLAG_DEBUG == True, record size will be limited to 100,000
FLAG_DEBUG = False

# Make sure the QUERY_DIRECTORY exists
if not QUERY_DIR.exists():
    QUERY_DIR.mkdir()

## Part 1: Data Preprocessing

In [3]:
def avoid_nan(value):
    """Replace NaN in a cell with None to avoid errors when saving to the database"""
    if pd.isna(value):
        return None
    else:
        return value


def debug_warp(count:int):
    """When FLAG_DEBUG is True, limit record size to 10000"""
    if FLAG_DEBUG:
        return 100000
    else:
        return count


def build_query_url(base_url: str, queries: dict = {}, flag_use_token: bool = True):
    """Build queries into url query string, and add api token to url"""
    result = base_url[:] + "?"

    if flag_use_token:
        result += f"$$app_token={NYC_DATA_APP_TOKEN}&"

    for key in queries:
        result += f"{key}={queries[key]}&"

    return result[:-1]


def get_md5(content: str):
    """Calculate the md5 of a string"""
    if isinstance(content, str):
        content = content.encode()
    md5 = hashlib.md5()
    md5.update(content)
    return md5.hexdigest()


def get_with_cache(url: str, update: bool = False):
    """This function implements a get function with Cache"""
    url_md5 = get_md5(url)
    storage_path = DATA_DIR / url_md5

    print('update or (not storage_path.exists()):',
          update or (not storage_path.exists()))

    if update or (not storage_path.exists()):
        print(f"Downloading {url} ...")

        session = requests.Session()
        # session.headers.update({"X-App-Token": NYC_DATA_APP_TOKEN})
        basic = requests.auth.HTTPBasicAuth(BASIC_USER, BASIC_PASS)

        count = 5

        while count >= 0:
            response = None
            print(f'Download try: {5 + 1 - count}')
            try:
                response = session.get(url, auth=basic)
            except Exception:
                print('Network error, retry')
                count -= 1
                continue

            if response:
                with open(storage_path, "wb") as file_handle:
                    file_handle.write(response.content)
                print(f"Done downloading {url}.")
                break
            else:
                print(
                    f"Download {url} fail, reason:",
                    response.status_code,
                    "message:",
                    response.content.decode(),
                )
                continue

    return storage_path


def download_nyc_geojson_data(url: str, force: bool = False):
    """This function is deprecated because it doesn't support url with query's very well"""
    parsed_url = urllib.parse.urlparse(url)
    url_path = parsed_url.path.strip("/")

    filename = DATA_DIR / url_path

    if force or not filename.exists():
        print(f"Downloading {url} to {filename}...")

        response = requests.get(url)
        if response:
            with open(filename, "w") as file_handle:
                # json.dump(..., f)
                file_handle.write(response.content)
            print(f"Done downloading {url}.")
        else:
            print(f"Download {url} fail.")
            return None
    else:
        print(f"Reading from {filename}...")

    return filename

In [4]:
def load_and_clean_zipcodes(zipcode_datafile) :
    """Loading and cleaning the zip code dataset"""

    gdf = gpd.read_file(zipcode_datafile)

    mapping_k = ['ZIPCODE', 'PO_NAME', 'STATE', 'COUNTY', 'geometry']

    mapping_v = ['zipcode', 'neighborhood', 'state', 'county', 'geometry']

    name_mapping = dict(map(lambda value_key, value_value: (value_key, value_value), mapping_k, mapping_v))

    gdf.crs = 'epsg:2263'

    gdf = gdf.to_crs('epsg:4326')

    # result = pd.DataFrame(gdf)
    result = gdf

    column_to_delete = []

    for column_name in result.columns:

        if column_name not in mapping_k:

            column_to_delete.append(column_name)

    result = result.drop(column_to_delete, axis=1)

    result = result.rename(columns=name_mapping)

    return result

In [5]:

def download_and_clean_311_data():
    """Loading and Cleaning 311 Complaint Data Set"""
    # https://data.cityofnewyork.us/resource/erm2-nwe9.json

    params = {
        '$select': 'count(unique_key)',
        # '$where': 'complaint_type LIKE "%25Noise%25" AND created_date >= "2022-01-01T00:00:00"::floating_timestamp',
        '$where': 'created_date >= "2022-01-01T00:00:00"::floating_timestamp',
    }

    query_url = build_query_url(
        'https://data.cityofnewyork.us/resource/erm2-nwe9.json', params, False)

    print('Will call get_with_cache() ... 1')
    dataset_path = get_with_cache(query_url, update=False)
    print('     Call get_with_cache() ... 1 ... end')

    jsonStr = '[]'

    with open(dataset_path, 'rb') as file_handle:
        jsonStr = file_handle.read().decode()

    row_count_since_20220101 = debug_warp(
        int(json.loads(jsonStr)[0]['count_unique_key']))

    query_url = build_query_url(
        'https://data.cityofnewyork.us/resource/erm2-nwe9.json', params, False)

    collected_row_count = 0

    result = gpd.GeoDataFrame()

    print(f'Report {collected_row_count} / {row_count_since_20220101} ...')

    while collected_row_count < row_count_since_20220101:

        print(
            f'Collect {collected_row_count} / {row_count_since_20220101} ...')

        params = {
            '$select': 'unique_key, created_date, complaint_type, incident_zip, latitude, longitude',
            # '$where': 'complaint_type LIKE "%25Noise%25" AND created_date >= "2022-01-01T00:00:00"::floating_timestamp',
            '$where': 'created_date >= "2022-01-01T00:00:00"::floating_timestamp',
            '$limit': '150000',
            '$offset': collected_row_count
        }

        query_url = build_query_url(
            'https://data.cityofnewyork.us/resource/erm2-nwe9.json', params, False)

        print('Will call get_with_cache() ... 2')
        dataset_path = get_with_cache(query_url)

        jsonStr = '[]'

        with open(dataset_path, 'rb') as file_handle:
            jsonStr = file_handle.read().decode()

        jsonObject = json.loads(jsonStr)

        part_dataframe = pd.DataFrame.from_records(jsonObject)

        result = pd.concat([result, part_dataframe], ignore_index=True)

        collected_row_count += len(jsonObject)

    result['incident_zip'].fillna(value=-1, inplace=True)
    result['geometry'] = gpd.points_from_xy(
        result['longitude'], result['latitude'])
    result = result.drop(['latitude', 'longitude'], axis=1)

    return result

In [6]:
def download_and_clean_tree_data():
    """Load and clean the tree dataset"""

    # https://data.cityofnewyork.us/resource/5rq2-4hqu.json

    # ["tree_id", "spc_common", "zipcode", "status", "the_geom"]

    params = {
        "$select": "count(tree_id)"
    }

    query_url = build_query_url(
        "https://data.cityofnewyork.us/resource/5rq2-4hqu.json", params, False
    )

    dataset_path = get_with_cache(query_url, update=False)

    jsonStr = "[]"

    with open(dataset_path, "rb") as file_handle:

        jsonStr = file_handle.read().decode()

    row_count = debug_warp(int(json.loads(jsonStr)[0]["count_tree_id"]))

    collected_row_count = 0

    result = gpd.GeoDataFrame()

    while collected_row_count < row_count:

        print(f"Collect {collected_row_count} / {row_count} ...")

        params = {
            "$select": "tree_id, spc_common, zipcode, status, health, the_geom",
            "$limit": "150000",
            "$offset": collected_row_count,
        }

        query_url = build_query_url(
            "https://data.cityofnewyork.us/resource/5rq2-4hqu.json", params, False
        )

        dataset_path = get_with_cache(query_url)

        jsonStr = "[]"

        with open(dataset_path, "rb") as file_handle:

            jsonStr = file_handle.read().decode()

        jsonObject = json.loads(jsonStr)

        part_dataframe = pd.DataFrame.from_records(jsonObject)

        result = pd.concat([result, part_dataframe], ignore_index=True)

        collected_row_count += len(jsonObject)

    result = result.rename(columns={"the_geom": "geometry"})

    result['geometry'] = result['geometry'].apply(shape)

    return result

In [7]:
def load_and_clean_zillow_data():
    """Load and clean historical rental dataset"""

    df = pd.read_csv(ZILLOW_DATA_FILE)

    df = df.drop(['RegionID', 'SizeRank', 'RegionType', 'StateName',
                 'City', 'Metro', 'CountyName'], axis=1)

    df = df.rename(columns={'RegionName': 'zipcode'})

    columns = df.columns

    columns = columns[2:]

    result = pd.DataFrame()

    values = []

    for index, row in df.iterrows():

        for column in columns:

            new_row = {

                'zipcode': row['zipcode'],

                'state': row['State'],

                'date': column,

                'average_price': row[column]

            }

            values.append(new_row)

    result = pd.DataFrame.from_records(values)

    return result

In [8]:
def load_all_data():
    """Load all data"""

    geodf_zipcode_data = load_and_clean_zipcodes(ZIPCODE_DATA_FILE)

    geodf_311_data = download_and_clean_311_data()

    geodf_tree_data = download_and_clean_tree_data()

    df_zillow_data = load_and_clean_zillow_data()

    return (geodf_zipcode_data, geodf_311_data, geodf_tree_data, df_zillow_data)

In [9]:
geodf_zipcode_data, geodf_311_data, geodf_tree_data, df_zillow_data = load_all_data()

Will call get_with_cache() ... 1
update or (not storage_path.exists()): False
     Call get_with_cache() ... 1 ... end
Report 0 / 6199520 ...
Collect 0 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 150000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 300000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 450000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 600000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 750000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 900000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collect 1050000 / 6199520 ...
Will call get_with_cache() ... 2
update or (not storage_path.exists()): False
Collec

In [10]:
geodf_zipcode_data.info()
geodf_zipcode_data.head()

<class 'geopandas.geodataframe.GeoDataFrame'>
RangeIndex: 263 entries, 0 to 262
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype   
---  ------        --------------  -----   
 0   zipcode       263 non-null    object  
 1   neighborhood  263 non-null    object  
 2   state         263 non-null    object  
 3   county        263 non-null    object  
 4   geometry      263 non-null    geometry
dtypes: geometry(1), object(4)
memory usage: 10.4+ KB


Unnamed: 0,zipcode,neighborhood,state,county,geometry
0,11436,Jamaica,NY,Queens,"POLYGON ((-73.80585 40.68291, -73.80569 40.682..."
1,11213,Brooklyn,NY,Kings,"POLYGON ((-73.93740 40.67973, -73.93487 40.679..."
2,11212,Brooklyn,NY,Kings,"POLYGON ((-73.90294 40.67084, -73.90223 40.668..."
3,11225,Brooklyn,NY,Kings,"POLYGON ((-73.95797 40.67066, -73.95576 40.670..."
4,11218,Brooklyn,NY,Kings,"POLYGON ((-73.97208 40.65060, -73.97192 40.650..."


In [11]:
geodf_311_data.info()
geodf_311_data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6199520 entries, 0 to 6199519
Data columns (total 5 columns):
 #   Column          Dtype   
---  ------          -----   
 0   unique_key      object  
 1   created_date    object  
 2   complaint_type  object  
 3   incident_zip    object  
 4   geometry        geometry
dtypes: geometry(1), object(4)
memory usage: 236.5+ MB


Unnamed: 0,unique_key,created_date,complaint_type,incident_zip,geometry
0,59681385,2023-12-09T12:00:00.000,Derelict Vehicles,11222,POINT (-73.94549 40.71914)
1,59682706,2023-12-09T12:00:00.000,Derelict Vehicles,11412,POINT (-73.75719 40.69898)
2,59683999,2023-12-09T12:00:00.000,Derelict Vehicles,11357,POINT (-73.82518 40.77956)
3,59681790,2023-12-09T02:41:46.000,Graffiti,10032,POINT (-73.94337 40.83670)
4,59684401,2023-12-09T02:06:35.000,Graffiti,11211,POINT (-73.95151 40.71341)


In [12]:
geodf_tree_data.info()
geodf_tree_data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 683788 entries, 0 to 683787
Data columns (total 6 columns):
 #   Column      Non-Null Count   Dtype 
---  ------      --------------   ----- 
 0   tree_id     683788 non-null  object
 1   spc_common  652169 non-null  object
 2   zipcode     683788 non-null  object
 3   status      683788 non-null  object
 4   health      652172 non-null  object
 5   geometry    683788 non-null  object
dtypes: object(6)
memory usage: 31.3+ MB


Unnamed: 0,tree_id,spc_common,zipcode,status,health,geometry
0,180683,red maple,11375,Alive,Fair,POINT (-73.84421521958048 40.723091773924274)
1,200540,pin oak,11357,Alive,Fair,POINT (-73.81867945834878 40.79411066708779)
2,204026,honeylocust,11211,Alive,Good,POINT (-73.93660770459083 40.717580740099116)
3,204337,honeylocust,11211,Alive,Good,POINT (-73.93445615919741 40.713537494833226)
4,189565,American linden,11215,Alive,Good,POINT (-73.97597938483258 40.66677775537875)


In [13]:
df_zillow_data.info()
df_zillow_data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 705810 entries, 0 to 705809
Data columns (total 4 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   zipcode        705810 non-null  int64  
 1   state          705810 non-null  object 
 2   date           705810 non-null  object 
 3   average_price  250167 non-null  float64
dtypes: float64(1), int64(1), object(2)
memory usage: 21.5+ MB


Unnamed: 0,zipcode,state,date,average_price
0,77494,TX,2015-01-31,1606.206406
1,77494,TX,2015-02-28,1612.779844
2,77494,TX,2015-03-31,1622.201575
3,77494,TX,2015-04-30,1630.392427
4,77494,TX,2015-05-31,1632.4115


## Part 2: Storing Data

Create database and enable PostGIS extension using command-line

In [14]:
os.environ['PGPASSWORD'] = 'postgres'
!createdb -U postgres apartment
!psql -U postgres --dbname apartment -c "CREATE EXTENSION postgis;"

CREATE EXTENSION


### Creating Tables

In [15]:
engine = db.create_engine(DB_URL)

Define table model using SQLAlchemy

In [16]:
Base = declarative_base()


class Tree(Base):
    '''table Tree'''
    
    __tablename__ = "tree"

    tree_id = db.Column(pg.INTEGER, autoincrement=False, primary_key=True)
    spc_common = db.Column(pg.TEXT, nullable=False)
    zipcode = db.Column(pg.INTEGER, index=True, nullable=False)
    status = db.Column(pg.TEXT, nullable=False)
    health = db.Column(pg.TEXT, nullable=False)
    geometry = db.Column(Geometry(geometry_type='POINT',
                         srid='4326'), index=True, nullable=False)


class Complaint(Base):
    '''table Complaint'''

    __tablename__ = 'complaint'

    unique_key = db.Column(pg.INTEGER, index=True,
                           autoincrement=False, primary_key=True)
    created_date = db.Column(pg.TIMESTAMP, index=True, nullable=False)
    complaint_type = db.Column(pg.TEXT, index=True, nullable=False)
    zipcode = db.Column(pg.INTEGER, index=True, nullable=True)
    geometry = db.Column(Geometry(geometry_type='POINT',
                         srid='4326'), index=True, nullable=False)


class Zipcode(Base):
    '''table Zipcode'''

    __tablename__ = 'zipcode'

    unique_key = db.Column(pg.INTEGER, autoincrement=True, primary_key=True)
    zipcode = db.Column(pg.INTEGER, index=True)
    neighborhood = db.Column(pg.TEXT, nullable=False)
    state = db.Column(pg.TEXT, nullable=False)
    county = db.Column(pg.TEXT, nullable=False)
    geometry = db.Column(Geometry(geometry_type='POLYGON',
                         srid='4326'), index=False, nullable=False)


class Rent(Base):
    '''table Rent'''

    __tablename__ = 'rent'

    unique_key = db.Column(pg.INTEGER, autoincrement=True, primary_key=True)
    zipcode = db.Column(pg.INTEGER, nullable=False)
    state = db.Column(pg.TEXT, nullable=False)
    date = db.Column(pg.DATE, nullable=False)
    average_price = db.Column(pg.FLOAT, nullable=True)

Create tables

In [17]:
Base.metadata.create_all(engine)

### Add Data to Database
#### Option 2: SQLAlchemy

Setup database session

In [18]:
Session = db.orm.sessionmaker(bind=engine)
session = Session()
session.commit()

Insert dataframe's data to database

In [19]:
print('Will insert to table: tree. Rows to insert:', len(geodf_tree_data.index))

gis_geodf_tree_data = gpd.GeoDataFrame(geodf_tree_data, crs='epsg:4326')

gis_geodf_tree_data.to_postgis(
    name='tree', if_exists='replace', con=engine, dtype={'zipcode': pg.INTEGER})

print('Done.')

Will insert to table: tree. Rows to insert: 683788
Done.


In [20]:
geodf_311_data = geodf_311_data[geodf_311_data['incident_zip'].notna()]

geodf_311_data['zipcode'] = geodf_311_data['incident_zip']


geodf_311_data = geodf_311_data.drop('incident_zip', axis=1)

print('Will insert to table: complaint. Rows to insert:', len(geodf_311_data.index))

Will insert to table: complaint. Rows to insert: 6199520


In [21]:
def check_int(value):
    '''Check if value is int, if so, return value, otherwise return -1'''

    try:

        int(value)

    except Exception:

        return -1
    return value

geodf_311_data['zipcode'] = geodf_311_data['zipcode'].apply(check_int)

In [23]:
gis = gpd.GeoDataFrame(geodf_311_data, crs='epsg:4326')

gis.to_postgis(name='complaint', if_exists='replace', con=engine, dtype={
               'unique_key': pg.INTEGER, 'created_date': pg.TIMESTAMP, 'zipcode': pg.INTEGER})

print('Done.')

Done.


In [24]:
for index, row in geodf_zipcode_data.iterrows():
    zipcode = Zipcode(zipcode=row['zipcode'], neighborhood=row['neighborhood'], state=row['state'],
                      county=row['county'], geometry=from_shape(row['geometry'], srid=4326))
    session.add(zipcode)
try:
    session.commit()
except Exception as sql_exception:
    session.rollback()
    print()
    print(sql_exception, ':')
    print(row)
    print()

In [25]:
print('Will insert to table: rent. Rows to insert:', len(df_zillow_data.index))

df_zillow_data.to_sql(name='rent', if_exists='replace', con=engine, dtype={
                      'unique_key': pg.INTEGER, 'zipcode': pg.INTEGER, 'date': pg.DATE, 'average_price': pg.FLOAT})

print('Done.')

Will insert to table: rent. Rows to insert: 705810
Done.


## Part 3: Understanding the Data