In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/15 10:06:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')

                                                                                

In [3]:
#Remove the columns we do not need and have a look the first five rows:

drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+--------------+--------------------+
|      Category|            Descript|
+--------------+--------------------+
|      WARRANTS|      WARRANT ARREST|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
|OTHER OFFENSES|TRAFFIC VIOLATION...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
| LARCENY/THEFT|GRAND THEFT FROM ...|
+--------------+--------------------+
only showing top 5 rows



In [4]:
# Top 20 crime categories:
from pyspark.sql.functions import col


In [5]:
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

[Stage 3:>                                                          (0 + 4) / 4]

+--------------------+------+
|            Category| count|
+--------------------+------+
|       LARCENY/THEFT|174900|
|      OTHER OFFENSES|126182|
|        NON-CRIMINAL| 92304|
|             ASSAULT| 76876|
|       DRUG/NARCOTIC| 53971|
|       VEHICLE THEFT| 53781|
|           VANDALISM| 44725|
|            WARRANTS| 42214|
|            BURGLARY| 36755|
|      SUSPICIOUS OCC| 31414|
|      MISSING PERSON| 25989|
|             ROBBERY| 23000|
|               FRAUD| 16679|
|FORGERY/COUNTERFE...| 10609|
|     SECONDARY CODES|  9985|
|         WEAPON LAWS|  8555|
|        PROSTITUTION|  7484|
|            TRESPASS|  7326|
|     STOLEN PROPERTY|  4540|
|SEX OFFENSES FORC...|  4388|
+--------------------+------+
only showing top 20 rows



                                                                                

In [6]:
#Top 20 crime descriptions:

data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()



+--------------------+-----+
|            Descript|count|
+--------------------+-----+
|GRAND THEFT FROM ...|60022|
|       LOST PROPERTY|31729|
|             BATTERY|27441|
|   STOLEN AUTOMOBILE|26897|
|DRIVERS LICENSE, ...|26839|
|      WARRANT ARREST|23754|
|SUSPICIOUS OCCURR...|21891|
|AIDED CASE, MENTA...|21497|
|PETTY THEFT FROM ...|19771|
|MALICIOUS MISCHIE...|17789|
|   TRAFFIC VIOLATION|16471|
|PETTY THEFT OF PR...|16196|
|MALICIOUS MISCHIE...|15957|
|THREATS AGAINST LIFE|14716|
|      FOUND PROPERTY|12146|
|ENROUTE TO OUTSID...|11470|
|GRAND THEFT OF PR...|11010|
|POSSESSION OF NAR...|10050|
|PETTY THEFT FROM ...|10029|
|PETTY THEFT SHOPL...| 9571|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [7]:
# Tokenization
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

In [23]:
# regular expression tokenizer

regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")

In [24]:
# stop words

add_stopwords = ["http","https","amp","rt","t","c","the"] 

In [25]:
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [26]:
# encode the string column of labels to a column of label indices

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_string_index = StringIndexer(inputCol = "Category", outputCol = "label")

In [32]:
# Model training and evaluation

# Logistic Regression using Word2Vec Features
# Transforms a word into a code for machine learning process.


from pyspark.ml.feature import Word2Vec

word2_vec = Word2Vec(inputCol="filtered", outputCol="rawFeatures", seed= 42)

pipeline = Pipeline(stages=[regexTokenizer, stopwords_remover, word2_vec, label_string_index])

In [33]:
# Fit the pipeline to training documents.

