In [1]:
!pip install azure-storage-blob # Microsoft Azure
!pip install pyarrow
!pip install psycopg2 sqlalchemy

Collecting azure-storage-blob
  Downloading azure_storage_blob-12.20.0-py3-none-any.whl (392 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m392.2/392.2 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting azure-core>=1.28.0 (from azure-storage-blob)
  Downloading azure_core-1.30.1-py3-none-any.whl (193 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.4/193.4 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
Collecting isodate>=0.6.1 (from azure-storage-blob)
  Downloading isodate-0.6.1-py2.py3-none-any.whl (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: isodate, azure-core, azure-storage-blob
Successfully installed azure-core-1.30.1 azure-storage-blob-12.20.0 isodate-0.6.1


In [2]:
import pandas as pd
import numpy as np
import json
import requests
from io import StringIO
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from math import ceil
import datetime
import calendar
from sqlalchemy import create_engine
import uuid

In [3]:
# Azure Functions
def azure_upload_blob(connect_str, container_name, blob_name, data):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    blob_client.upload_blob(data, overwrite=True)
    print(f"Uploaded to Azure Blob: {blob_name}")

def azure_download_blob(connect_str, container_name, blob_name):
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    download_stream = blob_client.download_blob()
    return download_stream.readall()

# Google Cloud Functions
def google_upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

def google_download_blob(bucket_name, source_blob_name, destination_file_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    print(f"Blob {source_blob_name} downloaded to {destination_file_name}.")

# AWS Functions
def aws_upload_file(file_name, bucket, object_name=None):
    if object_name is None:
        object_name = os.path.basename(file_name)
    s3_client = boto3.client('s3')
    response = s3_client.upload_file(file_name, bucket, object_name)
    print(f"Uploaded {file_name} to S3 bucket {bucket}.")

def aws_download_file(bucket, object_name, file_name):
    s3_client = boto3.client('s3')
    s3_client.download_file(bucket, object_name, file_name)
    print(f"Downloaded {object_name} from S3 bucket {bucket}.")

In [4]:
def week_of_month(dt):
    first_day = dt.replace(day=1)
    dom = dt.day
    adjusted_dom = dom + first_day.weekday()
    return int(ceil(adjusted_dom/7.0))

def get_week_of_year(date_str):
    """
    Calculate the ISO week number of the year for a given date.

    Parameters:
    date_str (str): A date string in the format 'YYYY-MM-DD'.

    Returns:
    int: ISO week number of the year.
    """
    # Parse the input string to a datetime object
    date = datetime.strptime(date_str, '%Y-%m-%d')

    # Get the ISO calendar week number
    week_of_year = date.isocalendar()[1]

    return week_of_year

In [6]:
# Specify the path to your JSON configuration file
config_file_path = 'config.json'

# Load the JSON configuration file
with open(config_file_path, 'r') as config_file:
    config = json.load(config_file)

# Print the configuration

CONNECTION_STRING_AZURE_STORAGE = config["connectionString"]
CONTAINER_AZURE = 'nypdarrest'

# Initialize the BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING_AZURE_STORAGE)

# Get the container client
container_client = blob_service_client.get_container_client(CONTAINER_AZURE)


nypd_arrest_df = pd.DataFrame()

# List all blobs in the specified container
blob_list = container_client.list_blobs()
for blob in blob_list:
    print(blob.name)
    blob_client = container_client.get_blob_client(blob=blob.name)
    blob_data = blob_client.download_blob()
    blob_content = blob_data.readall().decode('utf-8')
    df = pd.read_csv(StringIO(blob_content))
    # Display the head of the DataFrame
    print(df.shape)
    # since I have only one csv, I am doing to do the following instructions
    nypd_arrest_df = df.copy()



nypdarrest.csv
(63612, 19)


In [7]:
nypd_arrest_df.head(10)

Unnamed: 0,arrest_key,arrest_date,pd_cd,pd_desc,ky_cd,ofns_desc,law_code,law_cat_cd,arrest_boro,arrest_precinct,jurisdiction_code,age_group,perp_sex,perp_race,x_coord_cd,y_coord_cd,latitude,longitude,geocoded_column
0,280255493,2024-01-10,397.0,"ROBBERY,OPEN AREA UNCLASSIFIED",105.0,ROBBERY,PL 1601001,F,M,26,0,<18,M,BLACK,996342,236149,40.814845,-73.956312,POINT (-73.956312 40.814845)
1,279805419,2024-01-02,157.0,RAPE 1,104.0,RAPE,PL 1303501,F,K,77,0,25-44,M,WHITE HISPANIC,1003509,185018,40.674496,-73.930571,POINT (-73.9305713255961 40.6744956865259)
2,279895750,2024-01-03,101.0,ASSAULT 3,344.0,ASSAULT 3 & RELATED OFFENSES,PL 1200001,M,Q,106,0,65+,F,WHITE,1026836,180689,40.662526,-73.846499,POINT (-73.846499 40.662526)
3,280809090,2024-01-19,511.0,"CONTROLLED SUBSTANCE, POSSESSI",235.0,DANGEROUS DRUGS,PL 2200300,M,B,49,0,45-64,M,BLACK,1027430,251104,40.855793,-73.843908,POINT (-73.843908 40.855793)
4,280357135,2024-01-11,109.0,"ASSAULT 2,1,UNCLASSIFIED",106.0,FELONY ASSAULT,PL 1200502,F,K,81,2,25-44,F,BLACK,1002457,192292,40.694456,-73.934343,POINT (-73.934343 40.694456)
5,281488967,2024-02-01,157.0,RAPE 1,104.0,RAPE,PL 1303501,F,Q,113,0,25-44,M,BLACK,1046315,187088,40.679981,-73.776234,POINT (-73.7762339071953 40.6799807384666)
6,280769124,2024-01-19,464.0,JOSTLING,230.0,JOSTLING,PL 1652501,M,M,6,0,25-44,M,BLACK,984134,207984,40.737547,-74.000415,POINT (-74.000415 40.737547)
7,281338081,2024-01-30,101.0,ASSAULT 3,344.0,ASSAULT 3 & RELATED OFFENSES,PL 1200001,M,M,26,0,25-44,M,BLACK,996014,236126,40.81478,-73.957499,POINT (-73.957499 40.81478)
8,281426006,2024-01-31,155.0,RAPE 2,104.0,RAPE,PL 1303001,F,B,41,0,18-24,M,BLACK,1013037,236657,40.816206,-73.896001,POINT (-73.8960011932583 40.8162058439227)
9,281606761,2024-02-03,105.0,STRANGULATION 1ST,106.0,FELONY ASSAULT,PL 1211200,F,Q,102,0,45-64,M,ASIAN / PACIFIC ISLANDER,1030262,191716,40.692773,-73.834076,POINT (-73.834076 40.692773)


In [8]:
nypd_arrest_df.columns

Index(['arrest_key', 'arrest_date', 'pd_cd', 'pd_desc', 'ky_cd', 'ofns_desc',
       'law_code', 'law_cat_cd', 'arrest_boro', 'arrest_precinct',
       'jurisdiction_code', 'age_group', 'perp_sex', 'perp_race', 'x_coord_cd',
       'y_coord_cd', 'latitude', 'longitude', 'geocoded_column'],
      dtype='object')

In [9]:
#Drop unnecessary columns and rename DF column heads

columns_to_drop = ['pd_cd', 'ky_cd','law_code', 'jurisdiction_code', 'x_coord_cd', 'y_coord_cd']
nypd_arrest_df = nypd_arrest_df.drop(columns=columns_to_drop)

rename_mapping = {
    'arrest_key': 'arrestID',
    'arrest_date': 'arrestDate',
    'arrest_precinct': 'arrestPrecinct',
    'pd_desc': 'offenseDetail',
    'ofns_desc': 'offenseCategory',
    'law_cat_cd': 'offenseLevel',
    'arrest_boro': 'arrestBorough',
    'perp_race': 'perpRace',
    'perp_sex': 'perpSex',
    'age_group': 'perpAgeGroup'
}
nypd_arrest_df = nypd_arrest_df.rename(columns=rename_mapping)

In [10]:
print(nypd_arrest_df.dtypes)

arrestID             int64
arrestDate          object
offenseDetail       object
offenseCategory     object
offenseLevel        object
arrestBorough       object
arrestPrecinct       int64
perpAgeGroup        object
perpSex             object
perpRace            object
latitude           float64
longitude          float64
geocoded_column     object
dtype: object


In [11]:
#Change arrest date to datetime type
nypd_arrest_df['arrestDate'] = pd.to_datetime(nypd_arrest_df['arrestDate'])
print(nypd_arrest_df.dtypes)

arrestID                    int64
arrestDate         datetime64[ns]
offenseDetail              object
offenseCategory            object
offenseLevel               object
arrestBorough              object
arrestPrecinct              int64
perpAgeGroup               object
perpSex                    object
perpRace                   object
latitude                  float64
longitude                 float64
geocoded_column            object
dtype: object


In [12]:
unique_values = nypd_arrest_df['offenseLevel'].unique()
print(unique_values)

['F' 'M' 'V' '9' nan 'I']


In [13]:
#Filter for only 2024 (this is a YTD dataset so should only be this year)
#Drop bad rows where offenseLevel column is not M, V, or F

nypd_arrest_filtered_df = nypd_arrest_df[nypd_arrest_df['arrestDate'].dt.year == 2024]
nypd_arrest_filtered_df = nypd_arrest_df[nypd_arrest_df['offenseLevel'].isin(['M','V','F'])]

nypd_arrest_filtered_df.info()
unique_values = nypd_arrest_filtered_df['offenseLevel'].unique()
print(unique_values)

<class 'pandas.core.frame.DataFrame'>
Index: 63001 entries, 0 to 63611
Data columns (total 13 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   arrestID         63001 non-null  int64         
 1   arrestDate       63001 non-null  datetime64[ns]
 2   offenseDetail    63001 non-null  object        
 3   offenseCategory  63001 non-null  object        
 4   offenseLevel     63001 non-null  object        
 5   arrestBorough    63001 non-null  object        
 6   arrestPrecinct   63001 non-null  int64         
 7   perpAgeGroup     63001 non-null  object        
 8   perpSex          63001 non-null  object        
 9   perpRace         63001 non-null  object        
 10  latitude         63001 non-null  float64       
 11  longitude        63001 non-null  float64       
 12  geocoded_column  63001 non-null  object        
dtypes: datetime64[ns](1), float64(2), int64(2), object(8)
memory usage: 6.7+ MB
['F' 'M' 'V']


In [14]:
nypd_arrest_filtered_df.head()

Unnamed: 0,arrestID,arrestDate,offenseDetail,offenseCategory,offenseLevel,arrestBorough,arrestPrecinct,perpAgeGroup,perpSex,perpRace,latitude,longitude,geocoded_column
0,280255493,2024-01-10,"ROBBERY,OPEN AREA UNCLASSIFIED",ROBBERY,F,M,26,<18,M,BLACK,40.814845,-73.956312,POINT (-73.956312 40.814845)
1,279805419,2024-01-02,RAPE 1,RAPE,F,K,77,25-44,M,WHITE HISPANIC,40.674496,-73.930571,POINT (-73.9305713255961 40.6744956865259)
2,279895750,2024-01-03,ASSAULT 3,ASSAULT 3 & RELATED OFFENSES,M,Q,106,65+,F,WHITE,40.662526,-73.846499,POINT (-73.846499 40.662526)
3,280809090,2024-01-19,"CONTROLLED SUBSTANCE, POSSESSI",DANGEROUS DRUGS,M,B,49,45-64,M,BLACK,40.855793,-73.843908,POINT (-73.843908 40.855793)
4,280357135,2024-01-11,"ASSAULT 2,1,UNCLASSIFIED",FELONY ASSAULT,F,K,81,25-44,F,BLACK,40.694456,-73.934343,POINT (-73.934343 40.694456)


In [15]:
# Create Arrest Dimension

perpSex_mapping = {
    'M': 'Male',
    'F': 'Female'
}
offenseLevel_mapping = {
    'F': 'Felony',
    'M': 'Misdemeanor',
    'V': 'Violation'
}

arrestColumns = ['arrestID', 'offenseCategory', 'offenseDetail', 'offenseLevel', 'arrestPrecinct', 'perpAgeGroup', 'perpSex', 'perpRace']

# Converting the main DF into a subset DF for the arrest dimension
arrestDF = nypd_arrest_filtered_df[arrestColumns].copy()

# Applying the mapping to create a new column with descriptions
arrestDF['perpSex'] = arrestDF['perpSex'].map(perpSex_mapping)
arrestDF['offenseLevel'] = arrestDF['offenseLevel'].map(offenseLevel_mapping)

arrestDF

Unnamed: 0,arrestID,offenseCategory,offenseDetail,offenseLevel,arrestPrecinct,perpAgeGroup,perpSex,perpRace
0,280255493,ROBBERY,"ROBBERY,OPEN AREA UNCLASSIFIED",Felony,26,<18,Male,BLACK
1,279805419,RAPE,RAPE 1,Felony,77,25-44,Male,WHITE HISPANIC
2,279895750,ASSAULT 3 & RELATED OFFENSES,ASSAULT 3,Misdemeanor,106,65+,Female,WHITE
3,280809090,DANGEROUS DRUGS,"CONTROLLED SUBSTANCE, POSSESSI",Misdemeanor,49,45-64,Male,BLACK
4,280357135,FELONY ASSAULT,"ASSAULT 2,1,UNCLASSIFIED",Felony,81,25-44,Female,BLACK
...,...,...,...,...,...,...,...,...
63607,282877412,PETIT LARCENY,"LARCENY,PETIT FROM OPEN AREAS,",Misdemeanor,60,45-64,Male,WHITE HISPANIC
63608,283628088,DANGEROUS DRUGS,"CONTROLLED SUBSTANCE, POSSESSI",Misdemeanor,13,45-64,Male,BLACK
63609,283387006,ROBBERY,"ROBBERY,OPEN AREA UNCLASSIFIED",Felony,73,25-44,Male,BLACK
63610,283013910,VEHICLE AND TRAFFIC LAWS,"TRAFFIC,UNCLASSIFIED MISDEMEAN",Misdemeanor,83,25-44,Female,BLACK


In [18]:
# Create Location Dimension

borough_mapping = {
    'B': 'Bronx',
    'K': 'Brooklyn',
    'M': 'Manhattan',
    'S': 'Staten Island',
    'Q': 'Queens'
}

locationColumns = ['geocoded_column', 'arrestBorough', 'latitude', 'longitude']
uniquelocationDF = nypd_arrest_filtered_df.drop_duplicates(subset=['geocoded_column'])
locationDF = uniquelocationDF[locationColumns].copy()
locationDF['arrestBorough'] = locationDF['arrestBorough'].map(borough_mapping)
locationDF.rename(columns={'geocoded_column': 'locationID'}, inplace=True)
locationDF

Unnamed: 0,locationID,arrestBorough,latitude,longitude
0,POINT (-73.956312 40.814845),Manhattan,40.814845,-73.956312
1,POINT (-73.9305713255961 40.6744956865259),Brooklyn,40.674496,-73.930571
2,POINT (-73.846499 40.662526),Queens,40.662526,-73.846499
3,POINT (-73.843908 40.855793),Bronx,40.855793,-73.843908
4,POINT (-73.934343 40.694456),Brooklyn,40.694456,-73.934343
...,...,...,...,...
63593,POINT (-73.942728 40.66169),Brooklyn,40.661690,-73.942728
63606,POINT (-73.903353 40.661782),Brooklyn,40.661782,-73.903353
63607,POINT (-73.989545 40.5761),Brooklyn,40.576100,-73.989545
63610,POINT (-73.91752179908735 40.698893568968515),Brooklyn,40.698894,-73.917522


In [26]:
# Create Date Dimension

def week_of_month(dt):
    year = dt.year
    month = dt.month
    day = dt.day

    cal = calendar.monthcalendar(year, month)
    week_number = (day - 1) // 7 + 1
    return week_number

start_date = pd.to_datetime('2024-01-01')
end_date = pd.to_datetime('2024-03-31')
# Create a DataFrame for the date dimension
date_dimension = pd.DataFrame({'date': pd.date_range(start_date, end_date, freq='D')})

date_dimension.head(25)

# Extract attributes
date_dimension['year'] = date_dimension['date'].dt.year
date_dimension['quarter'] = date_dimension['date'].dt.quarter
date_dimension['monthNumber'] = date_dimension['date'].dt.month
date_dimension['monthName'] = date_dimension['date'].dt.strftime('%B')
date_dimension['dayNumber'] = date_dimension['date'].dt.day
date_dimension['dayName'] = date_dimension['date'].dt.strftime('%A')
date_dimension['dateisoformat'] = date_dimension['date'].apply(lambda x: x.isoformat())
date_dimension['dateID'] = date_dimension['date'].dt.strftime('%Y%m%d')

# Add week of the month and week of the year
date_dimension['weekOfMonth'] = date_dimension['date'].apply(week_of_month)
date_dimension['weekOfYear'] = date_dimension['date'].dt.strftime('%U')

new_order = ['dateID', 'dateisoformat','year','quarter','monthNumber','dayNumber','monthName','dayName','weekOfYear','weekOfMonth']
date_dimension = date_dimension[new_order]

date_dimension.head(25)

Unnamed: 0,dateID,dateisoformat,year,quarter,monthNumber,dayNumber,monthName,dayName,weekOfYear,weekOfMonth
0,20240101,2024-01-01T00:00:00,2024,1,1,1,January,Monday,0,1
1,20240102,2024-01-02T00:00:00,2024,1,1,2,January,Tuesday,0,1
2,20240103,2024-01-03T00:00:00,2024,1,1,3,January,Wednesday,0,1
3,20240104,2024-01-04T00:00:00,2024,1,1,4,January,Thursday,0,1
4,20240105,2024-01-05T00:00:00,2024,1,1,5,January,Friday,0,1
5,20240106,2024-01-06T00:00:00,2024,1,1,6,January,Saturday,0,1
6,20240107,2024-01-07T00:00:00,2024,1,1,7,January,Sunday,1,1
7,20240108,2024-01-08T00:00:00,2024,1,1,8,January,Monday,1,2
8,20240109,2024-01-09T00:00:00,2024,1,1,9,January,Tuesday,1,2
9,20240110,2024-01-10T00:00:00,2024,1,1,10,January,Wednesday,1,2


In [39]:
#Create Facts Dimension

factsDF = pd.DataFrame()

factsDF['arrestID'] = nypd_arrest_filtered_df['arrestID']
factsDF['dateID'] = nypd_arrest_filtered_df['arrestDate'].dt.strftime('%Y%m%d')
factsDF['locationID'] = nypd_arrest_filtered_df['geocoded_column']
factsDF['factID'] = range(1, len(factsDF) + 1)

new_order = ['factID', 'arrestID', 'dateID', 'locationID']
factsDF = factsDF[new_order]

factsDF

Unnamed: 0,factID,arrestID,dateID,locationID
0,1,280255493,20240110,POINT (-73.956312 40.814845)
1,2,279805419,20240102,POINT (-73.9305713255961 40.6744956865259)
2,3,279895750,20240103,POINT (-73.846499 40.662526)
3,4,280809090,20240119,POINT (-73.843908 40.855793)
4,5,280357135,20240111,POINT (-73.934343 40.694456)
...,...,...,...,...
63607,62997,282877412,20240227,POINT (-73.989545 40.5761)
63608,62998,283628088,20240312,POINT (-73.994805 40.740151)
63609,62999,283387006,20240307,POINT (-73.913562 40.671104)
63610,63000,283013910,20240301,POINT (-73.91752179908735 40.698893568968515)


In [45]:
# Database connection URL

#pwd = ''
database_url = f'postgresql://liamhas:{pwd}@cis9440-dw-nypdarrest.postgres.database.azure.com/postgres'

# Create a SQLAlchemy engine
engine = create_engine(database_url)

In [48]:
arrestDF.to_sql('dim_arrest', con=engine, if_exists='append', index=False)
factsDF.to_sql('facts_nypd_arrests', con=engine, if_exists='append', index=False)
locationDF.to_sql('dim_location', con=engine, if_exists='append', index=False)
date_dimension.to_sql('dim_date', con=engine, if_exists='append', index=False)

91

In [46]:
arrestDF.to_csv("arrestDF.csv",index=False)
factsDF.to_csv("factsDF.csv",index=False)
locationDF.to_csv("locationDF.csv",index=False)
date_dimension.to_csv("date_dimension.csv",index=False)


In [None]:
# Generic Functions
def create_string(length):
    if isinstance(length, int) and length > 0:
        result_string = "(" + "?," * (length - 1) + "?)"
        return result_string

def insert_data(table_name, df):
    conn = pyodbc.connect(connection_string)
    cursor = conn.cursor()
    result = create_string(len(df.columns))
    # Insert data into the table
    insert_query = f"INSERT INTO {table_name} VALUES {result}"
    print(insert_query)
    cursor.executemany(insert_query, df.values.tolist())
    conn.commit()
    conn.close()