# The following code was used to connect Databricks to an AWS S3 instance (which is being used to store data)

In [0]:
%fs ls /FileStore/tables/rootkey.csv

path,name,size,modificationTime
dbfs:/FileStore/tables/rootkey.csv,rootkey.csv,99,1681446184000


In [0]:
aws_keys_df = spark.read.format('csv')\
                        .option('header','true')\
                        .option('inferschema','true')\
                        .load('/FileStore/tables/rootkey.csv')
aws_keys_df.columns

Out[18]: ['Access key ID', 'Secret access key']

In [0]:
access_key = aws_keys_df.select('Access key ID').take(1)[0]['Access key ID']
secret_key = aws_keys_df.select('Secret access key').take(1)[0]['Secret access key']

In [0]:
import urllib

encoded_secret_key = urllib.parse.quote(string=secret_key,safe='')

In [0]:
aws_s3_bucket = 'iucloudcomputingdata'
mount_name = '/mnt/mount_s3'

source_url = 's3a://%s:%s@%s' % (access_key, encoded_secret_key, aws_s3_bucket)

In [0]:
dbutils.fs.mount(source_url, mount_name)

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
[0;32m<command-4405516673294176>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdbutils[0m[0;34m.[0m[0mfs[0m[0;34m.[0m[0mmount[0m[0;34m([0m[0msource_url[0m[0;34m,[0m [0mmount_name[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/dbutils.py[0m in [0;36mf_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    360[0m                     [0mexc[0m[0;34m.[0m[0m__context__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[1;32m    361[0m                     [0mexc[0m[0;34m.[0m[0m__cause__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 362[0;31m                     [0;32mraise[0m [0mexc[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    363[0m [0;34m[0m[0m
[1;32m    364[0m             

In [0]:
%fs ls '/mnt/mount_s3'



# Problem 1: NY Parking Violation Data
### Import Data

In [0]:
ny_data = spark.read.format('csv')\
                    .option('header','true')\
                    .option('inferschema','true')\
                    .load('/mnt/mount_s3/Parking_Violations_Issued_-_Fiscal_Year_2023.csv')

### Data Shape

In [0]:
print(ny_data.count())
print(len(ny_data.columns))

11535314
43


### Print Available Columns

In [0]:
print(*ny_data.columns, sep='\n')

Summons Number
Plate ID
Registration State
Plate Type
Issue Date
Violation Code
Vehicle Body Type
Vehicle Make
Issuing Agency
Street Code1
Street Code2
Street Code3
Vehicle Expiration Date
Violation Location
Violation Precinct
Issuer Precinct
Issuer Code
Issuer Command
Issuer Squad
Violation Time
Time First Observed
Violation County
Violation In Front Of Or Opposite
House Number
Street Name
Intersecting Street
Date First Observed
Law Section
Sub Division
Violation Legal Code
Days Parking In Effect    
From Hours In Effect
To Hours In Effect
Vehicle Color
Unregistered Vehicle?
Vehicle Year
Meter Number
Feet From Curb
Violation Post Code
Violation Description
No Standing or Stopping Violation
Hydrant Violation
Double Parking Violation


In [0]:
ny_data.select('Violation Code').show()

+--------------+
|Violation Code|
+--------------+
|            67|
|            51|
|            63|
|            63|
|            63|
|            63|
|            63|
|            67|
|            98|
|            10|
|            10|
|            51|
|            63|
|            45|
|            14|
|            20|
|            68|
|            68|
|            27|
|            20|
+--------------+
only showing top 20 rows



### Import pyspark.sql.functions necessary for analysis

In [0]:
from pyspark.sql.functions import col, concat, concat_ws, date_format, desc, substring, when

### Select Columns Related to time and date

In [0]:
tmp_data = ny_data.select(date_format('Issue Date', 'MM/dd/yyy').alias('issue_date'), \
                          concat_ws(':', substring('Violation Time', 1, 2), substring('Violation Time', 3, 2)).alias('violation_time'), \
                          when(substring('Violation Time', 5, 1) == 'A', 'AM').otherwise('PM').alias('am_pm'))

## Answer: 1A
### Decided to answer the question using only the hour
#### Return a count of violations by hour (24 hour format)

In [0]:
violation_hour = tmp_data.select(when(tmp_data.am_pm == 'AM', substring('violation_time', 1, 2).cast('int') + 12).otherwise(substring('violation_time', 1, 2)).alias('hour'))
violation_hour.groupBy('hour').count().filter(col('hour') <= 24).sort(desc('count')).show(24)

+----+------+
|hour| count|
+----+------+
|  21|999544|
|  20|983672|
|  23|983667|
|  12|950445|
|  01|891400|
|  22|815667|
|  02|797194|
|  03|662558|
|  19|639411|
|  04|528759|
|  05|455674|
|  18|359228|
|  06|333295|
|  08|278031|
|  07|262995|
|  09|261790|
|  10|217488|
|  11|200222|
|  13|186625|
|  17|172151|
|  24|161468|
|  14|149034|
|  15|128231|
|  16|116514|
+----+------+
only showing top 24 rows



## Answer: 1B (Part 1)
### Count Violations by Vehicle Year

In [0]:
ny_data.groupBy('Vehicle Year').count().sort(desc('count')).show()

+------------+-------+
|Vehicle Year|  count|
+------------+-------+
|           0|1883469|
|        2021|1032410|
|        2019| 917912|
|        2020| 914768|
|        2022| 820878|
|        2018| 755337|
|        2017| 685892|
|        2016| 580607|
|        2015| 538982|
|        2014| 419498|
|        2013| 408223|
|        2012| 336735|
|        2011| 313014|
|        2008| 269492|
|        2010| 258771|
|        2007| 251944|
|        2009| 215989|
|        2006| 207614|
|        2005| 161609|
|        2004| 142200|
+------------+-------+
only showing top 20 rows



## Answer: 1B (Part 2)
### Count Violations by Vehicle Make

In [0]:
ny_data.groupBy('Vehicle Make').count().sort(desc('count')).show()

+------------+-------+
|Vehicle Make|  count|
+------------+-------+
|       HONDA|1394250|
|       TOYOT|1333368|
|        FORD|1045269|
|       NISSA| 954362|
|       CHEVR| 610002|
|       ME/BE| 594653|
|         BMW| 583146|
|        JEEP| 533310|
|       HYUND| 392388|
|       LEXUS| 293765|
|       ACURA| 260718|
|       FRUEH| 251595|
|       DODGE| 247030|
|       SUBAR| 246033|
|         KIA| 221053|
|       VOLKS| 217459|
|        AUDI| 215080|
|       MAZDA| 192889|
|       INFIN| 191820|
|         RAM| 168228|
+------------+-------+
only showing top 20 rows



## Answer: 1C
### Count Violations by Location

In [0]:
ny_data.groupBy('Violation Location').count().sort(desc('count')).show()

+------------------+-------+
|Violation Location|  count|
+------------------+-------+
|              null|5349526|
|                19| 282466|
|                13| 254057|
|                 6| 224686|
|               114| 221523|
|                14| 190012|
|                18| 176733|
|                 9| 162228|
|                 1| 152429|
|               109| 137833|
|               115| 127523|
|               108| 124916|
|                20| 120919|
|                70| 117886|
|                84| 109848|
|                10| 109700|
|                52| 104836|
|               112| 103131|
|                17| 100624|
|                66|  98944|
+------------------+-------+
only showing top 20 rows



## Answer: 1D
### Count Violations by Vehicle Color

In [0]:
ny_data.groupBy('Vehicle Color').count().sort(desc('count')).show()

+-------------+-------+
|Vehicle Color|  count|
+-------------+-------+
|           GY|2275457|
|           WH|2055818|
|           BK|1992788|
|         null|1032007|
|           BL| 760235|
|        WHITE| 671757|
|           RD| 435989|
|        BLACK| 424056|
|         GREY| 308993|
|        SILVE| 151063|
|         BLUE| 150435|
|           GR| 145075|
|        BROWN| 139575|
|          RED| 123270|
|          BLK|  86555|
|           TN|  84162|
|           BR|  77050|
|           YW|  64612|
|          GRY|  63857|
|          WHI|  54140|
+-------------+-------+
only showing top 20 rows



## KMEAN Cluster Attempt (COULD NOT COMPLETE)
#### Rename columns to remove space

In [0]:
ny_data = ny_data.withColumnRenamed('Vehicle Color', 'Vehicle_Color')
ny_data = ny_data.withColumnRenamed('Street Code1', 'Street_Code1')
ny_data = ny_data.withColumnRenamed('Street Code2', 'Street_Code2')
ny_data = ny_data.withColumnRenamed('Street Code3', 'Street_Code3')

### Select Vehicle Color and Street Codes (1, 2, and 3)

In [0]:
kmeans_data = ny_data.select('Vehicle_Color', 'Street_Code1', 'Street_Code2', 'Street_Code3')

In [0]:
kmeans_data.show()

+-------------+------------+------------+------------+
|Vehicle_Color|Street_Code1|Street_Code2|Street_Code3|
+-------------+------------+------------+------------+
|          BLK|       34330|         179|           0|
|         GRAY|       34310|       16400|       11010|
|         GRAY|       30640|       13015|       28540|
|         null|       30640|       13015|       28540|
|         BLUE|       30640|       13015|       28540|
|         null|       30640|       13015|       28540|
|         BLUE|       30640|       13015|       28540|
|        WHITE|       11585|       26390|       15010|
|          BLK|           0|           0|           0|
|          RED|       33190|       25190|       31990|
|           GY|       33190|       25190|       31990|
|         GRAY|       30640|       24050|           0|
|          WHT|       30640|       13015|       28540|
|          BLK|           0|           0|           0|
|        BLACK|       33340|           0|           0|
|         

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

indexer = StringIndexer(inputCol='Vehicle_Color', outputCol="Vehicle_Color_Index")
indexed = indexer.fit(ny_data).transform(ny_data)
indexed.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-609361833306670>[0m in [0;36m<cell line: 5>[0;34m()[0m
[1;32m      3[0m [0mindexer[0m [0;34m=[0m [0mStringIndexer[0m[0;34m([0m[0minputCol[0m[0;34m=[0m[0;34m'Vehicle_Color'[0m[0;34m,[0m [0moutputCol[0m[0;34m=[0m[0;34m"Vehicle_Color_Index"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0mindexed[0m [0;34m=[0m [0mindexer[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mny_data[0m[0;34m)[0m[0;34m.[0m[0mtransform[0m[0;34m([0m[0mny_data[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 5[0;31m [0mindexed[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34

In [0]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=['Vehicle_Color', 'Street_Code1', 'Street_Code2', 'Street_Code3'], outputCol="features")
new_df = vecAssembler.transform(kmeans_data)
new_df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
[0;32m<command-609361833306668>[0m in [0;36m<cell line: 4>[0;34m()[0m
[1;32m      2[0m [0;34m[0m[0m
[1;32m      3[0m [0mvecAssembler[0m [0;34m=[0m [0mVectorAssembler[0m[0;34m([0m[0minputCols[0m[0;34m=[0m[0;34m[[0m[0;34m'Vehicle_Color'[0m[0;34m,[0m [0;34m'Street_Code1'[0m[0;34m,[0m [0;34m'Street_Code2'[0m[0;34m,[0m [0;34m'Street_Code3'[0m[0;34m][0m[0;34m,[0m [0moutputCol[0m[0;34m=[0m[0;34m"features"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m [0mnew_df[0m [0;34m=[0m [0mvecAssembler[0m[0;34m.[0m[0mtransform[0m[0;34m([0m[0mkmeans_data[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      5[0m [0mnew_df[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/ml/base.py

In [0]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

In [0]:
transformed = model.transform(new_df)
transformed.show()   

+---+-----+-----+-----+-----------------+----------+
|BLK|34510|10030|34050|         features|prediction|
+---+-----+-----+-----+-----------------+----------+
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    1|    0|    0|[1.0,1.0,0.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|         1|
|  1|    0|    1|    0|[1.0,0.0,1.0,0.0]|     

# Problem 2: NBA Shot Data
### Import Data

In [0]:
nba_data = spark.read.format('csv')\
                      .option('header','true')\
                      .option('inferschema','true')\
                      .load('/mnt/mount_s3/shot_logs.csv')

### Print Available Columns

In [0]:
print(*nba_data.columns, sep='\n')

GAME_ID
MATCHUP
LOCATION
W
FINAL_MARGIN
SHOT_NUMBER
PERIOD
GAME_CLOCK
SHOT_CLOCK
DRIBBLES
TOUCH_TIME
SHOT_DIST
PTS_TYPE
SHOT_RESULT
CLOSEST_DEFENDER
CLOSEST_DEFENDER_PLAYER_ID
CLOSE_DEF_DIST
FGM
PTS
player_name
player_id


### Select Player, Closest Defender, and Shot Result (1 if Made, 0 otherwise)

In [0]:
nba_hit_mx = nba_data.select('player_name', \
                             'CLOSEST_DEFENDER', \
                             when(nba_data.SHOT_RESULT == 'made', 1).otherwise(0).alias('hit'))

### Take the average hit rate by Player and Closest Defender (Determines defensive play against each player)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

hit_rt_mx = nba_hit_mx.groupBy('player_name', 'CLOSEST_DEFENDER').avg('hit')
hit_rt_mx = hit_rt_mx.withColumnRenamed('avg(hit)', 'hit_rt')

windowPartition = Window.partitionBy('player_name').orderBy('hit_rt')

hit_rt_mx = hit_rt_mx.withColumn('row_number', \
                                 row_number().over(windowPartition))
final_hit_rt_mx = hit_rt_mx.filter(hit_rt_mx.row_number == 1)

## Answer: 2A
### Sort the avg hit rt in ascending order (lower numbers indicate better defensive play)

In [0]:
final_hit_rt_mx.show()

+----------------+-----------------+------+----------+
|     player_name| CLOSEST_DEFENDER|hit_rt|row_number|
+----------------+-----------------+------+----------+
|    aaron brooks|    Nurkic, Jusuf|   0.0|         1|
|    aaron gordon|   Rivers, Austin|   0.0|         1|
| al farouq aminu|   Johnson, James|   0.0|         1|
|      al horford|      Diaw, Boris|   0.0|         1|
|    al jefferson|Hardaway Jr., Tim|   0.0|         1|
|   alan anderson|       Leuer, Jon|   0.0|         1|
|     alan crabbe| Sefolosha, Thabo|   0.0|         1|
|        alex len|  Knight, Brandon|   0.0|         1|
|   alexis ajinca|     Meeks, Jodie|   0.0|         1|
|      alonzo gee|     Korver, Kyle|   0.0|         1|
|amare stoudemire|       Deng, Luol|   0.0|         1|
|    amir johnson|       Tucker, PJ|   0.0|         1|
|  andre drummond|    James, LeBron|   0.0|         1|
|  andre iguodala| Webster, Martell|   0.0|         1|
|    andre miller|     Turner, Evan|   0.0|         1|
|  andre r

## Answer 2B (COULD NOT COMPLETE)