# Final Project Part I

In this final project, you will work through the Database Design, ETL and Analytics phases using a multii-year Chicago crime data source. This final project is divided into two parts.  Part I fill focus on the database design and ETL.  Part II will focus on the analytics.

In this Part I you will conduct the following tasks:

1. Reverse engineer an existing sourse RDBMS using metadata SQL queries to identify the table and attribute details necessary for creating tables and an entity-relationship diagram depecting the database logical structure. The source data is an SQLite database.
2. Implement a set of tables using DDL in your SSO dsa_student database schema on the postgres server that replicates the source database structure.
3. Create an Entity Relationship Diagram.
4. Establish connections to the source and destination databases.
5. Extract the source data from tables, Transform values as required and Load into the destination tables.
6. Validate the ETL process by confirming row counts in both source and destination tables.


Specific resourses and steps are listed below:

## Source SQLite Database

* Dataset URL: **/dsa/data/DSA-7030/cc0122dbv2.sqlite.db**
* Data Dictionary: [pdf](./ChicagoData-Description.pdf)
* [Chicago Crimes 2001-Present Dashboard](https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present-Dashboard/5cd6-ry5g)

This SQLite database consists of a set of normalized relations populated with publically available Chicago crime data for the years 2001 to 2022.  

## Database exploration

The cells below provide SQL DML statments for examining the underlying metadata in the SQLite database that describes the table, column, and relationship details.  An initial connection and subsequent SQL statements are provided for acquiring the information necessary for reconstructing the table and relational structure in your postgres SSO database.

In [None]:
#Load extention and connect to database
%load_ext sql
%sql sqlite:////dsa/data/DSA-7030/cc0122dbv2.sqlite.db

## Explore the SQLite Tables List

This quiery simply lists the names of the database tables.

In [None]:
%%sql
SELECT distinct m.type, m.tbl_name --m.sql
FROM sqlite_master AS m,
     pragma_table_info(m.name) AS t
WHERE m.type = 'table'
order by m.name, t.pk DESC

## Explore Column Details

The query below provdes the complete list of tables and their columns with important details.

* **tbl_name** = Name of the table
* **name** = column name
* **type** = declared data type
* **notnull** = indicates column declared as NOT NULL
* **pk** = indicates column is the primary key

In [None]:
%%sql 
SELECT m.tbl_name, t.* --m.sql
 FROM pragma_table_info(m.tbl_name) t, sqlite_master m WHERE m.type='table';

## Below query provdes the list of columns that are declared "unique" for referential integrity enforcement.

<u>Query Output Descriptions</u>
* **name** = the table name begining at the "cc_" -- cc_case_location is table name.
* **unique** = indicates the column is declared "unique"
* **origin** = indicates the columns is declared as primary key
* **name_1** = column name

In [None]:
%%sql 
select il.*,ii.* --,m.sql
    from sqlite_master m, 
    pragma_index_list( m.name ) as il,
    pragma_index_info(il.name) as ii

## Explore Relationship Details (get foreign key references)

The below query exracts the details describing the foreign key referenes bewtween tables.

* **from_table** = the name of the one-side table
* **from_column** = the name of the foreign key column in the one-side table
* **to_table** = the name of the many-side reference table
* **to_column** = the name of the foreign key column in the one-side reference table

These metadata can be translated to the necessary SQL statement to establish a relationship between tables:

```SQL
FOREIGN KEY (<from_column>) REFERENCES <to_table>(<to_column>)
```

In [None]:
%%sql
SELECT 
    m.name as from_table, f.'from' as from_column, f.'table' as to_table, f.'to' as to_column --, m.sql
FROM
    sqlite_master m
    JOIN pragma_foreign_key_list(m.name
                                ) f ON m.name != f."table"
WHERE m.type = 'table'
ORDER BY m.name
;

## Using the metadata from above:

## Implement the required CREATE TABLE statements for establishing the Chicago Crime Database in your SSO dsa_student database.  

The SQL statement takes this form:

```SQL
CREATE TABLE SSO.tbl_name (
 column_name_1 data_type <unqiue, not null>,
 column_name_N data_type <unqiue, not null>,
 PRIMARY KEY (<column_name>),
 <FOREIGN KEY (from_column_name) REFERENCES <SSO.to_table_name>(to_column_name)
 );
```

**The database tables and column names created in your SSO postgres server dsa_student database should be named exactly as they appear in the ```cc0122dbv2.sqlite.db``` SQLite database.**

Use as many cells as desired.

In [2]:
#sql create table statements

import psycopg2
import getpass

