In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

crimes_old_schema = StructType([StructField("COMPNOS", StringType(), True),
                            StructField("NatureCode", StringType(), True),
                            StructField("INCIDENT_TYPE_DESCRIPTION", StringType(), True),
                            StructField("MAIN_CRIMECODE", StringType(), True),
                            StructField("REPTDISTRICT", StringType(), True),
                            StructField("REPORTINGAREA", StringType(), True),
                            StructField("FROMDATE", StringType(), True),
                            StructField("WEAPONTYPE", StringType(), True),
                            StructField("Shooting", StringType(), True),
                            StructField("DOMESTIC", StringType(), True),
                            StructField("SHIFT", StringType(), True),
                            StructField("Year", StringType(), True),
                            StructField("Month", IntegerType(), True),
                            StructField("DAY_WEEK", StringType(), True),
                            StructField("UCRPART", StringType(), True),
                            StructField("X", StringType(), True),
                            StructField("Y", StringType(), True),
                            StructField("STREETNAME", StringType(), True),
                            StructField("XSTREETNAME", StringType(), True),
                            StructField("Location", StringType(), True)
])

crimes_schema = StructType([StructField("INCIDENT_NUMBER", StringType(), True),
                            StructField("OFFENSE_CODE", StringType(), True),
                            StructField("OFFENSE_CODE_GROUP", StringType(), True),
                            StructField("OFFENSE_DESCRIPTION", StringType(), True),
                            StructField("DISTRICT", StringType(), True),
                            StructField("REPORTING_AREA", StringType(), True),
                            StructField("SHOOTING", StringType(), True),
                            StructField("OCCURRED_ON_DATE", TimestampType(), True),
                            StructField("YEAR", IntegerType(), True),
                            StructField("MONTH", IntegerType(), True),
                            StructField("DAY_OF_WEEK", StringType(), True),
                            StructField("HOUR", IntegerType(), True),
                            StructField("UCR_PART",StringType(), True),
                            StructField("STREET", StringType(), True),
                            StructField("Lat", StringType(), True),
                            StructField("Long", StringType(), True),
                            StructField("Location", StringType(), True)])

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.execute.memory", "1gb") \
    .appName("Crimes in boston") \
    .getOrCreate()

crimes = spark.read.csv("./data/BostenCrime.csv", header = True, schema = crimes_schema)
crimes_old = spark.read.csv("./data/BostonCrimesOld.csv", header = True, schema = crimes_old_schema)

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [3]:
sc

In [4]:
crimes.registerTempTable("table")
crimes_old.registerTempTable("test")

all_boston_crimes = sqlContext.sql("""
               SELECT
                   YEAR,
                   MONTH,
                   STREET,
                   DISTRICT,
                   OFFENSE_DESCRIPTION,
                   Location
                FROM table
                UNION
                SELECT
                    YEAR,
                    MONTH,
                    STREETNAME,
                    REPTDISTRICT,
                    INCIDENT_TYPE_DESCRIPTION,
                    Location
                FROM test""")

In [5]:
all_boston_crimes.show()

+----+-----+----------------+--------+--------------------+--------------------+
|YEAR|MONTH|          STREET|DISTRICT| OFFENSE_DESCRIPTION|            Location|
+----+-----+----------------+--------+--------------------+--------------------+
|2019|    9|      GOODALE RD|      B3|M/V ACCIDENT - PE...|(42.28362830, -71...|
|2019|    9|   HYDE PARK AVE|     E18|THREATS TO DO BOD...|(42.27926700, -71...|
|2019|    9|        RIVER ST|      B3|  INVESTIGATE PERSON|(42.27130225, -71...|
|2019|    9|  COURTHOUSE WAY|      C6|  INVESTIGATE PERSON|(42.35404844, -71...|
|2019|    9|       SCHOOL ST|      A1|SICK/INJURED/MEDI...|(0.00000000, 0.00...|
|2019|    9|        JETTE CT|     D14|FIRE REPORT - HOU...|(42.34682750, -71...|
|2019|    9|          OLD RD|      B3|      VERBAL DISPUTE|(42.30226502, -71...|
|2019|    9|       SELDEN ST|      B3|LARCENY THEFT OF ...|(0.00000000, 0.00...|
|2019|    9|   WASHINGTON ST|     C11|SICK/INJURED/MEDI...|(42.28962239, -71...|
|2019|    9|COMMONWEALTH AVE

In [12]:
yearCount = all_boston_crimes.groupBy("YEAR").count()#.groupBy("YEAR").count().orderBy("YEAR").show()

In [14]:
yearCount.orderBy("YEAR").show()

+----+-----+
|YEAR|count|
+----+-----+
|2012|38388|
|2013|77318|
|2014|77325|
|2015|92201|
|2016|88294|
|2017|90685|
|2018|89317|
|2019|65694|
+----+-----+

