<h1> 저수준의 RDD API 패턴과 고수준 RDD API 패턴

<h3> 저수준 DSL, 데이터프레임 API 사용

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
# RDD를 이용한 예제
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
sc = spark.sparkContext
# (name, age) 형태의 튜플로 된 RDD를 생성한다.
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)])

# 집계와 평균을 위한 람다 표현식과 함께 map, reduceByKey 트랜스포메이션 사용용
ageRDD = (dataRDD
          .map(lambda x: (x[0], (x[1], 1)))
          .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
          .map(lambda x: (x[0], x[1][0] / x[1][1]))
          )
print(ageRDD.collect())
spark.stop()

[('Jules', 30.0), ('TD', 35.0), ('Brooke', 22.5), ('Denny', 31.0)]


<h3>고수준 DSL, 데이터프레임 API 사용

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = SparkSession.builder.appName("DataFrame").getOrCreate()

data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ['name', 'age'])
avg_df = data_df.groupBy('name').agg(avg('age'))
avg_df.show()
input("Press Enter to terminate...")
spark.stop()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Denny|    31.0|
| Jules|    30.0|
|    TD|    35.0|
+------+--------+



<h1> 스키마 정의 방법

<h3> 프로그래밍 스타일

In [5]:
from pyspark.sql.types import *
schema = StructType([StructField('author', StringType(), False),
                     StructField('title',StringType(), False),
                     StructField('pages',IntegerType(),False)])

In [6]:
schema

StructType([StructField('author', StringType(), False), StructField('title', StringType(), False), StructField('pages', IntegerType(), False)])

<h3> DDL 사용

In [7]:
schema = 'author STRING, title STRING, pages INT'

In [8]:
schema

'author STRING, title STRING, pages INT'

<h1> 로우


In [9]:
from pyspark.sql import Row
from pyspark.sql import SparkSession

import findspark
findspark.init()

blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter","LinkedIn"])
blog_row[1]

'Reynold'

<h3> 빠른 탐색을 위한 DataFrame으로 변경

In [10]:
spark = SparkSession \
        .builder \
        .appName('DataFrame') \
        .getOrCreate()
rows = [Row("Matei Zaharia","CA"),Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows,["Authors","State"])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



<h1> 샌프란 시스코 예제

In [11]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName('San Francisco') \
        .getOrCreate()

#프로그래밍적인 방법으로 스키마 정의
fire_schema = StructType([StructField('CallNumber', IntegerType(),True),
                           StructField('UnitID', StringType(), True),
                           StructField('IncidentNumber',IntegerType(),True),
                           StructField('CallType',StringType(),True),
                           StructField('CallDate',StringType(),True),
                           StructField('WatchDate',StringType(),True),
                           StructField('CallFinalDisposition',StringType(),True),
                           StructField('AvailableDtTm',StringType(),True),
                           StructField('Address',StringType(),True),
                           StructField('City',StringType(),True),
                           StructField('Zipcode',IntegerType(),True),
                           StructField('Battalion',StringType(),True),
                           StructField('StationArea',StringType(),True),
                           StructField('Box',StringType(),True),
                           StructField('OriginalPriority',StringType(),True),
                           StructField('Priority',StringType(),True),
                           StructField('FinalPriority',IntegerType(),True),
                           StructField('ALSUnit',BooleanType(),True),
                           StructField('CallTypeGroup',StringType(),True),
                           StructField('NumAlarms',IntegerType(),True),
                           StructField('UnitType',StringType(),True),
                           StructField('UnitSequenceInCallDispatch',IntegerType(),True),
                           StructField('FirePreventionDistrict',StringType(),True),
                           StructField('SupervisorDistrict',StringType(),True),
                           StructField('Neighborhood',StringType(),True),
                           StructField('Location',StringType(),True),
                           StructField('RowID',StringType(),True),
                           StructField('Delay',FloatType(),True)
                           ])

#DataFrameReader 인터페이스로 CSV 파일을 읽는다.
sf_fire_file = "sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
fire_df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

<h3> 트랜스포메이션과 액션

<h5> 프로젝션과 필터

In [12]:
from pyspark.sql.functions import *

few_fire_df = (fire_df
               .select('IncidentNumber','AvailableDtTm','CallType')
               .where(col('CallType')!='Medical Incident'))
few_fire_df.show(5,truncate=False) # truncate=False	긴 문자열도 줄이지 않고 전부 출력

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



<h5> 화재 신고로 기록된 CallType 종류 개수