In [3]:
database = input('Type database name and hit enter :: ')
user = input('Type username and hit enter :: ')
password = getpass.getpass('Type Password and hit enter :: ')

Type database name and hit enter :: dsa_student
Type username and hit enter :: twm7kt
Type Password and hit enter :: ········


In [41]:
connection = psycopg2.connect(database = database, 
                              user = user, 
                              host = 'pgsql.dsa.lan',
                              password = password)

In [4]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_case_location (
    case_number varchar(20) UNIQUE,
    block varchar(100),
    location_description varchar(100),
    community_area integer,
    ward integer,
    district integer,
    beat integer, 
    latitude real,
    longitude real,
    PRIMARY KEY (case_number)
 );"""
    )

In [10]:

with connection, connection.cursor() as cursor:
    cursor.execute(
        """CREATE TABLE twm7kt.cc_nibrs_offenses_crimes_aginst (
    nibrs_crime_against varchar(20),
    nibrs_offense_code varchar(10) NOT NULL,
    PRIMARY KEY (nibrs_crime_against, nibrs_offense_code),
    FOREIGN KEY (nibrs_crime_against) REFERENCES twm7kt.cc_nibrs_crimes_against(nibrs_crime_against),
    FOREIGN KEY (nibrs_offense_code) REFERENCES twm7kt.cc_nibrs_fbicode_offenses(nibrs_offense_code)
 );"""
    )

In [44]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_iucr_codes_primary_descriptions (
    iucr_code varchar(10) UNIQUE,
    iucr_primary_desc varchar(100),
    PRIMARY KEY (iucr_code),
    FOREIGN KEY (iucr_code) REFERENCES twm7kt.cc_iucr_codes(iucr_code)
 );"""
    )

In [43]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_iucr_codes_secondary_descriptions (
    iucr_code varchar(10) UNIQUE,
    iucr_secondary_desc varchar(100),
    PRIMARY KEY (iucr_code),
    FOREIGN KEY (iucr_code) REFERENCES twm7kt.cc_iucr_codes(iucr_code)
 );"""
    )

In [9]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_nibrs_crimes_against (
    nibrs_crime_against varchar(20) NOT NULL,
    PRIMARY KEY (nibrs_crime_against)
 );"""
    )

In [6]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_nibrs_fbicode_offenses (
    nibrs_offense_code varchar(10) UNIQUE NOT NULL,
    nibrs_offense_name varchar(100) NOT NULL,
    PRIMARY KEY (nibrs_offense_code)
 );"""
    )

In [7]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_nibrs_categories (
    nibrs_offense_code varchar(10) UNIQUE NOT NULL,
    nibrs_offense_category_name varchar(50),
    PRIMARY KEY (nibrs_offense_code),
    FOREIGN KEY (nibrs_offense_code) REFERENCES twm7kt.cc_nibrs_fbicode_offenses(nibrs_offense_code)
 );"""
    )

In [42]:
with connection, connection.cursor() as cursor:
    cursor.execute(
"""CREATE TABLE twm7kt.cc_iucr_codes (
    iucr_code varchar(10) UNIQUE,
    iucr_index_code char,
    PRIMARY KEY (iucr_code)
 );"""
    )

In [45]:
with connection, connection.cursor() as cursor:
    cursor.execute(

"""CREATE TABLE twm7kt.cc_cases (
    case_number varchar(20) UNIQUE,
    incident_date timestamp,
    iucr_code varchar(10),
    nibrs_fbi_offense_code varchar(10),
    arrest boolean,
    domestic boolean,
    updated_on timestamp,
    PRIMARY KEY (case_number),
    FOREIGN KEY (case_number) REFERENCES twm7kt.cc_case_location(case_number),
    FOREIGN KEY (iucr_code) REFERENCES twm7kt.cc_iucr_codes(iucr_code),
    FOREIGN KEY (nibrs_fbi_offense_code) REFERENCES twm7kt.cc_nibrs_fbicode_offenses(nibrs_offense_code)
 );"""
    )

In [None]:
connection.commit()
connection.close()
cursor.close()

# Connect to your SSO database using sqlAlchmey connection and implement your database structure

In [None]:
#implement tables in SSO database


# Did all in above section. 

## Construct and embed your Entity Relationship Diagram

Upload your ERD image to the "final_project" folder and update the markdown below to display it here:

![ERD-HERE](FinalERD.png)


# Perform the ETL of the source data to your SSO dsa_student Chicago Crime Database

