In [1]:
import asyncio
import contextvars
import functools
import logging
import os
from os.path import isfile, join
import pyarrow.parquet as pq
from time import perf_counter
from typing import List

import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager

In [17]:
TLC_URL = 'https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page'
DATA_DIR = '../../data/raw/'

In [3]:
def construct_url_list() -> set:
    """
    This method is used to scrap all the URLs of TLC Trip Data from the web and store them to a list

    :return: List of unique URLs that contain the parquet data files
    """
    url_list = set()

    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")

    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
    driver.get(TLC_URL)
    link = driver.find_element(By.LINK_TEXT, 'Expand All')
    link.click()

    soup = BeautifulSoup(driver.page_source, 'html.parser')
    for u in soup.select('[class="faq-v1"] div'):
        for a in u.find_all('a', href=True):
            href: str = a.get('href')
            if href.startswith('https://'):
                url_list.add(href)
    driver.close()
    driver.quit()
    return url_list

In [4]:
def write_to_file(file_name: str, data: List[str]) -> None:
    """
    Writes data to file

    :param file_name: file name
    :param data: data to write
    """
    with open(file_name, 'w') as f:
        f.write('\n'.join(data))


def clean_faulty_files() -> None:
    """
    Removes all faulty parquet files from the data directory
    """
    files = [f for f in os.listdir(DATA_DIR) if isfile(join(DATA_DIR, f))]
    for f in files:
        try:
            pq.ParquetFile('{}/{}'.format(DATA_DIR, f))
        except Exception as e:
            logging.info("Deleting corrupted file '{}': {}".format(f, e))
            os.remove('{}/{}'.format(DATA_DIR, f))

In [5]:
s3_urls = list(construct_url_list())

In [7]:
len(s3_urls)

460

In [8]:
s3_urls[:5]

['https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2015-09.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-06.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2017-06.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2019-02.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-11.parquet']

In [13]:
file_url = s3_urls[0]

In [14]:
file_name = file_url.split('/')[-1]
file_name

'fhv_tripdata_2015-09.parquet'

In [11]:
os.path.join(DATA_DIR, file_name)

'data/fhv_tripdata_2015-09.parquet'

In [15]:
def http_get_sync(url: str) -> bytes:
    """
    Synchronous call to get the content of a URL
    :param url: URL address
    :return: URL content
    """
    response = requests.get(url)
    return response.content

In [16]:
content = http_get_sync(file_url)

In [18]:
with open(os.path.join(DATA_DIR, file_name), 'wb') as f:
    f.write(content)

In [19]:
START_YEAR = 2021

In [22]:
filtered_urls = [url for url in s3_urls if url.endswith('.parquet') and int(url[-15:-11]) >= START_YEAR]
len(filtered_urls)

65