In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [3]:
# If you would like to manually change data types, refer to this article: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803
df = spark.read.csv('BostonCrime2.0.csv',header=True,inferSchema=True)

Data Exploration

In [10]:
df.show()

+---------------+------------+--------------------+--------+--------------+----+-----+-----------+----+----------+--------------+--------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP|DISTRICT|REPORTING_AREA|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|        STREET|SHOOTING|
+---------------+------------+--------------------+--------+--------------+----+-----+-----------+----+----------+--------------+--------+
|              1|        1102|               Fraud|      D4|           619|2015|   12|     Sunday|  14|  Part Two|  WESTLAND AVE|       N|
|              2|         619|             Larceny|      D4|           619|2015|   12|     Sunday|  14|  Part One|  WESTLAND AVE|       N|
|              3|        1107|               Fraud|     E18|           486|2015|    7|  Wednesday|  12|  Part Two|   OAKCREST RD|       N|
|              4|        1107|               Fraud|      C6|           226|2015|   11|   Thursday|   8|  Part Two|   E FOURTH ST|       N|
|              5|        26

In [11]:
df.printSchema()

root
 |-- INCIDENT_NUMBER: integer (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- SHOOTING: string (nullable = true)



In [12]:
print(df.head(1))

[Row(INCIDENT_NUMBER=1, OFFENSE_CODE=1102, OFFENSE_CODE_GROUP='Fraud', DISTRICT='D4', REPORTING_AREA='619', YEAR=2015, MONTH=12, DAY_OF_WEEK='Sunday', HOUR=14, UCR_PART='Part Two', STREET='WESTLAND AVE', SHOOTING='N')]


In [14]:
df.describe().show()

+-------+------------------+------------------+------------------+--------+-----------------+------------------+-----------------+-----------+-----------------+--------+---------+--------+
|summary|   INCIDENT_NUMBER|      OFFENSE_CODE|OFFENSE_CODE_GROUP|DISTRICT|   REPORTING_AREA|              YEAR|            MONTH|DAY_OF_WEEK|             HOUR|UCR_PART|   STREET|SHOOTING|
+-------+------------------+------------------+------------------+--------+-----------------+------------------+-----------------+-----------+-----------------+--------+---------+--------+
|  count|               800|               800|               800|     772|              800|               800|              800|        800|              800|     800|      783|     800|
|   mean|             400.5|        1949.24875|              null|    null| 404.742782152231|        2016.50375|           8.2025|       null|            15.49|    null|     null|    null|
| stddev|231.08440016582685|1095.3304308812276|        

In [4]:
df.select('DISTRICT','MONTH','SHOOTING').describe().show()

+-------+--------+-----------------+--------+
|summary|DISTRICT|            MONTH|SHOOTING|
+-------+--------+-----------------+--------+
|  count|     772|              800|     800|
|   mean|    null|           8.2025|    null|
| stddev|    null|2.677770867909777|    null|
|    min|      A1|                1|       N|
|    max|      E5|               12|       Y|
+-------+--------+-----------------+--------+



In [5]:
df.select('MONTH','HOUR','YEAR').describe().show()

+-------+-----------------+-----------------+------------------+
|summary|            MONTH|             HOUR|              YEAR|
+-------+-----------------+-----------------+------------------+
|  count|              800|              800|               800|
|   mean|           8.2025|            15.49|        2016.50375|
| stddev|2.677770867909777|6.011661591762074|1.1209623692572204|
|    min|                1|                1|              2015|
|    max|               12|               24|              2018|
+-------+-----------------+-----------------+------------------+



Data Manipulation

In [7]:
mon_col = df.select('month')
mon_col.show()

+-----+
|month|
+-----+
|   12|
|   12|
|    7|
|   11|
|   10|
|    9|
|    8|
|   10|
|    8|
|    8|
|    7|
|   11|
|   12|
|    7|
|    9|
|    9|
|   10|
|    7|
|    8|
|   10|
+-----+
only showing top 20 rows



In [8]:
df.withColumn('month_times_10',df['month']*10).show()
df.show()

+---------------+------------+--------------------+--------+--------------+----+-----+-----------+----+----------+--------------+--------+--------------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP|DISTRICT|REPORTING_AREA|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|        STREET|SHOOTING|month_times_10|
+---------------+------------+--------------------+--------+--------------+----+-----+-----------+----+----------+--------------+--------+--------------+
|              1|        1102|               Fraud|      D4|           619|2015|   12|     Sunday|  14|  Part Two|  WESTLAND AVE|       N|           120|
|              2|         619|             Larceny|      D4|           619|2015|   12|     Sunday|  14|  Part One|  WESTLAND AVE|       N|           120|
|              3|        1107|               Fraud|     E18|           486|2015|    7|  Wednesday|  12|  Part Two|   OAKCREST RD|       N|            70|
|              4|        1107|               Fraud|      C6|           226|2

In [9]:
df.filter("MONTH > 6").show()
df.filter("MONTH > 6").select('YEAR','MONTH').show()

+---------------+------------+--------------------+--------+--------------+----+-----+-----------+----+----------+--------------+--------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP|DISTRICT|REPORTING_AREA|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|        STREET|SHOOTING|
+---------------+------------+--------------------+--------+--------------+----+-----+-----------+----+----------+--------------+--------+
|              1|        1102|               Fraud|      D4|           619|2015|   12|     Sunday|  14|  Part Two|  WESTLAND AVE|       N|
|              2|         619|             Larceny|      D4|           619|2015|   12|     Sunday|  14|  Part One|  WESTLAND AVE|       N|
|              3|        1107|               Fraud|     E18|           486|2015|    7|  Wednesday|  12|  Part Two|   OAKCREST RD|       N|
|              4|        1107|               Fraud|      C6|           226|2015|   11|   Thursday|   8|  Part Two|   E FOURTH ST|       N|
|              5|        26

In [10]:
df.filter("MONTH > 6 AND HOUR < 12").select('INCIDENT_NUMBER','MONTH','HOUR').show()

+---------------+-----+----+
|INCIDENT_NUMBER|MONTH|HOUR|
+---------------+-----+----+
|              4|   11|   8|
|              6|    9|  11|
|              7|    8|   8|
|              9|    8|   6|
|             12|   11|  11|
|             13|   12|   4|
|             14|    7|   8|
|             16|    9|   9|
|             17|   10|   8|
|             19|    8|   9|
|             21|   12|   9|
|             25|    9|   9|
|             26|   10|   9|
|             27|    8|   9|
|             28|    7|   2|
|             29|   10|   9|
|             31|   11|   9|
|             43|    9|   7|
|             45|    8|   9|
|             55|    8|   9|
+---------------+-----+----+
only showing top 20 rows



Data Aggregation

In [11]:
df.groupBy('MONTH').mean().show()

+-----+--------------------+------------------+------------------+----------+------------------+
|MONTH|avg(INCIDENT_NUMBER)| avg(OFFENSE_CODE)|         avg(YEAR)|avg(MONTH)|         avg(HOUR)|
+-----+--------------------+------------------+------------------+----------+------------------+
|   12|   298.0091743119266| 1735.697247706422|2015.9816513761468|      12.0|15.458715596330276|
|    1|  339.05882352941177|1959.2941176470588|2016.4117647058824|       1.0|17.529411764705884|
|    6|  387.76785714285717| 2149.214285714286|            2016.5|       6.0|              14.5|
|    3|               403.6|           1981.25|            2016.5|       3.0|             15.35|
|    5|   404.9130434782609|2087.1739130434785|2016.5652173913043|       5.0|13.695652173913043|
|    9|             275.175|         1879.8125|         2015.8375|       9.0|             16.45|
|    4|   361.1764705882353|1483.6470588235295|2016.3529411764705|       4.0|15.647058823529411|
|    8|  267.67857142857144| 2

In [13]:
group_month_df = df.groupBy('MONTH').mean()
print("Sorted by HOUR")
group_month_df.orderBy('avg(HOUR)').show()
print("Sorted by YEAR")
df.groupBy('MONTH').mean().orderBy('avg(YEAR)').show()

Sorted by HOUR
+-----+--------------------+------------------+------------------+----------+------------------+
|MONTH|avg(INCIDENT_NUMBER)| avg(OFFENSE_CODE)|         avg(YEAR)|avg(MONTH)|         avg(HOUR)|
+-----+--------------------+------------------+------------------+----------+------------------+
|    2|  358.93333333333334|            1880.6|2016.2666666666667|       2.0|13.466666666666667|
|    5|   404.9130434782609|2087.1739130434785|2016.5652173913043|       5.0|13.695652173913043|
|    6|  387.76785714285717| 2149.214285714286|            2016.5|       6.0|              14.5|
|   10|   307.7752808988764|1578.9550561797753|2016.0224719101125|      10.0|14.561797752808989|
|    3|               403.6|           1981.25|            2016.5|       3.0|             15.35|
|    7|   616.5818965517242| 2165.668103448276|2017.5905172413793|       7.0|15.387931034482758|
|   12|   298.0091743119266| 1735.697247706422|2015.9816513761468|      12.0|15.458715596330276|
|    4|   361.1

Data Clean

In [22]:
from pyspark.sql.functions import format_number, col
group_month_df = df.groupBy('MONTH').mean()
group_month_df.show()
group_month_df = group_month_df.select('MONTH',
                                   format_number('avg(HOUR)',2),
                                   format_number('avg(YEAR)',2))
group_month_df.show()

group_month_df = group_month_df.select(col('MONTH').alias('INCIDENT_NUMBER'),
                                   col('format_number(avg(HOUR), 2)').alias('Average HOUR'),
                                   col('format_number(avg(YEAR), 2)').alias('Average YEAR'))
group_month_df.show()
                            
group_month_df = group_month_df.orderBy('Average HOUR')
                                       
print('Average HOUR and YEAR by INCIDENT_NUMBER')
group_month_df.show()

+-----+--------------------+------------------+------------------+----------+------------------+
|MONTH|avg(INCIDENT_NUMBER)| avg(OFFENSE_CODE)|         avg(YEAR)|avg(MONTH)|         avg(HOUR)|
+-----+--------------------+------------------+------------------+----------+------------------+
|   12|   298.0091743119266| 1735.697247706422|2015.9816513761468|      12.0|15.458715596330276|
|    1|  339.05882352941177|1959.2941176470588|2016.4117647058824|       1.0|17.529411764705884|
|    6|  387.76785714285717| 2149.214285714286|            2016.5|       6.0|              14.5|
|    3|               403.6|           1981.25|            2016.5|       3.0|             15.35|
|    5|   404.9130434782609|2087.1739130434785|2016.5652173913043|       5.0|13.695652173913043|
|    9|             275.175|         1879.8125|         2015.8375|       9.0|             16.45|
|    4|   361.1764705882353|1483.6470588235295|2016.3529411764705|       4.0|15.647058823529411|
|    8|  267.67857142857144| 2