* Establish a connection to the the SQLite source database using sqlAlchemy
* Establish a connection to your SSQ dsa_student postgres server destination database using sqlAlchemy
* Peform ETL of the source data tables to the destination data tables incrementally.
  * You may want to consider using pandas as the medium between the two databases 
     * it can easliy read sql table data
     * hold data in a data frame
     * make any necessary transformations to data values
     * write to sql table data
    

In [None]:
# ETL Here


%load_ext sql
%sql sqlite:////dsa/data/DSA-7030/cc0122dbv2.sqlite.db

In [3]:
import pandas as pd
import sqlalchemy

In [4]:
database = 'dsa_student'

In [5]:
username = 'twm7kt'

In [6]:
password = getpass.getpass("Type Password and hit enter:: ")

Type Password and hit enter:: ········


In [7]:
connectionstring = "sqlite:////dsa/data/DSA-7030/cc0122dbv2.sqlite.db"

In [8]:
connectionstring_p = 'postgresql://'+username+':'+password+'@pgsql.dsa.lan/'+database

In [9]:
lite_engine = sqlalchemy.create_engine(connectionstring)
p_engine = sqlalchemy.create_engine(connectionstring_p)



In [48]:
# cc_iucr_codes_primary_descriptions

with lite_engine.connect() as connection:
    primary = pd.read_sql_query("SELECT * FROM cc_iucr_codes_primary_descriptions", connection)

    
