<a href="https://colab.research.google.com/github/raccoonback/2024-spark/blob/main/koseungbin/week3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!cp /content/drive/MyDrive/colab_notebooks/spark/spark-3.5.1-bin-hadoop3.tgz /content/
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

!cp -r /content/drive/MyDrive/colab_notebooks/sample_data /content/

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, BooleanType

raw_weather_df = spark.read.csv(
    './sample_data/weather/*.xls',
    header=True,
    sep='\t',
    encoding="cp949",
    inferSchema=True
)

raw_weather_df.show(5)

weather_df = spark.read.csv(
    './sample_data/weather/*.xls',
    header=True,
    sep='\t',
    encoding="cp949",
    schema=StructType([
        StructField('date', DateType()),
        StructField('location', StringType()),
        StructField('is_heatwave', StringType()),
        StructField('highest_feel_temperature', DoubleType()),
        StructField('highest_temperature', DoubleType()),
        StructField('average_temperature', DoubleType()),
        StructField('lowest_temperature', DoubleType()),
        StructField('average_relative_humidity', DoubleType()),
        StructField('is_report_heatwave', StringType()),
        StructField('heatwave_forecast', StringType()),
        StructField('is_tropical_night', StringType()),
        StructField('uv', StringType()),
    ])
)


weather_df.show(5)


+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|      일시|       지점|폭염여부(O/X)|최고체감온도(°C)|최고기온(°C)|평균기온(°C)|최저기온(°C)|평균상대습도(%)|폭염특보(O/X)|폭염영향예보(단계)|열대야(O/X)|자외선지수(단계)|
+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|2023-08-01|북강릉(104)|            X|            31.8|        31.4|        27.0|        22.8|           78.6|            O|              경고|          O|            낮음|
|2023-08-01|  동해(106)|            X|            30.6|        30.0|        26.2|        22.8|           86.5|            O|              주의|          O|            낮음|
|2023-08-01|  서울(108)|            O|            34.6|        34.2|        29.9|        25.5|           71.6|            O|              경고|          O|        매우높음|
|2023-08-01|  인천(112)|            O| 

매월(month)마다 폭염특보가 가장 많이 발효된 지역(ex. 북강릉(104))을 찾아봅시다.

각 월마다 폭염특보가 발효된 카운트가 동일하다면, 지역명이 내림차순했을때 가장 첫 번째 지역을 필터링합니다.
(ex. 8월에 폭염특보가 "강원", "대구", "부산"이 각각 6번 발효되었다면, "강원"으로 지정합니다.)

In [None]:
from pyspark.sql.functions import col, avg, count, year, datediff, expr, min, max, to_date, month

heatwave_df = weather_df.withColumn('month', month('date')) \
  .filter((col('is_report_heatwave') == 'O')) \
  .groupBy('month', 'location') \
  .agg(count('location').alias('heatwave_count')) \
  .orderBy(col('heatwave_count').desc())

heatwave_df.show()

max_heatwave_df = heatwave_df.groupBy('month') \
  .agg(max('heatwave_count').alias('max_heatwave_count'))

heatwave_df.alias("hw") \
  .join(
        max_heatwave_df.alias("mhw"), \
        (col('hw.month') == col('mhw.month')) & (col('hw.heatwave_count') == col('mhw.max_heatwave_count')) \
  ) \
  .select(
      col('hw.month'),
      col('hw.location'),
      col('hw.heatwave_count')
  ) \
  .show()

max_heatwave_df = heatwave_df.groupBy('month') \
  .agg(max('heatwave_count').alias('max_heatwave_count'))

heatwave_df.alias("hw") \
  .join(
        max_heatwave_df.alias("mhw"), \
        (col('hw.month') == col('mhw.month')) & (col('hw.heatwave_count') == col('mhw.max_heatwave_count')) \
  ) \
  .select(
      col('hw.month'),
      col('hw.location'),
      col('hw.heatwave_count')
  ) \
  .groupBy('month', 'heatwave_count') \
  .agg(min('location').alias('location')) \
  .show()

+-----+-----------+--------------+
|month|   location|heatwave_count|
+-----+-----------+--------------+
|    8|  거제(294)|            26|
|    8|  부산(159)|            26|
|    8|  창원(155)|            26|
|    8|  합천(285)|            26|
|    8|  완도(170)|            26|
|    8|김해시(253)|            26|
|    8|  경산(827)|            26|
|    8|  보성(732)|            25|
|    8|  하동(932)|            25|
|    8|  광주(156)|            25|
|    8|  장흥(260)|            25|
|    8|  대구(143)|            25|
|    8|  나주(710)|            25|
|    8|  화순(741)|            25|
|    8|영광군(252)|            24|
|    8|  곡성(768)|            24|
|    8|순천시(712)|            24|
|    8|  구례(709)|            24|
|    8|강진군(259)|            24|
|    8|  고흥(262)|            24|
+-----+-----------+--------------+
only showing top 20 rows

+-----+-----------+--------------+
|month|   location|heatwave_count|
+-----+-----------+--------------+
|    6| 북춘천(93)|             6|
|    6|  홍천(212)|             6|
|    9| 

