In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from google.colab import drive
drive.mount('/content/drive')
spark = SparkSession.builder.master("local[*]").appName("CrimeData").getOrCreate()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Завантаження даних

In [None]:
detroit_file = "/content/drive/MyDrive/курс 4.1/Big data/Project/data/RMS_Crime_Incidents.csv"
la_file = "/content/drive/MyDrive/курс 4.1/Big data/Project/data/Crime_Data_from_2020_to_Present_20241130.csv"

detroit_df = spark.read.csv(detroit_file, header=True, inferSchema=True)
la_df = spark.read.csv(la_file, header=True, inferSchema=True)

KeyboardInterrupt: 

Перевірка на проблеми

In [None]:
# Функція для перевірки пропусків у всіх колонках
def check_missing_values(df, name):
    print(f"Пропуски в {name}:")
    df.select([(df[col].isNull().alias(col)) for col in df.columns]).groupBy().sum().show()

check_missing_values(detroit_df, "Detroit")
check_missing_values(la_df, "Los Angeles")

# Перевірка типів даних
print("Типи даних Detroit:")
detroit_df.printSchema()

print("\nТипи даних Los Angeles:")
la_df.printSchema()


Пропуски в Detroit:
++
||
++
||
++

Пропуски в Los Angeles:
++
||
++
||
++

