In [None]:
# References
# https://arrow.apache.org/docs/python/parquet.html

In [None]:
## Downloads


# https://catalog.data.gov/dataset/dof-parking-violation-codes/resource/05e24c51-7b04-4c1b-b70a-ea96996b6835
# https://data.cityofnewyork.us/City-Government/Parking-Violations-Issued-Fiscal-Year-2024/pvqr-7yc4/about_data

## Airflow, Docker, Python (Extract) - Initial Code

In [1]:
# !curl 'https://data.cityofnewyork.us/api/views/ncbg-6agr/rows.csv?accessType=DOWNLOAD' -o "DOF_Parking_Violation_Codes.csv"


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  3364    0  3364    0     0   3125      0 --:--:--  0:00:01 --:--:--  3126


In [2]:
# !curl 'https://data.cityofnewyork.us/api/views/pvqr-7yc4/rows.csv?accessType=DOWNLOAD&api_foundry=true' -o "Parking_Violations_Issued_-_Fiscal_Year_2024.csv"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2645M    0 2645M    0     0  4029k      0 --:--:--  0:11:12 --:--:-- 4313k888k  3917k      0 --:--:--  0:00:47 --:--:-- 4244k0     0  3949k      0 --:--:--  0:00:53 --:--:-- 4078k-- 3773k     0 --:--:--  0:01:00 --:--:-- 3773k:-- 4125k    0  3949k      0 --:--:--  0:01:18 --:--:-- 4033k  0 --:--:--  0:02:09 --:--:-- 4385k     0 --:--:--  0:02:15 --:--:-- 4320k0  4020k      0 --:--:--  0:03:04 --:--:-- 4329k --:--:-- 4331k     0 --:--:--  0:03:24 --:--:-- 3909k0:04:17 --:--:-- 3908k-  0:04:32 --:--:-- 4187k --:--:--  0:05:18 --:--:-- 4097k027k      0 --:--:--  0:05:35 --:--:-- 3932k0     0  4020k      0 --:--:--  0:05:58 --:--:-- 4057k   0     0  4011k      0 --:--:--  0:06:46 --:--:-- 4146k  0     0  4027k      0 --:--:--  0:07:07 --:--:-- 4228k0 --:--:--  0:07:33 --:--:-- 3472k  0 --:--:--  0:07:38 --:--:-- 4242k  0  4032k   

In [3]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time

In [4]:
# Start timing the whole process
total_start_time = time.time()

# Define file paths
csv_path = "Parking_Violations_Issued_-_Fiscal_Year_2024.csv"
parquet_path = "Parking_Violations_Issued_-_Fiscal_Year_2024.parquet"

# Time for reading CSV file
read_start_time = time.time()
df = pd.read_csv(csv_path)
read_end_time = time.time()
read_time_taken = (read_end_time - read_start_time) / 60
print(f"CSV file read completed in {read_time_taken:.2f} minutes")

  df = pd.read_csv(csv_path)


CSV file read completed in 1.12 minutes


In [5]:
# Convert object columns to string
obj_cols = df.select_dtypes(include=['object']).columns
df[obj_cols] = df[obj_cols].astype(str)
print("Converted object columns to string")

# Convert DataFrame to PyArrow Table
table = pa.Table.from_pandas(df)
print("Converted DataFrame to PyArrow Table")

Converted object columns to string
Converted DataFrame to PyArrow Table


In [6]:
# Time for writing to Parquet file
write_start_time = time.time()
pq.write_table(table, parquet_path)
write_end_time = time.time()
write_time_taken = (write_end_time - write_start_time) / 60
print(f"Parquet file write completed in {write_time_taken:.2f} minutes")

# End timing the whole process
total_end_time = time.time()
total_time_taken = (total_end_time - total_start_time) / 60

print(f"Total time taken: {total_time_taken:.2f} minutes")

Parquet file write completed in 0.29 minutes
Total time taken: 5.01 minutes


