## Google Drive 연동

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## JAVA 설치
- JVM 실행 위해서는 JAVA 설치 필수

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## Spark 설치
- 기존 : Web Link를 통해서 다운로드 받은 후, 압축파일 풀기
- 오늘 : 구글 드라이브에서 파일을 가져오기

In [3]:
%cd /content/drive/MyDrive/Colab Notebooks/spark

/content/drive/MyDrive/Colab Notebooks/spark


In [4]:
!pwd

/content/drive/MyDrive/Colab Notebooks/spark


In [5]:
!ls

 ch02   ch03   spark-3.1.1-bin-hadoop2.7.tgz   Untitled1.ipynb	'스파크 프로젝트.ipynb'


In [6]:
!cp -r spark-3.1.1-bin-hadoop2.7.tgz /content

In [7]:
%cd /content/

/content


In [8]:
!pwd

/content


In [9]:
!ls

drive  sample_data  spark-3.1.1-bin-hadoop2.7.tgz


In [10]:
!tar xf spark-3.1.1-bin-hadoop2.7.tgz > /dev/null

## 환경변수 설정
- 일반적으로 vi 편집기를 활용해서 작업
- 구글코랩 : os 라이브러리 사용해서 환경변수 지정

In [11]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

## PySpark 설치
- 무조건 설치 파일에 맞춰서 설치하기!

In [12]:
!pip install -q pyspark==3.1.1

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m212.3/212.3 MB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m22.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Ngrok 설정
- 회원가입해서 Token을 받는다.
- 싸이트 : https://ngrok.com/


In [13]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null

In [14]:
!pip install pyngrok

Collecting pyngrok
  Downloading pyngrok-7.0.0.tar.gz (718 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m718.7/718.7 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyngrok
  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
  Created wheel for pyngrok: filename=pyngrok-7.0.0-py3-none-any.whl size=21129 sha256=d3451748c25d900597e2bb09bc2c91118bd6235ef50020d8841f756610163604
  Stored in directory: /root/.cache/pip/wheels/60/29/7b/f64332aa7e5e88fbd56d4002185ae22dcdc83b35b3d1c2cbf5
Successfully built pyngrok
Installing collected packages: pyngrok
Successfully installed pyngrok-7.0.0


In [15]:
!./ngrok authtoken 2X9Pv8jBIGRtKHc2KgfsPbBlqiJ_7DGRtsgdVCvESQkoYvLG4

Authtoken saved to configuration file: /root/.ngrok2/ngrok.yml


## 테스트

In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()
spark

In [17]:
strings = spark.read.text("/content/spark-3.1.1-bin-hadoop2.7/README.md")
print(strings)
print(type(strings))

DataFrame[value: string]
<class 'pyspark.sql.dataframe.DataFrame'>


In [18]:
filtered = strings.filter(strings.value.contains("Spark"))
print(filtered)
print(type(filtered))

DataFrame[value: string]
<class 'pyspark.sql.dataframe.DataFrame'>


In [19]:
!ls

drive  ngrok-stable-linux-amd64.zip  spark-3.1.1-bin-hadoop2.7
ngrok  sample_data		     spark-3.1.1-bin-hadoop2.7.tgz


In [20]:
# !cat spark-3.1.1-bin-hadoop2.7/README.md

In [21]:
filtered.count()

19

## CSV 파일 불러오기
- CSV 포맷으로 파일 읽어서 데이터프레임에 저장함
- 스키마 추론, 쉼표로 구분된 컬럼 이름이 제공되는 헤더가 있음 지정

### 교재

In [23]:
mnm_file = '/content/drive/MyDrive/Colab Notebooks/spark/ch02/data/mnm_dataset.csv'

mnm_df = spark.read.format('csv').option("header", "true").option("inferSchema", "true").load(mnm_file)
mnm_df.show(n=10, truncate=False) # show, pandas head()

+-----+------+-----+
|State|Color |Count|
+-----+------+-----+
|TX   |Red   |20   |
|NV   |Blue  |66   |
|CO   |Blue  |79   |
|OR   |Blue  |71   |
|WA   |Yellow|93   |
|WY   |Blue  |16   |
|CA   |Yellow|53   |
|WA   |Green |60   |
|OR   |Green |71   |
|TX   |Green |68   |
+-----+------+-----+
only showing top 10 rows



- GROUP BY State별, Color별 갯수를 요약해보자

In [24]:
count_mnm_df = mnm_df.select("State", "Color", "Count").groupBy("State", "Color").sum("Count").orderBy("sum(Count)", ascending=False)
count_mnm_df.show(n = 2, truncate=False)

+-----+------+----------+
|State|Color |sum(Count)|
+-----+------+----------+
|CA   |Yellow|100956    |
|WA   |Green |96486     |
+-----+------+----------+
only showing top 2 rows



In [25]:
count_mnm_df.count() # 행의 갯수

60

- CA만 확인하자

In [26]:
ca_count_mnm_df = mnm_df.select('*')\
  .where(mnm_df.State == 'CA')\
  .groupBy("State", "Color")\
  .sum("Count")\
  .orderBy("sum(Count)", ascending=False)

ca_count_mnm_df.show()

+-----+------+----------+
|State| Color|sum(Count)|
+-----+------+----------+
|   CA|Yellow|    100956|
|   CA| Brown|     95762|
|   CA| Green|     93505|
|   CA|   Red|     91527|
|   CA|Orange|     90311|
|   CA|  Blue|     89123|
+-----+------+----------+



In [27]:
ca_count_mnm_df.count()

6

In [51]:
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://2023-34-125-97-230.ngrok-free.app","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://2023-34-125-97-230.ngrok-free.app","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


In [52]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

https://2023-34-125-97-230.ngrok-free.app


## RDD 방식

In [30]:
sc = spark.sparkContext
dataRDD = sc.parallelize([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)])
print(dataRDD)
print(type(dataRDD))

