# Tests on final Structure
- Read final parquet finals
- Execute SQL queries to prove that the concept works

In [5]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col
from datetime import datetime, timedelta
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, quarter, dayofyear
import inspect
import sql_queries
import time

In [11]:
def create_spark_session():
    '''
        - Creates spark session
    '''
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
        .getOrCreate()
    return spark

In [16]:
spark=create_spark_session()

In [13]:
# filesystem path for each table
dim_arrival_date_path="/home/workspace/spark-warehouse/arrival_date.parquet"
dim_arrival_location_path="/home/workspace/spark-warehouse/arrival_location.parquet"
dim_demographics_path="/home/workspace/spark-warehouse/demographics.parquet"
dim_junk_visa_transport_path="/home/workspace/spark-warehouse/junk_visa_transport.parquet"
dim_origin_country_path="/home/workspace/spark-warehouse/origin_country.parquet"
fact_immigration_path="/home/workspace/spark-warehouse/immigration.parquet"

In [14]:
# dictionary table:source_path
schema_dict={'dim_arrival_date':dim_arrival_date_path,
             'dim_arrival_location':dim_arrival_location_path,
             'dim_demographics':dim_demographics_path,
             'dim_junk_visa_transport':dim_junk_visa_transport_path,
             'dim_origin_country':dim_origin_country_path,
             'fact_immigration':fact_immigration_path}

In [17]:
# read source files & create tables
for key, value in schema_dict.items():
    #print(f"df=spark.read.parquet(\"{value}\")")
    #print(f"df.createOrReplaceTempView(\"{key}\")")
    df=spark.read.parquet(value)
    df.createOrReplaceTempView(key)
    print(f"{key} is ready")

dim_arrival_date is ready
dim_arrival_location is ready
dim_demographics is ready
dim_junk_visa_transport is ready
dim_origin_country is ready
fact_immigration is ready


In [23]:
# list top 5 cities with the most visitos (all time)
spark.sql("""select dal.port_name as city, count(*) visitor_amount
    from fact_immigration fi 
    join dim_arrival_location dal 
    on fi.arrival_location_id=dal.id 
    group by dal.port_name
    order by count(*) desc limit 5""").show()

+-------------+--------------+
|         City|visitor_amount|
+-------------+--------------+
|     NEW YORK|        432355|
|        MIAMI|        320748|
|  LOS ANGELES|        277364|
|      ORLANDO|        142679|
|SAN FRANCISCO|        129347|
+-------------+--------------+



In [40]:
# on which day of week in april 2016 does New York have the most arrivals?
spark.sql("""select dayofweek_name as arrival_day, dal.port_name as city, count(*) visitor_amount
    from fact_immigration fi 
    join dim_arrival_location dal 
    on fi.arrival_location_id=dal.id and dal.port_name='NEW YORK'
    join dim_arrival_date dad on fi.arrival_date_id=dad.arrdate and dad.month=4 and dad.year=2016
    group by dal.port_name,dayofweek_name
    order by count(*) desc limit 1""").show()

+-----------+--------+--------------+
|arrival_day|    city|visitor_amount|
+-----------+--------+--------------+
|        Sat|NEW YORK|         79359|
+-----------+--------+--------------+



In [39]:
# top 5 citizenship visiting the US most
spark.sql("""select doc.country_name, count(*) 
    from fact_immigration fi 
    join dim_origin_country doc
    on fi.country_id_citizenship = doc.country_id 
    group by doc.country_name
    order by count(*) desc
    limit 5
    """).show()

+--------------------+--------+
|        country_name|count(1)|
+--------------------+--------+
|      UNITED KINGDOM|  295508|
|MEXICO Air Sea, a...|  165350|
|              FRANCE|  155365|
|          CHINA, PRC|  140794|
|              BRAZIL|  116886|
+--------------------+--------+



In [41]:
# top 5 most visited cities by UNITED KINGDOM citizens
spark.sql("""select dal.port_name as city, count(*) 
    from fact_immigration fi 
    join dim_origin_country doc
    on fi.country_id_citizenship = doc.country_id and doc.country_name='UNITED KINGDOM'
    join dim_arrival_location dal
    on fi.arrival_location_id=dal.id
    group by dal.port_name
    order by count(*) desc
    limit 5
    """).show()

+-----------+--------+
|       city|count(1)|
+-----------+--------+
|   NEW YORK|   63826|
|    ORLANDO|   54836|
|  LAS VEGAS|   25522|
|LOS ANGELES|   23463|
|      MIAMI|   21915|
+-----------+--------+



In [52]:
# top 5 most visited cities by CHINA, PRC citizens and asian population ratio in those cities
spark.sql("""select doc.country_name as citizenship, 
    dal.port_name as city, 
    count(*) amount_vistor, 
    round((dd.asian_population/dd.total_population)*100,2) as ratio_asian_population
    from fact_immigration fi 
    join dim_origin_country doc
    on fi.country_id_citizenship = doc.country_id and doc.country_name='CHINA, PRC'
    join dim_arrival_location dal
    on fi.arrival_location_id=dal.id
    join dim_demographics dd
    on fi.arrlocation_demographics_id = dd.id
    group by doc.country_name,dal.port_name,(dd.asian_population/dd.total_population)*100 
    order by count(*) desc
    limit 5
    """).show()

+-----------+-------------+-------------+----------------------+
|citizenship|         city|amount_vistor|ratio_asian_population|
+-----------+-------------+-------------+----------------------+
| CHINA, PRC|  LOS ANGELES|        31309|                 12.92|
| CHINA, PRC|     NEW YORK|        19926|                 15.26|
| CHINA, PRC|SAN FRANCISCO|        18905|                 37.47|
| CHINA, PRC|      CHICAGO|        15090|                  7.17|
| CHINA, PRC|      SEATTLE|         7498|                 17.75|
+-----------+-------------+-------------+----------------------+