In [25]:
from tqdm import tqdm
import os
import requests
DATA_FOLDER='tt'
urls = {
        "DOF_Parking_Violation_Codes.csv": 'https://data.cityofnewyork.us/api/views/ncbg-6agr/rows.csv?accessType=DOWNLOAD',
        "Parking_Violations_Issued_Fiscal_Year_2024.csv": 'https://data.cityofnewyork.us/api/views/pvqr-7yc4/rows.csv?accessType=DOWNLOAD&api_foundry=true'
}
for filename, url in urls.items():
    local_file_path = os.path.join(DATA_FOLDER, filename)

    # Start the request and get the total length of the content
    response = requests.get(url, stream=True)
    total_length = response.headers.get('content-length')

    if total_length is None:
        # No content length header
        with open(local_file_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        print(f"Downloaded {filename} locally at {local_file_path}")
    else:
        total_length = int(total_length)
        # Download the file with a progress bar
        with open(local_file_path, 'wb') as f:
            for chunk in tqdm(response.iter_content(chunk_size=1024), total=total_length // 1024, unit='KB'):
                if chunk:
                    f.write(chunk)
        print(f"Downloaded {filename} locally at {local_file_path}")
        
parking_violations_df=pd.read_csv("Parking_Violations_Issued_-_Fiscal_Year_2024.csv")
parking_violations_df.shape
df_percent = parking_violations_df.sample(frac=0.12)
df_percent.shape

## Read Files

In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time
# Time for reading Parquet file
read_start_time = time.time()
parking_codes_df=pd.read_csv("DOF_Parking_Violation_Codes.csv")
parking_violations_df=pd.read_parquet("Parking_Violations_Issued_-_Fiscal_Year_2024.parquet")
read_end_time = time.time()
read_time_taken = (read_end_time - read_start_time) / 60
print(f"Parquet file read completed in {read_time_taken:.2f} minutes")

Parquet file read completed in 0.22 minutes


In [2]:
parking_codes_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 97 entries, 0 to 96
Data columns (total 4 columns):
 #   Column                       Non-Null Count  Dtype 
---  ------                       --------------  ----- 
 0   CODE                         97 non-null     int64 
 1   DEFINITION                   97 non-null     object
 2   Manhattan  96th St. & below  97 non-null     int64 
 3   All Other Areas              97 non-null     int64 
dtypes: int64(3), object(1)
memory usage: 3.2+ KB


In [3]:
parking_violations_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14765377 entries, 0 to 14765376
Data columns (total 43 columns):
 #   Column                             Dtype  
---  ------                             -----  
 0   Summons Number                     int64  
 1   Plate ID                           object 
 2   Registration State                 object 
 3   Plate Type                         object 
 4   Issue Date                         object 
 5   Violation Code                     int64  
 6   Vehicle Body Type                  object 
 7   Vehicle Make                       object 
 8   Issuing Agency                     object 
 9   Street Code1                       int64  
 10  Street Code2                       int64  
 11  Street Code3                       int64  
 12  Vehicle Expiration Date            int64  
 13  Violation Location                 float64
 14  Violation Precinct                 int64  
 15  Issuer Precinct                    int64  
 16  Issuer Code     

In [29]:
#

In [None]:
# https://www.kaggle.com/code/ishaan45/csv-to-parquet

## Airflow Connections

In [None]:
# aws_default

# AWS_ACCESS_KEY_ID=
# AWS_SECRET_ACCESS_KEY=

In [None]:
# snowflake_default

# PARKING_SCHEMA

# badreeshshetty

# 

# PARKING_WAREHOUSE

# PARKING_DB

# ACCOUNTADMIN

In [11]:
def get_parking_violations_data(year):
    parking_violations_data = {
        "2014": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2014",
            "url": 'https://data.cityofnewyork.us/api/views/jt7v-77mi/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2015": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2015",
            "url": 'https://data.cityofnewyork.us/api/views/c284-tqph/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2016": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2016",
            "url": 'https://data.cityofnewyork.us/api/views/kiv2-tbus/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2017": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2017",
            "url": 'https://data.cityofnewyork.us/api/views/2bnn-yakx/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2018": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2018",
            "url": 'https://data.cityofnewyork.us/api/views/a5td-mswe/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2019": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2019",
            "url": 'https://data.cityofnewyork.us/api/views/faiq-9dfq/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2020": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2020",
            "url": 'https://data.cityofnewyork.us/api/views/p7t3-5i9s/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2021": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2021",
            "url": 'https://data.cityofnewyork.us/api/views/kvfd-bves/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2022": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2022",
            "url": 'https://data.cityofnewyork.us/api/views/7mxj-7a6y/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2023": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2023",
            "url": 'https://data.cityofnewyork.us/api/views/869v-vr48/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        },
        "2024": {
            "file_name": "Parking_Violations_Issued_Fiscal_Year_2024",
            "url": 'https://data.cityofnewyork.us/api/views/pvqr-7yc4/rows.csv?accessType=DOWNLOAD&api_foundry=true'
        }
    }

    return parking_violations_data.get(str(year), "Invalid year")

In [14]:
data["file_name"]

'Parking_Violations_Issued_Fiscal_Year_2023'

In [None]:
Parking Violations

In [40]:
year=2019
data=get_parking_violations_data(year)

print("parking_violations_file_name: ",data["file_name"])
print("parking_violations_url: ",data["url"])

parking_violations_file_name:  Parking_Violations_Issued_Fiscal_Year_2019
parking_violations_url:  https://data.cityofnewyork.us/api/views/faiq-9dfq/rows.csv?accessType=DOWNLOAD&api_foundry=true


In [None]:
# import json

# search_str = 'o=[i("manifest","manifest.json"),i("catalog","catalog.json")]'

# with open('target/index.html', 'r') as f:
#     content_index = f.read()
    
# with open('target/manifest.json', 'r') as f:
#     json_manifest = json.loads(f.read())

# with open('target/catalog.json', 'r') as f:
#     json_catalog = json.loads(f.read())
    
# with open('target/dbt_docs_parking_violations.html', 'w') as f:
#     new_str = "o=[{label: 'manifest', data: "+json.dumps(json_manifest)+"},{label: 'catalog', data: "+json.dumps(json_catalog)+"}]"
#     new_content = content_index.replace(search_str, new_str)
#     f.write(new_content)
    
    
# python3 -m http.server

In [None]:
# gmail-parking-violations-ny-dae
# utzo enud uvno kqoy
# G App Password

In [None]:
# parking_violations_file_name

# parking_violations_url
 
# "Parking_Violations_Issued_Fiscal_Year_2014": 'https://data.cityofnewyork.us/api/views/jt7v-77mi/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2015": 'https://data.cityofnewyork.us/api/views/c284-tqph/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2016": 'https://data.cityofnewyork.us/api/views/kiv2-tbus/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2017": 'https://data.cityofnewyork.us/api/views/2bnn-yakx/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2018": 'https://data.cityofnewyork.us/api/views/a5td-mswe/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2019": 'https://data.cityofnewyork.us/api/views/faiq-9dfq/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2020": 'https://data.cityofnewyork.us/api/views/p7t3-5i9s/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2021": 'https://data.cityofnewyork.us/api/views/kvfd-bves/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2022": 'https://data.cityofnewyork.us/api/views/7mxj-7a6y/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2023": 'https://data.cityofnewyork.us/api/views/869v-vr48/rows.csv?accessType=DOWNLOAD&api_foundry=true',

# "Parking_Violations_Issued_Fiscal_Year_2024": 'https://data.cityofnewyork.us/api/views/pvqr-7yc4/rows.csv?accessType=DOWNLOAD&api_foundry=true'
    
    

## Extras

In [5]:
# Constants
# SNOWFLAKE_ACCOUNT = ''
# SNOWFLAKE_USER = ''
# SNOWFLAKE_PASSWORD = ''
SNOWFLAKE_WAREHOUSE = 'PARKING_WAREHOUSE'
SNOWFLAKE_DB = 'PARKING_DB'
SNOWFLAKE_SCHEMA = 'PARKING_SCHEMA'
SNOWFLAKE_TABLE_PARKING_VIOLATIONS = 'PARKING_VIOLATIONS'
SNOWFLAKE_TABLE_PARKING_VIOLATIONS_CODE = 'PARKING_VIOLATIONS_CODE'
PARQUET_FILE_PATH = 'Parking_Violations_Issued_-_Fiscal_Year_2024.parquet'
# CSV_FILE_PATH = '<your_csv_file_path>'

In [6]:
import snowflake.connector
import pandas as pd

# Establish a connection to Snowflake
conn = snowflake.connector.connect(
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    account=SNOWFLAKE_ACCOUNT,
    warehouse=SNOWFLAKE_WAREHOUSE,
    database=SNOWFLAKE_DB,
    schema=SNOWFLAKE_SCHEMA
)


In [10]:
import snowflake.connector
import pandas as pd

# Establish a connection to Snowflake
conn = snowflake.connector.connect(
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    account=SNOWFLAKE_ACCOUNT,
    warehouse=SNOWFLAKE_WAREHOUSE
)

conn.cursor().execute(f"CREATE WAREHOUSE IF NOT EXISTS {SNOWFLAKE_WAREHOUSE} WITH WAREHOUSE_SIZE='x-small'")
conn.cursor().execute(f"CREATE DATABASE IF NOT EXISTS {SNOWFLAKE_DB}")
conn.cursor().execute(f"CREATE SCHEMA IF NOT EXISTS {SNOWFLAKE_SCHEMA}")

# Use the specified database and schema
conn.cursor().execute(f"USE DATABASE {SNOWFLAKE_DB}")
conn.cursor().execute(f"USE SCHEMA {SNOWFLAKE_SCHEMA}")

# Read the Parquet file into a DataFrame
df = pd.read_parquet(PARQUET_FILE_PATH)

# Modify column names: capitalize and remove spaces
df.columns = [col.upper().replace(' ', '_') for col in df.columns]
df.columns = [col.upper().replace('?', '') for col in df.columns]

# Map DataFrame column types to Snowflake data types
type_mapping = {
    'object': 'STRING',
    'int64': 'NUMBER',
    'float64': 'FLOAT',
    'bool': 'BOOLEAN',
    'datetime64[ns]': 'TIMESTAMP'
}

# Get column names and types from the DataFrame
columns = ', '.join([f"{col} {type_mapping[str(dtype)]}" for col, dtype in df.dtypes.items()])

# Create the table
create_table_query = f"""
CREATE OR REPLACE TABLE {SNOWFLAKE_TABLE_PARKING_VIOLATIONS} (
    {columns}
)
"""
conn.cursor().execute(create_table_query)

# Create a temporary stage
temp_stage_name = 'TEMP_STAGE_PARQUET'
conn.cursor().execute(f"CREATE OR REPLACE TEMPORARY STAGE {temp_stage_name}")

# # Write the DataFrame to a Parquet file in the temporary stage
temp_parquet_file_path = 'Parking_Violations_Issued_-_Fiscal_Year_2024.parquet'
# df.to_parquet(temp_parquet_file_path)

# Upload the Parquet file to the temporary stage
conn.cursor().execute(f"PUT file://{temp_parquet_file_path} @{temp_stage_name}")

# Copy the data from the temporary stage to the Snowflake table
copy_query = f"""
COPY INTO {SNOWFLAKE_TABLE_PARKING_VIOLATIONS}
FROM @{temp_stage_name}/Parking_Violations_Issued_-_Fiscal_Year_2024.parquet
FILE_FORMAT = (TYPE = PARQUET)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
"""
conn.cursor().execute(copy_query)

# Clean up: remove the temporary stage
conn.cursor().execute(f"REMOVE @{temp_stage_name}")
conn.cursor().execute(f"DROP STAGE {temp_stage_name}")

# Close the connection
conn.close()

print("Data loaded successfully to Snowflake.")


Data loaded successfully to Snowflake.


In [2]:
import boto3
import gzip 
import io
import pandas as pd

S3_BUCKET = 'parking-violations-bucket'
CSV_FILE_PATH = 'Parking_Violations_Issued_Fiscal_Year_2024.csv.gz'

# Initialize the S3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id="AKIA3FLDYXQMIJKM4EOJ",
    aws_secret_access_key="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",
)

# Read the gzipped CSV content from S3
response = s3_client.get_object(Bucket=S3_BUCKET, Key=CSV_FILE_PATH)
gzipped_csv_content = response['Body'].read()

# Decompress the gzipped content
with gzip.GzipFile(fileobj=io.BytesIO(gzipped_csv_content), mode='rb') as gz:
    csv_content = gz.read().decode('utf-8')

# Read the CSV content into a DataFrame
df = pd.read_csv(io.StringIO(csv_content))

# Print the DataFrame to verify
print(df)

  df = pd.read_csv(io.StringIO(csv_content))


          Summons Number  Plate ID Registration State Plate Type  Issue Date  \
0             1159637337   KZH2758                 NY        PAS  06/09/2023   
1             1252960645   JPD8746                 NY        PAS  06/30/2023   
2             1252960669   JPD8746                 NY        PAS  06/30/2023   
3             1252994126   MBH9245                 99        PAS  07/06/2023   
4             1252994175   MBH9245                 PA        PAS  07/08/2023   
...                  ...       ...                ...        ...         ...   
14765372      9133935178   LEK7929                 NY        PAS  05/28/2024   
14765373      9133935180   LJF1533                 NY        PAS  05/28/2024   
14765374      9133935191     MAL1K                 NY        OMT  05/28/2024   
14765375      9133935208  T720318C                 NY        OMT  05/28/2024   
14765376      9133935210   LBJ4302                 NY        PAS  05/28/2024   

          Violation Code Vehicle Body T

In [3]:
import gzip
import io
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [4]:
# Your existing code
chunk_size = 10000  # Adjust the chunk size based on available memory
parquet_buffer = io.BytesIO()

parquet_writer = None
schema = None

for i, chunk in enumerate(pd.read_csv(io.StringIO(csv_content), chunksize=chunk_size, dtype=str)):
    print(f"Processing chunk {i+1}")
    chunk = chunk.fillna("N/A")
    if i == 0:
        schema = pa.Schema.from_pandas(df=chunk)
        parquet_writer = pq.ParquetWriter(parquet_buffer, schema, compression='gzip')
    table = pa.Table.from_pandas(chunk, schema=schema)
    parquet_writer.write_table(table)

if parquet_writer:
    parquet_writer.close()

parquet_buffer.seek(0)

# Save the Parquet file
with open("output.parquet", "wb") as f:
    f.write(parquet_buffer.read())

Processing chunk 1
Processing chunk 2
Processing chunk 3
Processing chunk 4
Processing chunk 5
Processing chunk 6
Processing chunk 7
Processing chunk 8
Processing chunk 9
Processing chunk 10
Processing chunk 11
Processing chunk 12
Processing chunk 13
Processing chunk 14
Processing chunk 15
Processing chunk 16
Processing chunk 17
Processing chunk 18
Processing chunk 19
Processing chunk 20
Processing chunk 21
Processing chunk 22
Processing chunk 23
Processing chunk 24
Processing chunk 25
Processing chunk 26
Processing chunk 27
Processing chunk 28
Processing chunk 29
Processing chunk 30
Processing chunk 31
Processing chunk 32
Processing chunk 33
Processing chunk 34
Processing chunk 35
Processing chunk 36
Processing chunk 37
Processing chunk 38
Processing chunk 39
Processing chunk 40
Processing chunk 41
Processing chunk 42
Processing chunk 43
Processing chunk 44
Processing chunk 45
Processing chunk 46
Processing chunk 47
Processing chunk 48
Processing chunk 49
Processing chunk 50
Processin

Processing chunk 398
Processing chunk 399
Processing chunk 400
Processing chunk 401
Processing chunk 402
Processing chunk 403
Processing chunk 404
Processing chunk 405
Processing chunk 406
Processing chunk 407
Processing chunk 408
Processing chunk 409
Processing chunk 410
Processing chunk 411
Processing chunk 412
Processing chunk 413
Processing chunk 414
Processing chunk 415
Processing chunk 416
Processing chunk 417
Processing chunk 418
Processing chunk 419
Processing chunk 420
Processing chunk 421
Processing chunk 422
Processing chunk 423
Processing chunk 424
Processing chunk 425
Processing chunk 426
Processing chunk 427
Processing chunk 428
Processing chunk 429
Processing chunk 430
Processing chunk 431
Processing chunk 432
Processing chunk 433
Processing chunk 434
Processing chunk 435
Processing chunk 436
Processing chunk 437
Processing chunk 438
Processing chunk 439
Processing chunk 440
Processing chunk 441
Processing chunk 442
Processing chunk 443
Processing chunk 444
Processing ch

Processing chunk 790
Processing chunk 791
Processing chunk 792
Processing chunk 793
Processing chunk 794
Processing chunk 795
Processing chunk 796
Processing chunk 797
Processing chunk 798
Processing chunk 799
Processing chunk 800
Processing chunk 801
Processing chunk 802
Processing chunk 803
Processing chunk 804
Processing chunk 805
Processing chunk 806
Processing chunk 807
Processing chunk 808
Processing chunk 809
Processing chunk 810
Processing chunk 811
Processing chunk 812
Processing chunk 813
Processing chunk 814
Processing chunk 815
Processing chunk 816
Processing chunk 817
Processing chunk 818
Processing chunk 819
Processing chunk 820
Processing chunk 821
Processing chunk 822
Processing chunk 823
Processing chunk 824
Processing chunk 825
Processing chunk 826
Processing chunk 827
Processing chunk 828
Processing chunk 829
Processing chunk 830
Processing chunk 831
Processing chunk 832
Processing chunk 833
Processing chunk 834
Processing chunk 835
Processing chunk 836
Processing ch

KeyboardInterrupt: 

In [5]:
PARQUET_FILE_PATH="Parking_Violations_Issued_Fiscal_Year_2024.parquet"

In [7]:
df.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,1159637337,KZH2758,NY,PAS,06/09/2023,67,VAN,HONDA,P,0,...,BLUE,0,2006,-,0,,,,,
1,1252960645,JPD8746,NY,PAS,06/30/2023,87,SUBN,LINCO,M,17870,...,GRAY,0,2020,-,0,,,,,
2,1252960669,JPD8746,NY,PAS,06/30/2023,31,SUBN,LINCO,M,17870,...,GRAY,0,2020,-,0,,,,,
3,1252994126,MBH9245,99,PAS,07/06/2023,20,SDN,KIA,M,12690,...,WHITE,0,0,-,0,,,,,
4,1252994175,MBH9245,PA,PAS,07/08/2023,40,SDN,KIA,M,12690,...,WHITE,0,0,-,0,,,,,


In [8]:
# obj_cols = df.select_dtypes(include=['object']).columns
# df[obj_cols] = df[obj_cols].astype(str)

# int_cols = df.select_dtypes(include=['int64']).columns
# df[int_cols] = df[int_cols].astype('int32')

# float_cols = df.select_dtypes(include=['float64']).columns
# df[float_cols] = df[float_cols].astype('float32')

In [34]:
import pandas as pd
import gzip

# Path to the gzipped CSV file
file_path = 'Parking_Violations_Issued_Fiscal_Year_2024.csv.gz'

# Read the gzipped CSV file
with gzip.open(file_path, 'rt') as f:
    df = pd.read_csv(f)

# Display the first few rows of the dataframe
print(df.head())

  df = pd.read_csv(f)


   Summons Number Plate ID Registration State Plate Type  Issue Date  \
0      1159637337  KZH2758                 NY        PAS  06/09/2023   
1      1252960645  JPD8746                 NY        PAS  06/30/2023   
2      1252960669  JPD8746                 NY        PAS  06/30/2023   
3      1252994126  MBH9245                 99        PAS  07/06/2023   
4      1252994175  MBH9245                 PA        PAS  07/08/2023   

   Violation Code Vehicle Body Type Vehicle Make Issuing Agency  Street Code1  \
0              67               VAN        HONDA              P             0   
1              87              SUBN        LINCO              M         17870   
2              31              SUBN        LINCO              M         17870   
3              20               SDN          KIA              M         12690   
4              40               SDN          KIA              M         12690   

   ...  Vehicle Color  Unregistered Vehicle?  Vehicle Year  Meter Number  \
0  .

In [None]:
    # Get the current working directory
    
    # print("beforecwd",os.getcwd())
    # print("files",os.listdir(os.getcwd()))


    # os.chdir('/opt/airflow/data')

    # cwd = os.getcwd()

    # files = os.listdir(cwd)

    # print("cwd",cwd)
    # print("files",files)

In [None]:
# import time
# import pandas as pd
# import polars as pl
# import pyarrow.parquet as pq
# import pyarrow.csv as pacsv

# # Polars
# def polars():
#     start_time = time.time()
#     polars_time = time.time() - start_time
#     df = pl.scan_csv(csv_file)
#     polars_time = time.time() - start_time
#     # Convert to Parquet
#     start_time = time.time()
#     df.sink_parquet(
#         parquet_file,
#         compression="snappy",
#         row_group_size=100_000
#     )

#     #df = None
#     polars_parquet_time = time.time() - start_time

#     print(f"Polars time: {polars_time:.2f} seconds")
#     print(f"Polars to Parquet time: {polars_parquet_time:.2f} seconds")

# # Pyarrow
# csv_file="Parking_Violations_Issued_-_Fiscal_Year_2024.csv"
# parquet_file="Parking_Violations_Issued_Fiscal_Year_2024_polars.parquet"
# start_time = time.time()
# parse_options = pacsv.ParseOptions(delimiter="\t", quote_char="^")
# df = pacsv.read_csv(csv_file, parse_options=parse_options)

# pyarrow_time = time.time() - start_time

# # Convert to Parquet
# start_time = time.time()
# pq.write_table(df, parquet_file)
# pyarrow_parquet_time = time.time() - start_time
# table_pyarrow = None
# print(f"Pyarrow time: {pyarrow_time:.2f} seconds")
# print(f"Pyarrow to Parquet time: {pyarrow_parquet_time:.2f} seconds")

## Not Using

In [22]:
# Get County_Precinct Data from here (https://www.nyc.gov/site/nypd/bureaus/patrol/precincts-landing.page)

In [20]:
# Correct data for the precincts with matching lengths for County and Precinct arrays
data = {
    "County": ["Manhattan"] * 22 + ["Bronx"] * 12 + ["Brooklyn"] * 23 + ["Queens"] * 16 + ["Staten Island"] * 4,
    "Precinct": [
        "1st Precinct", "5th Precinct", "6th Precinct", "7th Precinct", "9th Precinct", "10th Precinct", "13th Precinct", 
        "Midtown South Precinct", "17th Precinct", "Midtown North Precinct", "19th Precinct", "20th Precinct", 
        "Central Park Precinct", "23rd Precinct", "24th Precinct", "25th Precinct", "26th Precinct", "28th Precinct", 
        "30th Precinct", "32nd Precinct", "33rd Precinct", "34th Precinct", 
        "40th Precinct", "41st Precinct", "42nd Precinct", "43rd Precinct", "44th Precinct", "45th Precinct", 
        "46th Precinct", "47th Precinct", "48th Precinct", "49th Precinct", "50th Precinct", "52nd Precinct", 
        "60th Precinct", "61st Precinct", "62nd Precinct", "63rd Precinct", "66th Precinct", "67th Precinct", 
        "68th Precinct", "69th Precinct", "70th Precinct", "71st Precinct", "72nd Precinct", "73rd Precinct", 
        "75th Precinct", "76th Precinct", "77th Precinct", "78th Precinct", "79th Precinct", "81st Precinct", 
        "83rd Precinct", "84th Precinct", "88th Precinct", "90th Precinct", "94th Precinct", 
        "100th Precinct", "101st Precinct", "102nd Precinct", "103rd Precinct", "104th Precinct", "105th Precinct", 
        "106th Precinct", "107th Precinct", "108th Precinct", "109th Precinct", "110th Precinct", "111th Precinct", 
        "112th Precinct", "113th Precinct", "114th Precinct", "115th Precinct", 
        "120th Precinct", "121st Precinct", "122nd Precinct", "123rd Precinct"
    ]
}

# Creating the dataframe
df_precincts = pd.DataFrame(data)

# Extracting the precinct numbers
df_precincts["Precinct Number"] = df_precincts["Precinct"].str.extract(r'(\d+)', expand=False)

# Dropping rows where precinct number extraction failed
df_precincts = df_precincts.dropna(subset=["Precinct Number"])

In [23]:
df_precincts.to_csv("County_Precinct.csv",index=False)

## Chunking code (old)

In [8]:
# Task to convert CSV to Parquet directly in S3
def convert_csv_to_parquet():
    start_time = time.time()
    print("Starting the CSV to Parquet conversion process")

    s3_hook = S3Hook(aws_conn_id='aws_default')

    # Download gzipped CSV file content from S3
    print(f"Downloading gzipped CSV content from S3 bucket {S3_BUCKET}, key {CSV_FILE_PATH_VIOLATION}")
    gzipped_csv_content = s3_hook.get_key(key=CSV_FILE_PATH_VIOLATION, bucket_name=S3_BUCKET).get()["Body"].read()
    
    # Decompress gzipped CSV content
    print("Decompressing gzipped CSV content")
    with gzip.GzipFile(fileobj=io.BytesIO(gzipped_csv_content), mode='rb') as gz:
        csv_content = gz.read().decode('utf-8')

    # Read CSV content into DataFrame in chunks
    print("Reading CSV content into DataFrame in chunks")
    chunk_size = 5000
    parquet_buffer = io.BytesIO()
    
    parquet_writer = None
    schema = None

    for i, chunk in enumerate(pd.read_csv(io.StringIO(csv_content), chunksize=chunk_size, dtype=str)):
        print(f"Processing chunk {i+1}")
        
        # Handle null values and adjust column names
        chunk = chunk.fillna("N/A")
        chunk.columns = [col.upper().replace(' ', '_') for col in chunk.columns]
        chunk.columns = [col.upper().replace('?', '') for col in chunk.columns]

        if i == 0:
            schema = pa.Schema.from_pandas(df=chunk)
            parquet_writer = pq.ParquetWriter(parquet_buffer, schema, compression='gzip')
        table = pa.Table.from_pandas(chunk, schema=schema)
        parquet_writer.write_table(table)
    
    if parquet_writer:
        parquet_writer.close()
    
    parquet_buffer.seek(0)
    print("Parquet file created in memory")

    # Upload Parquet file content to S3
    print(f"Uploading Parquet file to S3 bucket {S3_BUCKET}, key {PARQUET_FILE_PATH_VIOLATION}")
    s3_hook.load_bytes(parquet_buffer.read(), key=PARQUET_FILE_PATH_VIOLATION, bucket_name=S3_BUCKET, replace=True)
    print(f"Uploaded Parquet file to S3 bucket {S3_BUCKET}")

    # Remove files from S3
    s3_hook.delete_objects(bucket=S3_BUCKET, keys=CSV_FILE_PATH_VIOLATION)
    print(f"Files {CSV_FILE_PATH_VIOLATION} removed from S3 bucket {S3_BUCKET}.")

    end_time = time.time()
    duration = (end_time - start_time) / 60
    print(f"CSV to Parquet conversion completed in {duration:.2f} minutes")

In [None]:
# Function to estimate bytes per line
def estimate_bpl(file_path, n_rows=10):
    """Return estimates of bytes per line using the first n lines"""
    s3_hook = S3Hook(aws_conn_id='aws_default')
    gzipped_csv_content = s3_hook.get_key(key=file_path, bucket_name=S3_BUCKET).get()["Body"].read()

    with gzip.GzipFile(fileobj=io.BytesIO(gzipped_csv_content), mode='rb') as gz:
        csv_content = gz.read().decode('utf-8')

    lines = csv_content.splitlines()
    total_length = sum(len(line.encode('utf-8')) for line in lines[:n_rows])
    return total_length / n_rows

In [None]:
# Estimate chunk size based on available memory
print("Estimating chunk size based on available memory")
fill_rate = 0.1
avail_mem = psutil.virtual_memory().available
bpl = estimate_bpl(CSV_FILE_PATH_VIOLATION)
chunk_size = int(avail_mem * fill_rate / bpl)
print(f"Estimated chunk size: {chunk_size}")