# APAN 5400 Term Project:
## Analysis of Parking Violations in New York City

### Group 1: Xiangdong Wang

#### Required Packages

In [2]:
import requests
import pandas as pd
import time
from pymongo import MongoClient

#### 1. Data Extraction

In [None]:
url = "https://data.cityofnewyork.us/resource/nc67-uf89.json"
total = 4_000_000
limit = 50_000
sleep_time = 1 
headers = {} 

final_file = "parking_violations_4M.csv"


offset = 0

while offset < total:
    print(f" Extracting {offset} to {offset + limit} ...")
    params = {
        "$limit": limit,
        "$offset": offset
    }

    try:
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status()
        data = response.json()
    except Exception as e:
        print(f" failed offset={offset}: {e}")
        break

    if not data:
        print("API empty data, failed")
        break

    df = pd.DataFrame(data)

    # to final file
    df.to_csv(final_file, mode='a', index=False, header=(offset == 0))
    print(f"Write successfully（Total_Records ~ {offset + limit:,}）")

    offset += limit
    time.sleep(sleep_time)

print("Successful：parking_violations_4M.csv")

In [None]:
import pandas as pd
import requests
import time

url = "https://data.cityofnewyork.us/resource/nc67-uf89.json"

total = 4_000_000
limit = 50_000
sleep_time = 1

headers = {}
final_file = "parking_violations_latest.csv"

offset = 0

while offset < total:
    print(f"Extracting {offset} to {offset + limit} ...")
    
    params = {
        "$limit": limit,
        "$offset": offset,
        "$order": ":id DESC"
    }

    try:
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status()
        data = response.json()
    except Exception as e:
        print(f" Failed at offset={offset}: {e}")
        break

    if not data:
        print("API returned no data. Ending early.")
        break

    df = pd.DataFrame(data)


    df.to_csv(final_file, mode='a', index=False, header=(offset == 0))
    print(f"Written: {offset + limit:,} rows")

    offset += limit
    time.sleep(sleep_time)

print("All 4M rows extracted to parking_violations_latest.csv")


Extracting 0 to 50000 ...
✅ Written: 50,000 rows
All 4M rows extracted to parking_violations_latest.csv


In [None]:
df = pd.read_csv("parking_violations_latest.csv", usecols=["issue_date"])
df["issue_date"] = pd.to_datetime(df["issue_date"], errors="coerce")

print(df["issue_date"].describe())

count                          4050000
mean     2025-02-13 05:52:09.408000256
min                2000-01-02 00:00:00
25%                2025-01-22 00:00:00
50%                2025-02-16 00:00:00
75%                2025-03-10 00:00:00
max                2025-04-07 00:00:00
Name: issue_date, dtype: object


#### 2. Transform

In [75]:
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [76]:
os.environ['PYSPARK_PYTHON']

'/Users/wangxd/Desktop/.conda/bin/python'

In [77]:
os.environ['PYSPARK_DRIVER_PYTHON']

'/Users/wangxd/Desktop/.conda/bin/python'

In [78]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Intro to Apache Spark") \
    .config("spark.cores.max", "4") \
    .config('spark.executor.memory', '8G') \
    .config('spark.driver.maxResultSize', '8g') \
    .config('spark.kryoserializer.buffer.max', '512m') \
    .config("spark.driver.cores", "4") \
    .getOrCreate()

sc = spark.sparkContext

print("Using Apache Spark Version", spark.version)

Using Apache Spark Version 3.5.5


In [79]:
pv_sdf = spark.read.option("header", "true") \
                   .option("delimiter", ",") \
                   .option("inferSchema", "true") \
                   .csv('/Users/wangxd/Desktop/5400_Project/parking_violations_latest.csv')
pv_sdf.show()

                                                                                

