<a href="https://colab.research.google.com/github/Indongspace/mulcamp34/blob/main/ch01_basic_240415.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Google Drive 연동

In [1]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


## Spark 설치

In [2]:
!apt-get install openjdk-8-jdk-headless
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -zxf spark-3.5.1-bin-hadoop3.tgz

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 45 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxtst6 amd64 2:1.2.3-1build4 [13.4 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jre-headless amd64 8u402-ga-2ubuntu1~22.04 [30.8 MB]
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jdk-headless amd64 8u402-ga-2ubuntu1~22.04 [8,873 kB]

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [4]:
!pip install findspark -q

In [5]:
import findspark
findspark.init()

In [6]:
import pyspark
spark_version = pyspark.__version__
print("Apache Spark 버전 확인: " + spark_version)

Apache Spark 버전 확인: 3.5.1


## Spark
- RDD : 다수의 서버에 분산 방식으로 저장함

In [7]:
from pyspark.sql import SparkSession

# Spark 세션 활성화
my_spark = SparkSession.builder.getOrCreate()
my_spark

- 데이터베이스 확인

In [8]:
my_spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

- Spark SQL 쿼리 실행 (Database 보여주기)

In [9]:
my_spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [10]:
my_spark.catalog.currentDatabase()

'default'

In [11]:
# 기존 Spark 세션 종료
my_spark.stop()

In [12]:
# 새로운 Spark 세션 시작
my_spark = SparkSession.builder.master("local[1]").appName("SampleTutorial").getOrCreate()
rdd_sample = my_spark.sparkContext.parallelize([1, 2, 3, 4, 5])
print(type(rdd_sample))

<class 'pyspark.rdd.RDD'>


In [13]:
rdd_sample.take(num=2)

[1, 2]

## CSV 파일 불러오기

In [16]:
DATA_PATH = '/content/drive/MyDrive/멀티캠퍼스34/Spark/data/flight_small.csv'
flights = my_spark.read.option('header', 'true').csv(DATA_PATH)
flights.show(2)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 2 rows



- flights 데이터프레임을 default 데이터베이스에 추가

In [17]:
flights.createOrReplaceTempView('flights')

- default 데이터베이스에 데이터가 추가가 되었는지 확인

In [18]:
my_spark.catalog.listTables('default')

[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

- 데이터 조회할 때, SQL을 통해서 조회가 가능하다.


In [19]:
my_spark.sql('SHOW TABLES FROM default').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |  flights|       true|
+---------+---------+-----------+



In [20]:
query = 'SELECT * FROM flights LIMIT 10'
# my_spark.sql(query).show()

flights10 = my_spark.sql(query)
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

- origin, dest GroupBy 연산 데이터갯수 확인
- pandas 데이터프레임으로 변환 (메서드 찾아보기)
- (~10:25)

In [21]:
query = """
  SELECT origin, dest, COUNT(*) AS CNT
  FROM flights
  GROUP BY origin, dest
"""

result_spark = my_spark.sql(query)
# result.show()

# pandas 데이터프레임으로 변환
result_pd = result_spark.toPandas()
result_pd.head()

Unnamed: 0,origin,dest,CNT
0,SEA,RNO,8
1,SEA,DTW,98
2,SEA,CLE,2
3,SEA,LAX,450
4,PDX,SEA,144


In [22]:
# Pandas 데이터프레임에서 Spark로 변환
import pandas as pd
import numpy as np

# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = my_spark.createDataFrame(pdf)
df.show()

+-------------------+--------------------+--------------------+
|                  0|                   1|                   2|
+-------------------+--------------------+--------------------+
| 0.7699466129227253| 0.45378664591237317|  0.1656461871495607|
| 0.9852720150727177|  0.8451599561850905|  0.6804704210263616|
| 0.7563296654754759|  0.5534977231737475|  0.8042263589370456|
| 0.3788634964314268|  0.3007850531034202|    0.78288470867717|
| 0.1496665028230174|  0.9162565970755469|  0.8066164686585211|
| 0.5592554088759834|  0.1064594144356098|  0.1555091616016232|
| 0.4853461686490371|  0.0508865174508083|0.036331584816260754|
|  0.817765046244986|  0.4408120652553813| 0.07707525703322105|
| 0.1952325947636524| 0.13178488074797146| 0.07498389867737532|
| 0.1069621822103708| 0.15002855671810578|  0.7914425353277262|
|0.05630828760277651|  0.7683865838047338|  0.5955243596803127|
|0.12056593853327824| 0.36305725301516767|  0.0908390951920115|
| 0.5654921835393071|  0.974243660325443

In [None]:
# my_spark.stop()

## 정리할 시간
- 첫번째 : Spark 세션 생성
- 두번째 : 임의의 pandas 데이터프레임 생성
- 세번째 : Spark 데이터프레임으로 변환
- 네번째 : 변환된 데이터프레임을 데이터베이스에 추가
- 마지막 : listTables() 확인

## Chapter 3장

In [23]:
# Databricks notebook source
# MAGIC
# MAGIC %md
# MAGIC # Example 3.7

# from pyspark.sql.types import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, expr, when, concat, lit
from pyspark.sql import SparkSession

# define schema for our data
schema = (StructType([
   StructField("Id", IntegerType(), False),
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)]))

ddl_schema = "`Id` INT,`First` STRING,`Last` STRING,`Url` STRING,`Published` STRING,`Hits` INT,`Campaigns` ARRAY<STRING>"

# create our data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

# create a DataFrame using the schema defined above
spark = SparkSession.builder.master("local[1]").appName("SampleTutorial").getOrCreate()

blogs_df = spark.createDataFrame(data, ddl_schema)
blogs_df.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [24]:
blogs_df2 = spark.createDataFrame(data, schema)
blogs_df2.show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



In [25]:
blogs_df2.printSchema()

root
 |-- Id: integer (nullable = false)
 |-- First: string (nullable = false)
 |-- Last: string (nullable = false)
 |-- Url: string (nullable = false)
 |-- Published: string (nullable = false)
 |-- Hits: integer (nullable = false)
 |-- Campaigns: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [26]:
# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = my_spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(my_spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(my_spark.catalog.listTables())

[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [27]:
# default DB에 테이블이 2개 존재
# 그 중에서 내가 원하는 테이블 취사 선택하는 예제
flights_2 = my_spark.table('flights')
flights_2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

## Spark 문법 활용 데이터 가공

In [28]:
# 데이터 컬럼 추가
# result = flights_2.
# flights_2['새로운'] = flights_2['air_time'] / 60
flights_2 = flights_2.withColumn("duration_hrs", flights_2.air_time/60)
flights_2.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|         2.2|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row



- 데이터를 필터링하는 코드 작성

In [29]:
result = flights_2.filter("distance >= 1000")
result.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row



In [30]:
result2 = flights_2.filter(flights_2.distance > 1000)
result2.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 1 row



In [31]:
# 변수 선택, tailnum, origin, dest
result3 = flights_2.select("tailnum", "origin", "dest")
result3.show(1)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
+-------+------+----+
only showing top 1 row



In [32]:
result4 = flights_2.select(flights_2.tailnum, flights_2.origin, flights_2.dest)
result4.show(1)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
+-------+------+----+
only showing top 1 row



In [33]:
# 다중 필터 조건 걸어보기
filterA = flights_2.origin == "SEA"
filterB = flights_2.dest == "PDX"

tempA = result4.origin == "SEA"

print(type(filterA))
print(type(tempA))

selected = result4.filter(filterA).filter(filterB)
selected.show()

<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N810SK|   SEA| PDX|
| N822SK|   SEA| PDX|
| N586SW|   SEA| PDX|
| N223SW|   SEA| PDX|
| N580SW|   SEA| PDX|
| N520AS|   SEA| PDX|
| N809SK|   SEA| PDX|
| N295SW|   SEA| PDX|
| N221SW|   SEA| PDX|
| N294SW|   SEA| PDX|
| N581SW|   SEA| PDX|
| N563SW|   SEA| PDX|
| N297SW|   SEA| PDX|
| N564SW|   SEA| PDX|
| N468AS|   SEA| PDX|
| N229SW|   SEA| PDX|
| N565SW|   SEA| PDX|
| N580SW|   SEA| PDX|
| N817SK|   SEA| PDX|
| N564SW|   SEA| PDX|
+-------+------+----+
only showing top 20 rows



In [34]:
# avg_speed
avg_speed = (flights_2.distance/(flights_2.air_time/60)).alias("avg_speed")
speed_df = flights.select("origin", "dest", "tailnum", avg_speed)
speed_df.show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

In [35]:
speed_df2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) AS avg_speed")
speed_df2.show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

## 집계함수
- groupby() + 집계함수

In [36]:
flights_2.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [37]:
# distance String --> int
flights_2 = flights_2.withColumn('distance', flights_2.distance.cast('int'))
flights_2.printSchema()

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- duration_hrs: double (nullable = true)



In [38]:
# origin, PDX 추출, groupby, distance 최소
flights_2.filter(flights_2.origin == "PDX").groupby("tailnum").sum("distance").show()

+-------+-------------+
|tailnum|sum(distance)|
+-------+-------------+
| N442AS|         8167|
| N36472|         5311|
| N567AA|         1616|
| N38451|         1739|
| N516UA|          991|
| N927DN|         1426|
| N954WN|         2103|
| N73283|          550|
| N102UW|         2282|
| N607AS|        11183|
| N622SW|         1199|
| N584AS|        13063|
| N914WN|          991|
| N445WN|          543|
| N3LDAA|         1739|
| N389HA|         5206|
| N578SW|          351|
| N430WN|         2518|
| N651SW|         1009|
| N611SW|         1772|
+-------+-------------+
only showing top 20 rows



In [39]:
# carrier
result = (flights_2
          .filter(flights_2.origin == "PDX")
          .filter(flights_2.carrier == "DL")
          .groupby("dest")
          .avg("distance"))
result.show()

+----+-------------+
|dest|avg(distance)|
+----+-------------+
| ATL|       2172.0|
| DTW|       1953.0|
| MSP|       1426.0|
| JFK|       2454.0|
| SLC|        630.0|
+----+-------------+



In [40]:
# carrier
result = (flights_2
          .filter(flights_2.origin == "PDX")
          .where(col("carrier") == "DL") # filter(flights_2.carrier == "DL")
          .groupby("dest")
          .avg("distance"))
result.show()

+----+-------------+
|dest|avg(distance)|
+----+-------------+
| ATL|       2172.0|
| DTW|       1953.0|
| MSP|       1426.0|
| JFK|       2454.0|
| SLC|        630.0|
+----+-------------+



In [41]:
import pyspark.sql.functions as F

flights_2= flights_2.withColumn('dep_delay', flights_2.distance.cast('int'))
by_month_dest = flights_2.groupBy("month", "dest")
by_month_dest.avg("dep_delay").show()

+-----+----+------------------+
|month|dest|    avg(dep_delay)|
+-----+----+------------------+
|   11| TUS|1183.6666666666667|
|   11| ANC|1453.5294117647059|
|    1| BUR|             877.0|
|    1| PDX|             129.0|
|    6| SBA|             877.0|
|    5| LAX| 919.7142857142857|
|   10| DTW|            1927.0|
|    6| SIT|             861.0|
|   10| DFW|1647.0588235294117|
|    3| FAI|            1533.0|
|   10| SEA|             129.0|
|    2| TUS|            1119.0|
|   12| OGG| 2625.818181818182|
|    9| DFW|1645.3333333333333|
|    5| EWR|2404.6666666666665|
|    3| RDM|             116.0|
|    8| DCA|            2335.3|
|    7| ATL|2178.2162162162163|
|    4| JFK|2431.1428571428573|
|   10| SNA| 962.1333333333333|
+-----+----+------------------+
only showing top 20 rows



In [42]:
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+------------------+
|month|dest| stddev(dep_delay)|
+-----+----+------------------+
|   11| TUS|  56.0029761113937|
|   11| ANC|22.450261919397587|
|    1| BUR| 61.55870112510924|
|    1| PDX|               0.0|
|    6| SBA| 62.00000000000001|
|    5| LAX| 54.56595650240686|
|   10| DTW|               0.0|
|    6| SIT|              NULL|
|   10| DFW|20.349880762766734|
|    3| FAI|               0.0|
|   10| SEA|               0.0|
|    2| TUS|               0.0|
|   12| OGG|31.552553563279734|
|    9| DFW|21.096385265356872|
|    5| EWR| 9.237604307033997|
|    3| RDM|               0.0|
|    8| DCA|10.143963722332614|
|    7| ATL| 4.916723926983699|
|    4| JFK|15.001831390031716|
|   10| SNA| 41.87202725766531|
+-----+----+------------------+
only showing top 20 rows



## 테이블 조인

In [43]:
my_spark.stop()

In [46]:
from pyspark.sql import SparkSession
my_spark = SparkSession.builder.master("local[1]").appName("SampleTutorial").getOrCreate()

DATA_PATH = '/content/drive/MyDrive/멀티캠퍼스34/Spark/data/'

airports = my_spark.read.csv(DATA_PATH + "airports.csv", header=True)
airports.show(1)

flights = my_spark.read.csv(DATA_PATH + "flight_small.csv", header=True)
flights.show(1)

planes = my_spark.read.csv(DATA_PATH + "planes.csv", header=True)
planes.show(1)

+---+-----------------+----------+-----------+----+---+---+
|faa|             name|       lat|        lon| alt| tz|dst|
+---+-----------------+----------+-----------+----+---+---+
|04G|Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
+---+-----------------+----------+-----------+----+---+---+
only showing top 1 row

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 1 row

+-------+----+--------------------+----------------+

In [47]:
airports.show(10)

+---+--------------------+----------+------------+----+---+---+
|faa|                name|       lat|         lon| alt| tz|dst|
+---+--------------------+----------+------------+----+---+---+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...|41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|48.0538086|-122.8106436| 108| -8|  A|
+---+--------------------+----------+------------+----+---+---+
only showing top 10 rows



In [48]:
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [49]:
# 컬럼명 변경
airports = airports.withColumnRenamed("faa", "dest")
airports.show(1)

+----+-----------------+----------+-----------+----+---+---+
|dest|             name|       lat|        lon| alt| tz|dst|
+----+-----------------+----------+-----------+----+---+---+
| 04G|Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
+----+-----------------+----------+-----------+----+---+---+
only showing top 1 row



In [50]:
result = flights.join(airports, on = 'dest', how="leftouter")
result.show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA|     111|     679|  14|    43| 

In [51]:
my_spark.stop()

## zip 파일 데이터 불러오기

In [52]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("Sample").getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7956fffe2d70>


In [54]:
import pyspark.sql.functions as F

# CSV 파일 불러오기
DATA_PATH = '/content/drive/MyDrive/멀티캠퍼스34/Spark/data/'
aa_dfw_2014 = spark.read.format("csv").options(Header=True).load(DATA_PATH + "AA_DFW_2014_Departures_Short.csv.gz")
aa_dfw_2014.show(1)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|         0005|                HNL|                          519|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 1 row



In [55]:
aa_dfw_2014 = aa_dfw_2014.withColumn('airport', F.lower(aa_dfw_2014['Destination Airport']))
aa_dfw_2014.show(1)

+-----------------+-------------+-------------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-------------------+-----------------------------+-------+
|       01/01/2014|         0005|                HNL|                          519|    hnl|
+-----------------+-------------+-------------------+-----------------------------+-------+
only showing top 1 row



In [56]:
# 특정 컬럼 삭제
aa_dfw_2014 = aa_dfw_2014.drop(aa_dfw_2014['Destination Airport'])
aa_dfw_2014.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2014|         0005|                          519|    hnl|
|       01/01/2014|         0007|                          505|    ogg|
|       01/01/2014|         0035|                          174|    slc|
|       01/01/2014|         0043|                          153|    dtw|
|       01/01/2014|         0052|                          137|    pit|
|       01/01/2014|         0058|                          174|    san|
|       01/01/2014|         0060|                          155|    mia|
|       01/01/2014|         0064|                          185|    jfk|
|       01/01/2014|         0090|                          126|    ord|
|       01/01/2014|         0096|                           91|    stl|
|       01/01/2014|         0099|                          182| 

In [57]:
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import pyarrow as pa

df = pd.DataFrame({'one': [-1, 100, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]}, index=list('abc'))

table = pa.Table.from_pandas(df)
pq.write_table(table, DATA_PATH + 'example.parquet')

- 데이터 불러오기

In [58]:
example_df = spark.read.parquet(DATA_PATH + 'example.parquet')
example_df.show(1)

+----+---+-----+-----------------+
| one|two|three|__index_level_0__|
+----+---+-----+-----------------+
|-1.0|foo| true|                a|
+----+---+-----+-----------------+
only showing top 1 row



In [59]:
voter_df = spark.read.format('csv').options(Header=True).load(DATA_PATH + 'DallasCouncilVoters.csv.gz')
voter_df.show()

+----------+-------------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|
+----------+-------------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember|       Scott Griggs|
|02/08/2017|Councilmember|   B. Adam  McGough|
|02/08/2017|Councilmember|       Lee Kleinman|
|02/08/2017|Councilmember|      Sandy Greyson|
|02/08/2017|Councilmember|  Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|
|02/08/2017|Councilmember| Rickey D. Callahan|
|01/11/2017|Councilmember|  Jennifer S. Gates|
|04/25/2018|C

In [60]:
# 중복값 제거
voter_df.select(voter_df['VOTER_NAME']).distinct().show(10)

+--------------------+
|          VOTER_NAME|
+--------------------+
|      Tennell Atkins|
|  the  final   20...|
|        Scott Griggs|
|       Scott  Griggs|
|       Sandy Greyson|
| Michael S. Rawlings|
| the final 2018 A...|
|        Kevin Felder|
|        Adam Medrano|
|       Casey  Thomas|
+--------------------+
only showing top 10 rows



### 문제
- 객체명 : result
- 메서드 filter() 사용
- length() 활용해서  0보다 크고, 20보다 작은 VOTER_NAME을 가져온다.
- 결과

In [61]:
voter_df = voter_df.filter('length(VOTER_NAME) > 0 and length(VOTER_NAME) < 20')
voter_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|011018__42         |
|Mark  Clayton      |
+-------------------+
only showing top 10 rows



- 밑줄이 그어진 행 `011018__42`은 제거를 하자.

In [62]:
voter_df = voter_df.filter(~F.col('VOTER_NAME').contains('_'))
voter_df.select('VOTER_NAME').distinct().show(10, truncate=False)

+-------------------+
|VOTER_NAME         |
+-------------------+
|Tennell Atkins     |
|Scott Griggs       |
|Scott  Griggs      |
|Sandy Greyson      |
|Michael S. Rawlings|
|Kevin Felder       |
|Adam Medrano       |
|Casey  Thomas      |
|Mark  Clayton      |
|Casey Thomas       |
+-------------------+
only showing top 10 rows



## Spark 데이터프레임 수정
- 문자열 다루는 코드

In [64]:
# 정규표현식('\s+') : 공백을 기준으로 문자열을 분리 하겠다.
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
voter_df = voter_df.drop('splits')
voter_df.show(1)



+----------+-------------+-----------------+----------+---------+
|      DATE|        TITLE|       VOTER_NAME|first_name|last_name|
+----------+-------------+-----------------+----------+---------+
|02/08/2017|Councilmember|Jennifer S. Gates|  Jennifer|    Gates|
+----------+-------------+-----------------+----------+---------+
only showing top 1 row



## when() 조건문 활용법

In [69]:
import pyspark.sql.functions as F

# Councilmember 일 경우에는 무작위 숫자를 입력
voter_df = voter_df.withColumn('random_val', F.when(voter_df.TITLE == 'Councilmember', F.rand()))
voter_df.show(10)

+----------+-------------+-------------------+----------+---------+-------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|         random_val|
+----------+-------------+-------------------+----------+---------+-------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates| 0.6890153932838178|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.7660026674274555|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|               NULL|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano| 0.8187885459450652|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas| 0.6855990001349068|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold|    0.8179771995019|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs| 0.2332541053554581|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough| 0.6846945392367293|
|02/08/2017|Councilmember|      

### 다중조건문
- Councilmember 일 경우에는 무작위 숫자
- Mayor 일 경우에는 숫자 2를 입력
- 그 외 나머지는 0으로 (otherwise)

In [73]:
voter_df = voter_df.withColumn('random_val',
                               F.when(voter_df.TITLE == 'Councilmember', F.rand())
                               .when(voter_df.TITLE == 'Mayor', 2)
                               .otherwise(0))
voter_df.show(10)

+----------+-------------+-------------------+----------+---------+--------------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|
+----------+-------------+-------------------+----------+---------+--------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.9364138128782483|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.19888420476418855|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.5941073119978861|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.013628293134853697|
|02/08/2017|Councilmember|Carolyn King Arnold|   Carolyn|   Arnold| 0.24306385463714986|
|02/08/2017|Councilmember|       Scott Griggs|     Scott|   Griggs|  0.1306727753581427|
|02/08/2017|Councilmember|   B. Adam  McGough|        B.|  McGough| 0.06829657485199203|
|02/08/2017|Councilme

### 행 추출
- random_val 중에서 0인 경우만 추출
- filter()

In [76]:
voter_df.filter(voter_df.random_val == 0).show(10)

+----------+--------------------+-----------------+----------+---------+----------+
|      DATE|               TITLE|       VOTER_NAME|first_name|last_name|random_val|
+----------+--------------------+-----------------+----------+---------+----------+
|04/25/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|04/25/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|06/20/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|06/20/2018|       Mayor Pro Tem|Dwaine R. Caraway|    Dwaine|  Caraway|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|08/15/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|       0.0|
|09/18/2018|Deputy Mayor Pro Tem|     Adam Medrano|      Adam|  Medrano|    

## 사용자 정의 함수 활용
- 기존에는 주어진 메서드 쓰는 것에 초점
- 사용자 정의 함수를 Spark에서 어떻게 활용하는지 확인해보자
  + Point : return 값의 데이터타입을 정의해줘야 한다.

- 미션
-- first, middle name 동시에 가져오기

In [78]:
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))
voter_df.show(1)

