In [1]:
import requests
from bs4 import BeautifulSoup
import os

In [2]:
# URL of the NYC TLC Trip Record Data page
url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Send a GET request to the URL
response = requests.get(url)

# Parse the HTML content
soup = BeautifulSoup(response.content, "html.parser")

# Find the div with id "faq2019" containing the links
faq2019_div = soup.find("div", id="faq2019")

# Initialize variables to store URLs
parquet_urls = []

# Loop through all links in the faq2019_div
for link in faq2019_div.find_all("a", href=True):
    href = link["href"]
    if "parquet" in href and "yellow_tripdata" in href and ("2019-02" in href or "2019-03" in href):
        parquet_urls.append(href)

# Function to download a file
def download_file(url, save_path):
    response = requests.get(url)
    with open(save_path, 'wb') as f:
        f.write(response.content)

# Create a directory to save the files
os.makedirs("parquet_files", exist_ok=True)

# Download each Parquet file
for url in parquet_urls:
    filename = os.path.basename(url)
    save_path = os.path.join("parquet_files", filename)
    download_file(url, save_path)
    print(f"Downloaded: {filename}")

print("All files downloaded successfully.")


Downloaded: yellow_tripdata_2019-02.parquet
Downloaded: yellow_tripdata_2019-03.parquet
All files downloaded successfully.


In [2]:
import requests
from bs4 import BeautifulSoup
import os
from urllib.parse import urljoin

# URL of the NYC TLC Trip Record Data page
url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Send a GET request to the URL
response = requests.get(url)

# Parse the HTML content
soup = BeautifulSoup(response.content, "html.parser")

# Find the div with id "faq2019" containing the links
faq2019_div = soup.find("div", id="faq2019")

# Initialize a dictionary to store URLs by month
parquet_urls_by_month = {}

# Loop through all links in the faq2019_div
for link in faq2019_div.find_all("a", href=True):
    href = link["href"]
    # Check if the link is a Parquet file and belongs to a specific month
    if "parquet" in href and ("2019-02" in href or "2019-03" in href):
        month = href.split('_')[1]  # Extract month from the URL (assuming URL format is consistent)
        if month not in parquet_urls_by_month:
            parquet_urls_by_month[month] = []
        parquet_urls_by_month[month].append(href)

# Function to download a file
def download_file(url, save_path):
    response = requests.get(url)
    with open(save_path, 'wb') as f:
        f.write(response.content)

# Create a directory to save the files
os.makedirs("parquet_files", exist_ok=True)

# Download each Parquet file for each month
for month, urls in parquet_urls_by_month.items():
    month_directory = os.path.join("parquet_files", month)
    os.makedirs(month_directory, exist_ok=True)  # Create month directory
    for url in urls:
        filename = os.path.basename(url)
        save_path = os.path.join(month_directory, filename)
        download_file(url, save_path)
        print(f"Downloaded: {filename} to {month_directory}")

print("All files downloaded successfully.")


Downloaded: yellow_tripdata_2019-02.parquet to parquet_files\tripdata
Downloaded: green_tripdata_2019-02.parquet to parquet_files\tripdata
Downloaded: fhv_tripdata_2019-02.parquet to parquet_files\tripdata
Downloaded: fhvhv_tripdata_2019-02.parquet to parquet_files\tripdata
Downloaded: yellow_tripdata_2019-03.parquet to parquet_files\tripdata
Downloaded: green_tripdata_2019-03.parquet to parquet_files\tripdata
Downloaded: fhv_tripdata_2019-03.parquet to parquet_files\tripdata
Downloaded: fhvhv_tripdata_2019-03.parquet to parquet_files\tripdata
All files downloaded successfully.


In [10]:
pip install pandas pymongo pyarrow


Collecting pymongo
  Obtaining dependency information for pymongo from https://files.pythonhosted.org/packages/51/28/577224211f43e2079126bfec53080efba46e59218f47808098f125139558/pymongo-4.8.0-cp311-cp311-win_amd64.whl.metadata
  Downloading pymongo-4.8.0-cp311-cp311-win_amd64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Obtaining dependency information for dnspython<3.0.0,>=1.16.0 from https://files.pythonhosted.org/packages/87/a1/8c5287991ddb8d3e4662f71356d9656d91ab3a36618c3dd11b280df0d255/dnspython-2.6.1-py3-none-any.whl.metadata
  Downloading dnspython-2.6.1-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.8.0-cp311-cp311-win_amd64.whl (630 kB)
   ---------------------------------------- 0.0/631.0 kB ? eta -:--:--
   -- ------------------------------------ 41.0/631.0 kB 960.0 kB/s eta 0:00:01
   ----------- ---------------------------- 174.1/631.0 kB 2.1 MB/s eta 0:00:01
   -------------------------- ------------- 419.8/631.0 kB 3.3 MB/s eta 0:0

In [None]:
# import os
# import pandas as pd
# import pyarrow as pa
# import pyarrow.parquet as pq
# import pyarrow.compute as pc
# from pymongo import MongoClient, errors

# # MongoDB connection settings
# client = MongoClient('mongodb://localhost:27017/')
# db = client['apnabase']

# # Directory where Parquet files are saved
# parquet_directory = "parquet_files/tripdata"

# # Iterate through each file in the parquet directory
# for root, dirs, files in os.walk(parquet_directory):
#     for file in files:
#         if file.endswith(".parquet"):  # Target specific file
#             file_path = os.path.join(root, file)
#             collection_name = os.path.splitext(file)[0]  # Use file name as collection name

#             # Print Parquet file information
#             print(f"Processing Parquet file: {file_path}")

