In [1]:
import pyspark
from pyspark import SparkContext, SQLContext

conf = pyspark.SparkConf().setMaster('local[1]').setAppName('LocalSparkSqlTest')
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [2]:
sc

Read the Airports file into a Spark Dataframe.

Note the used of the sqlContext for this.

Count the rows, display the first 5 rows

In [3]:
airportsDf = sqlContext.read.csv("hdfs://hadoop-master:9000/user/ec2-user/airports.csv", header=True)
print(airportsDf.count())
airportsDf.take(5)

322


[Row(IATA_CODE='ABE', AIRPORT='Lehigh Valley International Airport', CITY='Allentown', STATE='PA', COUNTRY='USA', LATITUDE='40.65236', LONGITUDE='-75.44040'),
 Row(IATA_CODE='ABI', AIRPORT='Abilene Regional Airport', CITY='Abilene', STATE='TX', COUNTRY='USA', LATITUDE='32.41132', LONGITUDE='-99.68190'),
 Row(IATA_CODE='ABQ', AIRPORT='Albuquerque International Sunport', CITY='Albuquerque', STATE='NM', COUNTRY='USA', LATITUDE='35.04022', LONGITUDE='-106.60919'),
 Row(IATA_CODE='ABR', AIRPORT='Aberdeen Regional Airport', CITY='Aberdeen', STATE='SD', COUNTRY='USA', LATITUDE='45.44906', LONGITUDE='-98.42183'),
 Row(IATA_CODE='ABY', AIRPORT='Southwest Georgia Regional Airport', CITY='Albany', STATE='GA', COUNTRY='USA', LATITUDE='31.53552', LONGITUDE='-84.19447')]

Some columns are strings that we want to convert to double

In [4]:
from pyspark.sql.types import DoubleType
print("original dtypes: ", airportsDf.dtypes)
airportsDf = airportsDf.withColumn("LATITUDE", airportsDf["LATITUDE"].cast(DoubleType()))
airportsDf = airportsDf.withColumn("LONGITUDE", airportsDf["LONGITUDE"].cast(DoubleType()))
print("updated dtypes: ", airportsDf.dtypes)

original dtypes:  [('IATA_CODE', 'string'), ('AIRPORT', 'string'), ('CITY', 'string'), ('STATE', 'string'), ('COUNTRY', 'string'), ('LATITUDE', 'string'), ('LONGITUDE', 'string')]
updated dtypes:  [('IATA_CODE', 'string'), ('AIRPORT', 'string'), ('CITY', 'string'), ('STATE', 'string'), ('COUNTRY', 'string'), ('LATITUDE', 'double'), ('LONGITUDE', 'double')]


Now we can filter the Spark Dataframe similar to how it's done with pandas

In [5]:
tropAirports = airportsDf.filter((airportsDf["LATITUDE"] > -23) & (airportsDf["LATITUDE"] < 23))
tropAirports.count()

12

In [6]:
tropAirports.collect()

[Row(IATA_CODE='BQN', AIRPORT='Rafael Hernández Airport', CITY='Aguadilla', STATE='PR', COUNTRY='USA', LATITUDE=18.49486, LONGITUDE=-67.12944),
 Row(IATA_CODE='GUM', AIRPORT='Guam International Airport', CITY='Agana', STATE='GU', COUNTRY='USA', LATITUDE=13.48345, LONGITUDE=-144.79598),
 Row(IATA_CODE='HNL', AIRPORT='Honolulu International Airport', CITY='Honolulu', STATE='HI', COUNTRY='USA', LATITUDE=21.31869, LONGITUDE=-157.92241),
 Row(IATA_CODE='ITO', AIRPORT='Hilo International Airport', CITY='Hilo', STATE='HI', COUNTRY='USA', LATITUDE=19.72026, LONGITUDE=-155.04847),
 Row(IATA_CODE='KOA', AIRPORT='Kona International Airport at Keahole', CITY='Kailua/Kona', STATE='HI', COUNTRY='USA', LATITUDE=19.73877, LONGITUDE=-156.04563),
 Row(IATA_CODE='LIH', AIRPORT='Lihue Airport', CITY='Lihue', STATE='HI', COUNTRY='USA', LATITUDE=21.97598, LONGITUDE=-159.33896),
 Row(IATA_CODE='OGG', AIRPORT='Kahului Airport', CITY='Kahului', STATE='HI', COUNTRY='USA', LATITUDE=20.89865, LONGITUDE=-156.43046

In [7]:
tropAirports.write.format("csv").save("hdfs://hadoop-master:9000/user/ec2-user/tropAirports.csv", header=True)

We can convert a spark dataframe to a pandas dataframe if we've cut it down to a small enough size

In [8]:
pdTropAirports = tropAirports.toPandas()
pdTropAirports.set_index("IATA_CODE", inplace=True)
display(pdTropAirports)

Unnamed: 0_level_0,AIRPORT,CITY,STATE,COUNTRY,LATITUDE,LONGITUDE
IATA_CODE,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
BQN,Rafael Hernández Airport,Aguadilla,PR,USA,18.49486,-67.12944
GUM,Guam International Airport,Agana,GU,USA,13.48345,-144.79598
HNL,Honolulu International Airport,Honolulu,HI,USA,21.31869,-157.92241
ITO,Hilo International Airport,Hilo,HI,USA,19.72026,-155.04847
KOA,Kona International Airport at Keahole,Kailua/Kona,HI,USA,19.73877,-156.04563
LIH,Lihue Airport,Lihue,HI,USA,21.97598,-159.33896
OGG,Kahului Airport,Kahului,HI,USA,20.89865,-156.43046
PPG,Pago Pago International Airport (Tafuna Airport),Pago Pago,AS,USA,14.33102,-170.71053
PSE,Mercedita Airport,Ponce,PR,USA,18.0083,-66.56301
SJU,Luis Muñoz Marín International Airport,San Juan,PR,USA,18.43942,-66.00183
