In [286]:
import os
import configparser
import boto3
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
# from signal import signal, SIGPIPE, SIG_DFL
from pyspark.sql.functions import col, monotonically_increasing_id, udf, to_date, count, lit, when
from pyspark.sql.types import (StructType,
                               StructField,
                               StringType,
                               IntegerType,
                               DoubleType,
                               DateType,
                               FloatType)

# create local spark session

In [210]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("spark_emr_udactity") \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .getOrCreate()

# Check spark session information

In [211]:
spark

In [318]:
# Stop spark session if I don't need it.
spark.stop()

In [5]:
# spark session setting configuration
spark.conf.set("spark.sql.shuffle.partitions", "5")
spark.sparkContext.getConf().getAll()

[('spark.files',
  'file:///Users/oneforall_nick/.ivy2/jars/saurfang_spark-sas7bdat-2.0.0-s_2.11.jar,file:///Users/oneforall_nick/.ivy2/jars/com.epam_parso-2.0.8.jar,file:///Users/oneforall_nick/.ivy2/jars/org.apache.logging.log4j_log4j-api-scala_2.11-2.7.jar,file:///Users/oneforall_nick/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar,file:///Users/oneforall_nick/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar'),
 ('spark.app.name', 'spark_emr_udactity'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '192.168.1.104'),
 ('spark.jars.packages', 'saurfang:spark-sas7bdat:2.0.0-s_2.11'),
 ('spark.submit.pyFiles',
  '/Users/oneforall_nick/.ivy2/jars/saurfang_spark-sas7bdat-2.0.0-s_2.11.jar,/Users/oneforall_nick/.ivy2/jars/com.epam_parso-2.0.8.jar,/Users/oneforall_nick/.ivy2/jars/org.apache.logging.log4j_log4j-api-scala_2.11-2.7.jar,/Users/oneforall_nick/.ivy2/jars/org.slf4j_slf4j-api-1.7.5.jar,/Users/oneforall_nick/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar'),
 ('spark.jars',


# Access AWS S3 to get my source data After I upload data from local to AWS S3

In [212]:
# ***** Access AWS Cloud configure ************
config = configparser.ConfigParser()
config.read_file(open('/Users/oneforall_nick/workspace/Udacity_capstone_project/cfg/dl.cfg'))
# config.read_file(open('dl.cfg'))

aws_access_key = config["ACCESS"]["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = config["ACCESS"]["AWS_SECRET_ACCESS_KEY"]
aws_token = config["ACCESS"]["AWS_TOKEN"]
# Access data from AWS S3
# SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
SOURCE_S3_BUCKET = 's3://mydatapool'
# Write data to AWS S3
# DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']
DEST_S3_BUCKET = 's3://destetlbucket'
# *********************************************

# ***** Local Testing configure ************
# SOURCE_S3_BUCKET = '/Users/oneforall_nick/workspace/Udacity_capstone_project/airflow/'
# DEST_S3_BUCKET = '/Users/oneforall_nick/workspace/Udacity_capstone_project/airflow/dest_data'

# ***** Local Testing configure *****************

session = boto3.Session(
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_access_key,
    aws_session_token=aws_token
)

s3_access = session.resource('s3')


# Dimension: Label Data
*Data format: <br>*
    TXT
*This step will seperate multiple tables:*
- imm_cit_res
- imm_port
- imm_mod
- imm_addr
- imm_visa

In [213]:
# ****** immigration_labels_descriptions ******

# Get AWS S3 data Object: immigration_labels_descriptions.SAS
s3_object = s3_access.Bucket('mydatapool').Object('data/immigration_data/immigration_labels_descriptions.SAS').get()
text = s3_object['Body'].read()
context = text.decode(encoding ='utf-8')
# for obj in s3_object.objects.all():
#     print(obj.key)

context = context.replace('\t', '')


def code_mapping(context, idx):
    content_mapping = context[context.index(idx):]
    content_line_split = content_mapping[:content_mapping.index(
        ';')].split('\n')
    content_line_list = [line.replace("'", "")
                         for line in content_line_split]
    content_two_dims = [i.strip().split('=') for i in content_line_list[1:]]
    content_three_dims = [[i[0].strip(), i[1].strip().split(', ')[:][0], e]
                          for i in content_two_dims if len(i) == 2 for e in i[1].strip().split(', ')[1:]]
    return content_two_dims, content_three_dims

imm_cit_res_two, imm_cit_res_three = code_mapping(context, "i94cntyl")
df_imm_city_res_label = spark.sparkContext.parallelize(imm_cit_res_three).toDF(["col_of_imm_cntyl", "value_of_imm_cntyl", "value_of_imm_cntyl_organizations"]) \
    .withColumn("col_of_imm_cntyl", col("col_of_imm_cntyl").cast("Integer")) \
    .withColumn("value_of_imm_cntyl", col("value_of_imm_cntyl").cast("String")) \
    .withColumn("value_of_imm_cntyl", col("value_of_imm_cntyl_organizations").cast("String")) \
    .select('col_of_imm_cntyl', 'value_of_imm_cntyl').distinct().dropDuplicates(['col_of_imm_cntyl'])

df_imm_city_res_label.show()

imm_port_two, imm_port_three = code_mapping(context, "i94prtl")
df_imm_destination_city = spark.sparkContext.parallelize(imm_port_three).toDF(["code_of_imm_destination_city", "value_of_imm_destination_city", "value_of_alias_imm_destination_city"]) \
                                                .withColumn("code_of_imm_destination_city", col("code_of_imm_destination_city").cast("String")) \
                                                .withColumn("value_of_imm_destination_city", col("value_of_imm_destination_city").cast("String")) \
                                                .withColumn("value_of_alias_imm_destination_city", col("value_of_alias_imm_destination_city").cast("String")).distinct().dropDuplicates(['code_of_imm_destination_city'])

df_imm_destination_city.show()


imm_mode_two, imm_mode_three = code_mapping(context, "i94model")
df_imm_travel_code = spark.sparkContext.parallelize(imm_mode_two).toDF(["code_of_imm_travel_code", "value_of_imm_travel_code"]) \
                                           .withColumn("code_of_imm_travel_code", col("code_of_imm_travel_code").cast("Float")) \
                                           .withColumn("value_of_imm_travel_code", col("value_of_imm_travel_code").cast("String")).distinct().dropDuplicates(['code_of_imm_travel_code'])
df_imm_travel_code.show()

imm_addr_two, imm_addr_three = code_mapping(context, "i94addrl")
df_imm_address = spark.sparkContext.parallelize(imm_addr_two).toDF(["code_of_imm_address", "value_of_imm_address"]) \
    .withColumn("code_of_imm_address", col("code_of_imm_address").cast("String")) \
    .withColumn("value_of_imm_address", col("value_of_imm_address").cast("String")).distinct().dropDuplicates(['code_of_imm_address'])

df_imm_address.show()

imm_visa = {'1': 'Business',
            '2': 'Pleasure',
            '3': 'Student'}

df_imm_visa = spark.sparkContext.parallelize(imm_visa.items()).toDF(["code_of_imm_visa", "value_of_imm_visa"]) \
                                    .withColumn("code_of_imm_visa", col("code_of_imm_visa").cast("Integer")) \
                                    .withColumn("value_of_imm_visa", col("value_of_imm_visa").cast("String")).distinct().dropDuplicates(['code_of_imm_visa'])
df_imm_visa.show()

                                                                                

+----------------+------------------+
|col_of_imm_cntyl|value_of_imm_cntyl|
+----------------+------------------+
|             471|          NORTHERN|
|             473|    FED. STATES OF|
|             245|               PRC|
|             582| no land arrivals)|
|             717|              SABA|
+----------------+------------------+

+----------------------------+-----------------------------+-----------------------------------+
|code_of_imm_destination_city|value_of_imm_destination_city|value_of_alias_imm_destination_city|
+----------------------------+-----------------------------+-----------------------------------+
|                         AGA|                        AGANA|                                 GU|
|                         ALC|                        ALCAN|                                 AK|
|                         ANZ|                    ANZALDUAS|                                 TX|
|                         BAL|                    BALTIMORE|               

In [216]:
# Source Data Count:
print(f"dim table: df_imm_city_res_label, Count: {df_imm_city_res_label.count():,}")

# TODO:OK: three columns
print(
    f"dim table: df_imm_destination_city, Count: {df_imm_destination_city.count():,}")

# TODO:OK: two columns
print(
    f"dim table: df_imm_travel_code, Count: {df_imm_travel_code.count():,}")

# TODO:OK: two columns
print(f"dim table: df_imm_address, Count: {df_imm_address.count():,}")

# TODO:OK: two columns
print(f"dim table: df_imm_visa, Count: {df_imm_visa.count():,}")


dim table: df_imm_city_res_label, Count: 5
dim table: df_imm_destination_city, Count: 582
dim table: df_imm_travel_code, Count: 4
dim table: df_imm_address, Count: 55
dim table: df_imm_visa, Count: 3


# Dimension: News
- Data format: <br>
    CSV
- explain: <br>
    display data persist in local memory

In [222]:
# file path: data >> news_article
"""Table: news_article schema
pk: cord_uid -> news_cord_uid
1. source_x -> news_source
    schema: StringType()
2. title -> news_title
    schema: StringType()
3. license -> news_licence
    schema: StringType()
4. abstract -> news_abstract
    schema: StringType()
5. publish_time -> news_publish_time (fk)
    schema: TimestampType()
6. authors -> news_authors
    schema: StringType()
7. url -> news_url
    schema: StringType()
"""
data_news = "/Users/oneforall_nick/workspace/Udacity_capstone_project/airflow/data/news_data/metadata.csv"

df_news = spark.read.options(header=True, delimiter=',').csv(path=data_news)

df_news = df_news.withColumn("news_cord_uid", col("cord_uid").cast("String")) \
    .withColumn("news_source", col("source_x").cast("String")) \
    .withColumn("news_title", col("title").cast("String")) \
    .withColumn("news_licence", col("license").cast("String")) \
    .withColumn("news_abstract", col("abstract").cast("String")) \
    .withColumn("news_publish_time", to_date(col("publish_time"), "yyyy-MM-dd")) \
    .withColumn("news_authors", col("authors").cast("String")) \
    .withColumn("news_url", col("url").cast("String")) \
    .select(col("news_cord_uid"),
            col("news_source"),
            col("news_title"),
            col("news_licence"),
            col("news_abstract"),
            col("news_publish_time"),
            col("news_authors"),
            col("news_url")).distinct().dropDuplicates(['news_cord_uid'])


df_news_tmp = df_news.createOrReplaceTempView("news_article_data")

df_news_tmp = spark.sql("SELECT * FROM news_article_data")

df_news_tmp.persist()

df_news_tmp.explain()

# df_news_tmp.unpersist()

== Physical Plan ==
InMemoryTableScan [news_cord_uid#10480, news_source#10499, news_title#10519, news_licence#10540, news_abstract#10562, news_publish_time#10585, news_authors#10609, news_url#10634]
   +- InMemoryRelation [news_cord_uid#10480, news_source#10499, news_title#10519, news_licence#10540, news_abstract#10562, news_publish_time#10585, news_authors#10609, news_url#10634], StorageLevel(disk, memory, 1 replicas)
         +- SortAggregate(key=[news_cord_uid#6406], functions=[first(news_source#6425, false), first(news_title#6445, false), first(news_licence#6466, false), first(news_abstract#6488, false), first(news_publish_time#6511, false), first(news_authors#6535, false), first(news_url#6560, false)])
            +- *(3) Sort [news_cord_uid#6406 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(news_cord_uid#6406, 5)
                  +- SortAggregate(key=[news_cord_uid#6406], functions=[partial_first(news_source#6425, false), partial_first(news_title#6445, f

22/06/27 13:45:07 WARN CacheManager: Asked to cache already cached data.


In [218]:
df_news.show(n=3, truncate=10)



+-------------+-----------+----------+------------+-------------+-----------------+------------+----------+
|news_cord_uid|news_source|news_title|news_licence|news_abstract|news_publish_time|news_authors|  news_url|
+-------------+-----------+----------+------------+-------------+-----------------+------------+----------+
|    A.; Ke...|       null|      null|  https:/...|         null|             null|        null|      null|
|     000tfenb|        PMC|Prevale...|       cc-by|   BACKGRO...|       2017-11-22|  Liu, Pe...|https:/...|
|     00bjha6b|        PMC|Cystiti...|         unk|   Female ...|       2009-11-27|  Munday,...|https:/...|
+-------------+-----------+----------+------------+-------------+-----------------+------------+----------+
only showing top 3 rows



                                                                                

In [317]:
# df_news.filter('news_title rlike "covid"').select(col('news_abstract')).dropna().show(truncate=False)

# df_news.filter((col('news_title').rlike("covid"))).show(truncate=False)

# df_news.withColumn("label_covid_for_ml", when(((df_news.news_title.rlike("covid")) | (df_news.news_abstract.isNull()), "Covid_News_Abstract_Null"))).show()

# TODO: How to use tables: Point 1
# 1. Filter news articles title and abstract
# 2. Label news title and abstract related articles that were matched covid word for user to split news_abstract words and language detect.
df_news.withColumn("label_covid_for_ml", when( (df_news.news_title.rlike("covid")) | (df_news.news_abstract.isNull()), "Covid_News_Abstract_Null")\
                   .when((df_news.news_title.rlike("covid")) | (df_news.news_abstract.isNotNull()), "Covid_News_Abstract")
                                        .otherwise("Other News")).filter(col('news_publish_time').isNotNull()).sort(col('news_publish_time').desc()).show()

# TODO: How to use tables: Point 2



[Stage 851:>                                                        (0 + 5) / 5]

+-------------+-----------+--------------------+------------+--------------------+-----------------+--------------------+--------------------+-------------------+
|news_cord_uid|news_source|          news_title|news_licence|       news_abstract|news_publish_time|        news_authors|            news_url| label_covid_for_ml|
+-------------+-----------+--------------------+------------+--------------------+-----------------+--------------------+--------------------+-------------------+
|     bpfarowa|   Elsevier|Chapter 2 Anatomi...|   els-covid|Abstract Mucosal ...|       2020-12-31|Silva-Sanchez, Aa...|https://doi.org/1...|Covid_News_Abstract|
|     ytk5s6m8|   Elsevier|Chapter 7 Coronav...|   els-covid|Abstract The coro...|       2020-12-31|Kasmi, Yassine; K...|https://doi.org/1...|Covid_News_Abstract|
|     cydvmm4r|   Elsevier|Chapter 20 Design...|   els-covid|Abstract Once the...|       2020-12-31|Slezak, Tom; Hart...|https://doi.org/1...|Covid_News_Abstract|
|     1mcgocvb|   Else

                                                                                

In [219]:
df_news.printSchema()

root
 |-- news_cord_uid: string (nullable = true)
 |-- news_source: string (nullable = true)
 |-- news_title: string (nullable = true)
 |-- news_licence: string (nullable = true)
 |-- news_abstract: string (nullable = true)
 |-- news_publish_time: date (nullable = true)
 |-- news_authors: string (nullable = true)
 |-- news_url: string (nullable = true)



In [223]:
f"df_news source count: {df_news.count():,}" # drop duplicates column: uid not yet

                                                                                

45805

In [224]:
spark.sql("select news_cord_uid, count(news_cord_uid) from news_article_data group by news_cord_uid having count(news_cord_uid) > 1").show(
    truncate=False)

+-------------+--------------------+
|news_cord_uid|count(news_cord_uid)|
+-------------+--------------------+
+-------------+--------------------+



In [225]:
# Check Duplicate data record
uuid = 'news_cord_uid'
df_news.groupBy(col(uuid)).agg(count(uuid).alias('check_duplicated_IDs')).filter(col('check_duplicated_IDs') > 1).show()


+-------------+--------------------+
|news_cord_uid|check_duplicated_IDs|
+-------------+--------------------+
+-------------+--------------------+



In [226]:

if df_news.groupBy(col(uuid)).agg(count(uuid).alias('check_duplicated_IDs')).filter(col('check_duplicated_IDs') > 1).count() == 0:
    print("The table: df_news has no duplicated IDs")
else:
    raise Exception(
        f"The table has duplicated IDs please check it: {df_news[uuid]}")


The table: df_news has no duplicated IDs


# Dimension Table: Us Cities Demographics data
- Data format: <br>
    CSV
- explain: <br>
    display data persist in local memory

In [227]:
# Create a us-cities data dimension table
"""Table: us_cities_demographics schema
pk: generated -> cidemo_id
    schema: IntegerType()
1. City -> cidemo_city
    schema: StringType()
2. State -> cidemo_state
    schema: StringType()
3. Median Age -> cidemo_median_age
    schema: FloatType()
4. Total Population -> cidemo_total_population
    schema: IntegerType()
5. State Code -> cidemo_state_code (fk)
    schema: StringType()
6. Count -> cidemo_count
    schema: IntegerType()
"""

data_us_cities_demographics = "/Users/oneforall_nick/workspace/Udacity_capstone_project/airflow/data/usCitiesDemographics_data/usCitiesDemo.csv"

# -> Must be defined a function that generated each table schema:
us_cities_demographics_data_schema = StructType([
    StructField(name="cidemo_city", dataType=StringType(), nullable=True),
    StructField(name="cidemo_state", dataType=StringType(), nullable=True),
    StructField(name="cidemo_median_age", dataType=FloatType(), nullable=True),
    StructField(name="cidemo_total_population", dataType=IntegerType(), nullable=True),
    StructField(name="cidemo_state_code", dataType=StringType(), nullable=True),
    StructField(name="cidemo_count", dataType=IntegerType(), nullable=True)
])

# Using pyspark to read csv file
df_us_cities_demographics = spark.read.options(header=True, delimiter=';').csv(data_us_cities_demographics)

df_us_cities_demographics = df_us_cities_demographics.withColumn("cidemo_id", monotonically_increasing_id()) \
                .withColumn("cidemo_city", col("City").cast("String")) \
                    .withColumn("cidemo_state", col("State").cast("String")) \
                        .withColumn("cidemo_median_age", col("Median Age").cast("Float")) \
                            .withColumn("cidemo_male_population", col("Male Population").cast("Integer")) \
                                .withColumn("cidemo_female_population", col("Female Population").cast("Integer")) \
                                    .withColumn("cidemo_total_population", col("Total Population").cast("Integer")) \
                                            .withColumn("cidemo_number_of_veterans", col("Number of Veterans").cast("Integer")) \
                                                .withColumn("cidemo_foreign_born", col("Foreign-born").cast("Integer")) \
                                                    .withColumn("cidemo_average_household_size", col("Average Household Size").cast("Float")) \
                                                        .withColumn("cidemo_state_code", col("State Code").cast("String")) \
                                                            .withColumn("cidemo_race", col("Race").cast("String")) \
    .withColumn("cidemo_count", col("Count").cast("Integer")) \
                    .select(col("cidemo_city"),
                            col("cidemo_state"),
                            col("cidemo_median_age"),
                            col("cidemo_total_population"),
                            col("cidemo_state_code"),
                            col("cidemo_count")).distinct().dropDuplicates()

# Auto-generated series of id
df_us_cities_demographics = df_us_cities_demographics.withColumn("cidemo_id", monotonically_increasing_id())

df_us_cities_demographics_temp = df_us_cities_demographics.createOrReplaceTempView("us_cities_demographics_data")

df_us_cities_demographics_temp = spark.sql("SELECT * FROM us_cities_demographics_data")

df_us_cities_demographics_temp.persist()

df_us_cities_demographics_temp.explain()

# df_us_cities_demographics_temp.unpersist()

== Physical Plan ==
InMemoryTableScan [cidemo_city#10982, cidemo_state#10997, cidemo_median_age#11013, cidemo_total_population#11067, cidemo_state_code#11153, cidemo_count#11202, cidemo_id#11234L]
   +- InMemoryRelation [cidemo_city#10982, cidemo_state#10997, cidemo_median_age#11013, cidemo_total_population#11067, cidemo_state_code#11153, cidemo_count#11202, cidemo_id#11234L], StorageLevel(disk, memory, 1 replicas)
         +- *(2) HashAggregate(keys=[cidemo_state_code#11153, cidemo_total_population#11067, cidemo_state#10997, cidemo_city#10982, cidemo_count#11202, cidemo_median_age#11013], functions=[])
            +- *(2) HashAggregate(keys=[cidemo_state_code#11153, cidemo_total_population#11067, cidemo_state#10997, cidemo_city#10982, cidemo_count#11202, cidemo_median_age#11013], functions=[])
               +- *(2) HashAggregate(keys=[cidemo_state_code#11153, cidemo_total_population#11067, cidemo_state#10997, cidemo_city#10982, cidemo_count#11202, cidemo_median_age#11013], functions=

In [228]:
df_us_cities_demographics_temp.show(n=5)

+-----------+--------------+-----------------+-----------------------+-----------------+------------+---------+
|cidemo_city|  cidemo_state|cidemo_median_age|cidemo_total_population|cidemo_state_code|cidemo_count|cidemo_id|
+-----------+--------------+-----------------+-----------------------+-----------------+------------+---------+
| High Point|North Carolina|             35.5|                 109828|               NC|       11060|        0|
|     Folsom|    California|             40.9|                  76368|               CA|         998|        1|
| Fort Myers|       Florida|             37.3|                  74015|               FL|       50169|        2|
|Santa Clara|    California|             35.2|                 126216|               CA|       55847|        3|
|Bolingbrook|      Illinois|             33.7|                  72096|               IL|         323|        4|
+-----------+--------------+-----------------+-----------------------+-----------------+------------+---

In [229]:
df_us_cities_demographics_temp.printSchema()

root
 |-- cidemo_city: string (nullable = true)
 |-- cidemo_state: string (nullable = true)
 |-- cidemo_median_age: float (nullable = true)
 |-- cidemo_total_population: integer (nullable = true)
 |-- cidemo_state_code: string (nullable = true)
 |-- cidemo_count: integer (nullable = true)
 |-- cidemo_id: long (nullable = false)



In [230]:
f"{df_us_cities_demographics_temp.count():,}"

'2,891'

In [44]:
spark.sql("select cidemo_id from us_cities_demographics_data group by cidemo_id having count(cidemo_id) > 1").show()

+---------+
|cidemo_id|
+---------+
+---------+



# Dimension: Immigration data
- Data format: <br>
    SAS
- explain: <br>
    display data persist in local memory

In [251]:
# ****** imm_data ******
imm_data = "/Users/oneforall_nick/workspace/Udacity_capstone_project/airflow/data/immigration_data/immigration_apr16_sub.sas7bdat"
df_imm_data = spark.read.format('com.github.saurfang.sas.spark').load(imm_data)

df_imm_data.select(col('i94mode')).distinct().show()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 



+-------+
|i94mode|
+-------+
|   null|
|    9.0|
|    1.0|
|    2.0|
|    3.0|
+-------+



                                                                                

In [232]:
f"{df_imm_data.count():,}"

                                                                                

'3,096,313'

In [234]:
df_imm_data.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

# Dimension: Immigration personal data
- Data format: <br>
    SAS
- explain: <br>
    display data persist in local memory

In [235]:
"""Table: immigration_personal schema -> According to this person data that I will make a core data table to display notifications information.
pk: cicid -> imm_per_cic_id
    schema: StringType()
1. biryear -> imm_person_birth_year
    schema: IntegerType()
2. gender -> imm_person_gender
    schema: StringType()
3. visatype -> imm_person_visa_type
    schema: StringType()
"""

# Dimension Table: Immigration personal data
df_immigration_personal = df_imm_data.withColumn("imm_per_cic_id", col("cicid").cast("String")) \
    .withColumn("imm_person_birth_year", col("biryear").cast("Integer")) \
    .withColumn("imm_person_gender", col("gender").cast("String")) \
    .withColumn("imm_visatype", col("visatype").cast("String")) \
    .select(col("imm_per_cic_id"),
            col("imm_person_birth_year"),
            col("imm_person_gender"),
            col("imm_visatype")).distinct().dropDuplicates(['imm_per_cic_id'])


In [236]:
f"{df_immigration_personal.count():,}"

                                                                                

'3,096,313'

# Dimension: Immigration main data
- Data format: <br>
    SAS
- explain: <br>
    display data persist in local memory

In [237]:
""""Table: immigration_main_information schema
pk: cicid -> imm_main_cic_id
1. i94yr: 4 digit year of the arrival  -> imm_year
2. i94mon: numeric month of the arrival -> imm_month
3. i94citi&i94res: 3 digit code of origin city -> imm_citi_res -> imm_cntyl
4. i94visa: reason for immigration -> imm_visa
    three categories:
        1 = Business
        2 = Pleasure
        3 = Student
5. i94port: 3 character code of destination city --> Foreign key (used to map to USDemographics and City Temperature data) -> imm_port
6. arrdate: arrival date of the departure -> imm_arrival_date:
7. deptdate: departure date
date_add
7. i94mode: 1 digit travel code -> imm_model:
    four categories:
        1 = 'Air'
	    2 = 'Sea'
	    3 = 'Land'
	    9 = 'Not reported'
8. i94addr -> imm_address
    ex: 'AL'='ALABAMA'
9. airline -> imm_airline
10 fltno -> imm_flight_no
    schema: StringType()
"""


def convert_to_datetime(days: DoubleType) -> datetime:
    """convert_to_datetime converts days to datetime format

    Args:
        days (DoubleType): from sas arrive or departure date

    Returns:
        datetime: added days to datetime format result.
    """
    if days is not None:
        date = datetime.strptime('1960-01-01', '%Y-%m-%d')

        return date + timedelta(days=days)

udf_convert_to_datetime = udf(lambda x: convert_to_datetime(x), DateType())

immigration_main_information = df_imm_data.withColumn("imm_main_cic_id", col("cicid").cast("Integer"))\
            .withColumn("imm_year", col("i94yr").cast("Integer"))\
                .withColumn("imm_month", col("i94mon").cast("Integer"))\
                    .withColumn("imm_cntyl", col("i94cit").cast("Integer"))\
                        .withColumn("imm_visa", col("i94visa").cast("Integer"))\
                            .withColumn("imm_port", col("i94port").cast("String"))\
                                .withColumn("imm_arrival_date", udf_convert_to_datetime(col("arrdate")))\
                                    .withColumn("imm_departure_date", udf_convert_to_datetime(col("depdate")))\
                                        .withColumn("imm_model", col("i94mode").cast("Integer"))\
                                            .withColumn("imm_address", col("i94addr").cast("String"))\
                                                .withColumn("imm_airline", col("airline").cast("String"))\
                                                    .withColumn("imm_flight_no", col("fltno").cast("String"))\
        .select(col('imm_main_cic_id'), \
                    col('imm_year'),\
                        col('imm_month'),\
                            col('imm_cntyl'),\
                                col('imm_visa'),\
                                    col('imm_port'),\
                                        col('imm_arrival_date'),\
                                            col('imm_departure_date'),\
                                                col('imm_model'),\
                                                    col('imm_address'),\
                                                        col('imm_airline'),\
                col('imm_flight_no')).distinct().dropDuplicates(['imm_main_cic_id'])

df_immigration_main_information = immigration_main_information.createOrReplaceTempView(
    "immigration_main_information_data")

df_immigration_main_information = spark.sql("SELECT * FROM immigration_main_information_data")

df_immigration_main_information.persist()

df_immigration_main_information.explain()

== Physical Plan ==
InMemoryTableScan [imm_main_cic_id#11808, imm_year#11838, imm_month#11869, imm_cntyl#11901, imm_visa#11934, imm_port#11968, imm_arrival_date#12004, imm_departure_date#12041, imm_model#12078, imm_address#12116, imm_airline#12155, imm_flight_no#12195]
   +- InMemoryRelation [imm_main_cic_id#11808, imm_year#11838, imm_month#11869, imm_cntyl#11901, imm_visa#11934, imm_port#11968, imm_arrival_date#12004, imm_departure_date#12041, imm_model#12078, imm_address#12116, imm_airline#12155, imm_flight_no#12195], StorageLevel(disk, memory, 1 replicas)
         +- SortAggregate(key=[imm_main_cic_id#11808], functions=[first(imm_year#11838, false), first(imm_month#11869, false), first(imm_cntyl#11901, false), first(imm_visa#11934, false), first(imm_port#11968, false), first(imm_arrival_date#12004, false), first(imm_departure_date#12041, false), first(imm_model#12078, false), first(imm_address#12116, false), first(imm_airline#12155, false), first(imm_flight_no#12195, false)])
      

In [238]:
df_immigration_main_information.printSchema()

root
 |-- imm_main_cic_id: integer (nullable = true)
 |-- imm_year: integer (nullable = true)
 |-- imm_month: integer (nullable = true)
 |-- imm_cntyl: integer (nullable = true)
 |-- imm_visa: integer (nullable = true)
 |-- imm_port: string (nullable = true)
 |-- imm_arrival_date: date (nullable = true)
 |-- imm_departure_date: date (nullable = true)
 |-- imm_model: integer (nullable = true)
 |-- imm_address: string (nullable = true)
 |-- imm_airline: string (nullable = true)
 |-- imm_flight_no: string (nullable = true)



In [239]:
f"{df_immigration_main_information.count():,}"

                                                                                

'3,096,313'

# Fact: Nofification

In [28]:
# Notification Table
"""
t2.imm_main_cic_id
t2.imm_per_cic_id
t2.news_cord_uid
src.cidemo_id
src.value_of_imm_destination_city
t2.news_title
t2.news_abstract
t2.news_publish_time
t2.news_authors
"""

#  ** t1: join imm two tables
#  ** t2: join news table with t1
#  ** t3: join us cities table with t2

df_notification = spark.sql(
        "WITH t1 AS \
            (SELECT * \
               FROM immigration_main_information_data imid \
             INNER JOIN imm_personal ip \
                    ON imid.imm_main_cic_id = ip.imm_per_cic_id \
                 WHERE imid.imm_year = 2016 \
            ), t2 AS \
                (SELECT * \
                   FROM t1 \
                 INNER JOIN news_article_data nad \
                        ON t1.imm_arrival_date = nad.news_publish_time \
            ) \
            SELECT  * \
              FROM t2 \
            LIMIT 5 \
        "
    )

df_notification.show(n=5, truncate=3)





+---------------+--------+---------+---------+--------+--------+----------------+------------------+---------+-----------+-----------+-------------+--------------+---------------------+-----------------+------------+-------------+-----------+----------+------------+-------------+-----------------+------------+--------+
|imm_main_cic_id|imm_year|imm_month|imm_cntyl|imm_visa|imm_port|imm_arrival_date|imm_departure_date|imm_model|imm_address|imm_airline|imm_flight_no|imm_per_cic_id|imm_person_birth_year|imm_person_gender|imm_visatype|news_cord_uid|news_source|news_title|news_licence|news_abstract|news_publish_time|news_authors|news_url|
+---------------+--------+---------+---------+--------+--------+----------------+------------------+---------+-----------+-----------+-------------+--------------+---------------------+-----------------+------------+-------------+-----------+----------+------------+-------------+-----------------+------------+--------+
|            982|     201|        4| 

                                                                                

22/06/23 23:02:40 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2361589 ms exceeds timeout 120000 ms
22/06/23 23:02:40 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 2361589 ms
22/06/23 23:02:40 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_10_piece0 !
22/06/23 23:02:40 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_11_piece0 !
22/06/23 23:02:40 WARN SparkContext: Killing executors is not supported by current scheduler.
