# School Shootings US and Demographics data in a Data Warehouse
### Data Engineering Capstone Project

#### Project Summary
Main project goal is to builds ETL pipe line that user 3 diffrent datasets and fit data in star-schema model. Then data will be used by data analytics to answer buissnes questions, or answer some historical questions.
Task to do is about join 3 datasets which contain data about "US Cities: Demographics", "School Shootings US 1990-present" and "FBI NICS Firearm Background Check Data".

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 

Project will merge 3 diffrent CSV data sets to a single dimensional model in data warehouse data sientists. I've chosen Apache Spark for exploration and cleaning data, and AWS S3 as storage service.
Next for data warehouse purpose I choose AWS Redshift. In Redshift data will be staged and transformed to data model. 
Full data model can answer questions about School Shootings in correlation of US cities demographics and number of firearm checks by month and state.

To do a little bit more complicated, after Apache Spark part, one set of data is stored in PARQUET format.

##### Describe and Gather Data 


##### US Cities: Demographics

us-cities-demographics.csv https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/ 

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 

This data comes from the US Census Bureau's 2015 American Community Survey.

* City
* State
* Median Age
* Male Population
* Female Population
* Total Population
* Number of Veterans
* Foreign-born
* Average Household Size
* State Code
* Race
* Count

Ex:
Row(City='Los Angeles', State='California', Median Age='35.0', Male Population='1958998', Female Population='2012898', Total Population='3971896', Number of Veterans='85417', Foreign-born='1485425', Average Household Size='2.86', State Code='CA', Race='White', Count='2177650')

#####  School Shootings US 1990-present 

pah_wikp_combo.csv - https://www.kaggle.com/ecodan/school-shootings-us-1990present#pah_wikp_combo.csv

A list of all school shooting incidents from 1990 to present.

* Date: date of incident
* City: location of incident
* State: location of incident
* Area Type: urban or suburban (only in Pah dataset)
* School: C = college, HS = high school, MS = middle school, ES = elementary school, - = unknown
* Fatalities: # killed
* Wounded: # wounded (only in Wikipedia dataset)
* Dupe: whether this incident appears in both datasets. Note: only the "Pah" version of the incident is marked.
* Source: Pah or Wikp
* Desc: text description of incident (only in Wikipedia dataset)

Ex:
Row(Date='3/27/90', City='Brooklyn', State='New York', AreaType=None, School='C', Fatalities='0', Wounded='1', Dupe=None, Source='Wikp', Desc='A black youth was taunted with racial slurs by three white youths in the stairwell of a public school in the¬†Bensonhurst¬†area of¬†Brooklyn. The 14-year-old was then shot and slightly wounded, because he had acted as peacemaker when the same boys had clashed with another black teen the month before.[240]')

##### FBI NICS Firearm Background Check Data

nics-firearm-background-checks.csv - https://github.com/BuzzFeedNews/nics-firearm-background-checks

The data in this repository comes from the FBI's National Instant Criminal Background Check System. The FBI provides data on the number of firearm checks by month, state, and type.

* month
* state
* permit
* permit_recheck
* handgun
* long_gun
* other
* multiple
* admin
* prepawn_handgun
* prepawn_long_gun
* prepawn_other
* redemption_handgun
* redemption_long_gun
* redemption_other
* returned_handgun
* returned_long_gun
* returned_other
* rentals_handgun
* rentals_long_gun
* private_sale_handgun
* private_sale_long_gun
* private_sale_other
* return_to_seller_handgun
* return_to_seller_long_gun
* return_to_seller_other
* totals

Ex:
Row(month='2020-03', state='Alabama', permit='31205', permit_recheck='606', handgun='34897', long_gun='17850', other='1583', multiple='1744', admin='0', prepawn_handgun='36', prepawn_long_gun='23', prepawn_other='0', redemption_handgun='3035', redemption_long_gun='1564', redemption_other='19', returned_handgun='13', returned_long_gun='0', returned_other='0', rentals_handgun='0', rentals_long_gun='0', private_sale_handgun='42', private_sale_long_gun='23', private_sale_other='8', return_to_seller_handgun='2', return_to_seller_long_gun='2', return_to_seller_other='0', totals='92652')

### Step 2: Explore and Assess the Data
#### Explore the Data & Cleaning Steps

About exploration I need to see that events overlap each other in data sets.
Data quality is at a good level. Before load to Redshift, format column with dates of fact should be made consistent.

Data about "US Cities: Demographics" needs to be aggregated from "Race" to "City", and then from "City" to "State", to fit rest of data.
This step is don in Redshift in part from staging table to dimensian table.

In [None]:
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

