In [2]:
import findspark
findspark.init('/opt/spark')

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .getOrCreate()

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/11 08:16:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [1]:
!hdfs dfs -put crime.csv /tmp/
!hdfs dfs -put offense_codes.csv /tmp/

2025-01-11 08:15:54,838 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `/tmp/crime.csv': File exists
2025-01-11 08:15:55,828 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `/tmp/offense_codes.csv': File exists


In [4]:
crimeFacts = (spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/tmp/crime.csv")
    .cache())

                                                                                

In [5]:
crimeFacts.show()

[Stage 2:>                                                          (0 + 1) / 1]

+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------------+-----------+------------+--------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP| OFFENSE_DESCRIPTION|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|           STREET|        Lat|        Long|            Location|
+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------------+-----------+------------+--------------------+
|     I182070945|         619|             Larceny|  LARCENY ALL OTHERS|     D14|           808|    null|2018-09-02 13:00:00|2018|    9|     Sunday|  13|  Part One|       LINCOLN ST|42.35779134|-71.13937053|(42.35779134, -71...|
|     I182070943|        1402|           Vandalism|           VANDALISM|     C11|   

                                                                                

In [6]:
offenseCodes = spark.read.option("header", "true").option("inferSchema", "true").csv("/tmp/offense_codes.csv")

In [7]:
offenseCodes.show()

+----+--------------------+
|CODE|                NAME|
+----+--------------------+
| 612|LARCENY PURSE SNA...|
| 613| LARCENY SHOPLIFTING|
| 615|LARCENY THEFT OF ...|
|1731|              INCEST|
|3111|LICENSE PREMISE V...|
|2646|LIQUOR - DRINKING...|
|2204|LIQUOR LAW VIOLATION|
|3810|M/V ACCIDENT - IN...|
|3801|M/V ACCIDENT - OTHER|
|3807|M/V ACCIDENT - OT...|
|3803|M/V ACCIDENT - PE...|
|3805|M/V ACCIDENT - PO...|
|3802|M/V ACCIDENT - PR...|
|3205|   M/V PLATES - LOST|
| 123|MANSLAUGHTER - NO...|
| 121|MANSLAUGHTER - VE...|
|3501|      MISSING PERSON|
|3502|MISSING PERSON - ...|
|3503|MISSING PERSON - ...|
| 111|MURDER, NON-NEGLI...|
+----+--------------------+
only showing top 20 rows



In [8]:
from pyspark.sql.functions import col, count, mean, month, median, year

In [9]:
crimeFacts = crimeFacts.withColumn("OCCURRED_ON_DATE", crimeFacts["OCCURRED_ON_DATE"].cast("timestamp"))

In [10]:
crimeFacts = (
    crimeFacts.withColumn("year", year(col("OCCURRED_ON_DATE")))
    .withColumn("month", month(col("OCCURRED_ON_DATE")))
)

In [11]:
crimes_total = crimeFacts.groupBy("DISTRICT").count().withColumnRenamed("count", "crimes_total")
crimes_total.show()



+--------+------------+
|DISTRICT|crimes_total|
+--------+------------+
|      C6|       23460|
|    null|        1765|
|      B2|       49945|
|     C11|       42530|
|     E13|       17536|
|      B3|       35442|
|      E5|       13239|
|     A15|        6505|
|      A7|       13544|
|     D14|       20127|
|      D4|       41915|
|     E18|       17348|
|      A1|       35717|
+--------+------------+



                                                                                

In [12]:
monthly_crimes = (
    crimeFacts.withColumn("month", month(col("OCCURRED_ON_DATE")))
    .groupBy("DISTRICT", "month")
    .count()
    .groupBy("DISTRICT")
    .agg(median(col("count"))
    .alias("crimes_monthly"))
)
monthly_crimes.show()

+--------+--------------+
|DISTRICT|crimes_monthly|
+--------+--------------+
|      C6|        1870.0|
|    null|         109.5|
|      B2|        3986.0|
|     C11|        3284.0|
|     E13|        1368.5|
|      B3|        2764.0|
|      E5|        1043.5|
|     A15|         499.0|
|      A7|        1092.0|
|     D14|        1607.5|
|      D4|        3297.5|
|     E18|        1313.0|
|      A1|        2775.0|
+--------+--------------+



In [13]:
coordinates = crimeFacts.groupBy("DISTRICT").agg(mean(col("Lat")), mean(col("Long")))
coordinates.show()

+--------+------------------+------------------+
|DISTRICT|          avg(Lat)|         avg(Long)|
+--------+------------------+------------------+
|      C6| 42.21212258445509|-70.85561011772268|
|    null| 25.23950519369344|-43.44877438704257|
|      B2| 42.31600367732632| -71.0756993065438|
|     C11| 42.29263740899965|-71.05125995734306|
|     E13|42.309803655709324|-71.09800478878299|
|      B3| 42.28305944520085|-71.07894914185519|
|      E5|42.197969994470235|-71.00440862434776|
|     A15|42.179155250910874|-70.74472508958492|
|      A7| 42.36070260499406| -71.0039483303988|
|     D14|  42.3435072451093|-71.13125461726422|
|      D4|42.341242517909265|-71.07725024947149|
|     E18|  42.2626806112253| -71.1189199875768|
|      A1| 42.33123077259858|-71.01991881361955|
+--------+------------------+------------------+



In [14]:
district_metrics = (
    crimes_total.join(monthly_crimes, on="District", how="left_outer")
    .join(coordinates, on="District", how="left_outer")
    .select(
        "District",
        "crimes_total",
        "crimes_monthly",
        col("avg(Lat)").alias("lat"),
        col("avg(Long)").alias("lng"))
)

district_metrics.show(truncate=False)

+--------+------------+--------------+------------------+------------------+
|District|crimes_total|crimes_monthly|lat               |lng               |
+--------+------------+--------------+------------------+------------------+
|null    |1765        |null          |null              |null              |
|A1      |35717       |2775.0        |42.33123077259858 |-71.01991881361955|
|A15     |6505        |499.0         |42.179155250910874|-70.74472508958492|
|A7      |13544       |1092.0        |42.36070260499406 |-71.0039483303988 |
|B2      |49945       |3986.0        |42.31600367732632 |-71.0756993065438 |
|B3      |35442       |2764.0        |42.28305944520085 |-71.07894914185519|
|C11     |42530       |3284.0        |42.29263740899965 |-71.05125995734306|
|C6      |23460       |1870.0        |42.21212258445509 |-70.85561011772268|
|D14     |20127       |1607.5        |42.3435072451093  |-71.13125461726422|
|D4      |41915       |3297.5        |42.341242517909265|-71.07725024947149|

In [17]:
!pip install pyarrow
!pip install numpy
import numpy as np
from pyspark.sql.window import Window
import pyspark.sql.functions as F

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


In [21]:
@F.pandas_udf('int')
def cumulative_sum(s):
    return s.apply(lambda x: np.cumsum(x))

cumsum_all_periods = (
    crimeFacts.groupBy(F.month('OCCURRED_ON_DATE'))
      .agg(F.count('*').alias('count'))
      .withColumn('cumulative_count', cumulative_sum('count'))
)

cumsum_all_periods.show()

25/01/11 08:18:25 WARN TaskSetManager: Lost task 0.0 in stage 36.0 (TID 39) (worker1 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/worker.py", line 814, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/worker.py", line 650, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/worker.py", line 814, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/worker.py", line 650, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/worker.py", line 375, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/hadoop-jupyter/nm-local-dir/usercache/jupyter/appcache/application_1736583312854_0001/container_1736583312854_0001_01_000002/pyspark.zip/pyspark/cloudpickle/cloudpickle.py", line 649, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'numpy'


In [22]:
cumsum_by_year = (
    crimeFacts.groupBy(F.year('OCCURRED_ON_DATE'), F.month('OCCURRED_ON_DATE'))
      .agg(F.count('*').alias('count'))
      .groupby('year(OCCURRED_ON_DATE)')
      .applyInPandas(cumulative_sum, schema='month int, count int, cumulative_count int')
)

ValueError: Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).