# 

In [None]:
from pyspark.sql import SparkSession

# Создаём SparkSession
spark = SparkSession.builder \
    .appName('CrimeData') \
    .config("spark.hadoop.home.dir", "C:\\hadoop") \
    .config("spark.hadoop.io.nativeio.useLegacyAccess", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Проверка версии Spark
print(f'Spark version: {spark.version}')

Spark version: 3.5.3


In [3]:
# from pathlib import Path

# file_path = Path(_file_).parent / 'data/crime_data.csv'
file_path = '.\data\crime.csv'

# Чтение файла
crime_data = spark.read.csv(file_path, header=True, inferSchema=True)

# Вывод первых строк данных
crime_data.show(5)

# прочитаем схему данных
crime_data.printSchema()

+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP| OFFENSE_DESCRIPTION|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|     STREET|        Lat|        Long|            Location|
+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|     I182070945|         619|             Larceny|  LARCENY ALL OTHERS|     D14|           808|    NULL|2018-09-02 13:00:00|2018|    9|     Sunday|  13|  Part One| LINCOLN ST|42.35779134|-71.13937053|(42.35779134, -71...|
|     I182070943|        1402|           Vandalism|           VANDALISM|     C11|           347|    NULL|201

In [4]:
# Удаление дубликатов
print(f"Количество строк ДО удаления дубликатов: {crime_data.count()}")
crime_data = crime_data.dropDuplicates()
print(f"Количество строк ПОСЛЕ удаления дубликатов: {crime_data.count()}")

Количество строк ДО удаления дубликатов: 319073
Количество строк ПОСЛЕ удаления дубликатов: 319050


In [5]:
# Проверить наличие пропущенных значений
from pyspark.sql.functions import col

missing_data = crime_data.select([col(c).isNull().alias(c) for c in crime_data.columns])
missing_data_summary = missing_data.describe().show()

# Удалить строки с пропущенными критически важными данными
crime_data = crime_data.dropna(subset=['DISTRICT', 'OCCURRED_ON_DATE', 'Lat', 'Long'])

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    max|
+-------+



In [6]:
offense_codes = spark.read.csv('.\data\offense_codes.csv', header=True, inferSchema=True)

# Убираем дубликаты
offense_codes = offense_codes.dropDuplicates(['CODE'])

In [7]:
from pyspark.sql.functions import split

# Добавим колонку crime_type, содержащую первую часть строки из NAME (до первого разделителя ' - '):
offense_codes = offense_codes.withColumn('crime_type', split(offense_codes['NAME'], ' - ').getItem(0))
offense_codes.show(5)

+----+--------------------+--------------------+
|CODE|                NAME|          crime_type|
+----+--------------------+--------------------+
| 111|MURDER, NON-NEGLI...|MURDER, NON-NEGLI...|
| 112|KILLING OF FELON ...|KILLING OF FELON ...|
| 113|KILLING OF FELON ...|KILLING OF FELON ...|
| 114|KILLING OF POLICE...|KILLING OF POLICE...|
| 121|MANSLAUGHTER - VE...|        MANSLAUGHTER|
+----+--------------------+--------------------+
only showing top 5 rows



In [8]:
# Присоединяем offense_codes к crime_data
crime_data = crime_data.join(offense_codes, crime_data['OFFENSE_CODE'] == offense_codes['CODE'], how='left')

# Сохраняем только нужные колонки
crime_data = crime_data.drop('CODE', 'NAME')
crime_data.show(5)

+---------------+------------+------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+--------------------+-----------+------------+--------------------+--------------+
|INCIDENT_NUMBER|OFFENSE_CODE|OFFENSE_CODE_GROUP| OFFENSE_DESCRIPTION|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|              STREET|        Lat|        Long|            Location|    crime_type|
+---------------+------------+------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+--------------------+-----------+------------+--------------------+--------------+
|     I182055198|         413|Aggravated Assault|ASSAULT - AGGRAVA...|     E13|           304|       Y|2018-07-13 14:01:00|2018|    7|     Friday|  14|  Part One|       WASHINGTON ST|42.31667114|-71.09743705|(42.31667114, -71...|       ASSAULT|
|     I182031562|   

In [9]:
# Просмотр первых строк обновлённого DataFrame
crime_data.select('DISTRICT', 'OFFENSE_CODE', 'crime_type', 'Lat', 'Long').show(5)

+--------+------------+--------------+-----------+------------+
|DISTRICT|OFFENSE_CODE|    crime_type|        Lat|        Long|
+--------+------------+--------------+-----------+------------+
|     E13|         413|       ASSAULT|42.31667114|-71.09743705|
|      D4|        3130|SEARCH WARRANT|42.33695098|-71.08574813|
|      B2|         413|       ASSAULT|42.30711484|-71.08425151|
|      B3|         413|       ASSAULT|42.28956988|-71.08510501|
|      B2|        1510|        WEAPON|42.30660388|-71.08055038|
+--------+------------+--------------+-----------+------------+
only showing top 5 rows



### Общее количество преступлений (crimes_total)

In [10]:
from pyspark.sql.functions import count

crimes_total = crime_data.groupBy('DISTRICT').agg(count('*').alias('crimes_total'))

### Медиана числа преступлений в месяц (crimes_monthly):

In [11]:
from pyspark.sql.functions import year, month, percentile_approx

# Создаём колонку year_month для уникального идентификатора месяца
crime_data = crime_data.withColumn("year_month", year("OCCURRED_ON_DATE") * 100 + month("OCCURRED_ON_DATE"))

# Считаем количество преступлений в месяц для каждого района
monthly_crimes = crime_data.groupBy("DISTRICT", "year_month").agg(count("*").alias("monthly_crimes"))

# Считаем медиану числа преступлений в месяц для каждого района
crimes_monthly = monthly_crimes.groupBy("DISTRICT").agg(
    percentile_approx("monthly_crimes", 0.5).alias("crimes_monthly")
)

### Три самых частых типа преступлений (frequent_crime_types):

In [12]:
from pyspark.sql.functions import col, collect_list, concat_ws

# Считаем частоту каждого crime_type в районе
crime_types_count = crime_data.groupBy('DISTRICT', 'crime_type').count()

# Получаем топ-3 типов преступлений в районе
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window = Window.partitionBy('DISTRICT').orderBy(col('count').desc())

top_crime_types = crime_types_count.withColumn('rank', row_number().over(window)) \
    .filter(col('rank') <= 3) \
    .groupBy('DISTRICT').agg(
        concat_ws(', ', collect_list('crime_type')).alias('frequent_crime_types')
    )

### Средние координаты района (lat и lng)

In [13]:
from pyspark.sql.functions import mean

avg_coordinates = crime_data.groupBy("DISTRICT").agg(
    mean("Lat").alias("lat"),
    mean("Long").alias("lng")
)

## Построение витрины

### Подсчёт метрик

In [14]:
from pyspark.sql.functions import count, col, expr, mean, approx_count_distinct, collect_list, lit, percentile_approx

# crimes_total: общее количество преступлений в районе
crimes_total = crime_data.groupBy('DISTRICT').agg(count('*').alias('crimes_total'))

# crimes_monthly: медиана числа преступлений в месяц
crime_data = crime_data.withColumn('year_month', expr('YEAR * 100 + MONTH'))
crimes_monthly = crime_data.groupBy('DISTRICT', 'year_month').agg(count('*').alias('monthly_crimes'))
crimes_median = crimes_monthly.groupBy('DISTRICT').agg(
    percentile_approx('monthly_crimes', 0.5).alias('crimes_monthly')
)

# frequent_crime_types: три самых частых типа преступлений
frequent_crime_types = crime_data.groupBy("DISTRICT", "crime_type").count() \
    .groupBy("DISTRICT").agg(
        concat_ws(", ", collect_list("crime_type")).alias("frequent_crime_types")
    )

# lat, lng: средние координаты
avg_coordinates = crime_data.groupBy('DISTRICT').agg(
    mean('Lat').alias('lat'),
    mean('Long').alias('lng')
)

### Объединение всех метрик

In [15]:
from functools import reduce

metrics = [crimes_total, crimes_monthly, top_crime_types, avg_coordinates]

# Объединяем все метрики по колонке 'DISTRICT'
final_aggregated_data = reduce(
    lambda df1, df2: df1.join(df2, on='DISTRICT', how='left'),
    metrics
)

final_aggregated_data.show()

+--------+------------+----------+--------------+--------------------+------------------+------------------+
|DISTRICT|crimes_total|year_month|monthly_crimes|frequent_crime_types|               lat|               lng|
+--------+------------+----------+--------------+--------------------+------------------+------------------+
|      C6|       21752|    201702|           435|DRUGS, SICK/INJUR...|42.212105037157954|-70.85558197403466|
|      C6|       21752|    201803|           528|DRUGS, SICK/INJUR...|42.212105037157954|-70.85558197403466|
|      C6|       21752|    201711|           446|DRUGS, SICK/INJUR...|42.212105037157954|-70.85558197403466|
|      C6|       21752|    201603|           642|DRUGS, SICK/INJUR...|42.212105037157954|-70.85558197403466|
|      C6|       21752|    201808|           703|DRUGS, SICK/INJUR...|42.212105037157954|-70.85558197403466|
|      C6|       21752|    201607|           572|DRUGS, SICK/INJUR...|42.212105037157954|-70.85558197403466|
|      C6|       21

## Сохранение в формате .parquet

In [19]:
import os

output_path = os.path.abspath('./data/aggregated_data')
final_aggregated_data.write.mode('overwrite').parquet(output_path)