# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import datetime as dt
from datetime import datetime, timedelta
from pyspark.sql import types as T
from pyspark.sql import functions as F
import pandas as pd
import os
import glob
import psycopg2
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import udf, mean, isnan, isnull, col, count, when



### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
# Read in the data here
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.load('./sas_data1')      ## Reading Immigration data
print("Completed, ran successfully")

Completed, ran successfully


In [3]:
TemperatureByCity = '../../data2/GlobalLandTemperaturesByCity.csv'  ## Reading Temperature data
df = pd.read_csv(TemperatureByCity)
print("Completed, ran successfully")

Completed, ran successfully


In [4]:
city_dem= 'https://public.opendatasoft.com/explore/dataset/us-cities-demographics/download'  ## Reading City Demographic data
city_dem_df = pd.read_csv(city_dem, sep=";")
print("Completed, ran successfully")

Completed, ran successfully


In [5]:
 df_spark.limit(5)

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

In [6]:
 df_spark.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1
2,5748519.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20582.0,...,,M,1987.0,10292016,M,,DL,94956410000.0,40,B1
3,5748520.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1987.0,10292016,F,,DL,94956450000.0,40,B1
4,5748521.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,WA,20588.0,...,,M,1988.0,10292016,M,,DL,94956390000.0,40,B1


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Immigration Data Exloration, Cleaning and Variable features selection for the Data Model
- This part of data exploration involved understanding distribution of each column and examining the data quality 
- Most importantly we will be performing uniqueness, duplication and missing check for the column choosen as PRIMARY KEY
- Check duplicate and volume of missing records in each column to decide if a column shold be dropped or retained
- For ths purpose of this project we will not be handling missing records by filling with applicable central tendency (mean, median, mode) as in the case of ML. so that we will not end up with misleading information for analytics purpose
- Finally, we assume a colume with more than 95% missing records will not be proving meaningful infromation, thus they will be dropped

In [7]:
# Performing cleaning tasks here
# Data exploration
df_spark.limit(5)

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string]

In [5]:
df_spark=df_spark.dropna(subset='i94addr') ## This will be our foreign column so we don't want to include missing or null values

In [6]:
df_spark.count()  #Total records in the immigration dataset  
## Note Null i94addr records have been dropped

2943721

In [10]:
df_spark.select('cicid').count()   # Total number of 'cicid' which are candidates unique IDs counted and compared with total datasets records

2943721

In [11]:
df_spark.select('cicid').distinct().count()    ## Total ('cicid') = Total (distinct ('cicid')) this means there is No duplicate key, no missing

2943721

In [12]:
df_spark.select([count(when(col('cicid').isNull(),True))]).toPandas()  ## Checking for null value, no missing values or null values
# cicid has no null value

Unnamed: 0,count(CASE WHEN (cicid IS NULL) THEN true END)
0,0


In [13]:
df_spark.select('cicid').dropna().count()  ## Dropping na values and counting - CICID are unique, and non-missing column shall be used as PRIMARY KEY

2943721

In [14]:
df_spark.select('i94yr').distinct().count()  ## This confirms that all the 3,096,313 records are in the same year (2016), 
                                              # so this column may be exluded in our data model to reduce the column numbers

1

In [15]:
df_spark.select('i94mon').distinct().count()  ##  This confirms that all the 3,096,313 are also in the same month
                                            # we can also exlude this column in our data model

1

In [16]:
df_spark.select('i94cit').distinct().count()

242

In [17]:
df_spark.select([count(when(col('i94cit').isNull(),True))]).toPandas()

Unnamed: 0,count(CASE WHEN (i94cit IS NULL) THEN true END)
0,0


In [18]:
df_spark.select('i94res').distinct().count()

229

In [19]:
df_spark.select('i94port').count()   ## Port of entry is an important information and no record is missing. We shall include it in the data model

2943721

In [20]:
df_spark.select('i94port').distinct().count() 

288

In [21]:
df_spark.select('arrdate').count()   ## Arrival Date is an important information, no record is missing

2943721

In [22]:
df_spark.select('arrdate').distinct().count() ## There are 30 category of arrival date, this confirms that all the records are within 1 month ( April of 2016)

30

In [23]:
df_spark.select('i94mode').count()  ## No records is missing, 

2943721

In [24]:
df_spark.select('i94mode').distinct().count()  ## There are 4 modes including invalid records

4

In [25]:
df_spark.select('depdate').count()    ##  this feature is also useful for demographic analysis

