In [1]:
from pyspark.sql import SparkSession
import configparser
import os

In [2]:
config = configparser.ConfigParser()
config.read_file(open('aws_credentials.cfg'))

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
.config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
.getOrCreate()

In [10]:
country_codes_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/country_codes_table/*.parquet') \
                        .createOrReplaceTempView('country_codes')
travel_mode_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/travel_mode_table/*.parquet') \
                        .createOrReplaceTempView('travel_mode')
us_port_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/us_port_table/*.parquet') \
                        .createOrReplaceTempView('us_port')
us_state_codes_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/us_state_codes_table/*.parquet') \
                        .createOrReplaceTempView('us_state_codes')
visa_category_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/visa_category_table/*.parquet') \
                        .createOrReplaceTempView('visa_category')
immigrants_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/immigrants_table/i94yr=2016/i94mon=4/*.parquet') \
                        .createOrReplaceTempView('immigrants')
us_airport_codes_df = spark.read.parquet('s3a://udac-capstone-bucket/outputs/us_airport_codes_table/*/*.parquet') \
                        .createOrReplaceTempView('us_airport_codes')

In [17]:
# Show the top 10 countries from where people were immigrated to US during Apr-2016
spark.sql("""
    SELECT cc.country,
           count(*) immigrants_count    
    FROM   immigrants i,
           country_codes cc
    WHERE  i.i94cit = cc.country_Code
    GROUP BY cc.country
    ORDER BY immigrants_count DESC
    LIMIT 10
""").show(truncate=False)

+---------------------------------------------------------+----------------+
|country                                                  |immigrants_count|
+---------------------------------------------------------+----------------+
|UNITED KINGDOM                                           |360157          |
|JAPAN                                                    |206873          |
|CHINA, PRC                                               |191425          |
|FRANCE                                                   |188766          |
|MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|175781          |
|BRAZIL                                                   |129833          |
|INDIA                                                    |110691          |
|AUSTRALIA                                                |109884          |
|ITALY                                                    |78535           |
|NETHERLANDS                                              |76920           |

In [19]:
# Show the count of people came to US for tourism during Apr-2016
spark.sql("""
    SELECT vc.category,
           count(*) tourists_count    
    FROM   immigrants i,
           visa_category vc
    WHERE  i.i94visa = vc.category_Code
    AND    vc.category = 'Pleasure'
    GROUP BY vc.category
""").show(truncate=False)

+--------+--------------+
|category|tourists_count|
+--------+--------------+
|Pleasure|2530868       |
+--------+--------------+

