In [1]:
# Подключим нужные библиотеки

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [2]:
import os

os.environ["SPARK_HOME"] = 'C:\Program Files\spark-3.5.2-bin-hadoop3'

In [3]:
#Создадим сессию

spark = SparkSession.builder.appName("Crime Analysis").getOrCreate()

df = spark.sql("select 'spark is running'")

df.show()

+----------------+
|spark is running|
+----------------+
|spark is running|
+----------------+



In [5]:
# Загружаем DataFrame из crime.csv и offense_codes.csv

df = spark.read.csv("data/crime.csv", header=True, sep=",", inferSchema=True)

offense_codes = spark.read.csv("data/offense_codes.csv", header=True, sep=",", inferSchema=True)

In [11]:
# Посмотрим схему полученных данных

df.printSchema()

offense_codes.printSchema()

root
 |-- INCIDENT_NUMBER: string (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- SHOOTING: string (nullable = true)
 |-- OCCURRED_ON_DATE: timestamp (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Location: string (nullable = true)

root
 |-- CODE: integer (nullable = true)
 |-- NAME: string (nullable = true)



In [79]:
# Выполним очистку данных

# Удалим дубли по INCIDENT_NUMBER
df_cleaned = df.dropDuplicates(["INCIDENT_NUMBER"])

# Удаление записей, где не указан район или код преступления
df_cleaned = df_cleaned.filter(df.OFFENSE_CODE.isNotNull() & df.DISTRICT.isNotNull())

# Оставим уникальные коды преступлений
unique_offense_codes = offense_codes.dropDuplicates(["CODE"])

In [27]:
# Выполним необходимые для витрины вычисления
# 1. Агрегируем данные о преступлениях по районам

crimes_total = df_cleaned.groupBy("DISTRICT").agg(F.count("*").alias("crimes_total"))

In [34]:
# 2. Медиана числа преступлений в месяц, группировка по DISTRICT, YEAR и MONTH

monthly_crimes_count = df_cleaned.groupBy("DISTRICT", "YEAR", "MONTH").agg(F.count("*").alias("monthly_crimes"))
crimes_monthly = monthly_crimes_count.groupBy("DISTRICT").agg(
    F.percentile_approx("monthly_crimes", 0.5).alias("crimes_monthly")
)

In [65]:
# 3. Три самых частых типа преступлений

#Покаждому району, проранжируем преступления
frequent_crime_types = (
    df_cleaned.groupBy("DISTRICT", "OFFENSE_CODE")
      .agg(F.count("*").alias("count"))
      .withColumn("rank", F.row_number().over(Window.partitionBy("DISTRICT").orderBy(F.desc("count"))))
      .filter(F.col("rank") <= 3)
)

#Вычислим первое наименование предступления, до -
unique_offense_codes_first = unique_offense_codes.withColumn("crime_type", F.split(F.col("NAME"), " -").getItem(0))

#Соеденим полученные выборки
frequent_crime_types = frequent_crime_types.join(unique_offense_codes_first,
                                                 frequent_crime_types.OFFENSE_CODE == unique_offense_codes_first.CODE,
                                                "inner")

# Объединяем типы преступлений в строку
frequent_crime_types_aggregated = frequent_crime_types.groupBy("DISTRICT").agg(
    F.concat_ws(", ", F.collect_list("crime_type")).alias("frequent_crime_types")
)

In [68]:
# 4. Средние координаты по районам

avg_coordinates = df_cleaned.groupBy("DISTRICT").agg(
    F.avg("Lat").alias("lat"),
    F.avg("Long").alias("lng")
)

In [69]:
# Объединяем данные по полю DISTRICT

final_df = (crimes_total
             .join(crimes_monthly, "DISTRICT")
             .join(frequent_crime_types_aggregated, "DISTRICT")
             .join(avg_coordinates, "DISTRICT"))

# Показать результат
final_df.show()

+--------+------------+--------------+--------------------+------------------+------------------+
|DISTRICT|crimes_total|crimes_monthly|frequent_crime_types|               lat|               lng|
+--------+------------+--------------+--------------------+------------------+------------------+
|      A1|       31020|           771|PROPERTY, ASSAULT...| 42.33064855453012|-71.01857752144772|
|     A15|        5978|           149|INVESTIGATE PERSO...|42.185067945255426|-70.75409373619097|
|      A7|       12306|           315|SICK/INJURED/MEDI...| 42.36280267789223|-71.00707817700778|
|      B2|       43403|          1130|VERBAL DISPUTE, M...| 42.31632986119105|-71.07629096717075|
|      B3|       31131|           800|VERBAL DISPUTE, I...|42.282453248040255|-71.07814828507321|
|     C11|       37298|           979|M/V, SICK/INJURED...|42.294000306038015|-71.05347358106056|
|      C6|       21196|           543|SICK/INJURED/MEDI...| 42.21490686409826|-70.85942538777124|
|     D14|       185

In [None]:
# Преобразуем в Pandas DataFrame и сохраняем в формате Parquet
output_path = "path/to/output_folder/crime_dashboard.parquet"
final_df.toPandas().to_parquet(output_path, index=False)

spark.stop()