# Question 1

In [0]:
import sys
import scipy
import pandas as pd
import numpy as np

from functools import reduce
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, functions as F, Row
from pyspark.sql.functions import col, when, udf, count, desc, avg, concat, sum, regexp_replace, expr, first
from pyspark.sql.window import Window
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType




### Loading the dataset

In [0]:
spark = SparkSession.builder.appName('FirstDataset').getOrCreate()
df = spark.read.csv("/FileStore/tables/Parking_Violations_Issued___Fiscal_Year_2024_20240407-2.csv", header=True, inferSchema=True)

In [0]:
df_pd = df.limit(10).toPandas()
df_pd

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,1159637337,KZH2758,NY,PAS,2023-06-09,67,VAN,HONDA,P,0,...,BLUE,0,2006,-,0,,,,,
1,1252960645,JPD8746,NY,PAS,2023-06-30,87,SUBN,LINCO,M,17870,...,GRAY,0,2020,-,0,,,,,
2,1252960669,JPD8746,NY,PAS,2023-06-30,31,SUBN,LINCO,M,17870,...,GRAY,0,2020,-,0,,,,,
3,1252994126,MBH9245,99,PAS,2023-07-06,20,SDN,KIA,M,12690,...,WHITE,0,0,-,0,,,,,
4,1252994175,MBH9245,PA,PAS,2023-07-08,40,SDN,KIA,M,12690,...,WHITE,0,0,-,0,,,,,
5,1307574919,LBZ7486,NY,PAS,2023-07-01,14,SUBN,HONDA,P,0,...,BK,0,2014,-,0,,,,,
6,1307574944,JRB4166,FL,PAS,2023-06-07,46,SUBN,HONDA,P,8690,...,,0,0,-,0,,,,,
7,1307575950,LCA7360J,NY,PAS,2023-06-21,46,SUBN,HONDA,P,0,...,GRY,0,2015,-,0,,,,,
8,1307575973,HNE3840,NY,PAS,2023-06-21,14,SDN,MAZDA,P,0,...,WHT,0,2020,-,0,,,,,
9,1307576886,LAG7093,NY,OMT,2023-06-03,46,SDN,HONDA,P,0,...,BLUE,0,2006,-,0,,,,,


