# 1. Gazetteer Data

a. Create Unmanaged Tables

The first step of this assignment involves loading the data from the CSV files, combining the file with the file for the other year, and saving it to disk as a table.

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [4]:
warehouse_dir = 'dsc_650/warehouse'
spark = SparkSession.builder.appName("DSC650Assignment5").config("spark.sql.warehouse.dir", warehouse_dir).getOrCreate()

In [6]:
df = df1.unionAll(df2)
df.write.saveAsTable('places')

In [7]:
gaz_2017_files = sc.wholeTextFiles('FileStore/tables/650/data/gazetteer/2017/*.csv').map(lambda x: x[0]).collect()
gaz_2018_files = sc.wholeTextFiles('FileStore/tables/650/data/gazetteer/2018/*.csv').map(lambda x: x[0]).collect()
gaz_zips = list(zip(gaz_2017_files, gaz_2018_files))

In [8]:
import re
re.findall('(\w+).csv', gaz_zips[0][0])

In [9]:
for i in range(len(gaz_zips)):
  table_name = re.findall('(\w+).csv', gaz_zips[i][0])
  first_table = gaz_zips[i][0]
  second_table = gaz_zips[i][1]
  df1 = spark.read.load(first_table,format='csv',sep=',',inferSchema=True,header=True)
  df2 = spark.read.load(second_table,format='csv',sep=',',inferSchema=True,header=True)
  df = df1.unionAll(df2)
  df.write.saveAsTable(str(table_name[0]))

In [10]:
for i in range(len(gaz_zips)):
  table_name = re.findall('(\w+).csv', gaz_zips[i][0])[0]
  query = "SELECT COUNT(*) AS row_count FROM " + table_name
  print(table_name)
  spark.sql(query).show()

In [11]:
def create_external_table(table_name):
  table_dir = os.path.join(warehouse_dir, table_name)
  return spark.catalog.createExternalTable(table_name, table_dir)
import os
def create_external_tables():
    for table_name in table_names:
        create_external_table(table_name)


In [12]:
table_names = []
for i in range(len(gaz_zips)):
  table_name = re.findall('(\w+).csv', gaz_zips[i][0])[0]
  table_name = table_name + '_external'
  table_names.append(table_name)
  create_external_table(table_name)
create_external_tables()

# b. Load and Query Tables

Now that I have saved the data to external tables, I will load the tables back into Spark and create a report using Spark SQL. For this report, I will create a report on school districts for the states of Nebraska and Iowa using the elementary_schools, secondary_schools and unified_school_districts tables.

In [14]:
Unified = spark.sql("SELECT state as State, year as Year, count(*) as Unified FROM unified_school_districts GROUP BY state, year")
Secondary = spark.sql("SELECT state as State, year as Year, count(*) as Secondary FROM secondary_schools GROUP BY state, year")
Elementary = spark.sql("SELECT state as State, year as Year, count(*) as Elementary FROM elementary_schools GROUP BY state, year")
s = Unified.join(Secondary,(Unified.Year == Secondary.Year) & (Unified.State == Secondary.State), 'left')
z = s.join(Elementary,(Unified.Year == Elementary.Year) & (Unified.State == Elementary.State), 'left')
z = z.drop(Secondary.Year).drop(Secondary.State).drop(Elementary.Year).drop(Elementary.State)
z.na.fill(0).filter((z.State == 'NE') | (z.State == 'IA')).sort(z.State).show()

In [15]:
spark.sql("SELECT unified_school_districts.state, unified_school_districts.year, count(unified_school_districts.state) as Unified, count(secondary_schools.state) \
        as Secondary, count(elementary_schools.state) as Elementary FROM unified_school_districts \
        LEFT JOIN secondary_schools on unified_school_districts.state = secondary_schools.state and unified_school_districts.year = secondary_schools.year \
        LEFT JOIN elementary_schools on unified_school_districts.state = elementary_schools.state and unified_school_districts.year = elementary_schools.year \
        WHERE unified_school_districts.state = 'IA' or unified_school_districts.state = 'NE' GROUP BY unified_school_districts.state, unified_school_districts.year \
        SORT BY unified_school_districts.state").show()

# 2. Flight Data

In the previous exercise, you joined data from flights and airport codes to create a report. Create an external table for airport_codes and domestic_flights from the domestic-flights/flights.parquet and airport-codes/airport-codes.csv files. From this I will create a report of the top ten airports for 2008 using Spark SQL instead of dataframes.

In [17]:
df_flights = spark.read.csv('/FileStore/tables/flights.csv',inferSchema=True,header=True)
df_airport_codes = spark.read.csv('/FileStore/tables/airport_codes-e62ed.csv',inferSchema=True,header=True)

In [18]:
spark.sql("CREATE VIEW flights_08 AS SELECT flights.origin_airport_code, airport_codes.name, flights.passengers, \
          flights.flight_date, flights.flights FROM flights \
          LEFT JOIN airport_codes ON flights.origin_airport_code = airport_codes.iata_code \
          WHERE flights.flight_date >= '2008-01-01' and flights.flight_date <= '2008-12-31' ").show()

In [19]:
spark.sql("SELECT * FROM flights_08 ").show()

In [20]:
spark.sql("SELECT name AS Name, origin_airport_code as `IATA code`, SUM(flights) AS `Total Inbound Flights`, SUM(passengers) as `Total Inbound Passengers`, MEAN(passengers) as `Average Daily Passengers` FROM flights_08 p1\
  GROUP BY origin_airport_code, name ORDER BY `Total Inbound Flights` DESC LIMIT 10").show()