In [12]:
from nyc_data_pipeline.source_extract_async import *

In [14]:
import asyncio
from bs4 import BeautifulSoup
import logging
import subprocess
import aiohttp
import os
import random
# [Include the class definitions of NYCPublicDataFetcher, NYCEndpointFetcher, and NYCUrlFetcher here]

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')



In [15]:
async def fetch_311_data(start_url):
    logging.info("Starting the 311 data fetching process")
    
    public_data_fetcher = NYCPublicDataFetcher(start_url)
    endpoint_fetcher = NYCEndpointFetcher()
    url_fetcher = NYCUrlFetcher()

    logging.info("Fetching initial data")
    initial_data = await url_fetcher.run(start_url)
    if not initial_data:
        logging.error("Failed to fetch initial data")
        return

    logging.info("Fetching main content data")
    main_content_data = await public_data_fetcher.run()
    if not main_content_data:
        logging.error("Failed to fetch main content data")
        return

    logging.info("Fetching detailed endpoint data")
    detailed_data = await endpoint_fetcher.run(main_content_data)
    if not detailed_data:
        logging.error("Failed to fetch detailed endpoint data")
        return

    logging.info("Data fetching process completed")
    return detailed_data

In [16]:
# Define the starting URL for the 311 data
start_url = 'https://data.cityofnewyork.us/browse?category=311'

# Asynchronous function to run the fetchers
data = await fetch_311_data(start_url)


2024-01-20 12:33:06,160 - INFO - Starting the 311 data fetching process
2024-01-20 12:33:06,164 - INFO - Fetching initial data
2024-01-20 12:33:09,346 - ERROR - Failed to fetch initial data


In [24]:
class NYCPipeline:
    
    def __init__(self, urls, bucket_name):
        self.urls = urls
        self.bucket_name = bucket_name
        self.endpoints = {}
        # self.manager = OortLandingZoneManager(bucket_name=self.bucket_name)
        # self.pipeline_manager = UnifiedPipelineManager()

    async def fetch_endpoints_from_url(self, url):
        data_fetcher = NYCPublicDataFetcher(url)
        data_dict = await data_fetcher.run()
        data_dict = {url.split('/')[-2]: url.lower() for _, url in data_dict.items()}
        endpoint_fetcher = NYCEndpointFetcher()
        endpoints = await endpoint_fetcher.run(data_dict)
        return endpoints

    async def fetch_all_endpoints(self):
        tasks = [self.fetch_endpoints_from_url(url) for url in self.urls]
        results = await asyncio.gather(*tasks)
        for endpoints in results:
            self.endpoints.update(endpoints)
        
    async def save_data_to_file(self, data, filename):
        with open(filename, 'w') as file:
            file.write(data)
    
    async def run(self):
        logging.info("Starting the NYCPipeline process")
        
        # Fetch all endpoints from the provided URLs
        logging.info("Fetching all endpoints from URLs")
        await self.fetch_all_endpoints()

        # Dictionary to store data
        data_dict = {}

        # Save the fetched data in the dictionary
        for key, endpoint in self.endpoints.items():
            identifier = f"{key}"  # or any other naming scheme you prefer
            logging.info(f"Saving data for {key}")
            data_dict[identifier] = endpoint

        logging.info("NYCPipeline process completed")

        # Return the dictionary with data
        return data_dict


In [26]:
# Initialize the pipeline
pipeline = NYCPipeline(urls=["https://data.cityofnewyork.us/browse?q=311"], bucket_name="your_bucket_name")

# Run the pipeline and get the data dictionary
data_dict = await pipeline.run()

# Now, data_dict contains your data
# You can process it as you need


2024-01-20 13:11:35,122 - INFO - Starting the NYCPipeline process
2024-01-20 13:11:35,129 - INFO - Fetching all endpoints from URLs
2024-01-20 13:12:01,083 - INFO - Saving data for 311-Service-Requests-from-2010-to-Present
2024-01-20 13:12:01,083 - INFO - Saving data for 311-Service-Requests-for-2009
2024-01-20 13:12:01,084 - INFO - Saving data for 311-Call-Center-Inquiry
2024-01-20 13:12:01,084 - INFO - Saving data for 311-Service-Requests-for-2004
2024-01-20 13:12:01,084 - INFO - Saving data for 311-Service-Requests-for-2007
2024-01-20 13:12:01,084 - INFO - Saving data for 311-Service-Level-Agreements
2024-01-20 13:12:01,084 - INFO - Saving data for 311-Web-Content-Services
2024-01-20 13:12:01,085 - INFO - Saving data for 311-Service-Requests-for-2008
2024-01-20 13:12:01,085 - INFO - Saving data for 311-Service-Requests-for-2006
2024-01-20 13:12:01,085 - INFO - Saving data for 311-Service-Requests-for-2005
2024-01-20 13:12:01,085 - INFO - Saving data for 311-Interpreter-Wait-Time
202

