# завантаження даних і відкриття

In [1]:
import os

os.environ["PYTHONUNBUFFERED"] = "1"
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/4.0.0/libexec"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["SPARK_HOME"] + "/bin:" + os.environ["PATH"]

In [2]:
os.environ["PATH"]

'/opt/homebrew/opt/openjdk@17/bin:/opt/homebrew/Cellar/apache-spark/4.0.0/libexec/bin:/Users/aleksejkitajskij/Library/Mobile Documents/com~apple~CloudDocs/Programing/GoIT_projects/data enginner/goit_spark/venv/bin:/Users/aleksejkitajskij/Desktop/goit_spark/venv/bin:/usr/bin:/bin:/usr/sbin:/sbin:/usr/local/bin'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, min, max, avg, unix_timestamp, count_if, round, when
from pyspark.sql.types import TimestampType, IntegerType

In [4]:
# Створюємо сесію Spark
spark = SparkSession.builder.appName("MyGoitSparkSandbox").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/16 11:23:18 WARN Utils: Your hostname, Aleksejs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.226 instead (on interface en0)
25/08/16 11:23:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/16 11:23:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/16 11:23:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/08/16 11:23:19 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
# Завантажуємо датасет
nuek_df = spark.read.csv('data/nuek-vuh3.csv', header=True)

In [6]:
nuek_df.show(10)

+-----------+-------+---------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+-------------+----------------------+--------------------+--------------------+----+-------------------+---------+------------+----+-----------------+--------+--------------+--------+---------------+----------------+---------+------------------------------+------------------------+-------------------+---------------------------------+-------------+--------------------+--------------------+--------------------+---------------------------+
|call_number|unit_id|incident_number|       call_type|           call_date|          watch_date|       received_dttm|          entry_dttm|       dispatch_dttm|       response_dttm|       on_scene_dttm|transport_dttm|hospital_dttm|call_final_disposition|      available_dttm|             address|city|zipcode_of_incident|battalion|station_area

25/08/16 11:23:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## фільтрація

In [7]:
nuek_df.select('call_type').where(col("call_type").isNotNull()).distinct().count()

16

In [8]:
# Створюємо тимчасове представлення для виконання SQL-запитів
nuek_df.createTempView("nuek_view")
# Виконуємо SQL-маніпуляції
df = spark.sql("""SELECT COUNT(DISTINCT call_type) as count
                    FROM nuek_view 
                    WHERE call_type IS NOT NULL""")

In [9]:
df.show()

+-----+
|count|
+-----+
|   16|
+-----+



In [10]:
# Витягуємо дані колонки з датафрейму
print(df.collect(), type(df.collect()))
# Дотягуємось до самого значення за номером рядка та іменем колонки
print(df.collect()[0]['count'])
# або за номером рядка та номером колонки
print(df.collect()[0][0])
# first() повертає перший рядок датафрейму
print(df.first())

[Row(count=16)] <class 'list'>
16
16
Row(count=16)


## агрегація

In [11]:
nuek_df.groupBy('call_type').count().orderBy(col('count').desc()).limit(3).show()

+----------------+-----+
|       call_type|count|
+----------------+-----+
|          Alarms|  584|
|Medical Incident|  206|
|           Other|  128|
+----------------+-----+



In [12]:
spark.sql("""SELECT call_type, COUNT(call_type) as count 
                    FROM nuek_view 
                    GROUP BY call_type 
                    ORDER BY count DESC
                    LIMIT 3""").show()

+----------------+-----+
|       call_type|count|
+----------------+-----+
|          Alarms|  584|
|Medical Incident|  206|
|           Other|  128|
+----------------+-----+



In [13]:
df_times = (nuek_df.select("received_dttm", "response_dttm")
    .withColumn("received_dttm", col("received_dttm").cast(TimestampType()))
    .withColumn("response_dttm", col("response_dttm").cast(TimestampType()))
    .withColumn("delay_s", unix_timestamp(col("response_dttm")) - unix_timestamp(col("received_dttm"))))

In [14]:
df_times.groupby().agg(
    count("*").alias("total_"),
    count_if(col("delay_s").isNotNull()).alias("delayed_not_null"),
    count_if(col("delay_s").isNull()).alias("delayed_null"),
    min(col("delay_s")).alias("min_delay"),
    max(col("delay_s")).alias("max_delay"),
    avg(
        col("delay_s")
    ).alias("avg_delay"),
    avg(
        when(col("delay_s").isNotNull(), col("delay_s")).otherwise(0)
    ).alias("avg_zeroed"),
    avg(
        when(col("delay_s").isNotNull(), col("delay_s")).otherwise(220.0615)
    ).alias("avg_replaced")
).show(10)

+------+----------------+------------+---------+---------+-----------------+----------+------------------+
|total_|delayed_not_null|delayed_null|min_delay|max_delay|        avg_delay|avg_zeroed|      avg_replaced|
+------+----------------+------------+---------+---------+-----------------+----------+------------------+
|  1000|             747|         253|        0|     1646|220.0615796519411|   164.386|220.06155950000144|
+------+----------------+------------+---------+---------+-----------------+----------+------------------+



## join

In [15]:
from pyspark.sql.functions import collect_list, col, array_union, array_distinct

In [16]:
zip_station = (nuek_df.select('zipcode_of_incident', 'station_area')
    .withColumnRenamed("station_area", "station_area_1"))

(nuek_df.join(zip_station, nuek_df.zipcode_of_incident == zip_station.zipcode_of_incident, 'inner')
      .drop(zip_station.zipcode_of_incident)
      .select('zipcode_of_incident', 'station_area', 'station_area_1')
      .dropDuplicates(['station_area', 'station_area_1'])
      .dropna()
      .where(col('station_area') != col('station_area_1'))
      .groupBy('zipcode_of_incident')
      .agg(
          collect_list("station_area").alias("station_area_list"),
          collect_list("station_area_1").alias("station_area_list_1")
          )
      .withColumn("station_area_united", array_union('station_area_list', 'station_area_list_1'))
      .withColumn("station_area_distinct", array_distinct('station_area_united'))
      .show())

25/08/16 11:23:25 WARN Column: Constructing trivially true equals predicate, 'zipcode_of_incident == zipcode_of_incident'. Perhaps you need to use aliases.


+-------------------+--------------------+--------------------+--------------------+---------------------+
|zipcode_of_incident|   station_area_list| station_area_list_1| station_area_united|station_area_distinct|
+-------------------+--------------------+--------------------+--------------------+---------------------+
|              94102|[01, 01, 01, 03, ...|[03, 05, 41, 01, ...|[01, 03, 05, 36, 41]| [01, 03, 05, 36, 41]|
|              94107|[01, 01, 01, 04, ...|[04, 25, 37, 01, ...|[01, 04, 08, 25, ...| [01, 04, 08, 25, ...|
|              94131|[11, 11, 12, 12, ...|[12, 20, 11, 20, ...|    [11, 12, 20, 26]|     [11, 12, 20, 26]|
|              94112|[15, 15, 15, 32, ...|[32, 33, 43, 15, ...|    [15, 32, 33, 43]|     [15, 32, 33, 43]|
|              94103|[01, 01, 01, 01, ...|[06, 07, 08, 29, ...|[01, 06, 07, 08, ...| [01, 06, 07, 08, ...|
|              94118|    [10, 21, 31, 31]|    [31, 31, 10, 21]|        [10, 21, 31]|         [10, 21, 31]|
|              94117|[05, 05, 05, 06,

In [17]:
# Закриваємо сесію Spark
spark.stop()