In [62]:
# top 5 arrival locations for visitors with visa type Student
spark.sql("""select visa_type,
    dal.port_name as city, 
    count(*) amount
    from fact_immigration fi 
    join dim_junk_visa_transport djvt 
    on fi.visa_transport_id = djvt.id and visa_type in ('Student')
    join dim_arrival_location dal
    on fi.arrival_location_id = dal.id
    group by visa_type,dal.port_name
    order by count(*) desc
    limit 5
""").show()

+---------+-----------+------+
|visa_type|       city|amount|
+---------+-----------+------+
|  Student|   NEW YORK|  5213|
|  Student|LOS ANGELES|  4892|
|  Student|      MIAMI|  2530|
|  Student|    CHICAGO|  2483|
|  Student|     BOSTON|  2055|
+---------+-----------+------+



### some generic tests / IGNORE

In [44]:
spark.sql("select * from dim_demographics").show()

+-------------+---------+---------------+--------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+--------------------------+----------------+---------------------------+-----------------+----------------+
|           id|port_code|           city|         state|state_code|median_age|male_population|female_population|total_population|number_of_veterans|foreign_born|average_household_size|hispanic_latino_population|white_population|african_american_population|native_population|asian_population|
+-------------+---------+---------------+--------------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+--------------------------+----------------+---------------------------+-----------------+----------------+
| 523986010113|      CHL|     Charleston|South Carolina|        SC|      35.0|          63956|            71568|          13

In [20]:
spark.sql("select * from dim_arrival_location limit 10").show()

+-------------+---------+----------+--------------------+
|           id|port_code|state_code|           port_name|
+-------------+---------+----------+--------------------+
| 197568495616|      MWH|        WA|MOSES LAKE GRANT ...|
|1675037245440|      HSV|        AL|MADISON COUNTY - ...|
|  42949672960|      BWA|        WA|            BOUNDARY|
|  42949672961|      DAB|        FL|DAYTONA BEACH INT...|
|  42949672962|      DET|        MI|             DETROIT|
|  42949672963|      DNA|        TX|               DONNA|
|  42949672964|      GAL|        TX|           GALVESTON|
|  42949672965|      PTK|        MI|OAKLAND COUNTY - ...|
| 300647710720|      ICT|        KS|MID-CONTINENT - W...|
|1683627180032|      LCB|        TX|LAREDO COLUMBIA B...|
+-------------+---------+----------+--------------------+



In [31]:
spark.sql("select * from fact_immigration limit 10").show()

+-------------+-------------------+---------------+-----------------+----------------------+--------------------+---------------------------+-------------+-------------------+----------------+-------+
|           id|arrival_location_id|arrival_date_id|visa_transport_id|country_id_citizenship|country_id_residence|arrlocation_demographics_id|immigrant_age|immigrant_birthyear|immigrant_gender|airline|
+-------------+-------------------+---------------+-----------------+----------------------+--------------------+---------------------------+-------------+-------------------+----------------+-------+
|1700807049216|      1194000908288|          20547|             1021|                   299|                 299|              1125281431552|           61|               1955|               F|     PD|
|1700807049217|      1194000908288|          20547|             1011|                   299|                 299|              1125281431552|           53|               1963|               M|    

In [68]:
spark.sql("select * from dim_arrival_date").show()

+-------+-------------------+----+-----+----------+---------+--------------+---------+----------+-------+
|arrdate|       arrdate_conv|year|month|dayofmonth|dayofweek|dayofweek_name|dayofyear|weekofyear|quarter|
+-------+-------------------+----+-----+----------+---------+--------------+---------+----------+-------+
|  20558|2016-04-14T00:00:00|2016|    4|        14|        5|           Thu|      105|        15|      2|
|  20570|2016-04-26T00:00:00|2016|    4|        26|        3|           Tue|      117|        17|      2|
|  20574|2016-04-30T00:00:00|2016|    4|        30|        7|           Sat|      121|        17|      2|
|  20554|2016-04-10T00:00:00|2016|    4|        10|        1|           Sun|      101|        14|      2|
|  20557|2016-04-13T00:00:00|2016|    4|        13|        4|           Wed|      104|        15|      2|
|  20556|2016-04-12T00:00:00|2016|    4|        12|        3|           Tue|      103|        15|      2|
|  20569|2016-04-25T00:00:00|2016|    4|      

In [53]:
spark.sql("""select * from dim_junk_visa_transport""").show()

+----+---------+--------------+---------+--------------+
|  id|visa_code|transport_code|visa_type|transport_type|
+----+---------+--------------+---------+--------------+
|1019|        1|             9| Business|  Not reported|
|1029|        2|             9| Pleasure|  Not reported|
|1039|        3|             9|  Student|  Not reported|
|1013|        1|             3| Business|          Land|
|1023|        2|             3| Pleasure|          Land|
|1033|        3|             3|  Student|          Land|
|1011|        1|             1| Business|           Air|
|1021|        2|             1| Pleasure|           Air|
|1031|        3|             1|  Student|           Air|
|1012|        1|             2| Business|           Sea|
|1022|        2|             2| Pleasure|           Sea|
|1032|        3|             2|  Student|           Sea|
+----+---------+--------------+---------+--------------+



In [71]:
spark.sql("select distinct length(port_code),length(state_code)  from dim_arrival_location dal").show()

+-----------------+------------------+
|length(port_code)|length(state_code)|
+-----------------+------------------+
|                3|                 2|
+-----------------+------------------+

