In [1]:
%pip install findspark pandas

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import argparse

In [4]:
# Получение путей до папки с исходными данными (input_folder) и папки с результатом (output_folder), введённых при запуске Spark-приложения через spark-submit
#parser = argparse.ArgumentParser()
#parser.add_argument("input_folder", help = "Path to the folder with input files offense_codes.csv, crime.csv")
#parser.add_argument("output_folder", help = "Path to the folder with output file crimes_in_boston_analysis.parquet")
#args = parser.parse_args()

In [5]:
# Создание SparkSession
spark = (SparkSession
    .builder
    .appName("Crimes in Boston - Analysis")
    .master("local[*]")
    .getOrCreate()
)

In [6]:
# Чтение данных из справочника кодов преступлений
#offense_codes_df = spark.read.csv(args.input_folder + "/offense_codes.csv", header = True, inferSchema = True)
offense_codes_df = spark.read.csv("path/to/input/offense_codes.csv", header = True, inferSchema = True)
offense_codes_df.show(truncate = False)

+----+------------------------------------------+
|CODE|NAME                                      |
+----+------------------------------------------+
|612 |LARCENY PURSE SNATCH - NO FORCE           |
|613 |LARCENY SHOPLIFTING                       |
|615 |LARCENY THEFT OF MV PARTS & ACCESSORIES   |
|1731|INCEST                                    |
|3111|LICENSE PREMISE VIOLATION                 |
|2646|LIQUOR - DRINKING IN PUBLIC               |
|2204|LIQUOR LAW VIOLATION                      |
|3810|M/V ACCIDENT - INVOLVING �BICYCLE - INJURY|
|3801|M/V ACCIDENT - OTHER                      |
|3807|M/V ACCIDENT - OTHER CITY VEHICLE         |
|3803|M/V ACCIDENT - PERSONAL INJURY            |
|3805|M/V ACCIDENT - POLICE VEHICLE             |
|3802|M/V ACCIDENT - PROPERTY �DAMAGE           |
|3205|M/V PLATES - LOST                         |
|123 |MANSLAUGHTER - NON-VEHICLE - NEGLIGENCE   |
|121 |MANSLAUGHTER - VEHICLE - NEGLIGENCE       |
|3501|MISSING PERSON                            |


In [7]:
# Информация о схеме
offense_codes_df.printSchema()

root
 |-- CODE: integer (nullable = true)
 |-- NAME: string (nullable = true)



In [8]:
# Чтение данных о преступлениях
#crime_df = spark.read.csv(args.input_folder + "/crime.csv", header = True, inferSchema = True)
crime_df = spark.read.csv("path/to/input/crime.csv", header = True, inferSchema = True)
crime_df.show(truncate = False)

+---------------+------------+--------------------------------+------------------------------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------------+-----------+------------+---------------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|OFFENSE_CODE_GROUP              |OFFENSE_DESCRIPTION                       |DISTRICT|REPORTING_AREA|SHOOTING|OCCURRED_ON_DATE   |YEAR|MONTH|DAY_OF_WEEK|HOUR|UCR_PART  |STREET           |Lat        |Long        |Location                   |
+---------------+------------+--------------------------------+------------------------------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------------+-----------+------------+---------------------------+
|I182070945     |619         |Larceny                         |LARCENY ALL OTHERS                        |D14     |808           |null    |2018-09-02 13:00:00|2018|9    |Sunday     |13  |Par

In [9]:
# Информация о схеме
crime_df.printSchema()

