# Course project for dataquest.io course "PostgreSQL For Data Engineering"

We are building a PostgreSQL database for Boston crimes dataset.
Project schema: https://dq-content.s3.amazonaws.com/250/goal.png

We are using Python with psycopg2 module for the SQL connection

Database name: crimes_db

Table: boston_crimes

Schema: crimes

User groups: readonly, readwrite

Users: data_analyst (readonly), data_scientist (readwrite)

In [None]:
# Creating the new database

import psycopg2
# Connecting to existing db to create a new one
conn = psycopg2.connect(dbname="dq", user="dq")
# Setting autocommit to True as this is required for creating databases
conn.autocommit = True
cur = conn.cursor()

cur.execute("CREATE DATABASE crime_db;")
conn.autocommit = False
conn.close()

In [1]:
# Connecting to new crime_db
import psycopg2
conn = psycopg2.connect(dbname="crime_db", user="dq")
cur = conn.cursor()

In [None]:
# Creating crimes schema
cur.execute("CREATE SCHEMA crimes;")

In [2]:
# Opening and reading the dataset file

import csv
with open('boston.csv') as file:
    reader = csv.reader(file)
    col_headers = next(reader)
    first_row = next(reader)

In [3]:
# Creating a function get_col_set to fetch a column and it's unique values

def get_col_set(csv_file, col_index):
    """
    Parameters: 
        string, csv filename
        int, index of the column wanted
    Return values:
        set, contains all distinct values from the column
    
    """
    import csv
    values = set()
    with open(csv_file, 'r') as f:
        next(f)
        reader = csv.reader(f)
        for row in reader:
            values.add(row[col_index])
    return values

# Printing all column headers and the number of unique values in that column
for i in range(len(col_headers)):
    values = get_col_set("boston.csv", i)
    print(col_headers[i], len(values), sep='\t')

incident_number	298329
offense_code	219
description	239
date	1177
day_of_the_week	7
lat	18177
long	18177


In [4]:
# Printing headers to find out index of 'description' column
print(col_headers)

['incident_number', 'offense_code', 'description', 'date', 'day_of_the_week', 'lat', 'long']


In [5]:
# Descriptions are in index 2
descriptions = get_col_set("boston.csv", 2)

# Finding out the maximum value length in description column
max_len = 0
for description in descriptions:
    max_len = max(max_len, len(description))
print(max_len)

58


In [6]:
# Finding out the datatypes of the column values
print(col_headers)
print(first_row)

['incident_number', 'offense_code', 'description', 'date', 'day_of_the_week', 'lat', 'long']
['1', '619', 'LARCENY ALL OTHERS', '2018-09-02', 'Sunday', '42.35779134', '-71.13937053']


From above we can define appropriate Postgres datatypes for our table:     
     
incident_number INTEGER

offense_code INTEGER

description VARCHAR(100)

date DATE

day_of_the_week WEEKDAY

lat DECIMAL

long DECIMAL

For day of the week we chose to create an enum datatype, because it always has a predifened value of a weekday.

In [None]:
# Creating enum datatype for the weekdays
cur.execute("""
    CREATE TYPE weekday AS ENUM ('Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday');
""")

# Creating the table
cur.execute("""
    CREATE TABLE crimes.boston_crimes (
        incident_number INTEGER PRIMARY KEY,
        offense_code INTEGER,
        description VARCHAR(100),
        date DATE,
        day_of_the_week WEEKDAY,
        lat DECIMAL,
        long DECIMAL
        );"""
    )

In [8]:
# Loading the data from the boston.csv file to the table boston_crimes

with open("boston.csv") as f:
    cur.copy_expert("COPY crimes.boston_crimes FROM STDIN WITH CSV HEADER;", f)
cur.execute("SELECT * FROM crimes.boston_crimes")

# Printing the number of rows to ensure that they were loaded
print(len(cur.fetchall()))

298329


In [None]:
# Revoking all privileges for public user group

cur.execute("REVOKE ALL ON SCHEMA public FROM public;")
cur.execute("REVOKE ALL ON DATABASE crime_db FROM public;")

In [None]:
# Creating the readonly user group and granting privileges

cur.execute("CREATE GROUP readonly NOLOGIN;")
cur.execute("GRANT CONNECT ON DATABASE crime_db TO readonly;")
cur.execute("GRANT USAGE ON SCHEMA crimes TO readonly;")
cur.execute("GRANT SELECT ON ALL TABLES IN SCHEMA crimes TO readonly;")

In [None]:
# Creating the readwrite user group and granting privileges

cur.execute("CREATE GROUP readwrite NOLOGIN;")
cur.execute("GRANT CONNECT ON DATABASE crime_db TO readwrite;")
cur.execute("GRANT USAGE ON SCHEMA crimes TO readwrite;")
cur.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA crimes TO readwrite;")

In [None]:
# Creating the users data_analyst and data_scientist and granting their user groups

cur.execute("CREATE USER data_analyst WITH PASSWORD 'secret1';")
cur.execute("GRANT readonly TO data_analyst;")

cur.execute("CREATE USER data_scientist WITH PASSWORD 'secret2';")
cur.execute("GRANT readwrite TO data_scientist;")

In [9]:
# Closing the old connection to test with a new connection
conn.close()

conn = psycopg2.connect(dbname="crime_db", user="dq")
cur = conn.cursor()
# Checking users and groups
cur.execute("""
    SELECT rolname, rolsuper, rolcreaterole, rolcreatedb, rolcanlogin FROM pg_roles
    WHERE rolname IN ('readonly', 'readwrite', 'data_analyst', 'data_scientist');
""")
for user in cur:
    print(user)
print()

# Checking privileges
cur.execute("""
    SELECT grantee, privilege_type
    FROM information_schema.table_privileges
    WHERE grantee IN ('readonly', 'readwrite');
""")

for user in cur:
    print(user)
conn.close()

('readonly', False, False, False, False)
('data_analyst', False, False, False, True)
('data_scientist', False, False, False, True)
('readwrite', False, False, False, False)

('readonly', 'SELECT')
('readwrite', 'INSERT')
('readwrite', 'SELECT')
('readwrite', 'UPDATE')
('readwrite', 'DELETE')
