In [1]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
#spark = SparkSession.builder.getOrCreate()
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.memory.fraction", 0.8) \
.config("spark.executor.memory", "14g") \
.config("spark.driver.memory", "12g")\
.config("spark.sql.shuffle.partitions" , "8000") \
.getOrCreate()

In [16]:
df = spark.read.format("csv").option("header", "true").load(r"C:\Users\User\Downloads\2014.csv")

In [17]:
df=df.select([c for c in df.columns if c in {'Registration State','Plate Type','Vehicle Body Type',
'Issuing Agency','Vehicle Make','Violation Location','Violation Precinct','Issuer Precinct',
'Issuer Command','Violation County','Violation In Front Of Or Opposite','House Number','Street Name','Date First Observed',
'Law Section','Vehicle Color','Sub Division','Issue Date','Street Code1','Street Code2',
'Street Code3','Vehicle Expiration Date','Violation Time','Vehicle Year','Feet From Curb',
'Violation Post Code'}])

In [18]:
df = df.select([F.col(col).alias(col.replace(' ', '_')) for col in df.columns])

In [19]:
df=df.withColumn('Vehicle_Expiration_Month', F.substring('Vehicle_Expiration_Date',5, 2))\
  .withColumn('Vehicle_Expiration_Day', F.substring('Vehicle_Expiration_Date',7, 2))\
.withColumn('Violation_Hour', F.substring('Violation_Time',1, 2))\
.withColumn('Violation_Minute', F.substring('Violation_Time',3, 2))\
.withColumn('Issue_Month', F.substring('Issue_Date',5, 2))\
  .withColumn('Issue_Day', F.substring('Issue_Date',4, 2))

In [20]:
numeric=['Street_Code1','Street_Code2','Street_Code3','Vehicle_Expiration_Date','Violation_Time',
         'Vehicle_Year','Feet_From_Curb','Violation_Post_Code','Vehicle_Expiration_Month',
        'Vehicle_Expiration_Day','Violation_Hour','Violation_Minute','Issue_Month',
        'Issue_Day']
for n in range(len(numeric)):
    df = df.withColumn(numeric[n], df[numeric[n]].cast(IntegerType()))

In [7]:
from pyspark.sql.functions import *
amount_missing_df = df.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df.columns])
amount_missing_df.show()

+------------------+----------+----------+-------------------+--------------------+--------------+------------+------------+------------+-----------------------+-------------------+------------------+---------------+-------------------+------------------+-------------------+---------------------------------+-------------------+--------------------+-------------------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+------------------------+----------------------+--------------------+-------------------+-----------+---------+
|Registration_State|Plate_Type|Issue_Date|  Vehicle_Body_Type|        Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date| Violation_Location|Violation_Precinct|Issuer_Precinct|     Issuer_Command|    Violation_Time|   Violation_County|Violation_In_Front_Of_Or_Opposite|       House_Number|         Street_Name|Date_First_Observed|Law_Section|        Sub_Division

In [21]:
df=df.select([c for c in df.columns if c not in {'Violation_Time','Feet_From_Curb',
                                            'Violation_Post_Code','Vehicle_Year',
                                             'Violation_Hour','Issue_Month','Vehicle_Expiration_Date',
                                             'Violation_Time','Issue_Date'}])

In [22]:
gg=['Registration_State','Plate_Type','Vehicle_Body_Type','Issuing_Agency','Vehicle_Make','Violation_Location',
   'Violation_Precinct','Issuer_Command','Violation_County','Violation_In_Front_Of_Or_Opposite',
   'House_Number','Street_Name','Date_First_Observed','Law_Section','Vehicle_Color','Sub_Division',
   'Issuer_Precinct']
key= list(set(df.columns) - set(gg))

In [23]:
for i in range(len(gg)):
    df = df.where(df[gg[i]].isNotNull())

In [24]:
df=df.drop(df.Violation_Minute)
key.remove('Violation_Minute')

In [11]:
df = df.sample(fraction=0.01, seed=3)
print((df.count(), len(df.columns)))

(80426, 23)


In [41]:
from pyspark.ml.feature import StringIndexer
categorical = gg
for i in range(len(categorical)):
    indexer = StringIndexer(inputCol=categorical[i], outputCol=categorical[i]+'_Index')
    df = indexer.fit(df).transform(df)

In [42]:
for i in range(len(gg)):
    df = df.drop(df[gg[i]])

In [43]:
for i in range(len(["Vehicle_Expiration_Month", "Vehicle_Expiration_Day"])):
    df = df.where(df[["Vehicle_Expiration_Month", "Vehicle_Expiration_Day"][i]].isNotNull())

In [15]:
# Do not run
from pyspark.ml.feature import *
categorical = [cc +'_Index' for cc in gg]
for i in range(len(categorical)):
    encoder = OneHotEncoder(inputCol=categorical[i], outputCol=categorical[i]+'_Encoded')
    df = encoder.fit(df).transform(df)

In [44]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
for i in range(len(df.drop('Registration_State_Index').columns)):
    df = df.withColumn(df.drop('Registration_State_Index').columns[i], 
                       list_to_vector_udf(df.drop('Registration_State_Index').columns[i]))

In [45]:
#https://stackoverflow.com/questions/65929680/what-data-type-does-vectorassembler-require-for-an-input
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(outputCol="features", inputCols=
                              [x for x in [cc +'_Index' for cc in gg]+key if x != 'Registration_State_Index'])
df1=vecAssembler.transform(df)

In [46]:
from pyspark.ml.feature import StringIndexer
label_Indexer = StringIndexer()\
                .setInputCol('Registration_State_Index')\
                .setOutputCol('label')
df1 = label_Indexer.fit(df1).transform(df1) 

In [47]:
from pyspark.ml.feature import StandardScaler
scaler=StandardScaler()\
       .setInputCol('features')\
       .setOutputCol('final_features')
df2=scaler.fit(df1).transform(df1)

In [48]:
train, test = df2.randomSplit([0.8, 0.2], seed=2021)

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='final_features',labelCol='Registration_State_Index',maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
lrModel = lr.fit(train)
perdiction=lrModel.transform(test)

In [None]:
perdiction.show(1)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
print('Test area under ROC', evaluator.evaluate(perdiction))

In [None]:
accuracy = perdiction.filter(perdiction.Vehicle_Color_Index == perdiction.prediction).count() / float(perdiction.count())

In [22]:
accuracy

0.16056889100566088

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid= (ParamGridBuilder()
           .addGrid(lr.regParam, [0.01, 0.5, 2.0])
           .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
           .addGrid(lr.maxIter, [1, 5, 10])
           .build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, 
                   evaluator=evaluator, numFolds=3)
cvModel = cv.fit(train)

In [None]:
predictions = cvModel.transform(test)
print('Best Model Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
weights = cvModel.bestModel.coefficients
weights = [(float(w), ) for w in weights]
weightsDF = sqlContext.createDataFrame(weights, ['Feature Weight'])
weightsDF.show(10)

In [None]:
best_model = cvModel.bestModel
print('Best Param (regParam):', best_model.java_obj.getRegParam())
print('Best Param (MaxIter):', best_model.java_obj.getMaxIter())
print('Best Param (elasticNetParam):', best_model.java_obj.getElasticNetParam())