#             # Read Parquet file into a table using pyarrow and filter invalid datetime values
#             try:
#                 table = pq.read_table(file_path)
#                 df = table.filter(
#                     pc.less_equal(table["dropOff_datetime"], pa.scalar(pd.Timestamp.max))
#                 ).to_pandas()
#             except Exception as e:
#                 print(f"Failed to read and process Parquet file {file_path}: {e}")
#                 continue

#             # Display data (first few rows)
#             print(f"Data preview for collection '{collection_name}':")
#             print(df.head())  # Display the first few rows of the dataframe
#             print("\n")

#             # Convert datetime columns to UTC before inserting into MongoDB
#             for col in df.select_dtypes(include=['datetime64[ns]']):
#                 try:
#                     # Localize timestamps (assuming your local timezone)
#                     # Replace 'Asia/Kolkata' with your actual local timezone if needed
#                     df[col] = df[col].dt.tz_localize('Asia/Kolkata', ambiguous='NaT', nonexistent='shift_forward')
#                     # Convert to UTC
#                     df[col] = df[col].dt.tz_convert('UTC')
#                 except Exception as e:
#                     print(f"Error localizing timezone for column {col}: {e}")

#             # Convert dataframe to records (list of dictionaries)
#             records = df.to_dict(orient='records')

#             # Batch insertion into MongoDB collection
#             chunk_size = 1000  # Adjust as needed
#             try:
#                 collection = db[collection_name]
#                 for i in range(0, len(records), chunk_size):
#                     collection.insert_many(records[i:i + chunk_size])
#                 print(f"Inserted {len(records)} documents into collection '{collection_name}' in MongoDB.")
#             except errors.BulkWriteError as bwe:
#                 print(f"Bulk write error while inserting documents into collection '{collection_name}': {bwe.details}")
#             except Exception as e:
#                 print(f"An error occurred while inserting documents into collection '{collection_name}': {e}")
#             print("\n")

# # Close MongoDB connection
# client.close()
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
from pymongo import MongoClient, errors

# MongoDB connection settings
client = MongoClient('mongodb://localhost:27017/')
db = client['apnabase']

# Directory where Parquet files are saved
parquet_directory = "parquet_files/tripdata"

# Iterate through each file in the parquet directory
for root, dirs, files in os.walk(parquet_directory):
    for file in files:
        if file.endswith(".parquet"):  # Target specific file
            file_path = os.path.join(root, file)
            collection_name = os.path.splitext(file)[0]  # Use file name as collection name

            # Print Parquet file information
            print(f"Processing Parquet file: {file_path}")

            # Read Parquet file into a table using pyarrow and filter invalid datetime values if applicable
            try:
                table = pq.read_table(file_path)
                if "dropOff_datetime" in table.schema.names:
                    df = table.filter(
                        pc.less_equal(table["dropOff_datetime"], pa.scalar(pd.Timestamp.max))
                    ).to_pandas()
                else:
                    df = table.to_pandas()
            except Exception as e:
                print(f"Failed to read and process Parquet file {file_path}: {e}")
                continue

            # Display data (first few rows)
            print(f"Data preview for collection '{collection_name}':")
            print(df.head())  # Display the first few rows of the dataframe
            print("\n")

            # Convert datetime columns to UTC before inserting into MongoDB
            for col in df.select_dtypes(include=['datetime64[ns]']):
                try:
                    # Localize timestamps (assuming your local timezone)
                    # Replace 'Asia/Kolkata' with your actual local timezone if needed
                    df[col] = df[col].dt.tz_localize('Asia/Kolkata', ambiguous='NaT', nonexistent='shift_forward')
                    # Convert to UTC
                    df[col] = df[col].dt.tz_convert('UTC')
                except Exception as e:
                    print(f"Error localizing timezone for column {col}: {e}")

            # Convert dataframe to records (list of dictionaries)
            records = df.to_dict(orient='records')

            # Batch insertion into MongoDB collection
            chunk_size = 1000  # Adjust as needed
            try:
                collection = db[collection_name]
                for i in range(0, len(records), chunk_size):
                    collection.insert_many(records[i:i + chunk_size])
                print(f"Inserted {len(records)} documents into collection '{collection_name}' in MongoDB.")
            except errors.BulkWriteError as bwe:
                print(f"Bulk write error while inserting documents into collection '{collection_name}': {bwe.details}")
            except Exception as e:
                print(f"An error occurred while inserting documents into collection '{collection_name}': {e}")
            print("\n")

# Close MongoDB connection
client.close()



Processing Parquet file: parquet_files/tripdata\fhvhv_tripdata_2019-02.parquet
Data preview for collection 'fhvhv_tripdata_2019-02':
  hvfhs_license_num dispatching_base_num originating_base_num  \
0            HV0003               B02867               B02867   
1            HV0003               B02879               B02879   
2            HV0005               B02510                 None   
3            HV0005               B02510                 None   
4            HV0005               B02510                 None   

     request_datetime   on_scene_datetime     pickup_datetime  \
0 2019-02-01 00:01:26 2019-02-01 00:02:55 2019-02-01 00:05:18   
1 2019-02-01 00:26:08 2019-02-01 00:41:29 2019-02-01 00:41:29   
2 2019-02-01 00:48:58                 NaT 2019-02-01 00:51:34   
3 2019-02-01 00:02:15                 NaT 2019-02-01 00:03:51   
4 2019-02-01 00:06:17                 NaT 2019-02-01 00:09:44   

     dropoff_datetime  PULocationID  DOLocationID  trip_miles  ...  sales_tax  \
0 20