In [13]:
from pyspark.sql.functions import *
(fire_df
 .select('CallType')
 .where(col('CallType').isNotNull())
 .agg(countDistinct('CallType').alias('DistinctCallTypes'))
 .show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



<h5> 신고 타입의 목록

In [14]:
from pyspark.sql.functions import *
(fire_df
 .select('CallType')
 .where(col('CallType').isNotNull())
 .distinct()
 .show(10),False)

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|  Aircraft Emergency|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
+--------------------+
only showing top 10 rows



(None, False)

<h5> 컬럼의 이름 변경 및 추가 삭제

In [15]:
new_fire_df = fire_df.withColumnRenamed('Delay','ResponseDelayedinMins')
(new_fire_df
 .select('ResponseDelayedinMins')
 .where(col('ResponseDelayedinMins')>5)
 .show(5,False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



<h5> 문자열을 날짜 타입으로 변경

In [16]:
fire_ts_df = (new_fire_df
              .withColumn('IncidentDate', to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              .drop('CallDate')
              .withColumn('OnWatchDate', to_timestamp(col('WatchDate'),'MM/dd/yyyy'))
              .drop('WatchDate')
              .withColumn('AvailableDtTS', to_timestamp(col('AvailableDtTm'), 'MM/dd/yyyy hh:mm:ss a'))
              .drop('AvailableDtTm'))


(fire_ts_df
 .select('IncidentDate','OnWatchDate','AvailableDtTs')
 .show(5,False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTs      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [18]:
(fire_ts_df
 .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



<h5>집계함수    

In [21]:
fire_ts_df.show(5)

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+------

In [None]:
(fire_ts_df
 .select('CallType')
 .where(col('CallType').isNotNull())
 .groupBy('CallType')
 .count()
 .orderBy(desc('count'))
 .show(10,False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [25]:
import pyspark.sql.functions as F
(fire_ts_df
 .select(F.sum('NumAlarms'),F.avg('ResponseDelayedinMins'),
         F.min('ResponseDelayedinMins'), F.max('ResponseDelayedinMins'))
 .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



<h3> 종단 간 데이터프레임 예제

Q1. 2018년에 왔던 신고 전화들의 모든 유형은 어떤것이 있는가?

In [30]:
(fire_ts_df
 .select('CallType')
 .where(year(col('IncidentDate'))==2018)
 .show())

+-----------------+
|         CallType|
+-----------------+
|   Structure Fire|
|           HazMat|
|           Alarms|
| Medical Incident|
| Medical Incident|
|Electrical Hazard|
| Medical Incident|
| Medical Incident|
| Medical Incident|
| Medical Incident|
| Medical Incident|
| Medical Incident|
|           Alarms|
|           Alarms|
|           Alarms|
| Medical Incident|
|   Structure Fire|
|Traffic Collision|
| Medical Incident|
| Medical Incident|
+-----------------+
only showing top 20 rows



Q2. 2018년에 신고 전화가 가장 많았던 달은 언제인가?

In [40]:
(fire_ts_df
 .select('IncidentDate')
 .where(year(col('IncidentDate'))==2018)
 .groupBy(month(col('IncidentDate')).alias('Month'))
 .count()
 .orderBy(desc('count'))
 .show())

+-----+-----+
|Month|count|
+-----+-----+
|   10| 1068|
|    5| 1047|
|    3| 1029|
|    8| 1021|
|    1| 1007|
|    6|  974|
|    7|  974|
|    9|  951|
|    4|  947|
|    2|  919|
|   11|  199|
+-----+-----+



Q3. 2018년에 가장 많은 신고가 들어온 샌프란시스코 지역은 어디인가?

In [66]:
(fire_ts_df
 .select('StationArea')
 .where((year(col('IncidentDate'))==2018) & (col('City')=='San Francisco'))
 .groupBy('StationArea')
 .count()
 .orderBy(desc('count'))
 .show())

+-----------+-----+
|StationArea|count|
+-----------+-----+
|         03| 1193|
|         01|  925|
|         36|  697|
|         07|  517|
|         13|  416|
|         06|  384|
|         17|  312|
|         05|  305|
|         28|  282|
|         08|  246|
|         35|  245|
|         41|  231|
|         11|  221|
|         21|  210|
|         02|  207|
|         43|  199|
|         10|  195|
|         38|  194|
|         31|  191|
|         32|  186|
+-----------+-----+
only showing top 20 rows



Q4. 2018년에 가장 응답 시간이 늦었던 지역은 어디인가?

In [69]:
import pyspark.sql.functions as F

(fire_ts_df
 .select('StationArea','ResponseDelayedinMins')
 .where(year(col('IncidentDate'))==2018)
 .groupBy('StationArea')
 .agg(F.max('ResponseDelayedinMins').alias('MaxResponseDelay'))
 .orderBy(desc('MaxResponseDelay'))
 .show()
)

+-----------+----------------+
|StationArea|MaxResponseDelay|
+-----------+----------------+
|         28|       491.26666|
|         13|       406.63333|
|         03|       340.48334|
|         12|       175.86667|
|         42|           155.8|
|         38|       129.01666|
|         25|           109.8|
|         22|       106.13333|
|         01|        94.71667|
|         17|       92.816666|
|         31|       90.433334|
|         43|        83.76667|
|         06|        74.13333|
|         05|       67.916664|
|         09|           63.15|
|         10|       52.883335|
|         35|           48.55|
|         15|       43.383335|
|         36|       41.433334|
|         14|           38.05|
+-----------+----------------+
only showing top 20 rows



Q5. 2018년에 어떤 주에서 신고가 제일 많았는가?

In [71]:
(fire_ts_df
 .select('City')
 .where(year(col('IncidentDate'))==2018)
 .groupBy('City')
 .count()
 .orderBy(desc('count'))
 .show()
 )

+-------------+-----+
|         City|count|
+-------------+-----+
|San Francisco| 9967|
|     Presidio|   63|
|Treasure Isla|   58|
|         NULL|   19|
|  Yerba Buena|   10|
|Hunters Point|    9|
|   Fort Mason|    8|
|    Daly City|    1|
|     Brisbane|    1|
+-------------+-----+



Q6. 지역, 우편번호, 신고 숫자 간에 상관관계가 있는가?

In [101]:
from pyspark.ml.feature import StringIndexer

# 각 지역과 우편번호별로 신고 건수 집계
grouped_df = (
    fire_ts_df
    .groupBy("City", "Zipcode")
    .count()  # 신고 수
    .withColumnRenamed("count", "CallCount")
)
indexer = StringIndexer(inputCol="City", outputCol="CityIndex", handleInvalid="skip")
grouped_df = indexer.fit(grouped_df).transform(grouped_df)

grouped_df.show()



+----+-------+---------+---------+
|City|Zipcode|CallCount|CityIndex|
+----+-------+---------+---------+
|  SF|  94124|     6554|      1.0|
|  PR|  94123|       15|      3.0|
|  HP|  94124|       31|     19.0|
|  DC|   NULL|       30|      5.0|
|  SF|  94158|      445|      1.0|
|  PR|  94118|        9|      3.0|
|  SF|  94102|    14881|      1.0|
|  SF|  94112|     5955|      1.0|
|  SF|  94107|     4678|      1.0|
|  SF|  94118|     3617|      1.0|
|  SF|  94134|     3614|      1.0|
|  SF|  94131|     2347|      1.0|
|  SF|  94117|     3990|      1.0|
|  SF|  94115|     5622|      1.0|
|  SF|  94129|       10|      1.0|
|  BN|  94134|        3|     13.0|
|  SF|  94130|      227|      1.0|
|  SF|  94123|     2572|      1.0|
|  SF|  94121|     3204|      1.0|
|  SF|  94133|     4292|      1.0|
+----+-------+---------+---------+
only showing top 20 rows



In [103]:
grouped_df = grouped_df.withColumn("ZipcodeNum", col("Zipcode").cast("int"))
correlation = grouped_df.stat.corr("ZipcodeNum", "CallCount")
print("Zipcode vs CallCount correlation:", correlation)
print("CityIndex vs CallCount correlation:", grouped_df.stat.corr("CityIndex", "CallCount"))
print("CityIndex vs ZipcodeNum:", grouped_df.stat.corr("CityIndex", "ZipcodeNum"))

Zipcode vs CallCount correlation: 0.11704700729419185
CityIndex vs CallCount correlation: -0.28307236361285926
CityIndex vs ZipcodeNum: -0.20761787841766743


In [104]:
grouped_df.show()

+----+-------+---------+---------+----------+
|City|Zipcode|CallCount|CityIndex|ZipcodeNum|
+----+-------+---------+---------+----------+
|  SF|  94124|     6554|      1.0|     94124|
|  PR|  94123|       15|      3.0|     94123|
|  HP|  94124|       31|     19.0|     94124|
|  DC|   NULL|       30|      5.0|      NULL|
|  SF|  94158|      445|      1.0|     94158|
|  PR|  94118|        9|      3.0|     94118|
|  SF|  94102|    14881|      1.0|     94102|
|  SF|  94112|     5955|      1.0|     94112|
|  SF|  94107|     4678|      1.0|     94107|
|  SF|  94118|     3617|      1.0|     94118|
|  SF|  94134|     3614|      1.0|     94134|
|  SF|  94131|     2347|      1.0|     94131|
|  SF|  94117|     3990|      1.0|     94117|
|  SF|  94115|     5622|      1.0|     94115|
|  SF|  94129|       10|      1.0|     94129|
|  BN|  94134|        3|     13.0|     94134|
|  SF|  94130|      227|      1.0|     94130|
|  SF|  94123|     2572|      1.0|     94123|
|  SF|  94121|     3204|      1.0|

In [105]:
spark.stop()