In [27]:
print(data_dict)

{'311-Service-Requests-from-2010-to-Present': 'https://data.cityofnewyork.us/resource/erm2-nwe9.json', '311-Service-Requests-for-2009': 'https://data.cityofnewyork.us/resource/3rfa-3xsf.json', '311-Call-Center-Inquiry': 'https://data.cityofnewyork.us/resource/wewp-mm3p.json', '311-Service-Requests-for-2004': 'https://data.cityofnewyork.us/resource/sqcr-6mww.json', '311-Service-Requests-for-2007': 'https://data.cityofnewyork.us/resource/aiww-p3af.json', '311-Service-Level-Agreements': 'https://data.cityofnewyork.us/resource/cs9t-e3x8.json', '311-Web-Content-Services': 'https://data.cityofnewyork.us/resource/vwpc-kje2.json', '311-Service-Requests-for-2008': 'https://data.cityofnewyork.us/resource/uzcy-9puk.json', '311-Service-Requests-for-2006': 'https://data.cityofnewyork.us/resource/hy4q-igkk.json', '311-Service-Requests-for-2005': 'https://data.cityofnewyork.us/resource/sxmw-f24h.json', '311-Interpreter-Wait-Time': 'https://data.cityofnewyork.us/resource/dzvt-6g3v.json', 'DOHMH-Call-C

In [33]:
import os
import logging
import asyncio
import aiohttp

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

async def fetch_json(url):
    logging.info(f"Fetching JSON data from {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                json_data = await response.json()
                logging.info(f"JSON data fetched successfully from {url}")
                return json_data
            else:
                logging.error(f"Failed to fetch JSON data from {url}")
                return None

async def fetch_all_json(data_dict):
    logging.info("Fetching JSON data from all URLs")
    json_data_dict = {}
    tasks = []
    
    for key, url in data_dict.items():
        task = asyncio.create_task(fetch_json(url))
        tasks.append((key, task))
    
    for key, task in tasks:
        json_data = await task
        if json_data:
            json_data_dict[key] = json_data
    
    logging.info("JSON data fetched successfully from all URLs")
    return json_data_dict

async def save_json_data(data_dict):
    logging.info("Saving JSON data to files")
    directory = "data"
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    tasks = []
    
    for key, json_data in data_dict.items():
        filename = os.path.join(directory, f"{key}.json")
        task = asyncio.to_thread(write_json_file, filename, json_data)
        tasks.append(task)
    
    await asyncio.gather(*tasks)
    
    logging.info("JSON data saved successfully!")

import json
def write_json_file(filename, json_data):
    with open(filename, 'w') as file:
        json.dump(json_data, file)

import concurrent.futures

async def save_json_data(data_dict):
    logging.info("Saving JSON data to files")
    directory = "data"
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    tasks = []
    loop = asyncio.get_event_loop()
    
    for key, json_data in data_dict.items():
        filename = os.path.join(directory, f"{key}.json")
        task = loop.run_in_executor(None, write_json_file, filename, json_data)
        tasks.append(task)


In [34]:
import nest_asyncio

# Apply the nest_asyncio patch
nest_asyncio.apply()

# Now you can run your code
loop = asyncio.get_event_loop()
json_data_dict = loop.run_until_complete(fetch_all_json(data_dict))
loop.run_until_complete(save_json_data(json_data_dict))

2024-01-20 13:32:39,863 - INFO - Fetching JSON data from all URLs
2024-01-20 13:32:39,866 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/erm2-nwe9.json
2024-01-20 13:32:39,870 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/3rfa-3xsf.json
2024-01-20 13:32:39,871 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/wewp-mm3p.json
2024-01-20 13:32:39,871 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/sqcr-6mww.json
2024-01-20 13:32:39,871 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/aiww-p3af.json
2024-01-20 13:32:39,872 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/cs9t-e3x8.json
2024-01-20 13:32:39,872 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/vwpc-kje2.json
2024-01-20 13:32:39,873 - INFO - Fetching JSON data from https://data.cityofnewyork.us/resource/uzcy-9puk.json
2024-01-20 13:32:39,896 - INFO - Fetching JSON

In [36]:
import os
import pandas as pd

# Specify the directory path
directory = "data"

# Initialize lists to store file names and sizes
file_names = []
file_sizes_gb = []

# Iterate over the files in the directory
for file_name in os.listdir(directory):
    file_path = os.path.join(directory, file_name)
    if os.path.isfile(file_path):
        # Get the file size in bytes
        file_size_bytes = os.path.getsize(file_path)
        # Convert the file size to GB
        file_size_gb = file_size_bytes / (1024**3)
        # Append the file name and size to the lists
        file_names.append(file_name)
        file_sizes_gb.append(file_size_gb)

# Create the pandas dataframe
df = pd.DataFrame({'File Name': file_names, 'Size (GB)': file_sizes_gb})

# Print the dataframe
df


Unnamed: 0,File Name,Size (GB)
0,Local-Law-8-of-2020-Complaints-of-Illegal-Park...,0.001308
1,Inspections-of-Municipal-Solid-Waste-Disposal-...,0.00011
2,Curb-Metal-Data.json,0.000283
3,311-Call-Center-Inquiry.json,0.000354
4,Local-Law-8-of-2020-Report-Monthly-Complaints-...,5.5e-05
5,DOHMH-Call-Center-Summary.json,2.7e-05
6,311-Service-Requests-for-2006.json,0.000782
7,311-Service-Level-Agreements.json,0.000177
8,311-Web-Content-Services.json,0.001001
9,311-Service-Requests-for-2007.json,0.000827


In [37]:
import os
import pandas as pd

# Specify the directory path
directory = "data"

# Initialize lists to store file names, sizes, and row counts
file_names = []
file_sizes_gb = []
row_counts = []

# Iterate over the files in the directory
for file_name in os.listdir(directory):
    file_path = os.path.join(directory, file_name)
    if os.path.isfile(file_path):
        # Get the file size in bytes
        file_size_bytes = os.path.getsize(file_path)
        # Convert the file size to GB
        file_size_gb = file_size_bytes / (1024**3)
        # Append the file name and size to the lists
        file_names.append(file_name)
        file_sizes_gb.append(file_size_gb)
        
        # Read the dataset file and count the number of rows
        dataset = pd.read_csv(file_path)
        row_count = len(dataset)
        row_counts.append(row_count)

# Create the pandas dataframe
df = pd.DataFrame({'File Name': file_names, 'Size (GB)': file_sizes_gb, 'Row Count': row_counts})

# Print the dataframe
df


Unnamed: 0,File Name,Size (GB),Row Count
0,Local-Law-8-of-2020-Complaints-of-Illegal-Park...,0.001308,0
1,Inspections-of-Municipal-Solid-Waste-Disposal-...,0.00011,0
2,Curb-Metal-Data.json,0.000283,0
3,311-Call-Center-Inquiry.json,0.000354,0
4,Local-Law-8-of-2020-Report-Monthly-Complaints-...,5.5e-05,0
5,DOHMH-Call-Center-Summary.json,2.7e-05,0
6,311-Service-Requests-for-2006.json,0.000782,0
7,311-Service-Level-Agreements.json,0.000177,0
8,311-Web-Content-Services.json,0.001001,0
9,311-Service-Requests-for-2007.json,0.000827,0


In [39]:
import pandas as pd

# Read the JSON file as a dataframe
df = pd.read_json('data/311-Service-Requests-from-2010-to-Present.json')

# Print the dataframe
print(df)


     unique_key             created_date agency  \
0      60066340  2024-01-19T12:00:00.000   DSNY   
1      60060963  2024-01-19T12:00:00.000   DSNY   
2      60064570  2024-01-19T12:00:00.000   DSNY   
3      60073378  2024-01-19T12:00:00.000   DSNY   
4      60073384  2024-01-19T12:00:00.000   DSNY   
..          ...                      ...    ...   
995    60072457  2024-01-18T22:02:55.000    HPD   
996    60073328  2024-01-18T22:02:31.000   NYPD   
997    60071714  2024-01-18T22:02:27.000   DSNY   
998    60066451  2024-01-18T22:02:20.000   DSNY   
999    60063000  2024-01-18T22:02:00.000    DEP   

                                           agency_name       complaint_type  \
0                             Department of Sanitation    Derelict Vehicles   
1                             Department of Sanitation    Derelict Vehicles   
2                             Department of Sanitation    Derelict Vehicles   
3                             Department of Sanitation    Derelict Vehi