config = configparser.ConfigParser()
config.read('dwh.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['CONFIG_SPARK']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['CONFIG_SPARK']['AWS_SECRET_ACCESS_KEY']

DEMOGRAPHICS_S3='s3a://udacity-capstone-input/us-cities-demographics.csv'
SHOOTINGS_S3='s3a://udacity-capstone-input/pah_wikp_combo.csv'
FIREARM_S3='s3a://udacity-capstone-input/nics-firearm-background-checks.csv'

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

df = spark.read.option('header', 'true').option("delimiter", ";").csv(DEMOGRAPHICS_S3)
df.createOrReplaceTempView("demographics_view")

df = spark.read.option('header', 'true').csv(FIREARM_S3)
df.createOrReplaceTempView("firearm_view")

df = spark.read.option('header', 'true').csv(SHOOTINGS_S3)
df.createOrReplaceTempView("shootings_view")


In [None]:
demographics_table = spark.sql("""
select 
*
from demographics_view
""")
demographics_table.show(5)

In [None]:
firearm_table = spark.sql("""
select 
year(month) as year_partition,
month(month) as month_partition,
*
from firearm_view where month like "2013-01" 
""")
firearm_table.show(5)

In [None]:
shootings_table = spark.sql("""
select 
date_format(to_date(Date,"MM/dd/yy"),"YYYY-MM") as new_date,
year(to_date(Date,"MM/dd/yy")) as year_partition,
month(to_date(Date,"MM/dd/yy")) as month_partition,
*
from shootings_view where date_format(to_date(Date,"MM/dd/yy"),"YYYY-MM") = "2013-01"
""")
shootings_table.show(5)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![image info](data_model.jpg)

#### 3.2 Mapping Out Data Pipelines
0. Fill credentials in dwh.cfg
1. Load CSV from S3 bucket into Apache Spark (local mode)
2. Make TempView, rename columns and cast date.
3. Save to S3 output bucket - CSV and PARQUET
4. Drop stage tables in Redshift 
5. Load data into stage and analyze table
6. Transform by SQL insert into fact and dimensions
7. Analyze target table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

print('--- Config START ---')

config = configparser.ConfigParser()
config.read('dwh.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['CONFIG_SPARK']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['CONFIG_SPARK']['AWS_SECRET_ACCESS_KEY']

DEMOGRAPHICS_S3='s3a://udacity-capstone-input/us-cities-demographics.csv'
SHOOTINGS_S3='s3a://udacity-capstone-input/pah_wikp_combo.csv'
FIREARM_S3='s3a://udacity-capstone-input/nics-firearm-background-checks.csv'

output_data='s3a://udacity-capstone-output2/'

print('--- Config DONE ---')

print('--- Spark session & file & view  START ---')

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

df = spark.read.option('header', 'true').option("delimiter", ";").csv(DEMOGRAPHICS_S3)
df.createOrReplaceTempView("demographics_view")

df = spark.read.option('header', 'true').csv(FIREARM_S3)
df.createOrReplaceTempView("firearm_view")

df = spark.read.option('header', 'true').csv(SHOOTINGS_S3)
df.createOrReplaceTempView("shootings_view")

print('--- Spark session & file & view DONE ---')

print('--- demographics_table START ---')

demographics_table = spark.sql("""
select 
*
from demographics_view
""")
demographics_rename_table=demographics_table \
.withColumnRenamed("Median Age","Median_Age") \
.withColumnRenamed("Male Population","Male_Population") \
.withColumnRenamed("Female Population","Female_Population") \
.withColumnRenamed("Total Population","Total_Population") \
.withColumnRenamed("Number of Veterans","Number_of_Veterans") \
.withColumnRenamed("Foreign-born","Foreign_born") \
.withColumnRenamed("Average Household Size","Average_Household_Size") \
.withColumnRenamed("State Code","State_Code") 

demographics_table=demographics_rename_table

print('--- demographics_table DONE ---')

print('--- firearm_table START ---')

firearm_table = spark.sql("""
select 
year(month) as year_partition,
month(month) as month_partition,
*
from firearm_view
""")

print('--- firearm_table DONE ---')

print('--- shootings_table START ---')

shootings_table = spark.sql("""
select 
date_format(to_date(Date,"MM/dd/yy"),"YYYY-MM") as new_date,
year(to_date(Date,"MM/dd/yy")) as year_partition,
month(to_date(Date,"MM/dd/yy")) as month_partition,
*
from shootings_view
""")

shootings_table = shootings_table.drop("Desc")

print('--- shootings_table DONE ---')

print('--- demographics_table write START ---')
demographics_table.write.mode('overwrite').csv(output_data + 'demographics_table/')
print('--- demographics_table write DONE ---')

print('--- firearm_table write START ---')
firearm_table.write.mode('overwrite').csv(output_data + 'firearm_table/')
print('--- firearm_table write DONE ---')

print('--- shootings_table write START ---')
shootings_table.write.mode('overwrite').parquet(output_data + 'shootings_table/')
print('--- shootings_table write DONE ---')

print(" --- ETL SPARK END ---")

In [None]:
import configparser
import psycopg2
from sql_queries import drop_table_queries, create_table_queries, copy_table_queries, analyze_stage_queries, create_dim_facts_table_queries, insert_dim_facts_table_queries, analyze_dim_facts_queries, analyze_dim_facts_queries

def drop_staging_tables(cur, conn):
    for query in drop_table_queries:
        print(query)
        cur.execute(query)
        conn.commit()

def create_tables(cur, conn):
    for query in create_table_queries:
        print(query)
        cur.execute(query)
        conn.commit()

def copy_tables(cur, conn):
    for query in copy_table_queries:
        print(query)
        cur.execute(query)
        conn.commit()
        
def analyze_stage(cur, conn):
    for query in analyze_stage_queries:
        print(query)
        cur.execute(query)
        conn.commit()

def create_dim_facts_table(cur, conn):
    for query in create_dim_facts_table_queries:
        print(query)
        cur.execute(query)
        conn.commit()
        
def insert_dim_facts_table(cur, conn):
    for query in insert_dim_facts_table_queries:
        print(query)
        cur.execute(query)
        conn.commit()
        
def analyze_dim(cur, conn):
    for query in analyze_dim_facts_queries:
        print(query)
        cur.execute(query)
        conn.commit()
        
        
config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

drop_staging_tables(cur, conn)
create_tables(cur, conn)
copy_tables(cur, conn)
analyze_stage(cur, conn)
create_dim_facts_table(cur, conn)
insert_dim_facts_table(cur, conn)
analyze_dim(cur, conn)

print(" --- ETL REDSHIFT END ---")


conn.close()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
import configparser
import psycopg2
import pandas as pd
from sql_queries import data_count_quality_check, data_state_unknown_quality_check

config = configparser.ConfigParser()
config.read('dwh.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

print('1. First quality check, number of records in model must be greater than 0.')
print('--- ---')
df = pd.read_sql_query(data_count_quality_check, conn)
print(df)
print('')
print('')
print('2. Second quality check, for state that is not in dim_state dimension. Should return Empty DataFrame.')
print('--- ---')
df = pd.read_sql_query(data_state_unknown_quality_check, conn)
print(df)

conn.commit()

#### 4.3 Data dictionary 

1. fact_shootings - table contains facts of school shooting incidents from 1990 to present
2. dim_time - dimension of time, cotains all data from shootings with extracted hour/day/week/month/year/weekday
3. dim_state - dimension with all possible state from shootings/firearm/demographics with new ID's
4. dim_firearm_statistic - dimension provides data on the number of firearm checks by month, state, and type
5. dim_demographics - dimension contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000

#### Example SQL with join all fact and dimensions, for ex. 2015:

```
select distinct dt.*,
                ss.*,
                dstate.*,
                dfs.*,
                dd.*
from fact_shootings ss
         left outer join dim_time dt
                         on ss.date = dt.date
         left outer join dim_state dstate
                         on ss.id_state = dstate.id
         left outer join dim_firearm_statistic dfs
                         on ss.id_state = dfs.ID_STATE and to_date(ss.date, 'yyyy-mm') = to_date(dfs.date, 'yyyy-mm')
         left outer join dim_demographics dd
                         on dstate.id = dd.ID_STATE
where to_char(ss.date, 'yyyy') = '2015'
;
```



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### Clearly state the rationale for the choice of tools and technologies for the project.
I chose Apache spark because it easy to use, elastic and scalable if I needed to (easy to spin up EMR cluster on AWS and change mode to cluster). Second tool is Redshift, I am familiar with databases and SQL so it was a natural choice, and its easy to provide data model further into BI world.

### Propose how often the data should be updated and why.

Model does not have to be updated, fact table and dimensions can be populated with new CSV data. Starschema automatically splits into dates

### The data was increased by 100x.

At Apache Spark part, I would choose to create EMR cluster on AWS and change mode to cluster.  Data was stored on S3, so it will very efficient. On Redshift part, just add new "worker nodes", or change to more powerfull. Table was created with DISTSTYLE and SORTKEY, so data will be rebalance on new nodes.

### The data populates a dashboard that must be updated on a daily basis by 7am every day.

I will be setup Apache Airflow to schedule DAG's to do updates.

### The database needed to be accessed by 100+ people.
It's Redshift, so add nodes and it will be autoscaling and increase read performance.
