# Gather and explore the data

In [1]:
# imports and installs
from pyspark.sql import SparkSession


In [2]:
# initiate a spark session
spark  = SparkSession \
    .builder \
    .appName("Capstone Project") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 11:07:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '55640'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '10.12.21.149'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.app.name', 'Capstone Project'),
 ('spa

Now load the pre-downloaded data 

In [4]:
airport_codes = spark.read.csv("data/airport-codes_csv.csv")
airport_codes.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  _c0|          _c1|                 _c2|         _c3|      _c4|        _c5|       _c6|         _c7|     _c8|      _c9|      _c10|                _c11|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     

We can see that airport codes has a header and to get rid of the c0-c11 headings we will set header to true. So, let's load the dataset again with header set to true.

In [7]:
airport_codes = spark.read.option("header","true").csv("data/airport-codes_csv.csv")
airport_codes.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [6]:
airport_codes.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [None]:
# count entries
airport_codes.count()

Now load the US cities demographics dataframe

In [8]:
city_demo = spark.read.csv("data/us-cities-demographics.csv")
city_demo.show()

+--------------------+
|                 _c0|
+--------------------+
|City;State;Median...|
|Denver;Colorado;3...|
|Provo;Utah;23.6;5...|
|Hampton;Virginia;...|
|Birmingham;Alabam...|
|Greeley;Colorado;...|
|Spring Valley;Nev...|
|Tustin;California...|
|Springfield;Massa...|
|Fort Wayne;Indian...|
|Santa Ana;Califor...|
|Dayton;Ohio;32.8;...|
|Redondo Beach;Cal...|
|Shawnee;Kansas;40...|
|Bethlehem;Pennsyl...|
|Pasadena;Californ...|
|Somerville;Massac...|
|Muncie;Indiana;27...|
|Sparks;Nevada;36....|
|Compton;Californi...|
+--------------------+
only showing top 20 rows



In [None]:
# we have to set the delimiter to ";"

In [None]:
city_demo = spark.read.option("delimiter",";").option("header","true").csv("data/us-cities-demographics.csv")
city_demo.show()

+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|         City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+-------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|       Denver|     Colorado|      34.1|         341137|           341408|          682545|             29363|      113222|                  2.33|        CO|Black or African-...|72288|
|        Provo|         Utah|      23.6|          56231|            59027|          115258|              2177|       10925|    3.2800000000000002|        UT|American Indian a...| 1916|
|      Hampton|     Virginia|      35.5|          66214|            70240| 

23/11/09 19:10:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 7195285 ms exceeds timeout 120000 ms
23/11/09 19:10:27 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/09 19:10:27 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.

In [9]:
# count entries
city_demo.count()

2891

Check how many different cities there are

In [13]:
city_demo.select('City', 'State').dropDuplicates().sort('City').show()

+------------+------------+
|        City|       State|
+------------+------------+
|     Abilene|       Texas|
|       Akron|        Ohio|
|     Alafaya|     Florida|
|     Alameda|  California|
|      Albany|    New York|
|      Albany|     Georgia|
| Albuquerque|  New Mexico|
|  Alexandria|    Virginia|
|    Alhambra|  California|
|       Allen|       Texas|
|       Allen|Pennsylvania|
|    Amarillo|       Texas|
|        Ames|        Iowa|
|     Anaheim|  California|
|   Anchorage|      Alaska|
|   Ann Arbor|    Michigan|
|     Antioch|  California|
|Apple Valley|  California|
|    Appleton|   Wisconsin|
|Arden-Arcade|  California|
+------------+------------+
only showing top 20 rows



Drop records where the city column contains NA values

In [14]:
city_demo.dropna(how="any", subset=['City'])
city_demo.count()

2891

23/07/05 03:57:15 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 238256 ms exceeds timeout 120000 ms
23/07/05 03:57:15 WARN SparkContext: Killing executors is not supported by current scheduler.
23/07/05 03:57:22 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B

**Data dictionary**:
| Column Name | Description |
| :--- | :--- |
| City | City name |
| State | US state |
| Median Age | Median age of the population |
| Male Population | Male population size |
| Female Population | Female population size |
| Total Population | Total population size |
| Number of Veterans | Number of veterans in the population |
| Foreign-born | Number of residents that were born in another city |
| Average Household Size | Average size of the houses in the city |
| State Code | Two-letter state code |
| Race | Race class |
| Count | Number of members of a given race |


Now read the immigration data parquet files

In [4]:
sas_data = spark.read.parquet("data/sas_data")
sas_data.show()

23/06/25 11:30:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [5]:
sas_data.count()

3096313

drop records where i94addr (state code) is null, since this is a primary key

**Data dictionary**:
| Column Name | Description |
| :--- | :--- |
| CICID* | ID that uniquely identify one record in the dataset |
| I94YR | 4 digit year |
| I94MON | Numeric month |
| I94CIT | 3 digit code of city of origin |
| I94RES | 3 digit code of country of origin |
| I94PORT | Port addmitted through |
| ARRDATE | Arrival date in the USA |
| I94MODE | Mode of arrival (1 = Air; 2 = Sea; 3 = Land; 9 = Not reported) |
| I94ADDR | State of arrival |
| DEPDATE | Departure date from the USA |
| I94BIR | Age of Respondent in Years |
| I94VISA | Visa codes collapsed into three categories: (1 = Business; 2 = Pleasure; 3 = Student) |
| COUNT | Used for summary statistics |
| DTADFILE | Character Date Field |
| VISAPOST | Department of State where Visa was issued |
| OCCUP | Occupation that will be performed in U.S. |
| ENTDEPA | Arrival Flag. Whether admitted or paroled into the US |
| ENTDEPD | Departure Flag. Whether departed, lost visa, or deceased |
| ENTDEPU | Update Flag. Update of visa, either apprehended, overstayed, or updated to PR |
| MATFLAG | Match flag of arrival and departure records|
| BIRYEAR | 4 digit year of birth |
| DTADDTO | Character date field - Date to which admitted to US (allowed to stay until) |
| GENDER | Non-immigrant sex|
| INSNUM | INS number |
| AIRLINE | Airline used to arrive in U.S. |
| ADMNUM | Admission number |
| FLTNO | Flight number of Airline used to arrive in U.S. |
| VISATYPE | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |