In [2]:
!pip install pyspark_dist_explore

Collecting pyspark_dist_explore
  Downloading pyspark_dist_explore-0.1.8-py3-none-any.whl (7.2 kB)
Installing collected packages: pyspark_dist_explore
Successfully installed pyspark_dist_explore-0.1.8
[0m

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col,countDistinct
from pyspark.ml.feature import StringIndexer,VectorAssembler,Normalizer,VectorIndexer
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,GBTClassifier,NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql.functions import col, udf
from pyspark.ml.linalg import Vectors, VectorUDT

In [4]:
import warnings
warnings.simplefilter("ignore")

In [5]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [6]:
spark = SparkSession.builder.appName('final_project_EDA').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/27 21:02:54 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/27 21:02:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/27 21:02:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/27 21:02:54 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [7]:
file_location ='gs://mm19b059_1/trainingdatanyc.csv'
#dataset = spark.read.format('csv').option('header',True).option('inferSchema',True).load(file_location)
dataset = spark.read.options(delimiter=",",header=True, inferSchema = True).csv(file_location).limit(1000000)

                                                                                

In [8]:
#dataset.select('*').limit(5).show()

In [9]:
#dataset.printSchema()

In [10]:
#dataset.count()

In [11]:
#len(dataset.columns)

In [12]:
dataset=dataset.dropDuplicates()
#dataset.count()

In [13]:
dataframe = dataset.toDF(*(c.replace(' ', '_') for c in dataset.columns))

In [14]:
'''from pyspark_dist_explore import hist

fig, ax = plt.subplots()
hist(ax, dataframe.select('Violation_Precinct'), bins = 200, color=['red'])
plt.show()'''

"from pyspark_dist_explore import hist\n\nfig, ax = plt.subplots()\nhist(ax, dataframe.select('Violation_Precinct'), bins = 200, color=['red'])\nplt.show()"

In [15]:
numeric_columns = list()
categorical_column = list()
for col_ in dataframe.columns:
    if dataframe.select(col_).dtypes[0][1] != "string":
        numeric_columns.append(col_)
    else:
        categorical_column.append(col_)
        
numeric_columns = numeric_columns[1:]
print("Numeric columns",numeric_columns)
print('-'*100)
print("categorical columns",categorical_column)


Numeric columns ['Feet_From_Curb', 'Issuer_Precinct', 'Street_Code2', 'Street_Code1', 'Issuer_Code', 'Street_Code3', 'Violation_Code', 'Vehicle_Year', 'Summons_Number', 'Law_Section', 'Violation_Location']
----------------------------------------------------------------------------------------------------
categorical columns ['Violation_Time', 'Violation_In_Front_Of_Or_Opposite', 'From_Hours_In_Effect', 'Issuing_Agency', 'Violation_County', 'Meter_Number', 'Plate_Type', 'Unregistered_Vehicle?', 'Issue_Date', 'Violation_Post_Code', 'Double_Parking_Violation', 'Plate_ID', 'Street_Name', 'Registration_State', 'Hydrant_Violation', 'Days_Parking_In_Effect', 'Vehicle_Expiration_Date', 'Issuer_Squad', 'Vehicle_Make', 'Sub_Division', 'Intersecting_Street', 'Vehicle_Color', 'Time_First_Observed', 'To_Hours_In_Effect', 'Issuer_Command', 'Date_First_Observed', 'House_Number', 'Violation_Legal_Code', 'No_Standing_or_Stopping_Violation', 'Violation_Description', 'Vehicle_Body_Type']


In [16]:
#dataframe.select(numeric_columns[:6]).describe().show()

In [17]:
#dataframe.select(numeric_columns[6:]).describe().show()

In [18]:
dataframe = dataframe.filter(dataframe.Violation_Precinct<=99)

In [19]:

def drop_null_columns(df):
    """
    This function drops all columns which contain null values.
    :param df: A PySpark DataFrame
    """
    n = df.count()
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v >= 0.5*n]
    df = df.drop(*to_drop)
    return df

dataframe = drop_null_columns(dataframe)
len(dataframe.columns)

23/04/27 21:04:16 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

35

In [20]:
#y = dataframe.select('Violation_Precinct')
#dataframe = dataframe.drop('Violation_Precinct')

In [21]:
dataframe = dataframe.na.drop(subset=['Violation_Precinct'])

In [22]:
cols = dataframe.columns
fill_dict = {x:-1 for x in cols}
dataframe = dataframe.na.fill(fill_dict)

In [23]:
train,test = dataframe.randomSplit([0.7, 0.3], seed = 0)

In [24]:
numeric_columns = list()
categorical_columns = list()
for col_ in dataframe.columns:
    if dataframe.select(col_).dtypes[0][1] != "string":
        numeric_columns.append(col_)
    else:
        categorical_columns.append(col_)
        
print("Numeric columns",numeric_columns)
print(len(numeric_columns))
print('-'*100)
print("categorical columns",categorical_columns)
print(len(categorical_columns))

Numeric columns ['Violation_Precinct', 'Feet_From_Curb', 'Issuer_Precinct', 'Street_Code2', 'Street_Code1', 'Issuer_Code', 'Street_Code3', 'Violation_Code', 'Vehicle_Year', 'Summons_Number', 'Law_Section', 'Violation_Location']
12
----------------------------------------------------------------------------------------------------
categorical columns ['Violation_Time', 'Violation_In_Front_Of_Or_Opposite', 'From_Hours_In_Effect', 'Issuing_Agency', 'Violation_County', 'Plate_Type', 'Issue_Date', 'Violation_Post_Code', 'Plate_ID', 'Street_Name', 'Registration_State', 'Days_Parking_In_Effect', 'Vehicle_Expiration_Date', 'Issuer_Squad', 'Vehicle_Make', 'Sub_Division', 'Vehicle_Color', 'To_Hours_In_Effect', 'Issuer_Command', 'Date_First_Observed', 'House_Number', 'Violation_Description', 'Vehicle_Body_Type']
23


