In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark import StorageLevel
import pyspark.sql.functions as F
schema = StructType([
    StructField('X', DoubleType(), True),
    StructField('Y', DoubleType(), True),
    StructField('RowID', IntegerType(), False),
    StructField('CrimeDateTime', StringType(), True), # TimestampType ??
    StructField('CrimeCode', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('Description', StringType(), True),
    StructField('Inside_Outside', StringType(), True),
    StructField('Weapon', StringType(), True),
    StructField('Post', StringType(), True),
    StructField('District', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Latitude', DoubleType(), True),
    StructField('Longitude', DoubleType(), True),
    StructField('GeoLocation', StringType(), True),
    StructField('Premise', StringType(), True),
    StructField('VRIName', StringType(), True),
    StructField('Total_Incidents', IntegerType(), True),
    StructField('Shape', StringType(), True)
  ])
df = spark.read.option('header', True).schema(schema).csv("Part1_crime_data.csv")
df.persist(StorageLevel.MEMORY_AND_DISK)
print(df.count())
df.printSchema()
df.show(5)

In [0]:
#1
display(df
        .select("CrimeCode")
        .distinct()
       )

CrimeCode
3P
3K
3BJ
1A
3M
5F
4B
3B
3NF
7A


In [0]:
#2
display(df
        .groupBy("CrimeCode")
        .agg(F.count("CrimeCode"))
        .withColumnRenamed("count(CrimeCode)", "CrimeCount")
        .orderBy("CrimeCount", ascending=False)
        )

CrimeCode,CrimeCount
4E,91822
6D,68508
5A,43956
7A,40308
6J,27670
6G,26898
6E,24310
6C,23269
4C,22455
5D,14971


In [0]:
#3
print(df
        .groupBy("Neighborhood")
        .agg(F.count("Neighborhood"))
        .withColumnRenamed("count(Neighborhood)", "CrimeCount")
        .orderBy("CrimeCount", ascending=False)
        .first()["Neighborhood"]
        )

In [0]:
#4
# timestamps datatype is not parsing 
df.select(F.unix_timestamp("CrimeDateTime", "yyyy/MM/dd HH:mm:ss+00").alias("timestamps")).groupBy(F.dayofyear("timestamps")).agg(F.count("timestamps"))

In [0]:
#5
not_weapon = ["NA","null"]
display(df
       .filter(~F.col("Weapon").isin(not_weapon))
       .select("Weapon")
       .distinct()
       )

Weapon
HANDS
KNIFE
OTHER
FIRE
FIREARM


In [0]:
#6
print(df
        .filter(~F.col("Weapon").isin(not_weapon))
        .groupBy("Weapon")
        .agg(F.count("Weapon"))
        .withColumnRenamed("count(Weapon)", "WeaponCount")
        .orderBy("WeaponCount", ascending=False)
        .first()["Weapon"]
        )