In [0]:
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: date (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation County

In [0]:
# Lets check for the number of missing values for each columm
from pyspark.sql.functions import isnull, when, count, col
null_counts = df.select([
    count(when(isnull(c), c)).alias(c) for c in df.columns
]).toPandas()
null_counts



Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,0,1,0,0,0,0,28486,10679,0,0,...,1015121,10490502,0,9381186,0,5519326,227812,10717482,10717482,10717482


In [0]:
df.dtypes

Out[6]: [('Summons Number', 'bigint'),
 ('Plate ID', 'string'),
 ('Registration State', 'string'),
 ('Plate Type', 'string'),
 ('Issue Date', 'date'),
 ('Violation Code', 'int'),
 ('Vehicle Body Type', 'string'),
 ('Vehicle Make', 'string'),
 ('Issuing Agency', 'string'),
 ('Street Code1', 'int'),
 ('Street Code2', 'int'),
 ('Street Code3', 'int'),
 ('Vehicle Expiration Date', 'int'),
 ('Violation Location', 'int'),
 ('Violation Precinct', 'int'),
 ('Issuer Precinct', 'int'),
 ('Issuer Code', 'int'),
 ('Issuer Command', 'string'),
 ('Issuer Squad', 'string'),
 ('Violation Time', 'string'),
 ('Time First Observed', 'string'),
 ('Violation County', 'string'),
 ('Violation In Front Of Or Opposite', 'string'),
 ('House Number', 'string'),
 ('Street Name', 'string'),
 ('Intersecting Street', 'string'),
 ('Date First Observed', 'int'),
 ('Law Section', 'int'),
 ('Sub Division', 'string'),
 ('Violation Legal Code', 'string'),
 ('Days Parking In Effect    ', 'string'),
 ('From Hours In Effect'

In [0]:
# Since we have both numerical and categorical missing values lets find the those colums and handel the fill the missing values seperately
num_cols = [col for col, datatype in df.dtypes if datatype in ('int')]
cat_cols = [col for col in df.columns if col not in num_cols]

### Handel the missing values

In [0]:
'''
we will be replacing missing values in numerical cols with 0 and categorical columns with "na"
Filling 0 for all the numerical cols, is not ideal but since we are not doing any deep analysis and to keep it simple I would be sticking with filling 0's
'''

for feature in num_cols:
  df = df.withColumn(feature, when(col(feature).isNull(), 0).otherwise(col(feature)))

for feature in cat_cols:
  df = df.withColumn(feature, when(col(feature).isNull(), 'na').otherwise(col(feature)))

In [0]:
null_counts = df.select([
    count(when(isnull(c), c)).alias(c) for c in df.columns
]).toPandas()
null_counts

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


### When are tickets most likely to be issued?

In [0]:
# We will be grouping the data based on the violation time and counting and displaying them in descending
ans_1 = df.groupBy('Violation Time').count().alias("Count").orderBy(desc("Count"))
ans_1.show()

+--------------+-----+
|Violation Time|count|
+--------------+-----+
|         0836A|20112|
|         0839A|19531|
|         0840A|19436|
|         0838A|19433|
|         0906A|19412|
|         1139A|19255|
|         1140A|19154|
|         1141A|19135|
|         0841A|18987|
|         1142A|18936|
|         1145A|18917|
|         0837A|18893|
|         0842A|18806|
|         1143A|18738|
|         0910A|18698|
|         1138A|18678|
|         0845A|18648|
|         0908A|18648|
|         0909A|18604|
|         1136A|18540|
+--------------+-----+
only showing top 20 rows



# Question 2: What are the most common years and types of cars to be ticketed?

In [0]:
df.count()

Out[11]: 10717482

In [0]:
# Remember that we have filled all missing values with zeroes, even the column had some missing values so, I will be filtering the rows without 0 in year columns
df_2 = df.filter(df['Vehicle Year'] != 0)
df_2.count()

Out[12]: 8939390

In [0]:
(10717482 - 8939390)*100/10717482

Out[13]: 16.59057603269126

There is about 16% of values being dropped due to year value being 0

In [0]:
# Now we will group by year and vehicle make and sort them in descending order
ans_2 = df_2.groupBy('Vehicle Year', 'Vehicle Body Type').count().\
        orderBy(desc("count"))
ans_2.show(10)

+------------+-----------------+------+
|Vehicle Year|Vehicle Body Type| count|
+------------+-----------------+------+
|        2021|             SUBN|468827|
|        2022|             SUBN|452375|
|        2023|             SUBN|447136|
|        2019|             SUBN|345019|
|        2020|             SUBN|343284|
|        2018|             SUBN|275701|
|        2017|             SUBN|226830|
|        2016|             SUBN|186232|
|        2015|             SUBN|180923|
|        2017|             4DSD|155318|
+------------+-----------------+------+
only showing top 10 rows



# Question 3: Where are tickets most commonly issued? 

In [0]:
# Assuming the question is about the violation location column, because question is vague, not sure about it. But the same code could be used violation county, street code_(1,2,3), and other location features

ans_3 = df.groupBy('Violation Location').count().orderBy(desc("count"))
ans_3.show()

+------------------+-------+
|Violation Location|  count|
+------------------+-------+
|                 0|4923863|
|                19| 276203|
|               114| 213205|
|                 6| 207636|
|                13| 189589|
|                14| 178348|
|               109| 153765|
|                 1| 148286|
|                18| 147809|
|                 9| 142074|
|               115| 135832|
|                61| 116439|
|                66| 115903|
|                20| 115747|
|               112| 109812|
|                70| 107721|
|                84| 104404|
|               103| 104246|
|                52| 103097|
|               108| 102733|
+------------------+-------+
only showing top 20 rows



# Question 4: Which color of the vehicle is most likely to get a ticket? 

In [0]:
# Similar to previous code we need to group by and count the number of cars per color and sort them in descending order
ans_4 = df.groupBy('Vehicle Color').count().orderBy(desc("count"))
ans_4.show(10)

+-------------+-------+
|Vehicle Color|  count|
+-------------+-------+
|           GY|2086345|
|           WH|1924600|
|           BK|1821705|
|           na|1015121|
|           BL| 688919|
|        WHITE| 610936|
|        BLACK| 401993|
|           RD| 393388|
|         GREY| 303176|
|         BLUE| 140721|
+-------------+-------+
only showing top 10 rows



# Question 5: Given a Black vehicle parking illegally at 34510, 10030, 34050 (street codes). What is the probability that it will get an ticket? (very rough prediction). 

In [0]:
# We will be grouping the data based on the violation time and counting and displaying them in descending
df_5 = df.select('Street Code1', 'Street Code2', 'Street Code3', 'Vehicle Color')

In [0]:
# Combine the three feature into one single vector
assembler = VectorAssembler(inputCols=['Street Code1', 'Street Code2', 'Street Code3'], outputCol='features')
feature_vector = assembler.transform(df_5)


In [0]:
kmeans = KMeans(k = 3, seed = 42)
input_features = feature_vector.select('features')
model_1 = kmeans.fit(input_features)

In [0]:
feature_data = model_1.transform(feature_vector)
feature_data.show()

+------------+------------+------------+-------------+--------------------+----------+
|Street Code1|Street Code2|Street Code3|Vehicle Color|            features|prediction|
+------------+------------+------------+-------------+--------------------+----------+
|           0|           0|           0|         BLUE|           (3,[],[])|         1|
|       17870|       25390|       32670|         GRAY|[17870.0,25390.0,...|         0|
|       17870|       25390|       32670|         GRAY|[17870.0,25390.0,...|         0|
|       12690|       41700|       61090|        WHITE|[12690.0,41700.0,...|         2|
|       12690|       41700|       61090|        WHITE|[12690.0,41700.0,...|         2|
|           0|           0|           0|           BK|           (3,[],[])|         1|
|        8690|       21690|       21740|           na|[8690.0,21690.0,2...|         0|
|           0|       61090|           0|          GRY|   [0.0,61090.0,0.0]|         0|
|           0|           0|           0|   

In [0]:

# Lets select the distinct colors which starts with letter B
colors_starting_with_B = df_5.select('Vehicle Color').distinct().filter(col('Vehicle Color').startswith('B')).collect()

for color in colors_starting_with_B:
    print(color['Vehicle Color'])

BLEU
BLLC
BKGL
BLTN
BUR
BLK
BRWN
BRRD
BIEGE
BKBR
BW
BRBL
BRN
BLACK
BL/GY
BK/GY
BK/BL
BURGU
BUS
BLAAC
BURGD
BKWH
BLGY0
BU
BKBK
BRONZ
BKTN
BLB
BL/
BRBK
BED
BEIGE
BLPR
BLUE,
BRZ
B
BKW
BN
BLAK
Beige
BLBL
BLUW
BROWM
BK/GR
BKJ
BIK
BL
BLAC
BKPR
BLR
BLE
BKGY
BLOR
BR
BURG
BERGE
BRGR
BLBK
B LK
BURGE
BK+
BROW
BK/
BKL
BKRD
BWN
BURY
BKL.[
BRO
BROWN
BRON
BKYW
BIG
BK
BLU
BKBL
BBLCK
BLCK
BK,
BKG
BKE
BRW
BIM
BLAE
BLWH
BLRD
Blue,
BLYW
Blue
BG
BGE
BCK
BLUE
BRTN
BKOR
BZ
BLG
Brown
BKGR
BLMR
BLGR
BKC
BLC
BLGL
BRUG
Black
BK.
BLKD
BWH
BURGA
BE
BIR
BLV
BLEVO
BLBR
BLGY
BK3
BROUW
BLA
BRGY
BK/TN
BKMR
BJK
BWC
BLK/U
BUY
BLW
BC
BLK/G
BKTAN
BLUEG
BKLKK
BLT
BLUGY
BUG
BL K
BEIG
BKH
BD
BEI
BY
BLLUE
BH
Bronz
BRY
BK1
BLI
BLIC
BLF
BAIGE
BLK/C
BL/WH
BLKL
BIL
BEA
BK333
BRG
BO
BL/BK
B LAC
BIGG
BLY
BRWH
BKT
BK000
BKI
BUGUR
BRT
BRGL
BLYE
BRK
BK]
BL/GR
BURGN
BWE
B L
BR/GY
BK/RD
BL2G4
BLP
BK```
BKNO
BLJK
BLUE/
BRU
BRU76
BKX
BKGRA
BK0-'
BB
BKD
BURUN
BLYL
BK/WH
BMW
BOW
BERG
BRIGE
BBLK
BLU/W
BUE
BLAKR
BAL
BLURD
BLN
BA
BLKRE
BUL
BREY

There are so many colors, and there is no common standard for color, so I am going to choose top 10, ways to write black from what I could pick up from my eyes, and filter them out.

In [0]:
black_codes = [
    "BLK",
    "BLACK",
    "BLAC",
    "BLCK",
    "B LK",
    "BLK.",
    "BK",
    "BK.",
    "BKG",
    "BKBL",
    "BKGY",
    "BKGR",
    "BKL",
    "BKRD",
    "BKWH",
    "BKBK",
    "BKLKK",
    "BK333", # Unsure whether it is black or not
    "BK000", # Unsure whether it is black or not
    "BK/TN", # Unsure whether it is black or not
    "BK/RD", # Unsure whether it is black or not
    "BK/GR", # Unsure whether it is black or not
    "BK/WH",
    "BKGL", # Unsure whether it is black or not
    "BKGRA", # Unsure whether it is black or not
    "BLK/U", # Unsure whether it is black or not
    "BLKOT", # Unsure whether it is black or not
    "BLC",
    "B LAC", # Unsure whether it is black or not
    "BLKG",
    "BLKRE",
    "BLKGR", # Unsure whether it is black or not
    "BLKR", # Unsure whether it is black or not
    "BLKA",
    "BLKD",
    "BBLACK", # Unsure whether it is black or not
    "BLKDK",
]
len(black_codes)

Out[22]: 37

In [0]:
black_codes = ['BLACK.', 'Black', 'BC', 'BLAC', 'BK/', 'BLK', 'BK.', 'BCK', 'BK', 'B LAC']


In [0]:
total_vehicle_count_df = feature_data.groupBy('prediction').agg(
    count(when(col('Vehicle Color').isin(black_codes), 1)).alias('color count'),
    count('Vehicle Color').alias('car count')
).orderBy('prediction')

total_vehicle_count_df.show()

+----------+-----------+---------+
|prediction|color count|car count|
+----------+-----------+---------+
|         0|     527609|  3553397|
|         1|    1150965|  5860974|
|         2|     234037|  1303111|
+----------+-----------+---------+



In [0]:
# now lets count for each cluster how many black vehicles vs total count of vehicle
black_cars_df = feature_data.groupBy('prediction').\
    agg(
        count(when(col('Vehicle Color').isin(black_codes), 1)).alias("Number of black cars in the cluster"),
        count('Vehicle Color').alias("Total number of cars for cluster")
    ).orderBy("prediction")
black_cars_df.show()

+----------+-----------------------------------+--------------------------------+
|prediction|Number of black cars in the cluster|Total number of cars for cluster|
+----------+-----------------------------------+--------------------------------+
|         0|                             527609|                         3553397|
|         1|                            1150965|                         5860974|
|         2|                             234037|                         1303111|
+----------+-----------------------------------+--------------------------------+



In [0]:
black_cars_probability_df = black_cars_df.select(
    'prediction',
    'Number of black cars in the cluster',
    'Total number of cars for cluster',
    (col('Number of black cars in the cluster') / col('Total number of cars for cluster')).alias('probability')
)
black_cars_probability_df.show()

+----------+-----------------------------------+--------------------------------+-------------------+
|prediction|Number of black cars in the cluster|Total number of cars for cluster|        probability|
+----------+-----------------------------------+--------------------------------+-------------------+
|         0|                             527609|                         3553397| 0.1484801726348055|
|         1|                            1150965|                         5860974|0.19637776929227121|
|         2|                             234037|                         1303111| 0.1795986681103912|
+----------+-----------------------------------+--------------------------------+-------------------+



In [0]:
# Find the closest centroid
models_centroid = np.array(model_1.clusterCenters()).astype(float)


In [0]:
models_centroid

Out[28]: array([[27659.42526624, 22371.66533852, 22545.54207256],
       [ 2399.26895822,   913.02541069,   526.0035948 ],
       [51708.44588333, 52750.83556485, 52813.87202021]])

In [0]:
target_codes = [34510.0, 10030.0, 34050.0]
start_index = -1
distance = np.inf

for index, clusters_center in enumerate(models_centroid):
    euclidian_distance = np.sum(np.square(np.subtract(target_codes, clusters_center)))
    if euclidian_distance < distance:
        distance = euclidian_distance
        ans = index


print(f"Closet Centroid for given data: {ans}")


Closet Centroid for given data: 0


# Question 6: For each pair of the players (A, B), we define the fear score of A when facing B is the hit rate, such that B is closet defender when A is shooting. Based on the fear score, for each player, please find out who is his ”most unwanted defender”. 

In [0]:
spark = SparkSession.builder.appName("NBA_data").getOrCreate()

nba_data = spark.read.csv("/FileStore/tables/shot_logs-1.csv", inferSchema = True, header = True)


In [0]:
nba_pd = nba_data.limit(10).toPandas()
nba_pd

Unnamed: 0,GAME_ID,MATCHUP,LOCATION,W,FINAL_MARGIN,SHOT_NUMBER,PERIOD,GAME_CLOCK,SHOT_CLOCK,DRIBBLES,...,SHOT_DIST,PTS_TYPE,SHOT_RESULT,CLOSEST_DEFENDER,CLOSEST_DEFENDER_PLAYER_ID,CLOSE_DEF_DIST,FGM,PTS,player_name,player_id
0,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,1,1,2024-04-12 01:09:00,10.8,2,...,7.7,2,made,"Anderson, Alan",101187,1.3,1,2,brian roberts,203148
1,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,2,1,2024-04-12 00:14:00,3.4,0,...,28.2,3,missed,"Bogdanovic, Bojan",202711,6.1,0,0,brian roberts,203148
2,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,3,1,2024-04-12 00:00:00,,3,...,10.1,2,missed,"Bogdanovic, Bojan",202711,0.9,0,0,brian roberts,203148
3,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,4,2,2024-04-12 11:47:00,10.3,2,...,17.2,2,missed,"Brown, Markel",203900,3.4,0,0,brian roberts,203148
4,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,5,2,2024-04-12 10:34:00,10.9,2,...,3.7,2,missed,"Young, Thaddeus",201152,1.1,0,0,brian roberts,203148
5,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,6,2,2024-04-12 08:15:00,9.1,2,...,18.4,2,missed,"Williams, Deron",101114,2.6,0,0,brian roberts,203148
6,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,7,4,2024-04-12 10:15:00,14.5,11,...,20.7,2,missed,"Jack, Jarrett",101127,6.1,0,0,brian roberts,203148
7,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,8,4,2024-04-12 08:00:00,3.4,3,...,3.5,2,made,"Plumlee, Mason",203486,2.1,1,2,brian roberts,203148
8,21400899,"MAR 04, 2015 - CHA @ BKN",A,W,24,9,4,2024-04-12 05:14:00,12.4,0,...,24.6,3,missed,"Morris, Darius",202721,7.3,0,0,brian roberts,203148
9,21400890,"MAR 03, 2015 - CHA vs. LAL",H,W,1,1,2,2024-04-12 11:32:00,17.4,0,...,22.4,3,missed,"Ellington, Wayne",201961,19.8,0,0,brian roberts,203148


In [0]:
nba_data.printSchema()

root
 |-- GAME_ID: integer (nullable = true)
 |-- MATCHUP: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- W: string (nullable = true)
 |-- FINAL_MARGIN: integer (nullable = true)
 |-- SHOT_NUMBER: integer (nullable = true)
 |-- PERIOD: integer (nullable = true)
 |-- GAME_CLOCK: timestamp (nullable = true)
 |-- SHOT_CLOCK: double (nullable = true)
 |-- DRIBBLES: integer (nullable = true)
 |-- TOUCH_TIME: double (nullable = true)
 |-- SHOT_DIST: double (nullable = true)
 |-- PTS_TYPE: integer (nullable = true)
 |-- SHOT_RESULT: string (nullable = true)
 |-- CLOSEST_DEFENDER: string (nullable = true)
 |-- CLOSEST_DEFENDER_PLAYER_ID: integer (nullable = true)
 |-- CLOSE_DEF_DIST: double (nullable = true)
 |-- FGM: integer (nullable = true)
 |-- PTS: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_id: integer (nullable = true)



In [0]:
null_counts = nba_data.select([
    count(when(isnull(c), c)).alias(c) for c in nba_data.columns
]).toPandas()

for column, value in null_counts.iloc[0].items():
    if value > 0:
        print(f"{column}: {value}")

SHOT_CLOCK: 5567


We need to handel the missing data only for numerical here

In [0]:
numerical_cols = [cols for cols, datatype in nba_data.dtypes if datatype in ('int', 'double')]
# Lets fill the null values with average of the column since we have less missing values to handel
# Calculate the average for all the numerical valeus
averages = {col: nba_data.select(avg(col)).first()[0] for col in numerical_cols}

# Now, fill the null values with the respective averages
for feature in numerical_cols:
    nba_data = nba_data.withColumn(feature, when(col(feature).isNull(), averages[feature]).otherwise(col(feature)))


In [0]:
null_counts = nba_data.select([
    count(when(isnull(c), c)).alias(c) for c in nba_data.columns
]).toPandas()

flag = False
for column, value in null_counts.iloc[0].items():
    if value > 0:
        print(f"{column}: {value}")
        flag = True

print(flag)


False


We have removed all the null values

In [0]:
nba_data.select('Player_name').distinct().collect()

Out[36]: [Row(Player_name='andre iguodala'),
 Row(Player_name='marreese speights'),
 Row(Player_name='marcin gortat'),
 Row(Player_name='wesley johnson'),
 Row(Player_name='trevor booker'),
 Row(Player_name='mike scott'),
 Row(Player_name='amir johnson'),
 Row(Player_name='gary neal'),
 Row(Player_name='jeremy lin'),
 Row(Player_name='dante exum'),
 Row(Player_name='trey burke'),
 Row(Player_name='jrue holiday'),
 Row(Player_name='luke babbitt'),
 Row(Player_name='michael kidd-gilchrist'),
 Row(Player_name='wayne ellington'),
 Row(Player_name='gordon hayward'),
 Row(Player_name='jordan hill'),
 Row(Player_name='nene hilario'),
 Row(Player_name='gerald henderson'),
 Row(Player_name='enes kanter'),
 Row(Player_name='rasual butler'),
 Row(Player_name='tyler hansbrough'),
 Row(Player_name='brian roberts'),
 Row(Player_name='kyle korver'),
 Row(Player_name='demarre carroll'),
 Row(Player_name='otto porter'),
 Row(Player_name='jason maxiell'),
 Row(Player_name='dante cunningham'),
 Row(Playe

In [0]:
nba_data.createTempView("nbatemp")

In [0]:
# Combine the players by their ids and closest defender and caculate the how many times the player has made or missed the sot when an particular defender is nearby

nba_grouped_ppid = spark.sql('''SELECT player_id AS PlayerID, CLOSEST_DEFENDER_PLAYER_ID AS DefenderID,
                        SUM(CASE WHEN SHOT_RESULT = 'made' THEN 1 ELSE 0 END) AS made_shot,
                        SUM(CASE WHEN SHOT_RESULT = 'missed' THEN 1 ELSE 0 END) AS missed_shot FROM nbatemp
                        GROUP BY player_id, CLOSEST_DEFENDER_PLAYER_ID''')
nba_grouped_ppid.show()


+--------+----------+---------+-----------+
|PlayerID|DefenderID|made_shot|missed_shot|
+--------+----------+---------+-----------+
|203148.0|  101111.0|        0|          1|
|202390.0|  201581.0|        0|          1|
|201945.0|  203488.0|        1|          0|
|201945.0|  203084.0|        1|          2|
|203077.0|    2403.0|        0|          1|
|202362.0|  201959.0|        1|          0|
|101107.0|    1717.0|        3|          2|
|101131.0|  202334.0|        1|          0|
|202330.0|  202325.0|        8|          4|
|202330.0|  203110.0|        3|          3|
|203957.0|  201145.0|        0|          1|
|203504.0|  101150.0|        0|          3|
|  2430.0|  203076.0|        4|          5|
|   977.0|  203076.0|        0|          3|
|   977.0|  201609.0|        1|          3|
|   977.0|    2744.0|        0|          1|
|101179.0|  203084.0|        0|          1|
|202325.0|    1938.0|        4|          1|
|201941.0|    2403.0|        2|          3|
|201941.0|    1495.0|        1| 

In [0]:
# Calculate the ratio for each defender and player pair and removing the duplicates, and remove the null values
nba_grouped_ppid = nba_grouped_ppid.withColumn("Ratio", expr("made_shot / (made_shot + missed_shot)"))
nba_grouped_ppid = nba_grouped_ppid.dropDuplicates(["PlayerID", "Ratio"]).filter("Ratio is not null")
nba_grouped_ppid.show()

+--------+----------+---------+-----------+-------------------+
|PlayerID|DefenderID|made_shot|missed_shot|              Ratio|
+--------+----------+---------+-----------+-------------------+
|202391.0|  202684.0|        2|          3|                0.4|
|  1718.0|  200782.0|        3|          5|              0.375|
|202338.0|  201599.0|        2|          1| 0.6666666666666666|
|101161.0|  202332.0|        2|          1| 0.6666666666666666|
|201600.0|  200794.0|        4|          1|                0.8|
|203148.0|  201571.0|        2|          3|                0.4|
|202691.0|  201961.0|        1|          4|                0.2|
|  2744.0|    2199.0|        5|         13| 0.2777777777777778|
|  1938.0|  201155.0|        2|          1| 0.6666666666666666|
|203521.0|  203114.0|        1|          2| 0.3333333333333333|
|203142.0|  101187.0|        1|          3|               0.25|
|201949.0|  201933.0|        1|          2| 0.3333333333333333|
|201589.0|    2207.0|        3|         

In [0]:
# We will now group each player and calculate min ratio for combo(pair of defender and player) and get the minimum
df_6 = nba_grouped_ppid.groupBy("PlayerID").agg({"Ratio": "min"}).withColumn("Ratio", col("min(Ratio)"))

combined_data = nba_grouped_ppid.join(df_6, ["PlayerID", "Ratio"]).drop("Ratio").select("PlayerID", "DefenderID")

combined_data = combined_data.join(nba_data, (combined_data["PlayerID"] == nba_data["player_id"]) & (combined_data["DefenderID"] == nba_data["CLOSEST_DEFENDER_PLAYER_ID"]))

In [0]:
# displaying the relevant answer
df_6 = combined_data.groupBy("PlayerID", "DefenderID") \
                    .agg(first("player_name").alias("Player Name"), first("CLOSEST_DEFENDER").alias("Useless Player/Kuppa Paiyan")) \
                    .orderBy("PlayerID") \
                    .limit(10)

df_6.show()

+--------+----------+--------------+---------------------------+
|PlayerID|DefenderID|   Player Name|Useless Player/Kuppa Paiyan|
+--------+----------+--------------+---------------------------+
|   708.0|  202344.0| kevin garnett|             Booker, Trevor|
|   977.0|  203076.0|   kobe bryant|             Davis, Anthony|
|  1495.0|  201144.0|    tim duncan|               Conley, Mike|
|  1713.0|  202330.0|  vince carter|            Hayward, Gordon|
|  1717.0|  202738.0|dirk nowtizski|             Thomas, Isaiah|
|  1718.0|  200765.0|   paul pierce|               Rondo, Rajon|
|  1889.0|  203473.0|  andre miller|            Dedmon, Dewayne|
|  1890.0|  201564.0|  shawn marion|                 Mayo, O.J.|
|  1891.0|  203100.0|   jason terry|               Wroten, Tony|
|  1938.0|  202390.0| manu ginobili|                 Neal, Gary|
+--------+----------+--------------+---------------------------+



# Question 7: For each player, we define the comfortable zone of shooting is a matrix of, {SHOT DIST, CLOSE DEF DIST, SHOT CLOCK} Please develop a Spark-based algorithm to classify each player’s records intocomfort- able zones. Considering the hit rate, which zone is the best for James Harden, Chris Paul, Stephen Curry, and Lebron James

In [0]:
# Fill the zone information based on the shot distance, close def distance, low clock
df_7 = nba_data.withColumn(
    'zone',
    concat(
        f.when(nba_data.SHOT_DIST <= 15, '_Close_range_shot').otherwise('_Long_shot'),
        f.when(nba_data.SHOT_CLOCK <= 12, '_Quick').otherwise('_Slow'),
        f.when(nba_data.CLOSE_DEF_DIST <= 5, '_close_by').otherwise('_distant')  
    )
)
df_7.show()



[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-3982461511713310>:5[0m
[1;32m      1[0m [38;5;66;03m# Fill the zone information based on the shot distance, close def distance, low clock[39;00m
[1;32m      2[0m df_7 [38;5;241m=[39m nba_data[38;5;241m.[39mwithColumn(
[1;32m      3[0m     [38;5;124m'[39m[38;5;124mzone[39m[38;5;124m'[39m,
[1;32m      4[0m     concat(
[0;32m----> 5[0m         f[38;5;241m.[39mwhen(nba_data[38;5;241m.[39mSHOT_DIST [38;5;241m<[39m[38;5;241m=[39m [38;5;241m15[39m, [38;5;124m'[39m[38;5;124m_Close_range_shot[39m[38;5;124m'[39m)[38;5;241m.[39motherwise([38;5;124m'[39m[38;5;124m_Long_shot[39m[38;5;124m'[39m),
[1;32m      6[0m         f[38;5;241m.[39mwhen(nba_data[38;5;241m.[39mSHOT_CLOCK [38;5;241m<[39m[38;5;241m=[39m [38;5;241m12[39m, [38;5;124m'[39m[38;5;124

In [0]:
# Calculate ratio for each player in different zones
ratio_for_player_for_zones = df_7.filter(
    df_7.player_name.isin(['james harden', 'chris paul', 'stephen curry', 'lebron james'])
).groupBy('player_name', 'zone').agg(
    f.sum('FGM'),
    f.count('SHOT_NUMBER').alias('Total count of shots'),
    (f.sum('FGM') / f.count('SHOT_NUMBER')).alias('hit_rate')
)
ratio_for_player_for_zones.show()





In [0]:
# Get the best zone for each player based on the ratio of above table
player_best_zones_df = ratio_for_player_for_zones.withColumn(
    'rank',
    f.rank().over(Window.partitionBy('player_name').orderBy(f.desc('hit_rate')))
).filter(f.col('rank') == 1).drop('rank')
player_best_zones_df.show()