In [25]:
indexed_cols = [i + '_indexed' for i in categorical_columns]

In [26]:
stringIndexer = StringIndexer(inputCols=categorical_columns+['Violation_Precinct'], outputCols=indexed_cols+['label'],handleInvalid = 'keep')
#indexed_df = stringIndexer.transform(train)

In [27]:
#df = assembled.select('raw_features').toPandas()
#dense_udf = udf(lambda x:DenseVector(x.toArray()), VectorUDT())
#df = assembled.select('raw_features', dense_udf("raw_features").alias("raw_features_udf"))
#df.iloc[0]['raw_features']
#df.select('raw_features_udf').toPandas()

In [28]:
assembler = VectorAssembler(inputCols=numeric_columns[1:]+indexed_cols,outputCol="features",handleInvalid = 'keep')
#assembled = assembler.transform(indexed_df)
#assembled = assembled.withColumn('raw_features_udf', dense_udf("raw_features"))
#assembled.select('raw_features_udf').show()

In [29]:
#normalizer = Normalizer(inputCol='raw_features', outputCol="norm_features",p=1.0)


In [30]:
#nb = NaiveBayes()
lr = LogisticRegression(maxIter=25, regParam=0.3, elasticNetParam=0.8)
#gbt = GBTClassifier(labelCol="Violation_Precinct", featuresCol="norm_features")
#rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=512, maxDepth=16)

In [31]:
pipeline= Pipeline(stages=[stringIndexer,assembler,lr])

In [32]:
model = pipeline.fit(train)

23/04/27 21:13:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 18.1 MiB
23/04/27 21:13:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 18.1 MiB
23/04/27 21:13:45 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/04/27 21:13:45 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/04/27 21:13:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 18.1 MiB
23/04/27 21:13:55 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 18.1 MiB
23/04/27 21:13:55 WARN org.apache.spark.network.server.TransportChannelHandler: Exception in connection from /10.128.0.22:47350
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.

In [33]:
#model = rf.fit(assembled)

In [34]:
'''paramGrid = ParamGridBuilder().addGrid(gbt.maxIter, [10,10,50]).build()
crossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=MulticlassClassificationEvaluator(),numFolds=4)
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)
prediction = cvModel.transform(test)
selected = prediction.select("label", "probability", "prediction")
print(MulticlassClassificationEvaluator(metricName ="accuracy").evaluate(prediction))'''

'paramGrid = ParamGridBuilder().addGrid(gbt.maxIter, [10,10,50]).build()\ncrossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=MulticlassClassificationEvaluator(),numFolds=4)\n# Run cross-validation, and choose the best set of parameters.\ncvModel = crossval.fit(train)\nprediction = cvModel.transform(test)\nselected = prediction.select("label", "probability", "prediction")\nprint(MulticlassClassificationEvaluator(metricName ="accuracy").evaluate(prediction))'

In [35]:
'''test = stringIndexer.transform(test)
test = assembler.transform(test)
test = test.withColumn('raw_features_udf', dense_udf("raw_features"))'''


'test = stringIndexer.transform(test)\ntest = assembler.transform(test)\ntest = test.withColumn(\'raw_features_udf\', dense_udf("raw_features"))'

In [36]:
predictions = model.transform(test)
#predictions.select(['target', 'rawPrediction','probability','prediction']).show()
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy of model is ", accuracy)

23/04/27 21:25:49 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 18.1 MiB
[Stage 117:>                                                        (0 + 1) / 1]

Accuracy of model is  0.2019286125503742


                                                                                

In [37]:
predictions.columns

['Violation_Precinct',
 'Feet_From_Curb',
 'Violation_Time',
 'Violation_In_Front_Of_Or_Opposite',
 'Issuer_Precinct',
 'Street_Code2',
 'From_Hours_In_Effect',
 'Issuing_Agency',
 'Street_Code1',
 'Issuer_Code',
 'Violation_County',
 'Plate_Type',
 'Issue_Date',
 'Violation_Post_Code',
 'Street_Code3',
 'Plate_ID',
 'Violation_Code',
 'Street_Name',
 'Registration_State',
 'Days_Parking_In_Effect',
 'Vehicle_Expiration_Date',
 'Issuer_Squad',
 'Vehicle_Make',
 'Sub_Division',
 'Vehicle_Year',
 'Vehicle_Color',
 'Summons_Number',
 'Law_Section',
 'Violation_Location',
 'To_Hours_In_Effect',
 'Issuer_Command',
 'Date_First_Observed',
 'House_Number',
 'Violation_Description',
 'Vehicle_Body_Type',
 'Violation_Time_indexed',
 'Violation_In_Front_Of_Or_Opposite_indexed',
 'From_Hours_In_Effect_indexed',
 'Issuing_Agency_indexed',
 'Violation_County_indexed',
 'Plate_Type_indexed',
 'Issue_Date_indexed',
 'Violation_Post_Code_indexed',
 'Plate_ID_indexed',
 'Street_Name_indexed',
 'Registr

In [39]:
from google.cloud import storage

srcPath = 'gs://mm19b059_1/logistic_regression.model'
model.write().overwrite().save(srcPath)

23/04/27 21:27:18 WARN org.apache.spark.scheduler.TaskSetManager: Stage 121 contains a task of very large size (7295 KiB). The maximum recommended task size is 1000 KiB.
                                                                                