Типи даних Detroit:
root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- incident_entry_id: string (nullable = true)
 |-- nearest_intersection: string (nullable = true)
 |-- offense_category: string (nullable = true)
 |-- offense_description: string (nullable = true)
 |-- state_offense_code: integer (nullable = true)
 |-- arrest_charge: string (nullable = true)
 |-- charge_description: string (nullable = true)
 |-- incident_occurred_at: string (nullable = true)
 |-- incident_time: timestamp (nullable = true)
 |-- incident_day_of_week: integer (nullable = true)
 |-- incident_hour_of_day: integer (nullable = true)
 |-- incident_year: integer (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_status: string (nullable = true)
 |-- case_status_updated_at: string (nullable = true)
 |-- updated_in_ibr_at: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- cr

Нормалізація

In [None]:
detroit_df = detroit_df.withColumn("City", lit("Detroit"))
la_df = la_df.withColumn("City", lit("Los Angeles"))

In [None]:
# Detroit
detroit_normalized = detroit_df.select(
    col("City").alias("city"),
    date_format(to_date(col("incident_occurred_at").substr(1, 10), "yyyy/MM/dd"), "dd.MM.yyyy").alias("date_occured"),
    date_format(col("incident_occurred_at").substr(12, 5), "HH:mm").alias("time_occured"),
    lit('').alias("date_report"),
    lit('').alias("time_report"),
    col("offense_category").alias("crime_category"),
    col("offense_description").alias("crime_description"),
    col("latitude"),
    col("longitude"),
    col("nearest_intersection").alias("address"),
    lit('').alias("Location_type"),
    when(col("case_status").isin("INACTIVE", "ACTIVE"), "not finished").otherwise("finished").alias("Investigation_status"),
    lit('').alias("victim_age_group"),
    lit('').alias("victim_sex"),
    lit('').alias("victim_race"),
    concat_ws(" ", col("City"), col("police_precinct")).alias("police_station")
)


In [None]:
detroit_normalized.show(5)
detroit_df.show(5)

+-------+------------+------------+-----------+-----------+------------------+--------------------+----------------+-----------------+--------------------+-------------+--------------------+----------------+----------+-----------+--------------+
|   city|date_occured|time_occured|date_report|time_report|    crime_category|   crime_description|        latitude|        longitude|             address|Location_type|Investigation_status|victim_age_group|victim_sex|victim_race|police_station|
+-------+------------+------------+-----------+-----------+------------------+--------------------+----------------+-----------------+--------------------+-------------+--------------------+----------------+----------+-----------+--------------+
|Detroit|  25.10.2022|       20:00|           |           |    STOLEN VEHICLE| MOTOR VEHICLE THEFT|42.4382753656854|-83.0528961345903|Conley St & E Lan...|             |        not finished|                |          |           |    Detroit 11|
|Detroit|  05.01

In [None]:
#Прибираємо аномальні значення
la_normalized = la_df.filter((col("Vict Age") >= 0) & (col("Vict Age") <= 120))


In [None]:
#LA
la_normalized = la_normalized.select(
    col("City").alias("city"),
    date_format(to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a"), "dd.MM.yyyy").alias("date_occured"),
    concat_ws(":",
              lpad((col("TIME OCC") / 100).cast("int").cast("string"), 2, "0"),
              lpad((col("TIME OCC") % 100).cast("int").cast("string"), 2, "0")
    ).alias("time_occured"),
    date_format(to_timestamp(col("Date Rptd"), "MM/dd/yyyy hh:mm:ss a"), "dd.MM.yyyy").alias("date_report"),
    lit('').alias("time_report"),
    lit('').alias("crime_category"),
    col("Crm Cd Desc").alias("crime_description"),
    col("LAT").alias("latitude"),
    col("LON").alias("longitude"),
    regexp_replace(col("LOCATION"), r"\s+\d+$", "").alias("address"),
    col("Premis Desc").alias("location_type"),
    when(col("Status Desc") == "Invest Cont", "not finished").otherwise("finished").alias("investigation_status"),
    when((col("Vict Age") >= 1) & (col("Vict Age") < 19), "<19")
    .when((col("Vict Age") >= 19) & (col("Vict Age") <= 24), "19-24")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 44), "25-44")
    .when((col("Vict Age") >= 45) & (col("Vict Age") <= 64), "45-64")
    .when((col("Vict Age") >= 65) & (col("Vict Age") <= 120), "65+")
    .otherwise("").alias("victim_age_group"),
    when(col("Vict Sex") == "M", "M")
    .when(col("Vict Sex") == "F", "F")
    .otherwise("None").alias("victim_sex"),
    when(col("Vict Descent") == "X", None)
    .when(col("Vict Descent").isin("B"), "BLACK")
    .when(col("Vict Descent").isin("W"), "WHITE")
    .when(col("Vict Descent").isin("H"), "WHITE HISPANIC")
    .when(col("Vict Descent").isin("A", "P", "C", "D", "J", "U", "V", "K", "Z"), "ASIAN / PACIFIC ISLANDER")
    .when(col("Vict Descent").isin("I"), "AMERICAN INDIAN/ALASKAN NATIVE")
    .otherwise("OTHER").alias("victim_race"),
    concat_ws(" ", col("City"), col("AREA")).alias("police_station")
)

In [None]:
la_normalized.show(5)
la_df.show(5)

+-----------+------------+------------+-----------+-----------+--------------+--------------------+--------+---------+--------------------+--------------------+--------------------+----------------+----------+-----------+--------------+
|       city|date_occured|time_occured|date_report|time_report|crime_category|   crime_description|latitude|longitude|             address|       location_type|investigation_status|victim_age_group|victim_sex|victim_race|police_station|
+-----------+------------+------------+-----------+-----------+--------------+--------------------+--------+---------+--------------------+--------------------+--------------------+----------------+----------+-----------+--------------+
|Los Angeles|  01.03.2020|       21:30| 01.03.2020|           |              |    VEHICLE - STOLEN| 34.0375|-118.3506|1900 S  LONGWOOD ...|              STREET|            finished|                |         M|      OTHER| Los Angeles 7|
|Los Angeles|  08.02.2020|       18:00| 09.02.2020| 

Чистим-чистим

In [None]:
final_df = detroit_normalized.unionByName(la_normalized)
reserv_df= final_df.select("*")


In [None]:
final_df=reserv_df.select("*")