ParallelCollectionRDD[57] at readRDDFromFile at PythonRDD.scala:274
<class 'pyspark.rdd.RDD'>


- 집계와 평균 : 문법이 난해함
- RDD는 쓰면 안되겠다!

In [31]:
agesRDD = (dataRDD
           .map(lambda x:(x[0], (x[1], 1)))
           .reduceByKey(lambda x, y:(x[0] + y[0], x[1] + y[1]))
           .map(lambda x: (x[0], x[1][0]/x[1][1]))
           )
agesRDD

PythonRDD[62] at RDD at PythonRDD.scala:53

## DataFrame 방식

In [32]:
from pyspark.sql.functions import avg
from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()

data_df = spark.createDataFrame([('Brooke', 20), ('Denny', 31), ('Jules', 30), ('TD', 35), ('Brooke', 25)], ['name', 'age'])
print(data_df)
print(type(data_df))

DataFrame[name: string, age: bigint]
<class 'pyspark.sql.dataframe.DataFrame'>


In [33]:
data_df.show()

+------+---+
|  name|age|
+------+---+
|Brooke| 20|
| Denny| 31|
| Jules| 30|
|    TD| 35|
|Brooke| 25|
+------+---+



In [34]:
# 이름으로 그룹화하여 평균 나이 계산
avg_df = data_df.groupby('name').agg(avg('age'))
avg_df.show()

+------+--------+
|  name|avg(age)|
+------+--------+
|Brooke|    22.5|
| Jules|    30.0|
|    TD|    35.0|
| Denny|    31.0|
+------+--------+



## 스키마 프로그래밍
- 데이터 생성할 때, SQL 비슷한 형식으로 데이터를 생성

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * # * : type 클래스 내부의 메서드를 모두 호출한다
from pyspark.sql.functions import *
# spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()

schema = StructType([
   StructField("Id", IntegerType(), False), # SQL에서 테이블 생성
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)])

#create our data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [36]:
blogs_df.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- First: string (nullable = false)
 |-- Last: string (nullable = false)
 |-- Url: string (nullable = false)
 |-- Published: string (nullable = false)
 |-- Hits: integer (nullable = false)
 |-- Campaigns: array (nullable = false)
 |    |-- element: string (containsNull = true)



- 계산

In [38]:
blogs_df.select('Hits').show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



In [39]:
blogs_df.select(expr('Hits') * 2).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [40]:
blogs_df.select(col('Hits') * 2).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [41]:
blogs_df.select(expr('Hits * 2')).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



- withColumn : 조건절 사용 가능

In [42]:
blogs_df.withColumn('Big Hitters', (expr('Hits > 10000'))).show()

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|       true|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+



In [46]:
blogs_df.schema

