In [1]:
import pandas as pd
from sodapy import Socrata
import pyspark
import json

conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)

import dask.dataframe as dd
from dask_glm.datasets import make_regression
from dask_ml.linear_model import LinearRegression

from pyspark.sql.functions import col, when

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/05 23:19:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/05 23:19:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/05 23:19:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
test_data_path = '/Users/maludee/repos/Big-Data-Project/data/Parking_Violations_Issued_-_Fiscal_Year_2023.csv'
open_data_path = '/Users/maludee/repos/Big-Data-Project/data/Open_Parking_and_Camera_Violations.csv'
precinct_data_path = "/Users/maludee/repos/Big-Data-Project/data/nypp.csv"

# Precinct Map for Tableau

In [3]:
# YEARLY PARKING DATA - FOR TABLEAU
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(test_data_path)

df.repartition(1).write.mode('overwrite').parquet('tmp/parking_2023')

df = spark.read.parquet('/Users/maludee/repos/Big-Data-Project/notebooks/tmp/parking_2023/*.parquet')

                                                                                

23/05/05 23:11:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [4]:
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [5]:
precinct_keys = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(precinct_data_path)
precinct_keys.show(5)

+--------------------+--------+-------------+---------------+
|            the_geom|Precinct|   Shape_Leng|     Shape_Area|
+--------------------+--------+-------------+---------------+
|MULTIPOLYGON (((-...|       1|80283.5387782|4.72864229826E7|
|MULTIPOLYGON (((-...|       5|18807.1249114|1.80945274385E7|
|MULTIPOLYGON (((-...|       6|24875.9642171|2.20179465474E7|
|MULTIPOLYGON (((-...|       7|17287.5444926| 1.8366669928E7|
|MULTIPOLYGON (((-...|       9|19772.5107407|2.13953862669E7|
+--------------------+--------+-------------+---------------+
only showing top 5 rows



In [6]:
# use sql syntax to parse the nested json
df.createOrReplaceTempView("2023_violations")
query = """
select 
    `Violation Precinct` as violation_precinct,
    count(*) as count_violations
from 
    2023_violations
group by 1
order by 2 desc

"""
precincts = spark.sql(query)
precincts.toPandas().to_csv('violations_by_precinct.csv')

# I think precinct 0 is in error and should be ignored
precincts.show(6)

                                                                                

+------------------+----------------+
|violation_precinct|count_violations|
+------------------+----------------+
|                 0|         5938384|
|                19|          331642|
|                13|          291119|
|                 6|          259154|
|               114|          254978|
|                14|          220291|
+------------------+----------------+
only showing top 6 rows



# ML Section

In [7]:
# OPEN DATASET - FOR MACHINE LEARNING 
# df = spark.read \
#     .option("header", "true") \
#     .option("inferSchema", "true") \
#     .csv(open_data_path)

# df.repartition(1).write.mode('overwrite').parquet('tmp/open_parking_full')

# read in the open data violations file
df = spark.read.parquet('/Users/maludee/repos/Big-Data-Project/notebooks/tmp/open_parking_full/*.parquet')

In [8]:
# select a subset of columns for the model (TODO: add more)
training_df = df.na.drop().select([col("State"), col("License Type"), 
                                   col("Violation"), col("Penalty Amount"), 
                                   col("Violation Time")])

# make into a temp sql view for convenience 
df.createOrReplaceTempView("open_violations")

query = """
select 
    case when lower(State) = 'ny' then 1 else 0 end as in_state,
    case when lower(`License Type`) = 'pas' then 1 else 0 end as passenger_car,
    `Fine Amount` as fine_amount
from 
    open_violations

"""

# query for model training dataset
X_y = spark.sql(query)
X_y.show(3)

+--------+-------------+-----------+
|in_state|passenger_car|fine_amount|
+--------+-------------+-----------+
|       1|            0|       65.0|
|       0|            1|       50.0|
|       1|            1|       50.0|
+--------+-------------+-----------+
only showing top 3 rows



In [10]:
# convert to dask - write training set to parquet
X_y.repartition(1).write.mode('overwrite').parquet('tmp/regression_dataset')

In [28]:
# convert to dask - read from parquet to dask dataframe
X_y = dd.read_parquet('/Users/maludee/repos/Big-Data-Project/notebooks/tmp/regression_dataset/*.parquet')  

X = X_y[['in_state', 'passenger_car']].to_dask_array()
y = X_y['fine_amount'].to_dask_array()

In [29]:
# this is needed to get the arrays to compute? 
X.compute_chunk_sizes()
y.compute_chunk_sizes()

Unnamed: 0,Array,Chunk
Bytes,736.37 MiB,736.37 MiB
Shape,"(96517058,)","(96517058,)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 736.37 MiB 736.37 MiB Shape (96517058,) (96517058,) Dask graph 1 chunks in 3 graph layers Data type float64 numpy.ndarray",96517058  1,

Unnamed: 0,Array,Chunk
Bytes,736.37 MiB,736.37 MiB
Shape,"(96517058,)","(96517058,)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [32]:
# train the model
lr = LinearRegression()
lr.fit(X, y)

In [35]:
# get predictions
yhat = lr.predict(X)

# why is score returning nan?
lr.score(X, y)

nan

In [39]:
y[:30].compute()

array([ 65.,  50.,  50.,  50.,  50., 115.,  50.,  50.,  50.,  50.,  50.,
        50.,  50.,  50.,  65.,  50.,  45., 115.,  50.,  50.,  65.,  50.,
        50.,  50.,  65.,  65.,  50.,  50.,  50.,  50.])

In [38]:
yhat[:30].compute()


array([-0.3496723 , -0.21716544,  2.07784501,  2.07784501,  2.07784501,
        2.07784501,  2.07784501,  2.07784501,  2.07784501,  2.07784501,
       -0.3496723 ,  2.07784501,  2.07784501,  2.07784501,  2.07784501,
       -0.3496723 ,  2.07784501,  2.07784501,  2.07784501,  2.07784501,
        2.07784501,  2.07784501,  2.07784501, -0.21716544, -0.3496723 ,
       -0.3496723 ,  2.07784501,  2.07784501,  2.07784501, -0.3496723 ])