2943721

In [26]:
df_spark.select('depdate').distinct().count() ## This makes sense, as candidates do not depart at the same time based on different Visa status

219

In [27]:
df_spark.select([count(when(col('depdate').isNull(),True))]).toPandas()     ### number of null values

Unnamed: 0,count(CASE WHEN (depdate IS NULL) THEN true END)
0,125984


In [28]:
df_spark.select('i94addr').count()  ## Address shall be included and renamed as State_code

2943721

In [29]:
df_spark.select([count(when(col('i94addr').isNull(),True))]).toPandas() ## Null records dropped already

Unnamed: 0,count(CASE WHEN (i94addr IS NULL) THEN true END)
0,0


In [30]:
df_spark.count()

2943721

In [31]:
df_spark.select([count(when(col('i94addr').isNull(),True))]).toPandas()  ## Null values dropped

Unnamed: 0,count(CASE WHEN (i94addr IS NULL) THEN true END)
0,0


In [32]:
df_spark.select([count(when(col('i94bir').isNull(),True))]).toPandas() ## Only few records are null in Age... Age shall be included, named as i94_Age

Unnamed: 0,count(CASE WHEN (i94bir IS NULL) THEN true END)
0,389


In [33]:
df_spark.select([count(when(col('i94visa').isNull(),True))]).toPandas() ## No null records in VISA category (Business, Pleasure,Student) Visa

Unnamed: 0,count(CASE WHEN (i94visa IS NULL) THEN true END)
0,0


In [34]:
df_spark.select([count(when(col('dtadfile').isNull(),True))]).toPandas() ## Date Added to file will be used, 1 record is null

Unnamed: 0,count(CASE WHEN (dtadfile IS NULL) THEN true END)
0,0


In [35]:
df_spark.select([count(when(col('visapost').isNull(),True))]).toPandas() ## Home Country state where VISA was issued, lot of record missing.
                                                                           ## some came unlawfully 

Unnamed: 0,count(CASE WHEN (visapost IS NULL) THEN true END)
0,1781080


In [36]:
df_spark.select([count(when(col('occup').isNull(),True))]).toPandas()  ### more than 99% of occupation information is null.. we are exluding this column

Unnamed: 0,count(CASE WHEN (occup IS NULL) THEN true END)
0,2936101


In [37]:
df_spark.select([count(when(col('entdepa').isNull(),True))]).toPandas() ## Arrival Flag,-  Admitted or Paroled into US ( Arrive_flag)

Unnamed: 0,count(CASE WHEN (entdepa IS NULL) THEN true END)
0,0


In [38]:
df_spark.select([count(when(col('entdepd').isNull(),True))]).toPandas()  ### Departure Flag - Departed, lost I-94 or is deceased (Depart_flag)

Unnamed: 0,count(CASE WHEN (entdepd IS NULL) THEN true END)
0,124309


In [39]:
df_spark.select([count(when(col('entdepu').isNull(),True))]).toPandas()  ### More than 99% of update flag are missing or null values. will be exluded

Unnamed: 0,count(CASE WHEN (entdepu IS NULL) THEN true END)
0,2943349


In [40]:
df_spark.select([count(when(col('matflag').isNull(),True))]).toPandas()  ## Arrival and Departure matching flag will be Included (ADmatch_flag)

Unnamed: 0,count(CASE WHEN (matflag IS NULL) THEN true END)
0,124309


In [41]:
df_spark.select([count(when(col('biryear').isNull(),True))]).toPandas()  ## Birt year will be included

Unnamed: 0,count(CASE WHEN (biryear IS NULL) THEN true END)
0,389


In [42]:
df_spark.select([count(when(col('gender').isNull(),True))]).toPandas() # will be included

Unnamed: 0,count(CASE WHEN (gender IS NULL) THEN true END)
0,392319


In [43]:
df_spark.select([count(when(col('dtaddto').isNull(),True))]).toPandas()

Unnamed: 0,count(CASE WHEN (dtaddto IS NULL) THEN true END)
0,185


In [44]:
df_spark.select([count(when(col('insnum').isNull(),True))]).toPandas() ##  More than 96% of INSNUM are missing or null values

Unnamed: 0,count(CASE WHEN (insnum IS NULL) THEN true END)
0,2861037


In [45]:
df_spark.select([count(when(col('airline').isNull(),True))]).toPandas()  ## Will be included

Unnamed: 0,count(CASE WHEN (airline IS NULL) THEN true END)
0,59186