[태용님 문제]


In [None]:
from pyspark.sql.functions import col, avg, count, year, datediff, expr, min, max, to_date, month, when, lit


raw_weather_df.show()
weather_df.show()

# 1번 문제
avg_weather_df = weather_df.withColumn(
    'uv_numericalization',
    when(col('uv') == '낮음', 1.5)
    .when(col('uv') == '보통', 4.5)
    .when(col('uv') == '높음', 7)
    .when(col('uv') == '매우높음', 9.5)
    .when(col('uv') == '위험', 13)
    .otherwise(0)
  ) \
  .groupBy('date') \
  .agg(
      avg('highest_temperature').alias('avg_max_temperature'),
      avg('average_relative_humidity').alias('avg_relative_humidity'),
      avg('uv_numericalization').alias('avg_uv_numericalization')
  ) \
  .orderBy('date') \

avg_weather_df.show()

# 2번 문제

high_temperature_weather_than_avg_df = weather_df.join(
    avg_weather_df,
    weather_df.date == avg_weather_df.date
  ) \
  .filter((col('highest_temperature') > col('avg_max_temperature')));

low_temperature_weather_than_avg_df = weather_df.join(
    avg_weather_df,
    weather_df.date == avg_weather_df.date
  ) \
  .filter((col('highest_temperature') < col('avg_max_temperature')));


high_temperature_weather_than_avg_df.show()
low_temperature_weather_than_avg_df.show()

# 3번 문제
# ??

# 4번 문제
high_humidity_weather_than_avg_df = weather_df.join(
    avg_weather_df,
    weather_df.date == avg_weather_df.date
  ) \
  .filter((col('average_relative_humidity') > col('avg_relative_humidity')));

low_humidity_weather_than_avg_df = weather_df.join(
    avg_weather_df,
    weather_df.date == avg_weather_df.date
  ) \
  .filter((col('average_relative_humidity') < col('avg_relative_humidity')));

high_humidity_weather_than_avg_df.show()
low_humidity_weather_than_avg_df.show()

# 5번 문제
# ??

+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|      일시|       지점|폭염여부(O/X)|최고체감온도(°C)|최고기온(°C)|평균기온(°C)|최저기온(°C)|평균상대습도(%)|폭염특보(O/X)|폭염영향예보(단계)|열대야(O/X)|자외선지수(단계)|
+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|2023-08-01|북강릉(104)|            X|            31.8|        31.4|        27.0|        22.8|           78.6|            O|              경고|          O|            낮음|
|2023-08-01|  동해(106)|            X|            30.6|        30.0|        26.2|        22.8|           86.5|            O|              주의|          O|            낮음|
|2023-08-01|  서울(108)|            O|            34.6|        34.2|        29.9|        25.5|           71.6|            O|              경고|          O|        매우높음|
|2023-08-01|  인천(112)|            O| 

[수지님 문제]

In [None]:
from pyspark.sql.functions import col, avg, count, year, datediff, expr, min, max, to_date, month, when, lit, count_if


raw_weather_df.show()
weather_df.show()

weather_df.filter((col('location') == '서울(108)')) \
  .withColumn('month', month('date')) \
  .groupBy('month') \
  .agg(
      avg('highest_temperature').alias('avg_max_temperature'),
      avg('average_temperature').alias('avg_temperature'),
      avg('lowest_temperature').alias('av_min_temperature'),
      count_if(col('is_report_heatwave') == 'O').alias('heatwave_report_count')
  ) \
  .orderBy(col('heatwave_report_count').desc()) \
  .show()

+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|      일시|       지점|폭염여부(O/X)|최고체감온도(°C)|최고기온(°C)|평균기온(°C)|최저기온(°C)|평균상대습도(%)|폭염특보(O/X)|폭염영향예보(단계)|열대야(O/X)|자외선지수(단계)|
+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|2023-08-01|북강릉(104)|            X|            31.8|        31.4|        27.0|        22.8|           78.6|            O|              경고|          O|            낮음|
|2023-08-01|  동해(106)|            X|            30.6|        30.0|        26.2|        22.8|           86.5|            O|              주의|          O|            낮음|
|2023-08-01|  서울(108)|            O|            34.6|        34.2|        29.9|        25.5|           71.6|            O|              경고|          O|        매우높음|
|2023-08-01|  인천(112)|            O| 

[준영님 문제]

In [None]:
from pyspark.sql.functions import col, avg, count, year, datediff, expr, min, max, to_date, month, when, lit, count_if, udf


raw_weather_df.show()
weather_df.show()