root
 |-- INCIDENT_NUMBER: string (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- SHOOTING: string (nullable = true)
 |-- OCCURRED_ON_DATE: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Location: string (nullable = true)



In [10]:
# Регистрация DataFrame как временных представлений
offense_codes_df.createOrReplaceTempView("offense_codes")
crime_df.createOrReplaceTempView("crime")

In [11]:
# Проверка справочника кодов преступлений на наличие дубликатов
result = spark.sql("""
SELECT
    COUNT(DISTINCT code)    AS unique_codes_number,
    COUNT(code)             AS codes_number
FROM offense_codes
""")

result.show()

+-------------------+------------+
|unique_codes_number|codes_number|
+-------------------+------------+
|                425|         576|
+-------------------+------------+



In [12]:
# Очистка справочника кодов преступлений от дубликатов + расчёт поля crime_type
unique_offense_codes_df = spark.sql("""
SELECT
    code            AS code,
    FIRST(name)     AS name,
    CASE
        WHEN INSTR(FIRST(name), ' -') = 0
            THEN FIRST(name)
        ELSE
            SUBSTR(FIRST(name), 1, INSTR(FIRST(name), ' -') - 1)
    END             AS crime_type
FROM offense_codes
GROUP BY
    code
""")

unique_offense_codes_df.show(truncate = False)

+----+------------------------------------------+-------------------------------------+
|code|name                                      |crime_type                           |
+----+------------------------------------------+-------------------------------------+
|243 |RAPE - ATTEMPT - SEXUAL ASSAULT W/ OBJECT |RAPE                                 |
|540 |BURGLARY - COMMERICAL - FORCE             |BURGLARY                             |
|623 |LARCENY SHOPLIFTING $50 TO $199           |LARCENY SHOPLIFTING $50 TO $199      |
|1721|FAILURE TO REGISTER AS A SEX OFFENDER     |FAILURE TO REGISTER AS A SEX OFFENDER|
|1903|GAMBLING - EQUIP VIOLATIONS               |GAMBLING                             |
|3704|M/V ACCIDENT - POLICE VEHICLE             |M/V ACCIDENT                         |
|251 |RAPE - COMPLETE - FORCIBLE                |RAPE                                 |
|2622|KIDNAPPING/CUSTODIAL KIDNAPPING           |KIDNAPPING/CUSTODIAL KIDNAPPING      |
|804 |STALKING                  

In [13]:
# Регистрация DataFrame как временного представления
unique_offense_codes_df.createOrReplaceTempView("unique_offense_codes")

In [14]:
# Проверка, что у всех преступлений указан код из справочника кодов преступлений
result = spark.sql("""
SELECT
    COUNT(*)
FROM crime                      C
LEFT JOIN unique_offense_codes  UOC     ON UOC.code = C.offense_code
WHERE UOC.code IS NULL
""")

result.show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [15]:
# Расчёт метрик crimes_total, lat, lng
crimes_total_lat_lng_df = spark.sql("""
SELECT
    district                            AS district,
    COUNT(DISTINCT incident_number)     AS crimes_total,
    AVG(lat)                            AS lat,
    AVG(long)                           AS lng
FROM crime
GROUP BY
    district
""")

crimes_total_lat_lng_df.show()

+--------+------------+------------------+------------------+
|district|crimes_total|               lat|               lng|
+--------+------------+------------------+------------------+
|      C6|       21196|42.212122584455514|-70.85561011772235|
|    null|        1583|25.239505193693454|-43.44877438704253|
|      B2|       43403| 42.31600367732771|-71.07569930654313|
|     C11|       37298| 42.29263740900059|-71.05125995734359|
|     E13|       15652|42.309803655709885|-71.09800478878381|
|      B3|       31131| 42.28305944520104|-71.07894914185485|
|      E5|       11876| 42.19796999447013|-71.00440862434749|
|     A15|        5978| 42.17915525091087|-70.74472508958509|
|      A7|       12306| 42.36070260499387|-71.00394833039849|
|     D14|       18573| 42.34350724510931| -71.1312546172649|
|      D4|       36755|42.341242517908626|-71.07725024947001|
|     E18|       15746| 42.26268061122596|-71.11891998757693|
|      A1|       31020|42.331230772598374|-71.01991881362001|
+-------

In [16]:
# Расчёт метрики crimes_monthly
crimes_monthly_df = spark.sql("""
WITH cte AS
(
    SELECT
        district                            AS district,
        year                                AS year,
        month                               AS month,
        COUNT(DISTINCT incident_number)     AS incidents_monthly_count_by_district
    FROM crime
    GROUP BY
        district,
        year,
        month
)
SELECT
    district                                                        AS district,
    PERCENTILE_APPROX(incidents_monthly_count_by_district, 0.5)     AS crimes_monthly
FROM cte
GROUP BY
    district
""")

crimes_monthly_df.show()

+--------+--------------+
|district|crimes_monthly|
+--------+--------------+
|      C6|           543|
|    null|            37|
|      B2|          1130|
|     C11|           979|
|     E13|           397|
|      B3|           800|
|      E5|           304|
|     A15|           149|
|      A7|           315|
|     D14|           466|
|      D4|           970|
|     E18|           399|
|      A1|           771|
+--------+--------------+



In [17]:
# Расчёт метрики frequent_crime_types
frequent_crime_types_df = spark.sql("""
WITH cte AS
(
    SELECT
        district                                                                                    AS district,
        offense_code                                                                                AS offense_code,
        COUNT(DISTINCT incident_number)                                                             AS incidents_count_by_district_and_offense_code,
        ROW_NUMBER() OVER (PARTITION BY district ORDER BY COUNT(DISTINCT incident_number) DESC)     AS row_number
    FROM crime
    GROUP BY
        district,
        offense_code
    ORDER BY
        district,
        row_number
)
SELECT
    T.district                                      AS district,
    CONCAT_WS(', ', COLLECT_LIST(UOC.crime_type))   AS frequent_crime_types
FROM cte                            T
INNER JOIN unique_offense_codes     UOC     ON UOC.code = T.offense_code
WHERE T.row_number <= 3
GROUP BY
    T.district
""")

frequent_crime_types_df.show(truncate = False)

+--------+--------------------------------------------------------------------+
|district|frequent_crime_types                                                |
+--------+--------------------------------------------------------------------+
|C6      |SICK/INJURED/MEDICAL, INVESTIGATE PERSON, TOWED MOTOR VEHICLE       |
|null    |M/V, M/V ACCIDENT, INVESTIGATE PROPERTY                             |
|B2      |VERBAL DISPUTE, M/V, INVESTIGATE PERSON                             |
|C11     |M/V, INVESTIGATE PERSON, SICK/INJURED/MEDICAL                       |
|E13     |SICK/INJURED/MEDICAL, M/V, INVESTIGATE PERSON                       |
|B3      |VERBAL DISPUTE, INVESTIGATE PERSON, VANDALISM                       |
|E5      |SICK/INJURED/MEDICAL, INVESTIGATE PERSON, M/V                       |
|A15     |INVESTIGATE PERSON, M/V, VANDALISM                                  |
|A7      |SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VANDALISM                 |
|D14     |TOWED MOTOR VEHICLE, M/V, SICK

In [18]:
# Регистрация DataFrame как временных представлений
crimes_total_lat_lng_df.createOrReplaceTempView("crimes_total_lat_lng")
crimes_monthly_df.createOrReplaceTempView("crimes_monthly")
frequent_crime_types_df.createOrReplaceTempView("frequent_crime_types")

In [19]:
# Сборка витрины
data_mart_df = spark.sql("""
SELECT
    T1.district,
    T1.crimes_total,
    T2.crimes_monthly,
    T3.frequent_crime_types,
    T1.lat,
    T1.lng
FROM crimes_total_lat_lng           T1
INNER JOIN crimes_monthly           T2  ON IFNULL(T2.district, '') = IFNULL(T1.district, '')
INNER JOIN frequent_crime_types     T3  ON IFNULL(T3.district, '') = IFNULL(T1.district, '')
ORDER BY
    T1.district
""")

data_mart_df.show(truncate = False)

+--------+------------+--------------+--------------------------------------------------------------------+------------------+------------------+
|district|crimes_total|crimes_monthly|frequent_crime_types                                                |lat               |lng               |
+--------+------------+--------------+--------------------------------------------------------------------+------------------+------------------+
|null    |1583        |37            |M/V, M/V ACCIDENT, INVESTIGATE PROPERTY                             |25.239505193693454|-43.44877438704253|
|A1      |31020       |771           |ASSAULT SIMPLE, PROPERTY, SICK/INJURED/MEDICAL                      |42.331230772598374|-71.01991881362001|
|A15     |5978        |149           |INVESTIGATE PERSON, M/V, VANDALISM                                  |42.17915525091087 |-70.74472508958509|
|A7      |12306       |315           |SICK/INJURED/MEDICAL, INVESTIGATE PERSON, VANDALISM                 |42.36070260499387

In [20]:
# Сохранение витрины в файл crimes_in_boston_analysis.parquet
#data_mart_df.write.mode("overwrite").parquet(args.output_folder + "/crimes_in_boston_analysis.parquet")
data_mart_df.write.mode("overwrite").parquet("path/to/output/crimes_in_boston_analysis.parquet")

In [21]:
# Остановка SparkSession
spark.stop()