In [46]:
df_spark.select([count(when(col('admnum').isNull(),True))]).toPandas() ## No record is null or missing but there are duplicates.

Unnamed: 0,count(CASE WHEN (admnum IS NULL) THEN true END)
0,0


In [47]:
df_spark.select('admnum').distinct().count()  ## Admission number is not unique, can not be used as PRIMARY KEY

2931159

In [69]:
df_spark.select([count(when(col('fltno').isNull(),True))]).toPandas()  ## will be included

Unnamed: 0,count(CASE WHEN (fltno IS NULL) THEN true END)
0,12746


In [93]:
df_spark.select([count(when(col('visatype').isNull(),True))]).toPandas()  

Unnamed: 0,count(CASE WHEN (visatype IS NULL) THEN true END)
0,0


##### Immigration Data Extraction and Cleaning

In [48]:
def convert_dt(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
get_date = udf(lambda x: convert_dt(x), T.DateType())

In [49]:
df_spark = df_spark.withColumn('arrivedate', get_date(df_spark.arrdate))   ## Arrival date conversion

In [50]:
df_spark = df_spark.withColumn('departdate', get_date(df_spark.depdate))    ## Departure date onversion

In [51]:
df_spark = df_spark.withColumn("dateOnfile", F.to_date(F.col("dtadfile").cast("string"), 'yyyyMMdd')) ## Date added to file conversion

In [52]:
df_spark = df_spark.withColumn("dateAdmtd", F.to_date(F.col("dtaddto").cast("string"), 'MMddyyyy')) ## Dtae admitted to US conversion

In [53]:
df_spark.limit(1)

DataFrame[cicid: double, i94yr: double, i94mon: double, i94cit: double, i94res: double, i94port: string, arrdate: double, i94mode: double, i94addr: string, depdate: double, i94bir: double, i94visa: double, count: double, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: double, dtaddto: string, gender: string, insnum: string, airline: string, admnum: double, fltno: string, visatype: string, arrivedate: date, departdate: date, dateOnfile: date, dateAdmtd: date]

In [54]:
df_spark.select('arrivedate').limit(5).show()

+----------+
|arrivedate|
+----------+
|2016-04-30|
|2016-04-30|
|2016-04-30|
|2016-04-30|
|2016-04-30|
+----------+



In [55]:
df_spark.select('departdate').limit(5).show()

+----------+
|departdate|
+----------+
|2016-05-08|
|2016-05-17|
|2016-05-08|
|2016-05-14|
|2016-05-14|
+----------+



In [56]:
df_spark.select('dateOnfile').limit(5).show()

+----------+
|dateOnfile|
+----------+
|2016-04-30|
|2016-04-30|
|2016-04-30|
|2016-04-30|
|2016-04-30|
+----------+



In [57]:
df_spark.select('dateAdmtd').limit(5).show()

+----------+
| dateAdmtd|
+----------+
|2016-10-29|
|2016-10-29|
|2016-10-29|
|2016-10-29|
|2016-10-29|
+----------+



In [58]:
df_spark.createOrReplaceTempView("immigration")

In [59]:
immig = spark.sql(''' SELECT DISTINCT int(cicid) as immig_id, i94cit,  i94res,  i94port , arrivedate, i94mode, 
                            i94addr as state_code, departdate, i94bir, i94visa, dateOnfile, visapost, entdepa as arrive_flag, 
                            entdepd as depart_flag, matflag as ADmatch_flag, biryear, dtaddto as dateAdmtd, gender, airline, 
                            admnum, fltno
                      FROM immigration
''')

In [60]:
immig.show(5)

+--------+------+------+-------+----------+-------+----------+----------+------+-------+----------+--------+-----------+-----------+------------+-------+---------+------+-------+---------------+-----+
|immig_id|i94cit|i94res|i94port|arrivedate|i94mode|state_code|departdate|i94bir|i94visa|dateOnfile|visapost|arrive_flag|depart_flag|ADmatch_flag|biryear|dateAdmtd|gender|airline|         admnum|fltno|
+--------+------+------+-------+----------+-------+----------+----------+------+-------+----------+--------+-----------+-----------+------------+-------+---------+------+-------+---------------+-----+
|     227| 103.0| 103.0|    NYC|2016-04-01|    1.0|        NY|2016-04-06|  40.0|    1.0|2016-04-01|    null|          G|          O|           M| 1976.0| 06292016|     M|     AB|5.5420116733E10|07450|
|     741| 103.0| 691.0|    FTL|2016-04-01|    1.0|        FL|2016-04-02|  17.0|    2.0|2016-04-01|    null|          O|          O|           M| 1999.0| 06292016|  null|     NK|5.5453003033E10|00

#### Temperature Data Exploration, Cleaning and Variable Selection
- In this section we will extract Average Daily Temperature for United States in the latest year (2013), Aggregate it by City 
- Merge the aggregated Average Daily Temperature to Demograhic data

In [7]:
df.head()  

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [8]:
df.shape  ## Here we see total records and columns

(8599212, 7)

In [9]:
df.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [10]:
# df['Country'].unique()  ## Viewing the Countries

In [11]:
df1 = df[df['Country']== 'United States']   ## Extracting US daily temperature by City up to year 2013

In [12]:
df1.shape

(687289, 7)

In [13]:
 ## Viewing Cities in which Temperatures were captured in United States
# df1['City'].unique() 

In [14]:
df1['dt'].describe()  ## Exploring more information about United States Daily temperatures captured by City

count         687289
unique          3239
top       1902-04-01
freq             257
Name: dt, dtype: object

In [15]:
df1['date']= pd.to_datetime(df1['dt'])  #Date Conversion

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [16]:
df1['year']= df1['date'].dt.year  # Year Extraction to filter US 2013 daily Temperature by city

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [17]:
df1.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date,year
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W,1820-01-01,1820
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W,1820-02-01,1820


In [18]:
df_T2013 = df1[df1['year']==2013]   #Extracting most recent (2013) US daily temperature by City

In [19]:
df_T2013.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,date,year
49871,2013-01-01,6.32,0.267,Abilene,United States,32.95N,100.53W,2013-01-01,2013
49872,2013-02-01,8.116,0.222,Abilene,United States,32.95N,100.53W,2013-02-01,2013


In [20]:
df_T2013['dt'].value_counts()  ## We see that Temperatures were captured daily and for 9 months in 2013 by City in United States

2013-06-01    257
2013-04-01    257
2013-03-01    257
2013-05-01    257
2013-01-01    257
2013-08-01    257
2013-09-01    257
2013-02-01    257
2013-07-01    257
Name: dt, dtype: int64

##### Aggregating Average Daily Temperture by City

In [21]:
TempByCity =df_T2013[['City', 'AverageTemperature']].groupby(['City']).mean().rename(columns={'AverageTemperature': 'AvgTem'} )

## Aggregating US Average Daily Temperature by City
## Our purpose is to get US Average Daily Temperature by City and merge it with the Dempographic data

In [22]:
TempByCity.head()    ### US Average Daily Temperature by City for 2013 report

Unnamed: 0_level_0,AvgTem
City,Unnamed: 1_level_1
Abilene,19.411111
Akron,11.870667
Albuquerque,13.630778
Alexandria,14.187667
Allentown,12.163889


- The United States average daily temperature by City for the year 2013 is now ready to be merged to City Demographic dataset

#### City Demographic Data Exploration, Cleaning and Merging with City Average Temperature
- Here we will extract usefull demographic information about States where Non-Immigrant arrived; such as 'male_population', 'female_population', 'total_population', 'foreign_born'
- Merge City Average Daily Temperature, and then aggregate the data by State.
- The final data will form part of data model with the Immigration data. Thus we can get more information about the State in which non-immigrants arrived, for census purposes 

In [23]:
city_dem_df.head(2)

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count
0,Wichita,Kansas,34.6,192354.0,197601.0,389955,23978.0,40270.0,2.56,KS,American Indian and Alaska Native,8791
1,Allen,Pennsylvania,33.5,60626.0,59581.0,120207,5691.0,19652.0,2.67,PA,Black or African-American,22304


In [24]:
city_dem_df.shape

(2891, 12)

In [25]:
city_dem = city_dem_df.merge(TempByCity, left_on= 'city', right_index=True) 
## Merging Daily Average Temperature by City to City Demographic data

In [26]:
city_dem.head(2)  ## Now we have City Demographic data merged with Average Daily Temperature by City

Unnamed: 0,city,state,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,state_code,race,count,AvgTem
0,Wichita,Kansas,34.6,192354.0,197601.0,389955,23978.0,40270.0,2.56,KS,American Indian and Alaska Native,8791,15.334
240,Wichita,Kansas,34.6,192354.0,197601.0,389955,23978.0,40270.0,2.56,KS,Asian,25210,15.334


In [27]:
city_dem1 = city_dem[['state_code', 'male_population', 'female_population', 'total_population', 'foreign_born', 'AvgTem']]
## Extracting Important columns, we are interested in State_code and state population information that we can join to the 
## Immigration data in the data model

In [28]:
city_dem1.head(2)  ## Because the data was structured by 5-classes of Race we have duplication of State_Codes, and we shall aggregate by State

Unnamed: 0,state_code,male_population,female_population,total_population,foreign_born,AvgTem
0,KS,192354.0,197601.0,389955,40270.0,15.334
240,KS,192354.0,197601.0,389955,40270.0,15.334


In [29]:
state_avg = city_dem1.groupby('state_code').mean().rename(columns={'male_population':'AvgM_Popu', 'female_population':'AvgF_Popu', 
                                            'total_population':'AvgT_Popu', 'foreign_born' : 'AvgForeign_born', 'AvgTem': 'AvgTemp'})

In [30]:
state_avg.head()

Unnamed: 0_level_0,AvgM_Popu,AvgF_Popu,AvgT_Popu,AvgForeign_born,AvgTemp
state_code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
AK,152945.0,145750.0,298695.0,33258.0,1.01875
AL,94935.75,104793.25,199729.0,9380.0,18.781361
AR,69478.0,70931.0,140409.0,11476.5,18.269444
AZ,215146.222222,218063.777778,433210.0,68431.777778,22.202926
CA,146524.322581,149489.0,296013.322581,93615.467742,17.352169


In [31]:
state_avg.shape ## State Average population information and State Daily Average Temperature ready to me merged

(42, 5)

In [32]:
state = city_dem1.drop_duplicates('state_code') ## Dropping duplicates from city demographic to extact distinct state_code for MERGE

In [33]:
state.shape

(42, 6)

In [34]:
state_dem = state.merge(state_avg, left_on='state_code', right_index=True)  ## Merging

In [35]:
state_dem = state_dem.drop(columns= ['male_population', 'female_population', 'total_population', 'foreign_born', 'AvgTem'])
## Dropping unwanted columns

In [36]:
state_dem = state_dem.round({'AvgM_Popu':0, 'AvgF_Popu':0, 'AvgT_Popu':0, 'AvgForeign_born':0, 'AvgTemp':3}) # Rounding and formatting

In [37]:
state_dem.head(5)     # Demographic and Temprature data ready for Data model and ETL

Unnamed: 0,state_code,AvgM_Popu,AvgF_Popu,AvgT_Popu,AvgForeign_born,AvgTemp
0,KS,97248.0,100612.0,197860.0,21162.0,14.303
3,TN,150854.0,163886.0,314739.0,27117.0,16.943
4,CT,58562.0,62790.0,121352.0,30852.0,13.267
6,UT,70182.0,68839.0,139021.0,21324.0,12.291
8,CA,146524.0,149489.0,296013.0,93615.0,17.352


In [55]:
code =[]
for i in state_dem['state_code']:
    code.append(str(i))
    
male =[]
for i in state_dem['AvgM_Popu']:
    male.append(int(i))
    
female =[]
for i in state_dem['AvgF_Popu']:
    female.append(int(i))
    
total =[]
for i in state_dem['AvgT_Popu']:
    total.append(int(i))
    
fborn =[]
for i in state_dem['AvgForeign_born']:
    fborn.append(int(i))
    
tempp =[]
for i in state_dem['AvgTemp']:
    tempp.append(i)

In [56]:
gs = pd.DataFrame()

In [57]:
gs['state_code']=code
gs['AvgM_Popu']=male
gs['AvgF_Popu']=female
gs['AvgT_Popu']=total
gs['AvgForeign_born']=fborn
gs['AvgTemp']=tempp

In [58]:
gt.head()

Unnamed: 0,state_code,AvgM_Popu,AvgF_Popu,AvgT_Popu,AvgForeign_born,AvgTemp
0,KS,97248,100612,197860,21162,14.303
1,TN,150854,163886,314739,27117,16.943
2,CT,58562,62790,121352,30852,13.267
3,UT,70182,68839,139021,21324,12.291
4,CA,146524,149489,296013,93615,17.352


In [60]:
gs.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 42 entries, 0 to 41
Data columns (total 6 columns):
state_code         42 non-null object
AvgM_Popu          42 non-null int64
AvgF_Popu          42 non-null int64
AvgT_Popu          42 non-null int64
AvgForeign_born    42 non-null int64
AvgTemp            42 non-null float64
dtypes: float64(1), int64(4), object(1)
memory usage: 2.0+ KB


- City Demographic data with Average Daily Temperature by State is ready for data model

In [61]:
state_demog =gs.drop_duplicates(subset='state_code')

In [62]:
state_demog.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 42 entries, 0 to 41
Data columns (total 6 columns):
state_code         42 non-null object
AvgM_Popu          42 non-null int64
AvgF_Popu          42 non-null int64
AvgT_Popu          42 non-null int64
AvgForeign_born    42 non-null int64
AvgTemp            42 non-null float64
dtypes: float64(1), int64(4), object(1)
memory usage: 2.3+ KB


In [63]:
print(state_demog)

   state_code  AvgM_Popu  AvgF_Popu  AvgT_Popu  AvgForeign_born  AvgTemp
0          KS      97248     100612     197860            21162   14.303
1          TN     150854     163886     314739            27117   16.943
2          CT      58562      62790     121352            30852   13.267
3          UT      70182      68839     139021            21324   12.291
4          CA     146524     149489     296013            93615   17.352
5          CO     118152     120372     238524            28487   10.754
6          NJ      90041      92101     182142            63103   12.534
7          NE     178666     181950     360616            35610   12.559
8          OH     173840     185091     358932            27253   13.106
9          PA     445480     490434     935914           116763   12.942
10         NC     141496     150948     292443            39348   17.449
11         FL     121279     129842     251121            62666   23.741
12         MO     114681     124158     238839     

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [65]:
# Write code here
## connect to the default database
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

In [66]:
 # create sparkify database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS capstonedb")
cur.execute("CREATE DATABASE capstonedb WITH ENCODING 'utf8' TEMPLATE template0")

In [67]:
# close connection to default database
conn.close() 
# connect to Capstone project database
conn = psycopg2.connect("host=127.0.0.1 dbname=capstonedb user=student password=student")
cur = conn.cursor()

In [68]:
drop1 = "DROP TABLE IF EXISTS immigration_table "
drop2 = "DROP TABLE IF EXISTS stateDem_table "
drop_table = [drop1,drop2]
for query in drop_table:
    cur.execute(query)
    conn.commit()

In [69]:
immig_table= (""" CREATE TABLE IF NOT EXISTS immigration_table (
                                                immig_id BIGINT PRIMARY KEY,
                                                i94cit int,
                                                i94res int,
                                                i94port varchar, 
                                                arrivedate date,
                                                i94mode int, 
                                                state_code varchar NOT NULL,
                                                departdate date, 
                                                i94bir int,
                                                i94visa int, 
                                                dateOnfile date,
                                                visapost varchar, 
                                                arrive_flag varchar,
                                                depart_flag varchar, 
                                                ADmatch_flag varchar,
                                                biryear BIGINT, 
                                                dateAdmtd date,
                                                gender varchar, 
                                                airline varchar,
                                                admnum BIGINT,
                                                fltno varchar )
""")


state_dem_table = (""" CREATE TABLE IF NOT EXISTS stateDem_table(
                                                state_code varchar PRIMARY KEY,
                                                AvgM_Popu BIGINT,
                                                AvgF_Popu BIGINT,
                                                AvgT_Popu BIGINT,
                                                AvgForeign_born BIGINT,
                                                AvgTemp float )
""")

create_table = [immig_table, state_dem_table]

In [70]:
for query in create_table:
    cur.execute(query)
    conn.commit()

In [71]:
immig_insert = (""" INSERT INTO immigration_table (
                                immig_id, i94cit,  i94res, i94port, arrivedate, i94mode, state_code, 
                                departdate, i94bir, i94visa, dateOndfile, visapost, arrive_flag, depart_flag, 
                                ADmatch_flag, biryear, dateAdmtd, gender, airline, admnum, fltno )                                          
                    VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT DO NOTHING
                                            """)

state_dem_insert = (""" INSERT INTO stateDem_table (
                                    state_code, AvgM_Popu, AvgF_Popu, AvgT_Popu, 
                                    AvgForeign_born, AvgTemp) 
                        VALUES(%s, %s, %s, %s, %s, %s)
                        ON CONFLICT DO NOTHING
                                            """)



In [72]:
cur.execute(state_dem_insert, state_demog)

KeyError: 0

In [159]:
cur.execute(immig_insert, immig)

SystemError: Objects/tupleobject.c:81: bad argument to internal function

In [64]:
conn.close()

NameError: name 'conn' is not defined

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.