def parse_to_region(location):
    regions = {
      "서울특별시": [108],
      "인천광역시": [112],
      "부산광역시": [159],
      "대구광역시": [143],
      "대전광역시": [133],
      "울산광역시": [152],
      "세종특별자치시": [239],
      "경기도": [
          119, 202, 203, 551, 549, 434, 433, 437, 438, 441, 444, 445,
          505, 504, 516, 532, 540, 541, 545, 546, 548, 550, 555, 556,
          565, 569, 571, 572, 590, 598, 601, 602, 603, 604
      ],
      "강원도": [
          104, 106, 114, 115, 121, 211, 212, 216, 217, 526, 555, 556,
          606, 90, 876, 93
      ],
      "충청북도": [127, 131, 221, 226, 601, 602, 603, 604],
      "충청남도": [
          129, 235, 236, 238, 232, 612, 615, 616, 618, 619, 627, 628,
          634, 636
      ],
      "전라북도": [140, 146, 702, 245, 247, 248, 254, 734, 737],
      "전라남도": [
          165, 168, 169, 170, 172, 712, 713, 730, 731, 732, 706, 709,
          710, 741, 754, 768, 789, 259, 260, 261, 262
      ],
      "경상북도": [
          135, 136, 137, 138, 271, 272, 273, 276, 277, 278, 279, 281,
          283, 284, 285, 288, 289, 294, 295, 801, 810, 812, 813, 815,
          822, 823, 825, 827
      ],
      "경상남도": [155, 162, 192, 253, 257, 264, 263, 268, 920],
      "제주특별자치도": [184]
    }

    for region, region_codes in regions.items():
        for region_code in region_codes:
          if str(region_code) in location:
            return region

    return '그 외..'


parse_to_region_udf = udf(parse_to_region, StringType())

weather_df.withColumn(
      'region',
      parse_to_region_udf(col('location'))
  ) \
  .groupBy('region') \
  .agg(avg('average_temperature').alias('avg_temperature')) \
  .orderBy(col('avg_temperature').desc()) \
  .show()

+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|      일시|       지점|폭염여부(O/X)|최고체감온도(°C)|최고기온(°C)|평균기온(°C)|최저기온(°C)|평균상대습도(%)|폭염특보(O/X)|폭염영향예보(단계)|열대야(O/X)|자외선지수(단계)|
+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|2023-08-01|북강릉(104)|            X|            31.8|        31.4|        27.0|        22.8|           78.6|            O|              경고|          O|            낮음|
|2023-08-01|  동해(106)|            X|            30.6|        30.0|        26.2|        22.8|           86.5|            O|              주의|          O|            낮음|
|2023-08-01|  서울(108)|            O|            34.6|        34.2|        29.9|        25.5|           71.6|            O|              경고|          O|        매우높음|
|2023-08-01|  인천(112)|            O| 

[성온님 문제]

In [None]:
from pyspark.sql.functions import col, avg, count, year, datediff, expr, min, max, to_date, month, when, lit, count_if, udf


raw_weather_df.show()
weather_df.show()

def is_none(field):
    return field is not None

is_none_udf = udf(is_none, BooleanType())

pure_weather_df = weather_df.filter(
      ((is_none_udf(col('date')))
      & (is_none_udf(col('location')))
      & (is_none_udf(col('is_heatwave')))
      & (is_none_udf(col('highest_feel_temperature')))
      & (is_none_udf(col('highest_temperature')))
      & (is_none_udf(col('average_temperature')))
      & (is_none_udf(col('lowest_temperature')))
      & (is_none_udf(col('average_relative_humidity')))
      & (is_none_udf(col('is_report_heatwave')))
      & (is_none_udf(col('is_tropical_night')))
      & (is_none_udf(col('uv'))))
  )

# 1번 문제
pure_weather_df.groupBy('is_heatwave') \
  .agg(
      count('is_heatwave'),
      min('highest_feel_temperature'),
      avg('highest_feel_temperature'),
      max('highest_feel_temperature')
    ) \
  .show()

# 2번 문제
pure_weather_df.groupBy('is_report_heatwave') \
  .agg(
      count('is_report_heatwave'),
      min('highest_feel_temperature'),
      avg('highest_feel_temperature'),
      max('highest_feel_temperature')
    ) \
  .show()

# 3번 문제
pure_weather_df.groupBy('heatwave_forecast') \
  .agg(
      count('heatwave_forecast'),
      min('highest_feel_temperature'),
      avg('highest_feel_temperature'),
      max('highest_feel_temperature')
    ) \
  .show()

# 4번 문제
pure_weather_df.groupBy('uv') \
  .agg(
      count('uv'),
      min('average_temperature'),
      avg('average_temperature'),
      max('average_temperature')
    ) \
  .show()

+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|      일시|       지점|폭염여부(O/X)|최고체감온도(°C)|최고기온(°C)|평균기온(°C)|최저기온(°C)|평균상대습도(%)|폭염특보(O/X)|폭염영향예보(단계)|열대야(O/X)|자외선지수(단계)|
+----------+-----------+-------------+----------------+------------+------------+------------+---------------+-------------+------------------+-----------+----------------+
|2023-08-01|북강릉(104)|            X|            31.8|        31.4|        27.0|        22.8|           78.6|            O|              경고|          O|            낮음|
|2023-08-01|  동해(106)|            X|            30.6|        30.0|        26.2|        22.8|           86.5|            O|              주의|          O|            낮음|
|2023-08-01|  서울(108)|            O|            34.6|        34.2|        29.9|        25.5|           71.6|            O|              경고|          O|        매우높음|
|2023-08-01|  인천(112)|            O| 