<a href="https://colab.research.google.com/github/StevenSongSTS/ss24-capstone-team23-datallah-nkitts-steveso/blob/main/ss24_capstone_team23_datallah_nkitts_steveso/travel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=3233f15433b7e0ba403ad2722d3ee632fd84bb303c861489687c773a4928902e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
# import dependencies
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from tqdm import tqdm
import os
import re

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# get all travel csv's
# source: https://www.transtats.bts.gov/DL_SelectFields.aspx?gnoyr_VQ=FMF&QO_fu146_anzr=Nv4%20Pn44vr45
shared_drive = 'drive/MyDrive/MADS Capstone Team 23/Data'
raw_path = f'{shared_drive}/raw/travel_data'
trns_path = f'{shared_drive}/processed/travel_data'
file_lst = os.listdir(raw_path)
file_lst = [x for x in file_lst if re.search(r'[0-9]{4}', x)]
file_lst

['2000.csv',
 '2001.csv',
 '2002.csv',
 '2003.csv',
 '2004.csv',
 '2005.csv',
 '2007.csv',
 '2008.csv',
 '2006.csv',
 '2009.csv',
 '2010.csv',
 '2012.csv',
 '2011.csv',
 '2013.csv',
 '2015.csv',
 '2014.csv',
 '2016.csv',
 '2018.csv',
 '2019.csv',
 '2017.csv',
 '2021.csv',
 '2022.csv',
 '2023.csv',
 '2024.csv',
 '2020.csv']

In [6]:
# create spark artifacts
spark = SparkSession.builder \
  .appName("Parse Travel Data") \
  .getOrCreate()

query = f'''
  WITH dist AS (
    SELECT DISTINCT
      YEAR,
      MONTH,
      CASE
        WHEN ORIGIN IN ('JFK', 'LGA', 'EWR', 'ISP', 'SWF', 'HPN')
          THEN 'New York Metro'
        WHEN ORIGIN IN ('ORD', 'MDW')
          THEN 'Chicago Metro'
        WHEN ORIGIN IN ('LAX', 'SNA', 'BUR', 'ONT', 'LGB', 'SBD')
          THEN 'Los Angeles Metro'
        WHEN ORIGIN IN ('SEA', 'LKE')
          THEN 'Seattle Metro'
        WHEN ORIGIN IN ('DFW', 'DAL')
          THEN 'Dallas Metro'
        ELSE 'OTHER ORIGIN'
      END AS ORIGIN_METRO,
      CASE
        WHEN DEST IN ('JFK', 'LGA', 'EWR', 'ISP', 'SWF', 'HPN')
          THEN 'New York Metro'
        WHEN DEST IN ('ORD', 'MDW')
          THEN 'Chicago Metro'
        WHEN DEST IN ('LAX', 'SNA', 'BUR', 'ONT', 'LGB', 'SBD')
          THEN 'Los Angeles Metro'
        WHEN DEST IN ('SEA', 'LKE')
          THEN 'Seattle Metro'
        WHEN DEST IN ('DFW', 'DAL')
          THEN 'Dallas Metro'
        ELSE 'OTHER DEST'
      END AS DEST_METRO,
      ORIGIN_CITY_NAME,
      DEST_CITY_NAME,
      CASE
        WHEN LEFT(DATA_SOURCE, 1) = 'D' THEN 'Dom'
        WHEN LEFT(DATA_SOURCE, 1) = 'I' THEN 'Intl'
      END AS INTL_OR_DOM,
      CASE
        WHEN CLASS IN ('F', 'L') THEN 'Passenger'
        WHEN CLASS IN ('G', 'P') THEN 'Cargo'
      END AS PASSNGR_OR_CARGO
    FROM travel_data
    WHERE ORIGIN IN ('JFK', 'LGA', 'EWR', 'ISP', 'SWF', 'HPN',
                    'ORD', 'MDW',
                    'LAX', 'SNA', 'BUR', 'ONT', 'LGB', 'SBD',
                    'SEA', 'LKE',
                    'DFW', 'DAL')
        OR DEST IN ('JFK', 'LGA', 'EWR', 'ISP', 'SWF', 'HPN',
                    'ORD', 'MDW',
                    'LAX', 'SNA', 'BUR', 'ONT', 'LGB', 'SBD',
                    'SEA', 'LKE',
                    'DFW', 'DAL'))

  , orig AS (
    SELECT
      YEAR,
      MONTH,
      ORIGIN_METRO AS METRO,
      INTL_OR_DOM,
      PASSNGR_OR_CARGO,
      COUNT(DISTINCT DEST_CITY_NAME) AS UNIQ_OUTBOUND_CNT
    FROM dist
    WHERE ORIGIN_METRO != DEST_METRO
    GROUP BY 1,2,3,4,5)

  , dest AS (
    SELECT
      YEAR,
      MONTH,
      DEST_METRO AS METRO,
      INTL_OR_DOM,
      PASSNGR_OR_CARGO,
      COUNT(DISTINCT ORIGIN_CITY_NAME) AS UNIQ_INBOUND_CNT
    FROM dist
    WHERE ORIGIN_METRO != DEST_METRO
    GROUP BY 1,2,3,4,5)

  , join AS (
    SELECT
      COALESCE(o.METRO, d.METRO) AS METRO,
      COALESCE(o.YEAR, d.YEAR) AS YEAR,
      COALESCE(o.MONTH, d.MONTH) AS MONTH,
      COALESCE(o.INTL_OR_DOM, d.INTL_OR_DOM) AS INTL_OR_DOM,
      COALESCE(o.PASSNGR_OR_CARGO, d.PASSNGR_OR_CARGO) AS PASSNGR_OR_CARGO,
      UNIQ_OUTBOUND_CNT,
      UNIQ_INBOUND_CNT
    FROM orig o
    FULL OUTER JOIN dest d
    ON    o.METRO = d.METRO
      AND o.YEAR = d.YEAR
      AND o.MONTH = d.MONTH
      AND o.INTL_OR_DOM = d.INTL_OR_DOM
      AND o.PASSNGR_OR_CARGO = d.PASSNGR_OR_CARGO
    )

  SELECT *
  FROM join
  WHERE METRO NOT IN ('OTHER DEST', 'OTHER ORIGIN')
  '''

In [7]:
# create empty starting df
emp_RDD = spark.sparkContext.emptyRDD()
columns = StructType([StructField('YEAR', IntegerType(), True),
                      StructField('MONTH', StringType(), True),
                      StructField('METRO', StringType(), True),
                      StructField('INTL_OR_DOM', StringType(), True),
                      StructField('PASSNGR_OR_CARGO', StringType(), True),
                      StructField('UNIQ_OUTBOUND_CNT', IntegerType(), True),
                      StructField('UNIQ_INBOUND_CNT', IntegerType(), True)])
data = spark.createDataFrame(data = emp_RDD,
                             schema = columns)

In [8]:
# iterate over files and compile df
for f in tqdm(file_lst):
  # read file
  csv_file = f'{raw_path}/{f}'
  df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_file)
  # create travel data view
  df.createOrReplaceTempView("travel_data")
  # use query
  result = spark.sql(query)
  # append to original empty df
  data = data.union(result)

100%|██████████| 25/25 [01:28<00:00,  3.54s/it]


In [9]:
# write to csv
data.toPandas().to_csv(f'{trns_path}/travel.csv', header = True, index = False)