In [None]:
final_df.groupBy("city").count().orderBy("count", ascending=False).show(truncate=False)

earliest_record_la = la_normalized.orderBy(col("date_occured").asc()).first()
print("First record in LA\n"+ str(earliest_record_la))
latest_record_la = la_normalized.orderBy(col("date_occured").desc()).first()
print("Last record in LA\n"+ str(latest_record_la))

earliest_record_detroit = detroit_normalized.orderBy(col("date_occured").asc()).first()
print("First record in Detroit\n"+ str(earliest_record_detroit))
latest_record_detroit = detroit_normalized.orderBy(col("date_occured").desc()).first()
print("Last record in Detroit\n"+ str(latest_record_detroit))


+-----------+------+
|city       |count |
+-----------+------+
|Los Angeles|986746|
|Detroit    |670997|
+-----------+------+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

Рядків \
Los Angeles - 986873 \
Detroit    - 670997 \

In [None]:
final_df=final_df.fillna('')

Збереження

In [None]:
output_path = "/content/drive/MyDrive/курс 4.1/Big data/Project/data/combined_crime_data.csv"
final_df.write.csv(output_path, header=True, mode="overwrite")


NameError: name 'final_df' is not defined

Робота з об'єднаним датасетом:


In [None]:
file_path = '/content/drive/MyDrive/курс 4.1/Big data/Project/data/combined_crime_incidents_data'
main_df = spark.read.parquet(file_path)
reserv_df= main_df.select("*")

main_df.groupBy("city").count().orderBy("count", ascending=False).show(truncate=False)

+-------------+-------+
|city         |count  |
+-------------+-------+
|New York     |2368970|
|Los Angeles  |986746 |
|San Francisco|612061 |
|Detroit      |410807 |
+-------------+-------+



In [None]:
main_df=reserv_df.select("*")
main_df.groupBy("city").count().orderBy("count", ascending=False).show(truncate=False)

+-------------+-------+
|city         |count  |
+-------------+-------+
|New York     |2368970|
|Los Angeles  |986746 |
|San Francisco|612061 |
|Detroit      |410807 |
+-------------+-------+



In [None]:
main_df = main_df.withColumn('crime_category', upper(main_df['crime_category']))
main_df = main_df.withColumn('crime_description', upper(main_df['crime_description']))

