# Extract the Dataset from the Source - NYC Open Data

### Install Libraries & Import Libraries

In [1]:
!pip install azure-storage-blob # Microoft Azure
!pip install pyarrow
!pip install psycopg2 sqlalchemy

Collecting azure-storage-blob
  Downloading azure_storage_blob-12.19.1-py3-none-any.whl (394 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m394.5/394.5 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting azure-core<2.0.0,>=1.28.0 (from azure-storage-blob)
  Downloading azure_core-1.30.1-py3-none-any.whl (193 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.4/193.4 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
Collecting isodate>=0.6.1 (from azure-storage-blob)
  Downloading isodate-0.6.1-py2.py3-none-any.whl (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: isodate, azure-core, azure-storage-blob
Successfully installed azure-core-1.30.1 azure-storage-blob-12.19.1 isodate-0.6.1


In [2]:
import pandas as pd
import numpy as np
import json
import requests
from io import StringIO
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from math import ceil
import datetime
import calendar
from sqlalchemy import create_engine

In [3]:
# Azure Functions
def azure_upload_blob(connect_str, container_name, blob_name, data):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    blob_client.upload_blob(data, overwrite=True)
    print(f"Uploaded to Azure Blob: {blob_name}")

def azure_download_blob(connect_str, container_name, blob_name):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    download_stream = blob_client.download_blob()
    blob_content = download_stream.readall().decode('utf-8')
    return blob_content

### Information Architecture
Source 1 --> Gather Data --> Convert to DataFrame --> Clean Data (delete columns with less than 60% of the data count) --> Upload to Cloud (Azure)

Download (Extract) from Azure --> Reformat Data --> Transform Data (create Dimension tables and Fact table) --> Load to Data Warehouse

### Download Raw Data from NYC Open Source

In [4]:
offset = 0
limit = 1000

In [5]:
all_raw_data = pd.DataFrame()

while True:
    url = f'https://data.cityofnewyork.us/resource/66be-66yr.json?$limit={limit}&$offset={offset}'
    df_chunk = pd.read_json(url)
    all_raw_data = pd.concat([all_raw_data, df_chunk], ignore_index=True)

    if len(df_chunk) < limit:
        break

    offset += limit

In [6]:
print(all_raw_data.shape)

(50315, 25)


In [7]:
all_raw_data.head()

Unnamed: 0,development_name,borough,account_name,location,meter_amr,tds,edp,rc_code,funding_source,amp,...,days,meter_number,estimated,current_charges,rate_class,bill_analyzed,consumption_hcf,water_sewer_charges,other_charges,meter_scope
0,FHA REPOSSESSED HOUSES (GROUP V),FHA,FHA REPOSSESSED HOUSES (GROUP V),117-11 192ND STREET,NONE,260,325,Q026000,FEDERAL,NY005012090P,...,30.0,V52311732,N,38.07,Basic Water and Sewer,Yes,204,0.0,38.07,
1,VAN DYKE I,BROOKLYN,VAN DYKE I,BLD 24 - Community Center,AMR,61,325,K006100,FEDERAL,NY005000610P,...,29.0,V84003679,Y,111.37,Basic Water and Sewer,Yes,204,111.37,0.0,Community Center
2,FHA REPOSSESSED HOUSES (GROUP II),FHA,FHA REPOSSESSED HOUSES (GROUP II),171-28 111TH AVENUE,AMR,212,325,Q021200,FEDERAL,NY005012090P,...,30.0,K15842657,N,38.07,Basic Water and Sewer,Yes,204,38.07,0.0,
3,SOUTH JAMAICA II,QUEENS,SOUTH JAMAICA II,BLD 15,AMR,66,325,Q006600,FEDERAL,NY005010080P,...,30.0,E132272916,N,2962.44,Basic Water and Sewer,Yes,204,2962.44,0.0,BLD 15
4,FHA REPOSSESSED HOUSES (GROUP X),FHA,FHA REPOSSESSED HOUSES (GROUP X),,,284,325,Q028400,FEDERAL,,...,29.0,K96779329,Y,256.15,,Yes,204,256.15,0.0,


In [8]:
# check the data count
all_raw_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50315 entries, 0 to 50314
Data columns (total 25 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   development_name     50315 non-null  object 
 1   borough              50315 non-null  object 
 2   account_name         50315 non-null  object 
 3   location             49493 non-null  object 
 4   meter_amr            49804 non-null  object 
 5   tds                  50315 non-null  object 
 6   edp                  50315 non-null  int64  
 7   rc_code              50315 non-null  object 
 8   funding_source       50237 non-null  object 
 9   amp                  50193 non-null  object 
 10  vendor_name          50315 non-null  object 
 11  umis_bill_id         50315 non-null  int64  
 12  revenue_month        50315 non-null  object 
 13  service_start_date   50310 non-null  object 
 14  service_end_date     50310 non-null  object 
 15  days                 50310 non-null 

In [9]:
df_clean_data = all_raw_data.copy()

In [10]:
df_clean_data = df_clean_data.drop(columns=['meter_scope'])
df_clean_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50315 entries, 0 to 50314
Data columns (total 24 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   development_name     50315 non-null  object 
 1   borough              50315 non-null  object 
 2   account_name         50315 non-null  object 
 3   location             49493 non-null  object 
 4   meter_amr            49804 non-null  object 
 5   tds                  50315 non-null  object 
 6   edp                  50315 non-null  int64  
 7   rc_code              50315 non-null  object 
 8   funding_source       50237 non-null  object 
 9   amp                  50193 non-null  object 
 10  vendor_name          50315 non-null  object 
 11  umis_bill_id         50315 non-null  int64  
 12  revenue_month        50315 non-null  object 
 13  service_start_date   50310 non-null  object 
 14  service_end_date     50310 non-null  object 
 15  days                 50310 non-null 

In [11]:
print(df_clean_data.shape)

(50315, 24)


### Upload Cleaned Dataframe to Azure

In [12]:
# Export DataFrame to CSV
output = StringIO()
df_clean_data.to_csv(output, index = False)
data = output.getvalue()
output.close()

In [13]:
config_file_path = 'config/config.json'

# Load the JSON configuration file
with open(config_file_path, 'r') as config_file:
    config = json.load(config_file)

connection_string = config["connection_string"]
container_name = "waterconsumption"
blob_name = "WaterConsumption.csv"

azure_upload_blob(connection_string, container_name, blob_name, data)

Uploaded to Azure Blob: WaterConsumption.csv