+--------+-----+------------+--------------+----------+--------------+--------------------+-----------+--------------+---------------+----------------+--------------+----------+--------+------+--------------+--------------------+----------------+
|   plate|state|license_type|summons_number|issue_date|violation_time|           violation|fine_amount|penalty_amount|interest_amount|reduction_amount|payment_amount|amount_due|precinct|county|issuing_agency|       summons_image|violation_status|
+--------+-----+------------+--------------+----------+--------------+--------------------+-----------+--------------+---------------+----------------+--------------+----------+--------+------+--------------+--------------------+----------------+
| LJM9267|   NY|         PAS|    9197665241|04/02/2025|        11:56A|NO PARKING-DAY/TI...|         65|             0|            0.0|             0.0|          65.0|       0.0|       1|    NY|       TRAFFIC|{'url': 'http://n...|            NULL|
| CAU2518|  

##### 2.1 Check the Columns

In [80]:
pv_sdf.printSchema()

root
 |-- plate: string (nullable = true)
 |-- state: string (nullable = true)
 |-- license_type: string (nullable = true)
 |-- summons_number: long (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- violation: string (nullable = true)
 |-- fine_amount: integer (nullable = true)
 |-- penalty_amount: integer (nullable = true)
 |-- interest_amount: double (nullable = true)
 |-- reduction_amount: double (nullable = true)
 |-- payment_amount: double (nullable = true)
 |-- amount_due: double (nullable = true)
 |-- precinct: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- issuing_agency: string (nullable = true)
 |-- summons_image: string (nullable = true)
 |-- violation_status: string (nullable = true)



##### 2.2 Check the Null Values

In [81]:
for col_name in pv_sdf.columns:
    null_count = pv_sdf.filter(pv_sdf[col_name].isNull()).count()
    print(f"{col_name}: {null_count} null values")

                                                                                

plate: 0 null values


                                                                                

state: 0 null values


                                                                                

license_type: 0 null values


                                                                                

summons_number: 1 null values


                                                                                

issue_date: 0 null values


                                                                                

violation_time: 88 null values


                                                                                

violation: 163 null values


                                                                                

fine_amount: 41 null values


                                                                                

penalty_amount: 41 null values


                                                                                

interest_amount: 41 null values


                                                                                

reduction_amount: 41 null values


                                                                                

payment_amount: 41 null values


                                                                                

amount_due: 41 null values


                                                                                

precinct: 41 null values


                                                                                

county: 167842 null values


                                                                                

issuing_agency: 48400 null values


                                                                                

summons_image: 507758 null values




violation_status: 2968879 null values


                                                                                

##### 2.3 DATE

In [83]:
from pyspark.sql.functions import to_date, col, upper

pv_sdf = pv_sdf.withColumn("issue_date", to_date("issue_date", "MM/dd/yyyy"))


pv_sdf.select("issue_date").printSchema()

root
 |-- issue_date: date (nullable = true)



##### 2.4 Float

In [None]:
pv_sdf = pv_sdf \
    .withColumn("fine_amount", col("fine_amount").cast("float")) \
    .withColumn("penalty_amount", col("penalty_amount").cast("float")) \
    .withColumn("interest_amount", col("interest_amount").cast("float")) \
    .withColumn("reduction_amount", col("reduction_amount").cast("float")) \
    .withColumn("payment_amount", col("payment_amount").cast("float")) \
    .withColumn("amount_due", col("amount_due").cast("float"))

pv_sdf.select("fine_amount", "penalty_amount", "amount_due").show(5)
pv_sdf.printSchema()

+-----------+--------------+----------+
|fine_amount|penalty_amount|amount_due|
+-----------+--------------+----------+
|       65.0|           0.0|       0.0|
|      115.0|           0.0|       0.0|
|       65.0|           0.0|       0.0|
|      115.0|           0.0|       0.0|
|      115.0|           0.0|       0.0|
+-----------+--------------+----------+
only showing top 5 rows

root
 |-- plate: string (nullable = true)
 |-- state: string (nullable = true)
 |-- license_type: string (nullable = true)
 |-- summons_number: long (nullable = true)
 |-- issue_date: date (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- violation: string (nullable = true)
 |-- fine_amount: float (nullable = true)
 |-- penalty_amount: float (nullable = true)
 |-- interest_amount: float (nullable = true)
 |-- reduction_amount: float (nullable = true)
 |-- payment_amount: float (nullable = true)
 |-- amount_due: float (nullable = true)
 |-- precinct: integer (nullable = true)
 |-- county: s

##### 2.5 Upper and String

In [None]:
from pyspark.sql.functions import upper, col

pv_sdf = pv_sdf \
    .withColumn("state", upper(col("state"))) \
    .withColumn("license_type", upper(col("license_type"))) \
    .withColumn("violation", upper(col("violation"))) \
    .withColumn("county", upper(col("county"))) \
    .withColumn("issuing_agency", upper(col("issuing_agency"))) \
    .withColumn("violation_status", upper(col("violation_status")))


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `violation_status` cannot be resolved. Did you mean one of the following? [`violation_time`, `violation_hour`, `violation`, `issue_date`, `state`].;
'Project [plate#7310, state#19179, license_type#19199, summons_number#8157, issue_date#7890, violation_time#7315, violation#19219, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, county#19239, issuing_agency#19259, day_of_week#8281, violation_hour#8347, am_pm#8325, upper('violation_status) AS violation_status#19279]
+- Project [plate#7310, state#19179, license_type#19199, summons_number#8157, issue_date#7890, violation_time#7315, violation#19219, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, county#19239, upper(issuing_agency#12164) AS issuing_agency#19259, day_of_week#8281, violation_hour#8347, am_pm#8325]
   +- Project [plate#7310, state#19179, license_type#19199, summons_number#8157, issue_date#7890, violation_time#7315, violation#19219, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, upper(county#12163) AS county#19239, issuing_agency#12164, day_of_week#8281, violation_hour#8347, am_pm#8325]
      +- Project [plate#7310, state#19179, license_type#19199, summons_number#8157, issue_date#7890, violation_time#7315, upper(violation#8081) AS violation#19219, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, county#12163, issuing_agency#12164, day_of_week#8281, violation_hour#8347, am_pm#8325]
         +- Project [plate#7310, state#19179, upper(license_type#8062) AS license_type#19199, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, county#12163, issuing_agency#12164, day_of_week#8281, violation_hour#8347, am_pm#8325]
            +- Project [plate#7310, upper(state#8043) AS state#19179, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, county#12163, issuing_agency#12164, day_of_week#8281, violation_hour#8347, am_pm#8325]
               +- Sort [issue_date#7890 DESC NULLS LAST], true
                  +- Filter (issue_date#7890 <= cast(2025-04-10 as date))
                     +- Sort [issue_date#7890 DESC NULLS LAST], true
                        +- Filter (issue_date#7890 <= cast(2025-04-10 as date))
                           +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#12119, interest_amount#12120, reduction_amount#12121, payment_amount#12122, amount_due#12123, precinct#12124, coalesce(county#9677, cast(UNKNOWN as string)) AS county#12163, coalesce(issuing_agency#9678, cast(UNKNOWN as string)) AS issuing_agency#12164, day_of_week#8281, violation_hour#8347, am_pm#8325]
                              +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, coalesce(nanvl(penalty_amount#9633, cast(null as float)), cast(0.0 as float)) AS penalty_amount#12119, coalesce(nanvl(interest_amount#9634, cast(null as float)), cast(0.0 as float)) AS interest_amount#12120, coalesce(nanvl(reduction_amount#9635, cast(null as float)), cast(0.0 as float)) AS reduction_amount#12121, coalesce(nanvl(payment_amount#9636, cast(null as float)), cast(0.0 as float)) AS payment_amount#12122, coalesce(nanvl(amount_due#9637, cast(null as float)), cast(0.0 as float)) AS amount_due#12123, coalesce(precinct#9638, cast(0 as int)) AS precinct#12124, county#9677, issuing_agency#9678, day_of_week#8281, violation_hour#8347, am_pm#8325]
                                 +- Filter atleastnnonnulls(4, summons_number#8157, violation_time#7315, violation#8081, violation_hour#8347)
                                    +- Deduplicate [amount_due#9637, plate#7310, penalty_amount#9633, state#8043, day_of_week#8281, issuing_agency#9678, summons_number#8157, issue_date#7890, county#9677, violation_hour#8347, precinct#9638, reduction_amount#9635, fine_amount#7910, violation#8081, payment_amount#9636, violation_time#7315, license_type#8062, am_pm#8325, interest_amount#9634]
                                       +- Deduplicate [summons_number#8157]
                                          +- Deduplicate [summons_number#8157]
                                             +- Sort [issue_date#7890 DESC NULLS LAST], true
                                                +- Filter (issue_date#7890 <= cast(2025-04-10 as date))
                                                   +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#9633, interest_amount#9634, reduction_amount#9635, payment_amount#9636, amount_due#9637, precinct#9638, coalesce(county#9049, cast(UNKNOWN as string)) AS county#9677, coalesce(issuing_agency#9050, cast(UNKNOWN as string)) AS issuing_agency#9678, day_of_week#8281, violation_hour#8347, am_pm#8325]
                                                      +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, coalesce(nanvl(penalty_amount#9001, cast(null as float)), cast(0.0 as float)) AS penalty_amount#9633, coalesce(nanvl(interest_amount#9002, cast(null as float)), cast(0.0 as float)) AS interest_amount#9634, coalesce(nanvl(reduction_amount#9003, cast(null as float)), cast(0.0 as float)) AS reduction_amount#9635, coalesce(nanvl(payment_amount#9004, cast(null as float)), cast(0.0 as float)) AS payment_amount#9636, coalesce(nanvl(amount_due#9005, cast(null as float)), cast(0.0 as float)) AS amount_due#9637, coalesce(precinct#9006, cast(0 as int)) AS precinct#9638, county#9049, issuing_agency#9050, day_of_week#8281, violation_hour#8347, am_pm#8325]
                                                         +- Filter atleastnnonnulls(1, violation_hour#8347)
                                                            +- Filter atleastnnonnulls(2, summons_number#8157, violation_time#7315)
                                                               +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#9001, interest_amount#9002, reduction_amount#9003, payment_amount#9004, amount_due#9005, precinct#9006, county#9049, issuing_agency#9050, day_of_week#8281, violation_hour#8347, am_pm#8325]
                                                                  +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#9001, interest_amount#9002, reduction_amount#9003, payment_amount#9004, amount_due#9005, precinct#9006, coalesce(county#8100, cast(UNKNOWN as string)) AS county#9049, coalesce(issuing_agency#8119, cast(UNKNOWN as string)) AS issuing_agency#9050, summons_image#7326, violation_status#8138, day_of_week#8281, violation_hour#8347, am_pm#8325]
                                                                     +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, coalesce(nanvl(penalty_amount#7929, cast(null as float)), cast(0.0 as float)) AS penalty_amount#9001, coalesce(nanvl(interest_amount#7948, cast(null as float)), cast(0.0 as float)) AS interest_amount#9002, coalesce(nanvl(reduction_amount#7967, cast(null as float)), cast(0.0 as float)) AS reduction_amount#9003, coalesce(nanvl(payment_amount#7986, cast(null as float)), cast(0.0 as float)) AS payment_amount#9004, coalesce(nanvl(amount_due#8005, cast(null as float)), cast(0.0 as float)) AS amount_due#9005, coalesce(precinct#8176, cast(0 as int)) AS precinct#9006, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, day_of_week#8281, violation_hour#8347, am_pm#8325]
                                                                        +- Filter atleastnnonnulls(2, summons_number#8157, violation_time#7315)
                                                                           +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, day_of_week#8281, cast(violation_hour#8303 as int) AS violation_hour#8347, am_pm#8325]
                                                                              +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, day_of_week#8281, violation_hour#8303, substring(violation_time#7315, -1, 1) AS am_pm#8325]
                                                                                 +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, day_of_week#8281, substring(violation_time#7315, 1, 2) AS violation_hour#8303, am_pm#8236]
                                                                                    +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, date_format(cast(issue_date#7890 as timestamp), E, Some(America/Toronto)) AS day_of_week#8281, violation_hour#8215, am_pm#8236]
                                                                                       +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, day_of_week#8195, violation_hour#8215, substring(violation_time#7315, -1, 1) AS am_pm#8236]
                                                                                          +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, day_of_week#8195, substring(violation_time#7315, 1, 2) AS violation_hour#8215]
                                                                                             +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138, date_format(cast(issue_date#7890 as timestamp), E, Some(America/Toronto)) AS day_of_week#8195]
                                                                                                +- Project [plate#7310, state#8043, license_type#8062, summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, cast(precinct#7323 as int) AS precinct#8176, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138]
                                                                                                   +- Project [plate#7310, state#8043, license_type#8062, cast(summons_number#7313L as int) AS summons_number#8157, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, county#8100, issuing_agency#8119, summons_image#7326, violation_status#8138]
                                                                                                      +- Project [plate#7310, state#8043, license_type#8062, summons_number#7313L, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, county#8100, issuing_agency#8119, summons_image#7326, upper(violation_status#7327) AS violation_status#8138]
                                                                                                         +- Project [plate#7310, state#8043, license_type#8062, summons_number#7313L, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, county#8100, upper(issuing_agency#7325) AS issuing_agency#8119, summons_image#7326, violation_status#7327]
                                                                                                            +- Project [plate#7310, state#8043, license_type#8062, summons_number#7313L, issue_date#7890, violation_time#7315, violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, upper(county#7324) AS county#8100, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                               +- Project [plate#7310, state#8043, license_type#8062, summons_number#7313L, issue_date#7890, violation_time#7315, upper(violation#7316) AS violation#8081, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                  +- Project [plate#7310, state#8043, upper(license_type#7312) AS license_type#8062, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                     +- Project [plate#7310, upper(state#7311) AS state#8043, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, amount_due#8005, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                        +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, payment_amount#7986, cast(amount_due#7322 as float) AS amount_due#8005, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                           +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, penalty_amount#7929, interest_amount#7948, reduction_amount#7967, cast(payment_amount#7321 as float) AS payment_amount#7986, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                              +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, penalty_amount#7929, interest_amount#7948, cast(reduction_amount#7320 as float) AS reduction_amount#7967, payment_amount#7321, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                                 +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, penalty_amount#7929, cast(interest_amount#7319 as float) AS interest_amount#7948, reduction_amount#7320, payment_amount#7321, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                                    +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, fine_amount#7910, cast(penalty_amount#7318 as float) AS penalty_amount#7929, interest_amount#7319, reduction_amount#7320, payment_amount#7321, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                                       +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, issue_date#7890, violation_time#7315, violation#7316, cast(fine_amount#7317 as float) AS fine_amount#7910, penalty_amount#7318, interest_amount#7319, reduction_amount#7320, payment_amount#7321, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                                          +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, to_date(issue_date#7870, Some(MM/dd/yyyy), Some(America/Toronto), false) AS issue_date#7890, violation_time#7315, violation#7316, fine_amount#7317, penalty_amount#7318, interest_amount#7319, reduction_amount#7320, payment_amount#7321, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                                             +- Project [plate#7310, state#7311, license_type#7312, summons_number#7313L, to_date(issue_date#7314, Some(MM/dd/yyyy), Some(America/Toronto), false) AS issue_date#7870, violation_time#7315, violation#7316, fine_amount#7317, penalty_amount#7318, interest_amount#7319, reduction_amount#7320, payment_amount#7321, amount_due#7322, precinct#7323, county#7324, issuing_agency#7325, summons_image#7326, violation_status#7327]
                                                                                                                                                +- Relation [plate#7310,state#7311,license_type#7312,summons_number#7313L,issue_date#7314,violation_time#7315,violation#7316,fine_amount#7317,penalty_amount#7318,interest_amount#7319,reduction_amount#7320,payment_amount#7321,amount_due#7322,precinct#7323,county#7324,issuing_agency#7325,summons_image#7326,violation_status#7327] csv


In [138]:
pv_sdf = pv_sdf.withColumn("summons_number", col("summons_number").cast("string"))

##### 2.6 Integer

In [None]:
pv_sdf = pv_sdf.withColumn("precinct", col("precinct").cast("int"))

pv_sdf.printSchema()


root
 |-- plate: string (nullable = true)
 |-- state: string (nullable = true)
 |-- license_type: string (nullable = true)
 |-- summons_number: integer (nullable = true)
 |-- issue_date: date (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- violation: string (nullable = true)
 |-- fine_amount: float (nullable = true)
 |-- penalty_amount: float (nullable = true)
 |-- interest_amount: float (nullable = true)
 |-- reduction_amount: float (nullable = true)
 |-- payment_amount: float (nullable = true)
 |-- amount_due: float (nullable = true)
 |-- precinct: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- issuing_agency: string (nullable = true)
 |-- summons_image: string (nullable = true)
 |-- violation_status: string (nullable = true)



##### 2.7 New Features

##### We add some available new variables to facilitate our subsequent potential analysis

In [88]:
from pyspark.sql.functions import date_format, substring

pv_sdf = pv_sdf.withColumn("day_of_week", date_format("issue_date", "E"))

pv_sdf = pv_sdf.withColumn("violation_hour", substring("violation_time", 1, 2))

pv_sdf = pv_sdf.withColumn("am_pm", substring("violation_time", -1, 1))

pv_sdf = pv_sdf.withColumn("violation_hour", col("violation_hour").cast("int"))

pv_sdf.printSchema()
pv_sdf.select("violation_time", "violation_hour", "am_pm", "day_of_week").show(10)

root
 |-- plate: string (nullable = true)
 |-- state: string (nullable = true)
 |-- license_type: string (nullable = true)
 |-- summons_number: integer (nullable = true)
 |-- issue_date: date (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- violation: string (nullable = true)
 |-- fine_amount: float (nullable = true)
 |-- penalty_amount: float (nullable = true)
 |-- interest_amount: float (nullable = true)
 |-- reduction_amount: float (nullable = true)
 |-- payment_amount: float (nullable = true)
 |-- amount_due: float (nullable = true)
 |-- precinct: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- issuing_agency: string (nullable = true)
 |-- summons_image: string (nullable = true)
 |-- violation_status: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- violation_hour: integer (nullable = true)
 |-- am_pm: string (nullable = true)

+--------------+--------------+-----+-----------+
|violation_time|violation_hour|am_pm|day_

##### 2.8 Processing Missing value

In [106]:
from pyspark.sql.functions import col

# DROP
pv_sdf = pv_sdf.dropna(subset=["summons_number", "violation_time","violation","violation_hour"])

# IMPUTE
pv_sdf = pv_sdf.fillna({
    "penalty_amount": 0.0,
    "interest_amount": 0.0,
    "reduction_amount": 0.0,
    "payment_amount": 0.0,
    "amount_due": 0.0,
    "precinct": 0
})

# IMPUTE "UNKNOWN"
pv_sdf = pv_sdf.fillna({
    "county": "UNKNOWN",
    "issuing_agency": "UNKNOWN"
})

# DELETE
pv_sdf = pv_sdf.drop("summons_image", "violation_status")



In [105]:
for col_name in pv_sdf.columns:
    null_count = pv_sdf.filter(pv_sdf[col_name].isNull()).count()
    print(f"{col_name}: {null_count} null values")

                                                                                

plate: 0 null values


                                                                                

state: 0 null values


                                                                                

license_type: 0 null values


                                                                                

summons_number: 0 null values


25/04/10 17:49:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:49:35 WARN RowBasedKeyValueBatch: Calling spill() on

issue_date: 0 null values


                                                                                

violation_time: 0 null values


                                                                                

violation: 123 null values


25/04/10 17:51:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:51:33 WARN RowBasedKeyValueBatch: Calling spill() on

fine_amount: 0 null values
penalty_amount: 0 null values
interest_amount: 0 null values
reduction_amount: 0 null values
payment_amount: 0 null values
amount_due: 0 null values
precinct: 0 null values
county: 0 null values
issuing_agency: 0 null values


                                                                                

day_of_week: 0 null values


25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/10 17:52:56 WARN RowBasedKeyValueBatch: Calling spill() on

violation_hour: 0 null values


                                                                                

am_pm: 0 null values


In [134]:
from pyspark.sql.functions import to_date, lit

date = lit("2025-04-10").cast("date")

pv_sdf = pv_sdf.filter(col("issue_date") <= date)


pv_sdf = pv_sdf.orderBy(col("issue_date").desc())
pv_sdf.select("summons_number", "issue_date").show(5)




+--------------+----------+
|summons_number|issue_date|
+--------------+----------+
|    2023650264|2025-04-07|
|    2023165696|2025-04-07|
|    2009012884|2025-04-07|
|    2023483074|2025-04-07|
|    2023432807|2025-04-07|
+--------------+----------+
only showing top 5 rows



                                                                                

##### 2.9 Drop the duplication

In [97]:
pv_sdf = pv_sdf.dropDuplicates(["summons_number"])
pv_sdf = pv_sdf.dropDuplicates()

In [139]:
pv_sdf.count()

                                                                                

3999788

In [147]:
pv_sdf.select("summons_number").show(5)



+--------------+
|summons_number|
+--------------+
|    2023650264|
|    2023432807|
|    2023283190|
|    2023165696|
|    2009012884|
+--------------+
only showing top 5 rows



                                                                                

#### 3. Loading

In [99]:
!pip install -U "psycopg[binary]"

python(12441) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.




In [100]:
import pandas as pd
import psycopg, os
from sqlalchemy import create_engine

In [101]:
print('Connecting to the PostgreSQL database...')
conn = psycopg.connect(
    host="localhost",
    port='5432',
    dbname="54_Project",
    user="postgres",
    password="123")

Connecting to the PostgreSQL database...


In [102]:
conn_url = 'postgresql://postgres:123@localhost:5432/54_Project'

engine = create_engine(conn_url)

connection = engine.connect()

In [103]:
cur = conn.cursor()

In [104]:
# execute a statement
print('PostgreSQL database version:')
cur.execute('SELECT version()')

# display the PostgreSQL database server version
db_version = cur.fetchone()
print(db_version)

PostgreSQL database version:
('PostgreSQL 17.2 (Debian 17.2-1.pgdg120+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit',)


##### 3.1 Create Tables

In [141]:
create_tables = """
CREATE TABLE IF NOT EXISTS summons (
    summons_number VARCHAR(20) PRIMARY KEY,
    plate TEXT,
    state TEXT,
    license_type TEXT,
    violation TEXT,
    issue_date DATE,
    violation_time TEXT,
    precinct INT
);


CREATE TABLE IF NOT EXISTS violation_finance (
    summons_number VARCHAR(20) PRIMARY KEY,
    fine_amount FLOAT,
    penalty_amount FLOAT DEFAULT 0,
    interest_amount FLOAT DEFAULT 0,
    reduction_amount FLOAT DEFAULT 0,
    payment_amount FLOAT DEFAULT 0,
    amount_due FLOAT DEFAULT 0,
    FOREIGN KEY (summons_number) REFERENCES summons(summons_number)
);

CREATE TABLE IF NOT EXISTS violation_features (
    summons_number VARCHAR(20) PRIMARY KEY,
    day_of_week TEXT,
    violation_hour INT,
    am_pm TEXT,
    FOREIGN KEY (summons_number) REFERENCES summons(summons_number)
);

CREATE TABLE IF NOT EXISTS location_info (
    summons_number VARCHAR(20) PRIMARY KEY,
    county TEXT,
    issuing_agency TEXT,
    FOREIGN KEY (summons_number) REFERENCES summons(summons_number)
);
"""

connection.execute(create_tables)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2bd588ad0>

In [142]:
# pv_df = pv_sdf.toPandas()


from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col


windowed_df = pv_sdf.withColumn("row_num", row_number().over(Window.orderBy("summons_number")))

batch_size = 500_000
total_rows = windowed_df.count()

pandas_batches = []

for start in range(0, total_rows, batch_size):
    end = start + batch_size
    batch_df = windowed_df.filter((col("row_num") > start) & (col("row_num") <= end)).drop("row_num")
    pandas_batches.append(batch_df.toPandas())
    print(f"Converted rows {start} to {end}")


25/04/10 21:19:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:19:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:19:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:20:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:20:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:20:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 0 to 500000


25/04/10 21:23:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:23:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:23:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:23:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:23:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:24:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 500000 to 1000000


25/04/10 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:25:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:26:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:26:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:26:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 1000000 to 1500000


25/04/10 21:27:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:27:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:27:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:28:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:28:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:28:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 1500000 to 2000000


25/04/10 21:29:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:29:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:29:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:30:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:30:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:30:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 2000000 to 2500000


25/04/10 21:31:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:31:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:31:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:32:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:32:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 2500000 to 3000000


25/04/10 21:33:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:33:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:33:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:34:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:34:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:34:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 3000000 to 3500000


25/04/10 21:35:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:35:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:35:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:36:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:36:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 21:36:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/10 2

Converted rows 3500000 to 4000000


In [None]:
pv_df = pd.concat(pandas_batches, ignore_index=True)

print(pv_df.shape)



(3999788, 19)


In [148]:
pandas_batches

[           plate state license_type summons_number  issue_date violation_time  \
 0       T729359C    NY          OMT      -18134598  2025-02-26         03:25P   
 1        KTV3567    NY          PAS      -18134610  2025-02-26         03:17P   
 2        LAV2430    NY          PAS      -18134622  2025-02-26         03:15P   
 3        KYD8452    NY          PAS      -18134623  2025-02-26         02:58P   
 4        GJB6382    NY          PAS      -18134635  2025-02-26         02:56P   
 ...          ...   ...          ...            ...         ...            ...   
 499995    J22NNX    NJ          PAS     1498776590  2025-01-27         04:37P   
 499996   MKJ8324    PA          PAS     1498776607  2025-01-27         06:42P   
 499997   MHM8466    PA          PAS     1498776619  2025-02-02         05:36P   
 499998   KYE2127    NY          PAS     1498776700  2025-01-27         04:55P   
 499999   BMP3208    NY          PAS     1498776711  2025-01-25         01:51A   
 
              

In [123]:
pv_df.dtypes

plate                object
state                object
license_type         object
summons_number        int64
issue_date           object
violation_time       object
violation            object
fine_amount         float32
penalty_amount      float32
interest_amount     float32
reduction_amount    float32
payment_amount      float32
amount_due          float32
precinct              int64
county               object
issuing_agency       object
day_of_week          object
violation_hour        int32
am_pm                object
dtype: object

In [155]:
pv_df["summons_number"] = pv_df["summons_number"].astype(str)
pv_df["precinct"] = pv_df["precinct"].astype("int64")
pv_df["violation_hour"] = pv_df["violation_hour"].astype("int64")

In [156]:

summons_df = pv_df[[
    "summons_number", "plate", "state", "license_type", "violation",
    "issue_date", "violation_time", "precinct"
]]

finance_df = pv_df[[
    "summons_number", "fine_amount", "penalty_amount", "interest_amount",
    "reduction_amount", "payment_amount", "amount_due"
]]

features_df = pv_df[[
    "summons_number", "day_of_week", "violation_hour", "am_pm"
]]

location_df = pv_df[[
    "summons_number", "county", "issuing_agency"
]]




In [157]:
summons_df.to_sql("summons", con=engine, index=False, if_exists="append", chunksize=10000)
finance_df.to_sql("violation_finance", con=engine, index=False, if_exists="append", chunksize=10000)
features_df.to_sql("violation_features", con=engine, index=False, if_exists="append", chunksize=10000)
location_df.to_sql("location_info", con=engine, index=False, if_exists="append", chunksize=10000)


399788

In [None]:
summons_df.head(5)
@

Unnamed: 0,summons_number,plate,state,license_type,violation,issue_date,violation_time,precinct
0,-888337545,LSG3567,NY,PAS,NO STANDING-DAY/TIME LIMITS,2025-01-02,08:50P,110
1,-888337491,63DPXA,FL,PAS,NO STANDING-DAY/TIME LIMITS,2025-02-01,04:30P,0
2,-423958799,BLANKPLATE,99,999,REG. STICKER-EXPIRED/MISSING,2025-01-29,08:47A,81
3,-423450696,KGII60,FL,PAS,NO PARKING-EXC. AUTH. VEHICLE,2024-11-08,03:38P,108
4,-423450569,LRD9713,NY,PAS,NO STANDING-EXC. AUTH. VEHICLE,2024-11-16,12:00A,108