In [None]:
#Опрацювання crime_category
main_df = main_df.withColumn('crime_category', trim(main_df['crime_category']))
main_df = main_df.withColumn('crime_category',
    when(col('crime_category').like("LARCENY%"), "LARCENY/THEFT")
    .when(col('crime_category').like("MOTOR VEHICLE THEFT%"), "VEHICLE THEFT")
    .when(col('crime_category').like("STOLEN VEHICLE%"), "VEHICLE THEFT")
    .when(col('crime_category').like("AGGRAVATED ASSAULT%"), "ASSAULT")
    .when(col('crime_category').like("SEX OFFENSE%"), "SEX OFFENSES")
    .when(col('crime_category').like("SEX OFFENSES%"), "SEX OFFENSES")
    .when(col('crime_category').like("SEXUAL ASSAULT%"), "SEX OFFENSES")
    .when(col('crime_category').like("WEAPONS OFFENSES%"), "WEAPONS OFFENSE")
    .when(col('crime_category').like("WEAPONS OFFENSE%"), "WEAPONS OFFENSE")
    .when(col('crime_category').like("WEAPONS OFFENSES%"), "WEAPONS OFFENSE")
    .when(col('crime_category').like("WEAPONS OFFENCE%"), "WEAPONS OFFENSE")
    .when(col('crime_category').like("MISCELLANEOUS INVESTIGATION%"), "MISCELLANEOUS")
    .when(col('crime_category').like("SUSPICIOUS OCC%"), "SUSPICIOUS")
    .when(col('crime_category').like("TRAFFIC COLLISION%"), "TRAFFIC VIOLATION")
    .when(col('crime_category').like("TRAFFIC VIOLATION ARREST%"), "TRAFFIC VIOLATION")
    .when(col('crime_category').like("VEHICLE IMPOUNDED%"), "OTHER")
    .when(col('crime_category').like("COURTESY REPORT%"), "OTHER")
    .when(col('crime_category').like("CASE CLOSURE%"), "OTHER")
    .when(col('crime_category').like("OBSTRUCTING THE POLICE%"), "OBSTRUCTION")
    .when(col('crime_category').like("OBSTRUCTING JUDICIARY%"), "OBSTRUCTION")
    .when(col('crime_category').like("VEHICLE MISPLACED%"), "OTHER")
    .when(col('crime_category').like("NULL%"), "")
    .when(col('crime_category').like("OTHER OFFENSES%"), "OTHER")
    .when(col('crime_category').like("MISCELLANEOUS%"), "MISCELLANEOUS")
    .when(col('crime_category').like("OTHER MISCELLANEOUS%"), "MISCELLANEOUS")
    .when(col('crime_category').like("LIQUOR%"), "LIQUOR")
    .when(col('crime_category').like("LIQUOR LAWS%"), "LIQUOR")
    .when(col('crime_category').like("FORGERY AND COUNTERFEITING%"), "FORGERY AND COUNTERFEITING")
    .when(col('crime_category').like("FORGERY%"), "FORGERY AND COUNTERFEITING")
    .when(col('crime_category').like("DRUG OFFENSE%"), "DRUG OFFENSE")
    .when(col('crime_category').like("DRUG VIOLATION%"), "DRUG OFFENSE")
    .when(col('crime_category').like("DANGEROUS DRUGS%"), "DRUG OFFENSE")
    .when(col('crime_category').like("NON-CRIMINAL%"), "NON-CRIMINAL")
    .when(col('crime_category').like("CIVIL SIDEWALKS%"), "NON-CRIMINAL")
    .when(col('crime_category').like("OUIL%"), "DRUNK DRIVING")
    .when(col('crime_category').like("HUMAN TRAFFICKING%"), "HUMAN TRAFFICKING")
    .when(col('crime_category').like("HUMAN TRAFFICKING, COMMERCIAL SEX ACTS%"), "HUMAN TRAFFICKING")
    .when(col('crime_category').like("HUMAN TRAFFICKING (B), INVOLUNTARY SERVITUDE%"), "HUMAN TRAFFICKING")
    .when(col('crime_category').like("HUMAN TRAFFICKING (A), COMMERCIAL SEX ACTS%"), "HUMAN TRAFFICKING")
    .when(col('crime_category').like("HOMICIDE%"), "HOMICIDE")
    .when(col('crime_category').like("JUSTIFIABLE HOMICIDE%"), "HOMICIDE")
    .otherwise(col('crime_category'))
)
main_df = main_df.na.fill({"crime_category": ""})

In [None]:
#Перевірки
# main_df.groupBy("crime_category").count().orderBy("crime_category", ascending=False).show(400, truncate=False)
# main_df.groupBy("crime_category").count().orderBy("count", ascending=False).show(400, truncate=False)
# main_df.groupBy("crime_description").count().orderBy("crime_description", ascending=False).show(4000, truncate=False)
# main_df.groupBy("crime_description").count().orderBy("count", ascending=False).show(400, truncate=False)

In [None]:
main_df.groupBy("city").count().orderBy("count", ascending=False).show(truncate=False)

+-------------+-------+
|city         |count  |
+-------------+-------+
|New York     |2368970|
|Los Angeles  |986746 |
|San Francisco|612061 |
|Detroit      |410807 |
+-------------+-------+



In [None]:
output_path = "/content/drive/MyDrive/курс 4.1/Big data/Project/data/4_usa_city_crime_data"
main_df.write.csv(output_path, header=True, mode="overwrite")