In [1]:
import findspark
findspark.init('/opt/spark')
import pyarrow
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder\
    .master("local")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/09 12:20:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [34]:
df_crime = spark.read.option("header", True).csv("dz5/crime.csv")
df_crime.show(3)

+---------------+------------+------------------+-------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|OFFENSE_CODE_GROUP|OFFENSE_DESCRIPTION|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|     STREET|        Lat|        Long|            Location|
+---------------+------------+------------------+-------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+-----------+-----------+------------+--------------------+
|     I182070945|       00619|           Larceny| LARCENY ALL OTHERS|     D14|           808|    null|2018-09-02 13:00:00|2018|    9|     Sunday|  13|  Part One| LINCOLN ST|42.35779134|-71.13937053|(42.35779134, -71...|
|     I182070943|       01402|         Vandalism|          VANDALISM|     C11|           347|    null|2018-08-21 00:00:0

In [12]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import functions
from pyspark.sql.functions import PandasUDFType, pandas_udf, col
import numpy as np
import pandas as pd
import pyspark.pandas as ps

In [49]:
#Кумулятивную сумму количества преступлений за весь период по месяцам - с первого месяца наблюдений и до последнего, 
#используя pandas_udf и np.cumsum.
@pandas_udf("long", PandasUDFType.SCALAR)
def cum_sum(ser: pd.Series) -> pd.Series:
    return np.cumsum(ser)




In [50]:
df_crime_1 = df_crime.groupBy("YEAR", "MONTH").agg(functions.count("INCIDENT_NUMBER").alias("crime_per_month"))
df_crime_1.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- crime_per_month: long (nullable = false)



In [57]:
df_crime_1 = df_crime_1.orderBy(col("YEAR").cast("Int"),col("MONTH").cast("Int")).withColumn("csum", cum_sum(col("crime_per_month")))
df_crime_1.show(6)

+----+-----+---------------+-----+
|YEAR|MONTH|crime_per_month| csum|
+----+-----+---------------+-----+
|2015|    6|           4191| 4191|
|2015|    7|           8324|12515|
|2015|    8|           8342|20857|
|2015|    9|           8414|29271|
|2015|   10|           8308|37579|
|2015|   11|           7818|45397|
+----+-----+---------------+-----+
only showing top 6 rows



In [87]:
#Кумулятивную сумму количества преступлений по месяцам в рамках каждого года, используя группировку по году, applyInPandas и np.cumsum.
def pd_cum_sum(pdf: pd.DataFrame) -> pd.DataFrame:
    pdf["csum"] = np.cumsum(pdf["crime_per_month"])
    return pdf

In [88]:
df_crime_2 = df_crime.groupBy("YEAR", "MONTH").agg(functions.count("INCIDENT_NUMBER").alias("crime_per_month")).orderBy(col("YEAR").cast("Int"), col("MONTH").cast("Int"))
df_crime_2.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- crime_per_month: long (nullable = false)



In [93]:
df_crime_2 = df_crime_2.groupBy("YEAR").applyInPandas(pd_cum_sum , schema = "YEAR string, MONTH string, crime_per_month long, csum double")

In [94]:
df_crime_2.show()

                                                                                

+----+-----+---------------+-------+
|YEAR|MONTH|crime_per_month|   csum|
+----+-----+---------------+-------+
|2015|    6|           4191| 4191.0|
|2015|    7|           8324|12515.0|
|2015|    8|           8342|20857.0|
|2015|    9|           8414|29271.0|
|2015|   10|           8308|37579.0|
|2015|   11|           7818|45397.0|
|2015|   12|           7991|53388.0|
|2016|    1|           7835| 7835.0|
|2016|    2|           7308|15143.0|
|2016|    3|           8199|23342.0|
|2016|    4|           8101|31443.0|
|2016|    5|           8578|40021.0|
|2016|    6|           8558|48579.0|
|2016|    7|           8619|57198.0|
|2016|    8|           8938|66136.0|
|2016|    9|           8522|74658.0|
|2016|   10|           8583|83241.0|
|2016|   11|           7922|91163.0|
|2016|   12|           7951|99114.0|
|2017|    1|           7993| 7993.0|
+----+-----+---------------+-------+
only showing top 20 rows