pipeline_fit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

                                                                                

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      Category|            Descript|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|      WARRANTS|      WARRANT ARREST|   [warrant, arrest]|   [warrant, arrest]|(809,[17,32],[1.0...|  7.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
|OTHER OFFENSES|TRAFFIC VIOLATION...|[traffic, violati...|[traffic, violati...|(809,[11,17,35],[...|  1.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
| LARCENY/THEFT|GRAND THEFT FROM ...|[grand, theft, fr...|[grand, theft, fr...|(809,[0,2,3,4,6],...|  0.0|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [34]:
# split dataset into training and test sets

# set seed for reproducibility

(training_data, test_data) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Sample Size: " + str(training_data.count()))
print("Test Dataset Sample Size: " + str(test_data.count()))

                                                                                

Training Dataset Sample Size: 614891




Test Dataset Sample Size: 263158


                                                                                

In [35]:
# Our model will make predictions and score on the test set; 
# we then look at the top 10 predictions from the highest probability.

from pyspark.ml.classification import LogisticRegression

# fit the classification model

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
cls_model = lr.fit(training_data)

                                                                                

In [36]:
# infer the model

predictions = lrModel.transform(test_data)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)



+------------------------------+-------------+------------------------------+-----+----------+
|                      Descript|     Category|                   probability|label|prediction|
+------------------------------+-------------+------------------------------+-----+----------+
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8720678987144937,0.02043...|  0.0|       0.0|
|THEFT, BICYCLE, <$50, NO SE...|LARCENY/THEFT|[0.8

                                                                                

In [37]:
# evaluate the model

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

                                                                                

0.9725533704587821

In [None]:
# The accuracy is excellent!

In [38]:
# Try out other classifiers - RandomForestClassifier

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rf_model = rf.fit(training_data)
predictions = rf_model.transform(test_data)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

22/05/15 11:27:53 WARN MemoryStore: Not enough space to cache rdd_287_0 in memory! (computed 32.0 MiB so far)
22/05/15 11:27:53 WARN BlockManager: Persisting block rdd_287_0 to disk instead.
22/05/15 11:27:53 WARN MemoryStore: Not enough space to cache rdd_287_3 in memory! (computed 32.0 MiB so far)
22/05/15 11:27:53 WARN BlockManager: Persisting block rdd_287_3 to disk instead.
22/05/15 11:27:54 WARN MemoryStore: Not enough space to cache rdd_287_2 in memory! (computed 32.0 MiB so far)
22/05/15 11:27:54 WARN BlockManager: Persisting block rdd_287_2 to disk instead.
22/05/15 11:27:55 WARN MemoryStore: Not enough space to cache rdd_287_1 in memory! (computed 48.1 MiB so far)
22/05/15 11:27:55 WARN BlockManager: Persisting block rdd_287_1 to disk instead.
22/05/15 11:28:04 WARN MemoryStore: Not enough space to cache rdd_287_3 in memory! (computed 163.6 MiB so far)
22/05/15 11:28:04 WARN MemoryStore: Not enough space to cache rdd_287_0 in memory! (computed 72.2 MiB so far)
22/05/15 11:28:

+----------------------------+-------------+------------------------------+-----+----------+
|                    Descript|     Category|                   probability|label|prediction|
+----------------------------+-------------+------------------------------+-----+----------+
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.06626...|  0.0|       0.0|
|PETTY THEFT FROM LOCKED AUTO|LARCENY/THEFT|[0.6111733413439591,0.0662

                                                                                

In [39]:
#eva
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

                                                                                

0.6686938240895559

In [41]:
%%time
# fine tuning the better model - LogisticRegression model

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cv_model = cv.fit(training_data)

predictions = cv_model.transform(test_data)

# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

22/05/15 11:40:59 WARN BlockManager: Asked to remove block broadcast_954, which does not exist
22/05/15 11:46:19 WARN BlockManager: Asked to remove block broadcast_1471_piece0, which does not exist
22/05/15 11:46:19 WARN BlockManager: Asked to remove block broadcast_1471, which does not exist
22/05/15 11:46:53 WARN BlockManager: Asked to remove block broadcast_1536_piece0, which does not exist
22/05/15 11:46:53 WARN BlockManager: Asked to remove block broadcast_1536, which does not exist
22/05/15 11:53:43 WARN BlockManager: Asked to remove block broadcast_2155, which does not exist

CPU times: user 6.28 s, sys: 1.91 s, total: 8.19 s
Wall time: 25min 50s


                                                                                

0.9919757517712569

In [42]:
# The model's accuracy improves!

In [None]:
from pyspark.ml.clustering import KMeans

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(dataset)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)