### Explore staging/i94_cit_res_data.csv

Exploration summary:
- 289 records
- 2 columns: country_id and country
- Both columns come in as strings even though country_id is really numeric
- Neither column contains any missing values
- country_id has no duplicates but country does

Cleaning steps needed:
- Cast country_id as int
- Make country unique by appending '(<country_id>)' to string name
- Do this only for cases where i94_cit_res_name equals INVALID: STATELESS or INVALID: UNITED STATES

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1590937788150_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Get SparkSession object
spark = SparkSession.builder.getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Filename of input file
filename = 's3://data-eng-capstone-cf/staging/i94_cit_res_data.csv'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Read into a spark dataframe
df = spark.read.csv(filename, header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Print schema
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- country_id: string (nullable = true)
 |-- country: string (nullable = true)

In [6]:
# What does the file look like?
df.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------------------------------------------------------+
|country_id|country                                                  |
+----------+---------------------------------------------------------+
|582       |MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|
|236       |AFGHANISTAN                                              |
|101       |ALBANIA                                                  |
|316       |ALGERIA                                                  |
|102       |ANDORRA                                                  |
|324       |ANGOLA                                                   |
|529       |ANGUILLA                                                 |
|518       |ANTIGUA-BARBUDA                                          |
|687       |ARGENTINA                                                |
|151       |ARMENIA                                                  |
|532       |ARUBA                                                    |
|438  

In [7]:
# How many records?
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

289

In [8]:
# How many NaN/None valued rows are there per column?
# https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
df.select([F.sum((F.isnan(c) | F.col(c).isNull()).cast(IntegerType())).alias(c) for c in df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------+
|country_id|country|
+----------+-------+
|         0|      0|
+----------+-------+

In [9]:
# Summary stats?
df.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+-----------+
|summary|        country_id|    country|
+-------+------------------+-----------+
|  count|               289|        289|
|   mean|389.24567474048445|       null|
| stddev|210.20353526976587|       null|
|    min|                 0|AFGHANISTAN|
|    max|               999|   ZIMBABWE|
+-------+------------------+-----------+

In [10]:
# Check if unqiue in country_id
df.select('country_id').count() == df.select('country_id').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True

In [11]:
# Check if unqiue in country_name
df.select('country').count() == df.select('country').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

False

In [12]:
# What are the duplicate values in country_name?
df.groupBy(F.col('country')).count().filter(F.col('count')>1).show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+-----+
|country               |count|
+----------------------+-----+
|INVALID: STATELESS    |2    |
|INVALID: UNITED STATES|2    |
+----------------------+-----+