+----------+-------------+-----------------+----------+---------+------------------+--------------------+
|      DATE|        TITLE|       VOTER_NAME|first_name|last_name|        random_val|              splits|
+----------+-------------+-----------------+----------+---------+------------------+--------------------+
|02/08/2017|Councilmember|Jennifer S. Gates|  Jennifer|    Gates|0.9364138128782483|[Jennifer, S., Ga...|
+----------+-------------+-----------------+----------+---------+------------------+--------------------+
only showing top 1 row



In [83]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

@udf(returnType = StringType())
def getFirstAndMiddle(names):
  return ' '.join(names[:-1])

# udfgetFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

voter_df = voter_df.withColumn('result', getFirstAndMiddle(voter_df.splits))
voter_df.show(10)

+----------+-------------+-------------------+----------+---------+--------------------+--------------------+------------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|              splits|      result|
+----------+-------------+-------------------+----------+---------+--------------------+--------------------+------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.9364138128782483|[Jennifer, S., Ga...| Jennifer S.|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.19888420476418855|[Philip, T., King...|   Philip T.|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|[Michael, S., Raw...|  Michael S.|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.5941073119978861|     [Adam, Medrano]|        Adam|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.013628293134853697|     [Casey, Thomas]|       Casey|
|02/08/2017|Coun

### ROW_ID 추가
- 기존 데이터프레임에서 ROW_ID

In [85]:
voter_df = voter_df.withColumn('ROW_ID', F.monotonically_increasing_id()+1)
voter_df.show()

+----------+-------------+-------------------+----------+---------+--------------------+--------------------+------------+------+
|      DATE|        TITLE|         VOTER_NAME|first_name|last_name|          random_val|              splits|      result|ROW_ID|
+----------+-------------+-------------------+----------+---------+--------------------+--------------------+------------+------+
|02/08/2017|Councilmember|  Jennifer S. Gates|  Jennifer|    Gates|  0.9364138128782483|[Jennifer, S., Ga...| Jennifer S.|     1|
|02/08/2017|Councilmember| Philip T. Kingston|    Philip| Kingston| 0.19888420476418855|[Philip, T., King...|   Philip T.|     2|
|02/08/2017|        Mayor|Michael S. Rawlings|   Michael| Rawlings|                 2.0|[Michael, S., Raw...|  Michael S.|     3|
|02/08/2017|Councilmember|       Adam Medrano|      Adam|  Medrano|  0.5941073119978861|     [Adam, Medrano]|        Adam|     4|
|02/08/2017|Councilmember|       Casey Thomas|     Casey|   Thomas|0.013628293134853697|  

In [87]:
result_df = voter_df.select(voter_df['VOTER_NAME']).distinct()
result_df.count()

27

In [96]:
result_df = result_df.withColumn('ROW_ID', F.monotonically_increasing_id()+1)
result_df.orderBy(result_df.ROW_ID.desc()).show(10)

+-------------------+------+
|         VOTER_NAME|ROW_ID|
+-------------------+------+
|       Lee Kleinman|    27|
|        Erik Wilson|    26|
|Carolyn King Arnold|    25|
|Rickey D.  Callahan|    24|
|   Monica R. Alonzo|    23|
|    Lee M. Kleinman|    22|
|  Jennifer S. Gates|    21|
|Philip T.  Kingston|    20|
|  Dwaine R. Caraway|    19|
| Rickey D. Callahan|    18|
+-------------------+------+
only showing top 10 rows



In [97]:
# ROW_ID의 값 중 가장 큰 값을 추출
result_df.select('ROW_ID').rdd.max()[0]

27

## 속도 측정

In [101]:
import time
import pyspark.sql.functions as F

departures_df = spark.read.format("csv").options(Header=True).load(DATA_PATH + "AA_DFW_2014_Departures_Short.csv.gz")
departures_df.show(1)

+-----------------+-------------+-------------------+-----------------------------+
|Date (MM/DD/YYYY)|Flight Number|Destination Airport|Actual elapsed time (Minutes)|
+-----------------+-------------+-------------------+-----------------------------+
|       01/01/2014|         0005|                HNL|                          519|
+-----------------+-------------+-------------------+-----------------------------+
only showing top 1 row



In [105]:
start_time = time.time()

# cache() 메서드를 활용해서 속도가 실제로 향상이 되는지 확인한다.
# 처음 메서드 실행할 때와, 재실행 시 속도가 좀 빨라지더라

departuers_df = departures_df.distinct().cache() # 1차 가공된 데이터를 임시 저장
print(departures_df.count(), time.time() - start_time) # 2차 가공 시작

157198 0.28300952911376953


In [106]:
start_time = time.time()
print(departures_df.count(), time.time() - start_time)

157198 0.23595118522644043


In [107]:
# cache에 저장이 되엄ㅆ는지 확인하자.
departuers_df.is_cached

True

In [108]:
# cache 메모리에 있는 것을 제거하자
departuers_df.unpersist()

DataFrame[Date (MM/DD/YYYY): string, Flight Number: string, Destination Airport: string, Actual elapsed time (Minutes): string]

In [109]:
departuers_df.is_cached

False

- 메모리 : 가정집
- 캐시메모리 : 사물보관함(역)

In [110]:
# Partitions
app_name = spark.conf.get('spark.app.name')
driver_tcp_port = spark.conf.get('spark.driver.port')
num_partitions = spark.conf.get('spark.sql.shuffle.partitions')
print("Number of partitions: %s" % num_partitions)

print("Name: %s" % app_name)
print("Driver TCP port: %s" % driver_tcp_port)
print("Number of partitions: %s" % num_partitions)

Number of partitions: 200
Name: Sample
Driver TCP port: 43467
Number of partitions: 200


- 쿼리플랜 : MySQL, Oracle에서 학습을 하는 것이 더 의미가 있음
  + 중요한 것 : Spark에서 이러한 쿼리 플랜이 존재함