In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder \
         .master("local") \
         .appName("Project") \
         .config("spark.some.config.option", "some-value") \
         .getOrCreate()
        
sc = spark.sparkContext

In [2]:
from pyspark.sql.types import *

df = spark.read.csv('Crimes.csv', header=True, inferSchema=True)

In [3]:
from pyspark.sql.types import  (StructType, 
                                StructField, 
                                DateType, 
                                BooleanType,
                                DoubleType,
                                IntegerType,
                                StringType,
                               TimestampType)
crimes_schema = StructType([StructField("ID", StringType(), True),
                            StructField("CaseNumber", StringType(), True),
                            StructField("Date", StringType(), True ),
                            StructField("Block", StringType(), True),
                            StructField("IUCR", StringType(), True),
                            StructField("PrimaryType", StringType(), True  ),
                            StructField("Description", StringType(), True ),
                            StructField("LocationDescription", StringType(), True ),
                            StructField("Arrest", BooleanType(), True),
                            StructField("Domestic", BooleanType(), True),
                            StructField("Beat", StringType(), True),
                            StructField("District", StringType(), True),
                            StructField("Ward", StringType(), True),
                            StructField("CommunityArea", StringType(), True),
                            StructField("FBICode", StringType(), True ),
                            StructField("XCoordinate", DoubleType(), True),
                            StructField("YCoordinate", DoubleType(), True ),
                            StructField("Year", IntegerType(), True),
                            StructField("UpdatedOn", DateType(), True ),
                            StructField("Latitude", DoubleType(), True),
                            StructField("Longitude", DoubleType(), True),
                            StructField("Location", StringType(), True )
                            ])

In [4]:
crimes = spark.read.csv("Crimes.csv",
                       header = True, 
                        schema = crimes_schema)

In [None]:
crimes.show()

In [23]:
print(" The crimes dataframe has {} records".format(crimes.count()))

 The crimes dataframe has 6600889 records


In [24]:
crimes.printSchema()

root
 |-- ID: string (nullable = true)
 |-- CaseNumber: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- PrimaryType: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- LocationDescription: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- CommunityArea: string (nullable = true)
 |-- FBICode: string (nullable = true)
 |-- XCoordinate: double (nullable = true)
 |-- YCoordinate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- UpdatedOn: date (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [26]:
crimes.select("Block","Arrest").show(10, truncate = False)

+-----+------+
|Block|Arrest|
+-----+------+
|null |null  |
|null |null  |
|null |null  |
|null |null  |
|null |null  |
|null |null  |
|null |null  |
|null |null  |
|null |null  |
|null |null  |
+-----+------+
only showing top 10 rows



In [25]:
crimes.select("PrimaryType").distinct().count()

1

In [21]:
crimes.filter(crimes["Arrest"]==True).count()/crimes.count() * 100

27.918754579875525

In [8]:
crime_type = crimes.select('PrimaryType')

In [9]:
crime_type

DataFrame[PrimaryType: string]

In [13]:
rddcrime = crime_type.rdd
wordcounts= rddcrime.map(lambda w:(w,1)) \
        .reduceByKey(lambda a,b:a+b) \

In [17]:
for (word,count) in wordcounts.collect():
    print("%s: %i" % (word,count))

Row(PrimaryType=None): 6600889


In [14]:
wordcounts

PythonRDD[46] at RDD at PythonRDD.scala:48

In [16]:
schemaPeople = spark.createDataFrame(wordcounts)

ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

In [60]:
#renaming the column in dataframe
schemaPeople = schemaPeople.withColumnRenamed('_1','PrimaryType')
schemaPeople = schemaPeople.withColumnRenamed('_2','Frequency')
schemaPeople.show()
schemaPeople.toPandas()

+--------------------+---------+
|         PrimaryType|Frequency|
+--------------------+---------+
|      [INTIMIDATION]|     3832|
|      [NON-CRIMINAL]|      145|
|[PUBLIC PEACE VIO...|    47054|
|           [BATTERY]|  1204701|
|       [SEX OFFENSE]|    24392|
|[OFFENSE INVOLVIN...|    44061|
|[NON-CRIMINAL (SU...|        7|
|          [STALKING]|     3258|
|[CONCEALED CARRY ...|      187|
|[MOTOR VEHICLE TH...|   309026|
|          [BURGLARY]|   381486|
|        [KIDNAPPING]|     6579|
| [CRIMINAL TRESPASS]|   189822|
|             [ARSON]|    10933|
|     [OTHER OFFENSE]|   409517|
|  [PUBLIC INDECENCY]|      154|
|      [PROSTITUTION]|    67920|
|           [ROBBERY]|   250499|
|[CRIM SEXUAL ASSA...|    26100|
|[INTERFERENCE WIT...|    14433|
+--------------------+---------+
only showing top 20 rows



ImportError: Pandas >= 0.19.2 must be installed; however, your version was 0.17.1.

In [56]:
schemaPeople.createOrReplaceTempView('sqlResults')

In [61]:
import pandas as pd
pd.__version__

'0.17.1'

In [57]:
# PLOT PASSENGER NUMBER VS. TRIP COUNTS
import matplotlib.pyplot as plt
%matplotlib inline

x_labels = sqlResults['PrimaryType'].values
fig = sqlResults[['Frequency']].plot(kind='bar', facecolor='lightblue')
fig.set_xticklabels(x_labels)
fig.set_title('Counts of trips by passenger count')
fig.set_xlabel('Passenger count in trips')
fig.set_ylabel('Trip counts')
plt.show()

NameError: name 'sqlResults' is not defined