In [1]:
from pyspark import SparkContext,SparkConf

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TextClassification").getOrCreate()

In [3]:
import pandas as pd

In [4]:
df = spark.read.csv('./sf_crime/train.csv',header = True,inferSchema = True)

In [12]:
df.printSchema()

root
 |-- Dates: timestamp (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)



In [15]:
pd.DataFrame(df.take(5),columns=df.columns)

Unnamed: 0,Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y
0,2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425892,37.774599
1,2015-05-13 23:53:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425892,37.774599
2,2015-05-13 23:33:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",VANNESS AV / GREENWICH ST,-122.424363,37.800414
3,2015-05-13 23:30:00,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,NORTHERN,NONE,1500 Block of LOMBARD ST,-122.426995,37.800873
4,2015-05-13 23:30:00,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,PARK,NONE,100 Block of BRODERICK ST,-122.438738,37.771541


In [16]:
df.columns

['Dates',
 'Category',
 'Descript',
 'DayOfWeek',
 'PdDistrict',
 'Resolution',
 'Address',
 'X',
 'Y']

In [68]:
df = df.select(['Category','Descript'])

In [69]:
df.show(5)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 5 rows



In [70]:
df.groupBy('Category').count().sort('count',ascending=False).show()

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



In [27]:
from pyspark.ml.feature import RegexTokenizer,StopWordsRemover,CountVectorizer
from pyspark.ml.classification import LogisticRegression

In [86]:
regexTokenizer = RegexTokenizer(inputCol = 'Descript',outputCol = 'words',pattern = '\\W')
add_stopwords = ['http','https','amp','rt','t','c','the']

stopwordsRemover = StopWordsRemover(inputCol = 'words',outputCol = 'Processed words').setStopWords(add_stopwords)

countVectors = CountVectorizer(inputCol = 'Processed words',outputCol = 'features',vocabSize = 10000, minDF =5)

In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder,StringIndexer,VectorAssembler

In [87]:
Indexer = StringIndexer(inputCol = "Category", outputCol = 'label')

In [89]:
pipeline = Pipeline(stages = [regexTokenizer,stopwordsRemover,countVectors,Indexer])

In [90]:
df_processed = pipeline.fit(df).transform(df)


In [91]:
pd.DataFrame(df_processed.take(5),columns= df_processed.columns)

Unnamed: 0,Category,Descript,words,Processed words,features,label
0,WARRANTS,WARRANT ARREST,"[warrant, arrest]","[warrant, arrest]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",7.0
1,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,"[traffic, violation, arrest]","[traffic, violation, arrest]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
2,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,"[traffic, violation, arrest]","[traffic, violation, arrest]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
3,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,"[grand, theft, from, locked, auto]","[grand, theft, from, locked, auto]","(1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, ...",0.0
4,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,"[grand, theft, from, locked, auto]","[grand, theft, from, locked, auto]","(1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, ...",0.0


In [92]:
df_training = df_processed.select(['features','label'])

In [93]:
(trainingData,testData) = df_training.randomSplit([0.7,0.3],seed = 0)
print('Training Data Count:%d' %trainingData.count())

Training Data Count:614169


In [84]:
print('Test Data Count:%d'%testData.count())

Test Data Count:263880


In [94]:
lr = LogisticRegression(maxIter = 10, regParam = 0.3,elasticNetParam = 0)
lrModel = lr.fit(trainingData)

In [95]:
prediction = lrModel.transform(testData)

In [98]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction')
evaluator.evaluate(prediction)

0.9715292317978003