primary.to_sql('cc_iucr_codes_primary_descriptions', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time


In [26]:
# cc_nibrs_crimes_against

with lite_engine.connect() as connection:
    nca = pd.read_sql_query("SELECT * FROM cc_nibrs_crimes_against", connection)

    
nca.to_sql('cc_nibrs_crimes_against', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time


In [28]:
# cc_nibrs_fbicode_offenses

with lite_engine.connect() as connection:
    nfo = pd.read_sql_query("SELECT * FROM cc_nibrs_fbicode_offenses", connection)

    
nfo.to_sql('cc_nibrs_fbicode_offenses', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time


In [47]:
# cc_iucr_codes_secondary_descriptions

with lite_engine.connect() as connection:
    csd = pd.read_sql_query("SELECT * FROM cc_iucr_codes_secondary_descriptions", connection)

    
csd.to_sql('cc_iucr_codes_secondary_descriptions', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time


In [33]:
# cc_nibrs_categories

with lite_engine.connect() as connection:
    nc = pd.read_sql_query("SELECT * FROM cc_nibrs_categories", connection)

    
nc.to_sql('cc_nibrs_categories', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time


In [11]:
# cc_nibrs_offenses_crimes_aginst

with lite_engine.connect() as connection:
    nc = pd.read_sql_query("SELECT * FROM cc_nibrs_offenses_crimes_aginst", connection)

    
nc.to_sql('cc_nibrs_offenses_crimes_aginst', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time

IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "cc_nibrs_offenses_crimes_aginst_pkey"
DETAIL:  Key (nibrs_crime_against, nibrs_offense_code)=(Property, 23*) already exists.

[SQL: INSERT INTO twm7kt.cc_nibrs_offenses_crimes_aginst (nibrs_crime_against, nibrs_offense_code) VALUES (%(nibrs_crime_against)s, %(nibrs_offense_code)s)]
[parameters: ({'nibrs_crime_against': 'Property', 'nibrs_offense_code': '23*'}, {'nibrs_crime_against': 'Not a Crime', 'nibrs_offense_code': '09C'}, {'nibrs_crime_against': 'Property', 'nibrs_offense_code': '26A'}, {'nibrs_crime_against': 'Person', 'nibrs_offense_code': '36B'}, {'nibrs_crime_against': 'Person', 'nibrs_offense_code': '11C'}, {'nibrs_crime_against': 'Property', 'nibrs_offense_code': '290'}, {'nibrs_crime_against': 'Person', 'nibrs_offense_code': '90F'}, {'nibrs_crime_against': 'Property', 'nibrs_offense_code': '23G'}  ... displaying 10 of 64 total bound parameter sets ...  {'nibrs_crime_against': 'Property', 'nibrs_offense_code': '26G'}, {'nibrs_crime_against': 'Society', 'nibrs_offense_code': '720'})]
(Background on this error at: http://sqlalche.me/e/gkpj)

In [46]:
# cc_iucr_codes

with lite_engine.connect() as connection:
    nc = pd.read_sql_query("SELECT * FROM cc_iucr_codes", connection)

    
nc.to_sql('cc_iucr_codes', # The table to load
          p_engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Recall that panda data frame has a row index, so we need to ignore it
          chunksize=50)       # Do 50 records from the data frame at a time


In [12]:
# cc_case_location

with lite_engine.connect() as connection:
    for cl_chunk in pd.read_sql_query("SELECT * FROM cc_case_location", connection, chunksize = 10000):
        cl_chunk.to_sql('cc_case_location', # The table to load
            p_engine,             # The engine created above
            schema= username,   # The schema where the table lives, our pawprint
            if_exists='append', # If the table is found, it would keep loading the end of table.
            index=False,        # Recall that panda data frame has a row index, so we need to ignore it
            chunksize=1000)       # Do 50 records from the data frame at a time

In [10]:
with lite_engine.connect() as connection:
    test = pd.read_sql_query("SELECT * FROM cc_cases LIMIT 100;", connection)

test.head()

Unnamed: 0,case_number,incident_date,iucr_code,nibrs_fbi_offense_code,arrest,domestic,updated_on
0,HY411648,09/05/2015 01:30:00 PM,486,08B,0,1,02/10/2018 03:50:01 PM
1,HY411615,09/04/2015 11:30:00 AM,870,06,0,0,02/10/2018 03:50:01 PM
2,JC213529,09/01/2018 12:01:00 AM,810,06,0,1,04/06/2019 04:04:43 PM
3,HY411595,09/05/2015 12:45:00 PM,2023,18,1,0,02/10/2018 03:50:01 PM
4,HY411610,09/05/2015 01:00:00 PM,560,08A,0,1,02/10/2018 03:50:01 PM


In [12]:
print(test.dtypes)
test = test.astype({'arrest': bool, 'domestic': bool})
print(test.dtypes)

case_number               object
incident_date             object
iucr_code                 object
nibrs_fbi_offense_code    object
arrest                     int64
domestic                   int64
updated_on                object
dtype: object
case_number               object
incident_date             object
iucr_code                 object
nibrs_fbi_offense_code    object
arrest                      bool
domestic                    bool
updated_on                object
dtype: object


In [13]:
# cc_cases

with lite_engine.connect() as connection:
    for c_chunk in pd.read_sql_query("SELECT * FROM cc_cases", connection, chunksize = 10000):
        c_chunk = c_chunk.astype({'arrest': bool, 'domestic': bool})
        c_chunk.to_sql('cc_cases', # The table to load
            p_engine,             # The engine created above
            schema= username,   # The schema where the table lives, our pawprint
            if_exists='append', # If the table is found, it would keep loading the end of table.
            index=False,        # Recall that panda data frame has a row index, so we need to ignore it
            chunksize=1000)       # Do 50 records from the data frame at a time

# Execute SQL DML commands to confirm the table record counts for the destination database tables are consistent with the source database table record counts

In [None]:
# Confirm counts here

In [49]:
# cc_iucr_codes_primary_descriptions

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_iucr_codes_primary_descriptions", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_iucr_codes_primary_descriptions", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0       401
postgress count:     count
0    401


In [50]:
# cc_nibrs_crimes_against

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_crimes_against", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_crimes_against", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0         4
postgress count:     count
0      4


In [51]:
# cc_nibrs_fbicode_offenses

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_fbicode_offenses", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_fbicode_offenses", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0        90
postgress count:     count
0     90


In [52]:
# cc_iucr_codes_secondary_descriptions

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_iucr_codes_secondary_descriptions", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_iucr_codes_secondary_descriptions", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0       401
postgress count:     count
0    401


In [53]:
# cc_nibrs_categories

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_categories", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_categories", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0        90
postgress count:     count
0     90


In [54]:
# cc_nibrs_offenses_crimes_aginst

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_offenses_crimes_aginst", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_nibrs_offenses_crimes_aginst", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0        64
postgress count:     count
0     64


In [56]:
# cc_iucr_codes

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_iucr_codes", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_iucr_codes", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0       520
postgress count:     count
0    520


In [13]:
# cc_case_location

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_case_location", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_case_location", connection)
    
print('postgress count: ',p2)

sqlite count:     COUNT(*)
0   7676541
postgress count:       count
0  7676541


In [14]:
# cc_cases

with lite_engine.connect() as connection:
    p1 = pd.read_sql_query("SELECT COUNT(*) FROM cc_cases", connection)
print('sqlite count: ',p1)

with p_engine.connect() as connection:
    p2 = pd.read_sql_query("SELECT COUNT(*) FROM cc_cases", connection)

print('postgress count: ',p2)

sqlite count:     COUNT(*)
0   7676541
postgress count:       count
0  7676541


## This is the end of Part 1 of the Final Project 
### Part 2 will be deployed in Module 8.

# Save your notebook, then `File > Close and Halt`