# Data Engineering Capstone Project


In [25]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [26]:
# Do all imports and installs here
import sys, os
import logging
import pandas as pd
from pandas_profiling import ProfileReport
from pathlib import Path
from typing import Iterable
from IPython import display as ICD

In [27]:
src_path: str = "../src"
sys.path.append(src_path)
logging.getLogger().setLevel(logging.INFO)

In [28]:
from utils.io import process_config
from utils.aws import create_s3_bucket
from utils.spark import create_spark_session
from data.tables import (
    ON_LOAD_TABLES_SCHEMA,
    ON_LOAD_TABLES_FILES,
    STAR_EXTRACT_TABLES_ARGS,
)

In [29]:
user_config, dl_config = (
    process_config(Path(os.getcwd()).parent.joinpath("_user.cfg")),
    process_config(Path(os.getcwd()).parent.joinpath("dl.cfg")),
)
spark = create_spark_session(user_config, dl_config)
s3_bucket_prefix = dl_config.get("S3", "BUCKET_NAME")

---

## 1. Preview raw data


In [30]:
for table_name, table_schema in ON_LOAD_TABLES_SCHEMA.items():
    table_paths = ON_LOAD_TABLES_FILES[table_name]
    table_df = spark.read.csv(
        (
            str(table_paths)
            if not isinstance(table_paths, Iterable)
            else [str(p) for p in table_paths]
        ),
        schema=ON_LOAD_TABLES_SCHEMA[table_name],
        header=True,
    )

    n_elem = table_df.count()
    table_df_preview = spark.createDataFrame(
        table_df.take(5),
        schema=ON_LOAD_TABLES_SCHEMA[table_name],
    ).toPandas()

    print(f"First 5 rows of {table_name}:")
    print(f"Columns: {table_df.columns}.")
    ICD.display(table_df_preview)
    print(f"The full table contains a total of {n_elem} records\n\n")

                                                                                

First 5 rows of i94_immigration:
Columns: ['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline', 'admnum', 'fltno', 'visatype'].


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,1.0,2016.0,7.0,254.0,276.0,LOS,20636.0,1.0,CA,20640.0,...,,M,1978.0,9282016.0,M,,OZ,63092900000.0,202,WT
1,2.0,2016.0,7.0,140.0,140.0,NYC,20636.0,1.0,NY,20657.0,...,,M,1971.0,9282016.0,F,,DL,63092900000.0,9858,WT
2,3.0,2016.0,7.0,135.0,135.0,ORL,20636.0,1.0,FL,20657.0,...,,M,2006.0,9282016.0,M,,VS,63092900000.0,71,WT
3,4.0,2016.0,7.0,124.0,124.0,TAM,20636.0,1.0,FL,20645.0,...,,M,1999.0,9282016.0,M,,LH,63092900000.0,482,WT
4,5.0,2016.0,7.0,130.0,130.0,LOS,20636.0,1.0,CA,20662.0,...,,M,2015.0,9282016.0,M,,SU,63092900000.0,106,WT


The full table contains a total of 40790529 records


First 5 rows of us_demographics:
Columns: ['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race', 'Count'].


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.799999,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.599998,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


The full table contains a total of 2891 records


First 5 rows of airport_codes:
Columns: ['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country', 'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code', 'coordinates'].


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


The full table contains a total of 55075 records




                                                                                

First 5 rows of world_temperature:
Columns: ['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude'].


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
2,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
3,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
4,1744-07-01,16.082001,1.396,Århus,Denmark,57.05N,10.33E


The full table contains a total of 8235082 records




---

## 2. Run ETL pipeline to extract STAR dimensional tables


Create S3 bucket to store all results


In [31]:
assert create_s3_bucket(user_config, dl_config), "Error creating S3 bucket."

INFO:root:Bucket cupm-de-capstone already exists.
INFO:root:Available buckets: [s3.Bucket(name='cupm-de-capstone')]


#### Run Airflow DAG (`capstone_etl`) now.


---

## 3. Run analytics queries on dimensional tables


In [32]:
profiling_path = Path("../data").joinpath("profiling_reports")
profiling_path.mkdir(exist_ok=True)

In [33]:
star_tables = {
    table_name: spark.read.parquet(table_args["op_kwargs"]["s3_save_path"])
    for table_name, table_args in STAR_EXTRACT_TABLES_ARGS.items()
}

                                                                                

### 3.1. Data profiling of dimensional tables

WARNING: Avoid for tables with numbers of rows in the order of dozens of millions, according to memory availability.


In [34]:
for table_name, table_df in star_tables.items():
    if table_name == "fact_immigration":
        continue
    star_table = table_df.toPandas()
    ProfileReport(star_table).to_file(profiling_path.joinpath(f"{table_name}.html"))

                                                                                

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

  return func(*args, **kwargs)
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

                                                                                

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

INFO:py4j.clientserver:Error while sending or receiving.
Traceback (most recent call last):
  File "/home/uziel/miniconda3/envs/de_capstone/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [Errno 104] Connection reset by peer
INFO:py4j.clientserver:Closing down clientserver connection
INFO:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/uziel/miniconda3/envs/de_capstone/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/uziel/miniconda3/envs/de_capstone/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/uziel/mi

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

                                                                                

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

  return func(*args, **kwargs)


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

                                                                                

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

  return func(*args, **kwargs)


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

### 3.2. Example queries using combinations of dimensional tables


#### Do immigrants prefer destinations with higher or lower population?


In [35]:
df = (
    star_tables["fact_immigration"]
    .groupBy("city_id")
    .count()
    .join(
        star_tables["fact_us_demogr"],
        (
            star_tables["fact_immigration"]["city_id"]
            == star_tables["fact_us_demogr"]["city_id"]
        ),
    )
    .dropna(subset=["total_population"])
    .select(["count", "total_population"])
)

In [36]:
df.stat.corr("count", "total_population")

                                                                                

0.7699218219771018

There is a high positive correlation between the population size of a city and the number of immigrants it attracts.


#### Do immigrants prefer destinations with higher or lower temperature?


In [37]:
df = (
    star_tables["fact_immigration"]
    .groupBy("city_id")
    .count()
    .join(
        star_tables["fact_temps"],
        (
            star_tables["fact_immigration"]["city_id"]
            == star_tables["fact_temps"]["city_id"]
        ),
    )
    .dropna(subset=["avg_temperature"])
    .select(["count", "avg_temperature"])
)

In [38]:
df.stat.corr("count", "avg_temperature")

                                                                                

-0.044231400827696755

There is no correlation between number of immigrants and average temperature of a city.


#### Do immigrants prefer destinations with more or less airports?


In [39]:
df = (
    star_tables["fact_immigration"]
    .groupBy("city_id")
    .count()
    .join(
        (
            star_tables["dim_airports"]
            .groupBy("city_id")
            .count()
            .withColumnRenamed("count", "airports_count")
        ),
        (
            star_tables["fact_immigration"]["city_id"]
            == star_tables["dim_airports"]["city_id"]
        ),
    )
    .select(["count", "airports_count"])
)

In [40]:
df.stat.corr("count", "airports_count")

                                                                                

0.37289342731132336

There is a low, positive correlation between number of immigrants and number of airports in the receiving city.