StructType(List(StructField(Id,IntegerType,false),StructField(First,StringType,false),StructField(Last,StringType,false),StructField(Url,StringType,false),StructField(Published,StringType,false),StructField(Hits,IntegerType,false),StructField(Campaigns,ArrayType(StringType,true),false)))

## DDL 프로그래밍
- SQL과 유사한 형태

In [48]:
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaings` ARRAY<STRING>"

#create our data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaings|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



- 데이터 생성 후 스키마 생성

In [49]:
blogs_df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaings: array (nullable = true)
 |    |-- element: string (containsNull = true)



## View 테이블 생성

In [50]:
blogs_df.createOrReplaceTempView("blogs")

## 두 개 컬럼 연산

In [53]:
blogs_df.show(2)

+---+------+-----+-----------------+---------+----+-------------------+
| Id| First| Last|              Url|Published|Hits|          Campaings|
+---+------+-----+-----------------+---------+----+-------------------+
|  1| Jules|Damji|https://tinyurl.1| 1/4/2016|4535|[twitter, LinkedIn]|
|  2|Brooke|Wenig|https://tinyurl.2| 5/5/2018|8908|[twitter, LinkedIn]|
+---+------+-----+-----------------+---------+----+-------------------+
only showing top 2 rows



In [55]:
blogs_df.select(col('Id'), col('Hits'), expr('Hits') + expr('Id')).show()

+---+-----+-----------+
| Id| Hits|(Hits + Id)|
+---+-----+-----------+
|  1| 4535|       4536|
|  2| 8908|       8910|
|  3| 7659|       7662|
|  4|10568|      10572|
|  5|40578|      40583|
|  6|25568|      25574|
+---+-----+-----------+



In [61]:
blogs_df\
  .withColumn('Authporsld', (concat(expr('First'), expr('Last'), expr('Id'))))\
  .select(col('Authporsld'))\
  .show()

+-------------+
|   Authporsld|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
|MateiZaharia5|
|  ReynoldXin6|
+-------------+



## 정렬

In [66]:
blogs_df.sort(col('Id').desc()).show() # 내림차순

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaings|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [65]:
blogs_df.sort(col('Id').asc()).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaings|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



## Row
- 하나의 행은 일반적으로 하나 이상의 컬럼을 갖고 있는 row 객체로 표현됨
- 각 컬럼은 동일한 컬럼 타입일 수도 있고, 다른 타입일 수도 있음

In [67]:
from pyspark.sql import Row
blogs_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "LinkedIn"])
blogs_row[1]

'Reynold'

- Row 객체를 확인해서 데이터 프레임으로 만들어서 사용

In [69]:
rows = [Row('콩','용이'), Row('두부', '둥이')]
authors_df = spark.createDataFrame(rows, ['이름', '별칭'])
authors_df.show()

+----+----+
|이름|별칭|
+----+----+
|  콩|용이|
|두부|둥이|
+----+----+



## 외부 데이터 가져오기

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * # * : type 클래스 내부의 메서드를 모두 호출한다
from pyspark.sql.functions import *
# spark = SparkSession.builder.appName('mulCamp28').config('spark.ui.port', '4050').getOrCreate()

sf_fire_file = '/content/drive/MyDrive/Colab Notebooks/spark/ch03/data/sf-fire-calls.csv'
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),
                     StructField('CallDate', StringType(), True),
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),
                     StructField('City', StringType(), True),
                     StructField('Zipcode', IntegerType(), True),
                     StructField('Battalion', StringType(), True),
                     StructField('StationArea', StringType(), True),
                     StructField('Box', StringType(), True),
                     StructField('OriginalPriority', StringType(), True),
                     StructField('Priority', StringType(), True),
                     StructField('FinalPriority', IntegerType(), True),
                     StructField('ALSUnit', BooleanType(), True),
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [72]:
fireDF = spark.read.csv(sf_fire_file, header = True, schema=fire_schema)
fireDF.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

- 데이터 프레임을 캐시함

In [73]:
fireDF.cache()

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: string, WatchDate: string, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: float]

In [75]:
fireDF.count()

175296

- 조건에 의한 행 추출 진행

In [76]:
few_fireDF = fireDF\
  .select('IncidentNumber', 'AvailableDtTm', 'CallType')\
  .where(col('CallType') != 'Medical Incident')

few_fireDF.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



- 화재 신고로 기록된 CallType 종류가 궁금함

In [81]:
fireDF\
  .select("CallType")\
  .where(col("CallType").isNotNull())\
  .agg(countDistinct("CallType").alias("CNT_CallType"))\
  .show()

+------------+
|CNT_CallType|
+------------+
|          30|
+------------+



- NULL 이 아닌 신고 타입의 목록 확인

In [82]:
fireDF\
  .select("CallType")\
  .where(col("CallType").isNotNull())\
  .distinct()\
  .show(10, False)

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



### 컬럼의 이름 변경 및 추가 삭제
- 컬럼 이름 지정
  + withColumnRenamed() 함수

## 컬럼의 이름 변경 및 추가 삭제
- 컬럼 이름 지정
  - withColumnRenamed() 함수

In [90]:
fireDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [97]:
new_fire_df = fireDF.withColumnRenamed("Delay", "ResponseDelayedinMins") # ResponseDelayedinMins : 새로운 컬럼명
new_fire_df.select("ResponseDelayedinMins").where(col("ResponseDelayedinMins") > 10).show(5, False)

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|11.916667            |
|95.28333             |
|13.55                |
|13.583333            |
|13.4                 |
+---------------------+
only showing top 5 rows



## 날짜 데이터 타입 변환
- 문자열에서 TimeStamp 타입으로 변환

In [98]:
new_fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [99]:
fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
              .withColumn("OnWatchDate",   to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm"))
fire_ts_df.show()

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+-----------

In [100]:
# fire_ts_df.printSchema()
print(type(fire_ts_df.columns))

<class 'list'>


- 문제 SF zip codes 출력, 94102 or 94103

In [101]:
fire_ts_df\
  .select("Neighborhood", "Zipcode")\
  .where((col("Zipcode") == 94102) | (col("Zipcode") == 94103))\
  .distinct()\
  .show(10, truncate=False)

+------------------------------+-------+
|Neighborhood                  |Zipcode|
+------------------------------+-------+
|Potrero Hill                  |94103  |
|Western Addition              |94102  |
|Tenderloin                    |94102  |
|Nob Hill                      |94102  |
|Castro/Upper Market           |94103  |
|South of Market               |94102  |
|South of Market               |94103  |
|Hayes Valley                  |94103  |
|Financial District/South Beach|94102  |
|Mission Bay                   |94103  |
+------------------------------+-------+
only showing top 10 rows



- SF 2018년도 가장 응답시간이 길었던 것 확인
- filter()

In [96]:
fire_ts_df\
  .select("Neighborhood", "ResponseDelayedinMins")\
  .filter(year("IncidentDate") == 2018)\
  .orderBy("ResponseDelayedinMins", ascending=False)\
  .show()

+--------------------+---------------------+
|        Neighborhood|ResponseDelayedinMins|
+--------------------+---------------------+
|           Chinatown|            491.26666|
|Financial Distric...|            406.63333|
|          Tenderloin|            340.48334|
|      Haight Ashbury|            175.86667|
|Bayview Hunters P...|                155.8|
|Financial Distric...|            135.51666|
|     Pacific Heights|            129.01666|
|        Potrero Hill|                109.8|
|        Inner Sunset|            106.13333|
|     South of Market|             94.71667|
|Bayview Hunters P...|            92.816666|
|     South of Market|            91.666664|
|      Inner Richmond|            90.433334|
|           Excelsior|             83.76667|
|     South of Market|                 76.9|
|          Tenderloin|            76.566666|
| Castro/Upper Market|             74.13333|
|    Western Addition|            67.916664|
|            Nob Hill|                67.45|
|     Sout

## 데이터 저장 및 불러오기
- 시험

In [102]:
DATA_PATH = '/content/drive/MyDrive/Colab Notebooks/spark/ch03/data'

# 데이터 저장
fire_ts_df.write.format('parquet').mode('overwrite').save(DATA_PATH + 'parquet231024/')

- parquet 파일 불러오기

In [103]:
new_df = spark.read.format('parquet').load(DATA_PATH + 'parquet231024/')
new_df.show()

+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|ResponseDelayedinMins|       IncidentDate|        OnWatchDate|      AvailableDtTS|
+----------+------+--------------+----------------+--------------------+--------------------+----+-------+---------+-----------+----+-----------

## Spark 종료

In [37]:
# spark.stop()