In [50]:
# AWS-LINUX

import findspark
findspark.init("/opt/spark")

In [51]:
import pyspark
from pyspark.sql import SparkSession

In [52]:
spark = SparkSession.builder.getOrCreate()
spark

![spark1](images/spark1.png)

## 리스트객체로 RDD 객체 생성하기

### RDD(Resilient Distributed Dataset)
#### read-only 데이터셋으로서 다양한 머신에 데이터셋의 멀티셋(중복을 허용)을 분산해두고 특정한 머신에 문제가 생기더라도 문제없이 읽을수로 있도록 지원한다

- MapReduce 작업
- 분산하여 병렬적 처리
- 빠른 연산
- 불변(Immutable)
- Transformation 과 Action 으로 함수 종류가 나눠지며, Action 함수가 실행됐을 때 실제 연산
- Lineage 를 통해 Fault Tolerant(내고장성) 보장

In [53]:
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd=spark.sparkContext.parallelize(dataList)
print(rdd)
print(type(rdd))
print(rdd.collect())

ParallelCollectionRDD[228] at parallelize at PythonRDD.scala:195
<class 'pyspark.rdd.RDD'>
[('Java', 20000), ('Python', 100000), ('Scala', 3000)]


In [54]:
print(spark.version)

2.4.7


## 텍스트 파일 내용 읽어서 RDD 객체 생성하기

In [55]:
rdd = spark.read.text("data/korean_stopwords.txt")
print(type(rdd))
print(rdd.collect())

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(value='아'), Row(value='휴'), Row(value='아이구'), Row(value='아이쿠'), Row(value='아이고'), Row(value='어'), Row(value='나'), Row(value='우리'), Row(value='저희'), Row(value='따라'), Row(value='의해'), Row(value='을'), Row(value='를'), Row(value='에'), Row(value='의'), Row(value='가'), Row(value='으로'), Row(value='로'), Row(value='에게'), Row(value='뿐이다'), Row(value='의거하여'), Row(value='근거하여'), Row(value='입각하여'), Row(value='기준으로'), Row(value='예하면'), Row(value='예를 들면'), Row(value='예를 들자면'), Row(value='저'), Row(value='소인'), Row(value='소생'), Row(value='저희'), Row(value='지말고'), Row(value='하지마'), Row(value='하지마라'), Row(value='다른'), Row(value='물론'), Row(value='또한'), Row(value='그리고'), Row(value='비길수 없다'), Row(value='해서는 안된다'), Row(value='뿐만 아니라'), Row(value='만이 아니다'), Row(value='만은 아니다'), Row(value='막론하고'), Row(value='관계없이'), Row(value='그치지 않다'), Row(value='그러나'), Row(value='그런데'), Row(value='하지만'), Row(value='든간에'), Row(value='논하지 않다'), Row(value='따지지 않다'), Row(value='설사'), R

In [56]:
rdd = spark.sparkContext.textFile("data/korean_stopwords.txt")
print(type(rdd))
print(rdd.collect())

<class 'pyspark.rdd.RDD'>
['아', '휴', '아이구', '아이쿠', '아이고', '어', '나', '우리', '저희', '따라', '의해', '을', '를', '에', '의', '가', '으로', '로', '에게', '뿐이다', '의거하여', '근거하여', '입각하여', '기준으로', '예하면', '예를 들면', '예를 들자면', '저', '소인', '소생', '저희', '지말고', '하지마', '하지마라', '다른', '물론', '또한', '그리고', '비길수 없다', '해서는 안된다', '뿐만 아니라', '만이 아니다', '만은 아니다', '막론하고', '관계없이', '그치지 않다', '그러나', '그런데', '하지만', '든간에', '논하지 않다', '따지지 않다', '설사', '비록', '더라도', '아니면', '만 못하다', '하는 편이 낫다', '불문하고', '향하여', '향해서', '향하다', '쪽으로', '틈타', '이용하여', '타다', '오르다', '제외하고', '이 외에', '이 밖에', '하여야', '비로소', '한다면 몰라도', '외에도', '이곳', '여기', '부터', '기점으로', '따라서', '할 생각이다', '하려고하다', '이리하여', '그리하여', '그렇게 함으로써', '하지만', '일때', '할때', '앞에서', '중에서', '보는데서', '으로써', '로써', '까지', '해야한다', '일것이다', '반드시', '할줄알다', '할수있다', '할수있어', '임에 틀림없다', '한다면', '등', '등등', '제', '겨우', '단지', '다만', '할뿐', '딩동', '댕그', '대해서', '대하여', '대하면', '훨씬', '얼마나', '얼마만큼', '얼마큼', '남짓', '여', '얼마간', '약간', '다소', '좀', '조금', '다수', '몇', '얼마', '지만', '하물며', '또한', '그러나', '그렇지만', '하지만', '이외에도', '대해 말하자면', '뿐이다', '다음에', '반

## 생성한 RDD 객체 Spark DataFrame 으로 변환하기

### Spark DataFrame
- DataFrame은 명명 된 열로 구성된 데이터 세트 
- 개념적으로는 관계형 데이터베이스의 테이블 또는 R / Python의 데이터 프레임과 동일하지만 내부적으로 더욱  최적화가 있음
- RDB Table처럼 Schema를 가지고 있고 RDB의 Table 연산이 가능
- 구조화 된 데이터 파일, Hive의 테이블, 외부 데이터베이스 또는 기존 RDD와 같은 다양한 소스 에서 구성 할 수 있늠 
- DataFrame API는 Scala, Java, Python 및 R 에서 사용할 수 있음
- SparkSQL을 통해 사용 가능

In [57]:
dept = [("Finance",10), 
        ("Marketing",20), 
        ("Sales",30), 
        ("IT",40) 
      ]
rdd = spark.sparkContext.parallelize(dept)
print(rdd.collect())

[('Finance', 10), ('Marketing', 20), ('Sales', 30), ('IT', 40)]


In [58]:
df = rdd.toDF()
df.printSchema()
df.show()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---------+---+
|       _1| _2|
+---------+---+
|  Finance| 10|
|Marketing| 20|
|    Sales| 30|
|       IT| 40|
+---------+---+



In [59]:
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



In [60]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
print(type(df))
print(df)
df.printSchema()
df.show()

<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[firstname: string, middlename: string, lastname: string, dob: string, gender: string, salary: bigint]
root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [61]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



## CSV 파일 내용 읽어서 DataFrame 객체 생성하기

In [62]:
df = spark.read.csv("data/emp.csv")
df.printSchema()
df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

+-----+------+---------+----+----------+----+----+------+
|  _c0|   _c1|      _c2| _c3|       _c4| _c5| _c6|   _c7|
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839|  KING|PRES

In [63]:
emp = spark.read.csv("data/emp.csv", header=True)
emp.printSchema()
emp.show()

root
 |-- empno: string (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- comm: string (nullable = true)
 |-- deptno: string (nullable = true)

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 784

In [64]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True)
emp.printSchema()
emp.show()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: timestamp (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)

+-----+------+---------+----+-------------------+----+----+------+
|empno| ename|      job| mgr|           hiredate| sal|comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17 00:00:00| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03 00:00:00|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02 00:00:00|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22 00:00:00|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06 00:00:00|2450|null|    10|
| 7788| SCOTT|  ANALYST

In [65]:
df = spark.read.csv("data/mpgdata.csv")
df.printSchema()
df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)

+---+---------+------------+----------+------+------------+----------+
|_c0|      _c1|         _c2|       _c3|   _c4|         _c5|       _c6|
+---+---------+------------+----------+------+------------+----------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model-year|
| 18|        8|         307|       130|  3504|          12|        70|
| 15|        8|         350|       165|  3693|        11.5|        70|
| 18|        8|         318|       150|  3436|          11|        70|
| 16|        8|         304|       150|  3433|          12|        70|
| 17|        8|         302|       140|  3449|        10.5|        70|
| 15|        8|         429|       198|  4341|          10|        70|
| 14|        8|         454|       220|

In [66]:
df = spark.read.load("data/iris.csv",
                     format="csv", sep=",", inferSchema=True, header=True)
df.printSchema()
df.show()

root
 |-- sepal.length: double (nullable = true)
 |-- sepal.width: double (nullable = true)
 |-- petal.length: double (nullable = true)
 |-- petal.width: double (nullable = true)
 |-- variety: string (nullable = true)

+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
|         5.4|        3.9|         1.7|        0.4| Setosa|
|         4.6|        3.4|         1.4|        0.3| Setosa|
|         5.0|        3.4|         1.5|        0.2| Setosa|
|         4.4|        2.9|         1.4|        0.2| Setosa|
|         4.9|        3.1|         1.5|        0.1| Setosa|
|

## JSON 파일 내용 읽어서 DataFrame 객체 생성하기

In [67]:
df = spark.read.json("data/seoul_geo.json")
df.show()

+--------------------+--------------------+-----------------------+-------+
|     _corrupt_record|            geometry|             properties|   type|
+--------------------+--------------------+-----------------------+-------+
|                   {|                null|                   null|   null|
|"type": "FeatureC...|                null|                   null|   null|
|       "features": [|                null|                   null|   null|
|                null|[[[[127.115195849...|[2013, 11250, 강동구...|Feature|
|                null|[[[[127.069069813...|[2013, 11240, 송파구...|Feature|
|                null|[[[[127.058673592...|[2013, 11230, 강남구...|Feature|
|                null|[[[[127.013971196...|[2013, 11220, 서초구...|Feature|
|                null|[[[[126.961089890...|[2013, 11210, 관악구...|Feature|
|                null|[[[[126.982238079...|[2013, 11200, 동작구...|Feature|
|                null|[[[[126.891846638...|[2013, 11190, 영등포...|Feature|
|                null|[[[[126.901

In [68]:
df = spark.read.load("data/seoul_geo.json", format="json")
df.show()

+--------------------+--------------------+-----------------------+-------+
|     _corrupt_record|            geometry|             properties|   type|
+--------------------+--------------------+-----------------------+-------+
|                   {|                null|                   null|   null|
|"type": "FeatureC...|                null|                   null|   null|
|       "features": [|                null|                   null|   null|
|                null|[[[[127.115195849...|[2013, 11250, 강동구...|Feature|
|                null|[[[[127.069069813...|[2013, 11240, 송파구...|Feature|
|                null|[[[[127.058673592...|[2013, 11230, 강남구...|Feature|
|                null|[[[[127.013971196...|[2013, 11220, 서초구...|Feature|
|                null|[[[[126.961089890...|[2013, 11210, 관악구...|Feature|
|                null|[[[[126.982238079...|[2013, 11200, 동작구...|Feature|
|                null|[[[[126.891846638...|[2013, 11190, 영등포...|Feature|
|                null|[[[[126.901

## 파케이 파일 내용 읽어서 DataFrame 객체 생성하기

In [69]:
df = spark.read.load("data/userdata1.parquet")
df = df.select("first_name", "last_name", "email")
df.show()

+----------+---------+--------------------+
|first_name|last_name|               email|
+----------+---------+--------------------+
|    Amanda|   Jordan|    ajordan0@com.com|
|    Albert|  Freeman|     afreeman1@is.gd|
|    Evelyn|   Morgan|emorgan2@altervis...|
|    Denise|    Riley|    driley3@gmpg.org|
|    Carlos|    Burns|cburns4@miitbeian...|
|   Kathryn|    White|  kwhite5@google.com|
|    Samuel|   Holmes|sholmes6@foxnews.com|
|     Harry|   Howell| hhowell7@eepurl.com|
|      Jose|   Foster|   jfoster8@yelp.com|
|     Emily|  Stewart|estewart9@opensou...|
|     Susan|  Perkins| sperkinsa@patch.com|
|     Alice|    Berry|aberryb@wikipedia...|
|    Justin|    Berry|jberryc@usatoday.com|
|     Kathy| Reynolds|kreynoldsd@redcro...|
|   Dorothy|   Hudson|dhudsone@blogger.com|
|     Bruce|   Willis|bwillisf@bluehost...|
|     Emily|  Andrews|eandrewsg@cornell...|
|   Stephen|  Wallace|swallaceh@netvibe...|
|  Clarence|   Lawson|clawsoni@vkontakt...|
|   Rebecca|     Bell| rbellj@ba

## 직접 만든 DataFrame 객체 생성하여 정보 출력하기

In [70]:
data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



## Spark의 DataFrame 객체를 Pandas의 DataFrame 객체로 변환하기

In [71]:
pandasDF = pysparkDF.toPandas()
print(type(pandasDF))
print(pandasDF)

<class 'pandas.core.frame.DataFrame'>
  first_name middle_name last_name    dob gender  salary
0      James                 Smith  36636      M   60000
1    Michael        Rose            40288      M   70000
2     Robert              Williams  42114         400000
3      Maria        Anne     Jones  39192      F  500000
4        Jen        Mary     Brown             F       0


## select()

In [72]:
emp1 = emp.select("empno", "ename", "hiredate", "sal")
print(type(emp1))
emp1.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+-----+------+-------------------+----+
|empno| ename|           hiredate| sal|
+-----+------+-------------------+----+
| 7369| SMITH|1980-12-17 00:00:00| 800|
| 7499| ALLEN|1981-02-20 00:00:00|1600|
| 7521|  WARD|1981-02-03 00:00:00|1250|
| 7566| JONES|1981-03-02 00:00:00|2975|
| 7654|MARTIN|1981-10-22 00:00:00|1250|
| 7698| BLAKE|1981-05-01 00:00:00|2850|
| 7782| CLARK|1981-09-06 00:00:00|2450|
| 7788| SCOTT|1982-12-08 00:00:00|3000|
| 7839|  KING|1981-11-17 00:00:00|5000|
| 7844|TURNER|1984-10-08 00:00:00|1500|
| 7876| ADAMS|1983-01-12 00:00:00|1100|
| 7900| JAMES|1981-12-03 00:00:00| 950|
| 7902|  FORD|1981-12-13 00:00:00|3000|
| 7934|MILLER|1982-01-25 00:00:00|1300|
+-----+------+-------------------+----+



In [73]:
emp.select(emp.empno,emp.ename,emp.hiredate, emp.sal).show()

+-----+------+-------------------+----+
|empno| ename|           hiredate| sal|
+-----+------+-------------------+----+
| 7369| SMITH|1980-12-17 00:00:00| 800|
| 7499| ALLEN|1981-02-20 00:00:00|1600|
| 7521|  WARD|1981-02-03 00:00:00|1250|
| 7566| JONES|1981-03-02 00:00:00|2975|
| 7654|MARTIN|1981-10-22 00:00:00|1250|
| 7698| BLAKE|1981-05-01 00:00:00|2850|
| 7782| CLARK|1981-09-06 00:00:00|2450|
| 7788| SCOTT|1982-12-08 00:00:00|3000|
| 7839|  KING|1981-11-17 00:00:00|5000|
| 7844|TURNER|1984-10-08 00:00:00|1500|
| 7876| ADAMS|1983-01-12 00:00:00|1100|
| 7900| JAMES|1981-12-03 00:00:00| 950|
| 7902|  FORD|1981-12-13 00:00:00|3000|
| 7934|MILLER|1982-01-25 00:00:00|1300|
+-----+------+-------------------+----+



In [74]:
from pyspark.sql.functions import col
emp.select(col("empno"),col("ename"),col("hiredate"),col("sal")).show()

+-----+------+-------------------+----+
|empno| ename|           hiredate| sal|
+-----+------+-------------------+----+
| 7369| SMITH|1980-12-17 00:00:00| 800|
| 7499| ALLEN|1981-02-20 00:00:00|1600|
| 7521|  WARD|1981-02-03 00:00:00|1250|
| 7566| JONES|1981-03-02 00:00:00|2975|
| 7654|MARTIN|1981-10-22 00:00:00|1250|
| 7698| BLAKE|1981-05-01 00:00:00|2850|
| 7782| CLARK|1981-09-06 00:00:00|2450|
| 7788| SCOTT|1982-12-08 00:00:00|3000|
| 7839|  KING|1981-11-17 00:00:00|5000|
| 7844|TURNER|1984-10-08 00:00:00|1500|
| 7876| ADAMS|1983-01-12 00:00:00|1100|
| 7900| JAMES|1981-12-03 00:00:00| 950|
| 7902|  FORD|1981-12-13 00:00:00|3000|
| 7934|MILLER|1982-01-25 00:00:00|1300|
+-----+------+-------------------+----+



## collect()

In [75]:
dataCollect = emp.collect()
print(type(dataCollect))
print("----------------------------")
print(dataCollect)
print("----------------------------")
display(dataCollect)

<class 'list'>
----------------------------
[Row(empno=7369, ename='SMITH', job='CLERK', mgr=7902, hiredate=datetime.datetime(1980, 12, 17, 0, 0), sal=800, comm=None, deptno=20), Row(empno=7499, ename='ALLEN', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 2, 20, 0, 0), sal=1600, comm=300, deptno=30), Row(empno=7521, ename='WARD', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 2, 3, 0, 0), sal=1250, comm=500, deptno=30), Row(empno=7566, ename='JONES', job='MANAGER', mgr=7839, hiredate=datetime.datetime(1981, 3, 2, 0, 0), sal=2975, comm=None, deptno=20), Row(empno=7654, ename='MARTIN', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 10, 22, 0, 0), sal=1250, comm=1400, deptno=30), Row(empno=7698, ename='BLAKE', job='MANAGER', mgr=7839, hiredate=datetime.datetime(1981, 5, 1, 0, 0), sal=2850, comm=None, deptno=30), Row(empno=7782, ename='CLARK', job='MANAGER', mgr=7839, hiredate=datetime.datetime(1981, 9, 6, 0, 0), sal=2450, comm=None, deptno=10), Row(emp

[Row(empno=7369, ename='SMITH', job='CLERK', mgr=7902, hiredate=datetime.datetime(1980, 12, 17, 0, 0), sal=800, comm=None, deptno=20),
 Row(empno=7499, ename='ALLEN', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 2, 20, 0, 0), sal=1600, comm=300, deptno=30),
 Row(empno=7521, ename='WARD', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 2, 3, 0, 0), sal=1250, comm=500, deptno=30),
 Row(empno=7566, ename='JONES', job='MANAGER', mgr=7839, hiredate=datetime.datetime(1981, 3, 2, 0, 0), sal=2975, comm=None, deptno=20),
 Row(empno=7654, ename='MARTIN', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 10, 22, 0, 0), sal=1250, comm=1400, deptno=30),
 Row(empno=7698, ename='BLAKE', job='MANAGER', mgr=7839, hiredate=datetime.datetime(1981, 5, 1, 0, 0), sal=2850, comm=None, deptno=30),
 Row(empno=7782, ename='CLARK', job='MANAGER', mgr=7839, hiredate=datetime.datetime(1981, 9, 6, 0, 0), sal=2450, comm=None, deptno=10),
 Row(empno=7788, ename='SCOTT', job='ANALYST'

In [76]:
emp.printSchema()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: timestamp (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)



## withColumn()

In [77]:
newemp = emp.withColumn("deptno",col("deptno").cast("Integer"))
newemp.printSchema()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: timestamp (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)



In [78]:
newemp = newemp.withColumn("sal",col("sal")*100)
newemp.show()

+-----+------+---------+----+-------------------+------+----+------+
|empno| ename|      job| mgr|           hiredate|   sal|comm|deptno|
+-----+------+---------+----+-------------------+------+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17 00:00:00| 80000|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|160000| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03 00:00:00|125000| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02 00:00:00|297500|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22 00:00:00|125000|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|285000|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06 00:00:00|245000|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08 00:00:00|300000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17 00:00:00|500000|null|    10|
| 7844|TURNER| SALESMAN|7698|1984-10-08 00:00:00|150000|null|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12 00:00:00|110000|null|    20|
| 7900| JAMES|    CLERK|7698|1981-

## withColumnRenamed()

In [79]:
newemp = newemp.withColumnRenamed("sal","salary")
newemp.show()

+-----+------+---------+----+-------------------+------+----+------+
|empno| ename|      job| mgr|           hiredate|salary|comm|deptno|
+-----+------+---------+----+-------------------+------+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17 00:00:00| 80000|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|160000| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03 00:00:00|125000| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02 00:00:00|297500|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22 00:00:00|125000|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|285000|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06 00:00:00|245000|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08 00:00:00|300000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17 00:00:00|500000|null|    10|
| 7844|TURNER| SALESMAN|7698|1984-10-08 00:00:00|150000|null|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12 00:00:00|110000|null|    20|
| 7900| JAMES|    CLERK|7698|1981-

In [80]:
newemp = newemp.withColumnRenamed("mgr","manager") \
    .withColumnRenamed("ename","empname")
newemp.show()

+-----+-------+---------+-------+-------------------+------+----+------+
|empno|empname|      job|manager|           hiredate|salary|comm|deptno|
+-----+-------+---------+-------+-------------------+------+----+------+
| 7369|  SMITH|    CLERK|   7902|1980-12-17 00:00:00| 80000|null|    20|
| 7499|  ALLEN| SALESMAN|   7698|1981-02-20 00:00:00|160000| 300|    30|
| 7521|   WARD| SALESMAN|   7698|1981-02-03 00:00:00|125000| 500|    30|
| 7566|  JONES|  MANAGER|   7839|1981-03-02 00:00:00|297500|null|    20|
| 7654| MARTIN| SALESMAN|   7698|1981-10-22 00:00:00|125000|1400|    30|
| 7698|  BLAKE|  MANAGER|   7839|1981-05-01 00:00:00|285000|null|    30|
| 7782|  CLARK|  MANAGER|   7839|1981-09-06 00:00:00|245000|null|    10|
| 7788|  SCOTT|  ANALYST|   7566|1982-12-08 00:00:00|300000|null|    20|
| 7839|   KING|PRESIDENT|   null|1981-11-17 00:00:00|500000|null|    10|
| 7844| TURNER| SALESMAN|   7698|1984-10-08 00:00:00|150000|null|    30|
| 7876|  ADAMS|    CLERK|   7788|1983-01-12 00:00:0

## filter() - where() 와 동일

In [81]:
emp.filter(emp.ename == "KING").show(truncate=False)

+-----+-----+---------+----+-------------------+----+----+------+
|empno|ename|job      |mgr |hiredate           |sal |comm|deptno|
+-----+-----+---------+----+-------------------+----+----+------+
|7839 |KING |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |
+-----+-----+---------+----+-------------------+----+----+------+



In [82]:
emp.filter('ename == "KING"').show(truncate=False)

+-----+-----+---------+----+-------------------+----+----+------+
|empno|ename|job      |mgr |hiredate           |sal |comm|deptno|
+-----+-----+---------+----+-------------------+----+----+------+
|7839 |KING |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |
+-----+-----+---------+----+-------------------+----+----+------+



In [83]:
emp.filter((emp.deptno == 30) & (emp.sal >= 1500)).show(truncate=False)

+-----+------+--------+----+-------------------+----+----+------+
|empno|ename |job     |mgr |hiredate           |sal |comm|deptno|
+-----+------+--------+----+-------------------+----+----+------+
|7499 |ALLEN |SALESMAN|7698|1981-02-20 00:00:00|1600|300 |30    |
|7698 |BLAKE |MANAGER |7839|1981-05-01 00:00:00|2850|null|30    |
|7844 |TURNER|SALESMAN|7698|1984-10-08 00:00:00|1500|null|30    |
+-----+------+--------+----+-------------------+----+----+------+



In [84]:
emp.where((emp.deptno == 30) & (emp.sal >= 1500)).show(truncate=False)

+-----+------+--------+----+-------------------+----+----+------+
|empno|ename |job     |mgr |hiredate           |sal |comm|deptno|
+-----+------+--------+----+-------------------+----+----+------+
|7499 |ALLEN |SALESMAN|7698|1981-02-20 00:00:00|1600|300 |30    |
|7698 |BLAKE |MANAGER |7839|1981-05-01 00:00:00|2850|null|30    |
|7844 |TURNER|SALESMAN|7698|1984-10-08 00:00:00|1500|null|30    |
+-----+------+--------+----+-------------------+----+----+------+



## distinct(), drop (), dropDuplicates ()

In [85]:
empnew = emp.select("job", "deptno")
empnew.show()

+---------+------+
|      job|deptno|
+---------+------+
|    CLERK|    20|
| SALESMAN|    30|
| SALESMAN|    30|
|  MANAGER|    20|
| SALESMAN|    30|
|  MANAGER|    30|
|  MANAGER|    10|
|  ANALYST|    20|
|PRESIDENT|    10|
| SALESMAN|    30|
|    CLERK|    20|
|    CLERK|    30|
|  ANALYST|    20|
|    CLERK|    10|
+---------+------+



In [86]:
empnew.distinct().show()

+---------+------+
|      job|deptno|
+---------+------+
|  ANALYST|    20|
|  MANAGER|    10|
|  MANAGER|    30|
|PRESIDENT|    10|
|    CLERK|    20|
| SALESMAN|    30|
|    CLERK|    10|
|  MANAGER|    20|
|    CLERK|    30|
+---------+------+



In [87]:
empnew.dropDuplicates().show()

+---------+------+
|      job|deptno|
+---------+------+
|  ANALYST|    20|
|  MANAGER|    10|
|  MANAGER|    30|
|PRESIDENT|    10|
|    CLERK|    20|
| SALESMAN|    30|
|    CLERK|    10|
|  MANAGER|    20|
|    CLERK|    30|
+---------+------+



In [88]:
empnew.drop("deptno").show()

+---------+
|      job|
+---------+
|    CLERK|
| SALESMAN|
| SALESMAN|
|  MANAGER|
| SALESMAN|
|  MANAGER|
|  MANAGER|
|  ANALYST|
|PRESIDENT|
| SALESMAN|
|    CLERK|
|    CLERK|
|  ANALYST|
|    CLERK|
+---------+



## orderBy(), sort()

In [89]:
emp.sort("sal").show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22 00:00:00|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03 00:00:00|1250|500 |30    |
|7934 |MILLER|CLERK    |7782|1982-01-25 00:00:00|1300|null|10    |
|7844 |TURNER|SALESMAN |7698|1984-10-08 00:00:00|1500|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20 00:00:00|1600|300 |30    |
|7782 |CLARK |MANAGER  |7839|1981-09-06 00:00:00|2450|null|10    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01 00:00:00|2850|null|30    |
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20 

In [90]:
emp.sort(emp.sal.desc()).show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20    |
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01 00:00:00|2850|null|30    |
|7782 |CLARK |MANAGER  |7839|1981-09-06 00:00:00|2450|null|10    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20 00:00:00|1600|300 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08 00:00:00|1500|null|30    |
|7934 |MILLER|CLERK    |7782|1982-01-25 00:00:00|1300|null|10    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22 00:00:00|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03 00:00:00|1250|500 |30    |
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20 

In [91]:
emp.sort("deptno", "sal").show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
|7934 |MILLER|CLERK    |7782|1982-01-25 00:00:00|1300|null|10    |
|7782 |CLARK |MANAGER  |7839|1981-09-06 00:00:00|2450|null|10    |
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20    |
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22 00:00:00|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03 00:00:00|1250|500 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08 00:00:00|1500|null|30 

In [92]:
emp.sort(emp.deptno.desc(), emp.sal.desc()).show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
|7698 |BLAKE |MANAGER  |7839|1981-05-01 00:00:00|2850|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20 00:00:00|1600|300 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08 00:00:00|1500|null|30    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22 00:00:00|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03 00:00:00|1250|500 |30    |
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20    |
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10 

In [93]:
emp.orderBy(emp.deptno.desc(), emp.sal.desc()).show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
|7698 |BLAKE |MANAGER  |7839|1981-05-01 00:00:00|2850|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20 00:00:00|1600|300 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08 00:00:00|1500|null|30    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22 00:00:00|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03 00:00:00|1250|500 |30    |
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20    |
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10 

In [94]:
emp.sort(col("hiredate").asc(),col("sal").asc()).show(truncate=False)
emp.orderBy(col("hiredate").asc(),col("sal").asc()).show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |
|7521 |WARD  |SALESMAN |7698|1981-02-03 00:00:00|1250|500 |30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20 00:00:00|1600|300 |30    |
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01 00:00:00|2850|null|30    |
|7782 |CLARK |MANAGER  |7839|1981-09-06 00:00:00|2450|null|10    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22 00:00:00|1250|1400|30    |
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |
|7934 |MILLER|CLERK    |7782|1982-01-25 00:00:00|1300|null|10    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20 

## groupBy()

In [95]:
emp.groupBy("deptno").sum("sal").show(truncate=False)

+------+--------+
|deptno|sum(sal)|
+------+--------+
|20    |10875   |
|10    |8750    |
|30    |9400    |
+------+--------+



In [96]:
emp.groupBy("deptno").min("sal").show(truncate=False)

+------+--------+
|deptno|min(sal)|
+------+--------+
|20    |800     |
|10    |1300    |
|30    |950     |
+------+--------+



In [97]:
emp.groupBy("deptno").max("sal").show(truncate=False)

+------+--------+
|deptno|max(sal)|
+------+--------+
|20    |3000    |
|10    |5000    |
|30    |2850    |
+------+--------+



In [98]:
emp.groupBy("deptno").avg("sal").show(truncate=False)

+------+------------------+
|deptno|avg(sal)          |
+------+------------------+
|20    |2175.0            |
|10    |2916.6666666666665|
|30    |1566.6666666666667|
+------+------------------+



In [99]:
emp.groupBy("deptno", "job").sum("sal").show(truncate=False)

+------+---------+--------+
|deptno|job      |sum(sal)|
+------+---------+--------+
|20    |ANALYST  |6000    |
|20    |MANAGER  |2975    |
|30    |MANAGER  |2850    |
|30    |SALESMAN |5600    |
|30    |CLERK    |950     |
|20    |CLERK    |1900    |
|10    |PRESIDENT|5000    |
|10    |CLERK    |1300    |
|10    |MANAGER  |2450    |
+------+---------+--------+



In [100]:
emp.groupBy("deptno").sum("sal", "comm").show(truncate=False)

+------+--------+---------+
|deptno|sum(sal)|sum(comm)|
+------+--------+---------+
|20    |10875   |null     |
|10    |8750    |null     |
|30    |9400    |2200     |
+------+--------+---------+



In [101]:
from pyspark.sql.functions import sum,avg,max,min,mean,count
emp.groupBy("deptno").agg(sum("sal"), avg("sal"), max("sal"), min("sal"), mean("sal")).show(truncate=False)

+------+--------+------------------+--------+--------+------------------+
|deptno|sum(sal)|avg(sal)          |max(sal)|min(sal)|avg(sal)          |
+------+--------+------------------+--------+--------+------------------+
|20    |10875   |2175.0            |3000    |800     |2175.0            |
|10    |8750    |2916.6666666666665|5000    |1300    |2916.6666666666665|
|30    |9400    |1566.6666666666667|2850    |950     |1566.6666666666667|
+------+--------+------------------+--------+--------+------------------+



In [102]:
emp.groupBy("deptno") \
    .agg(sum("sal").alias("sum_salary"), \
         avg("sal").alias("avg_salary"), \
         max("sal").alias("max_salary"), \
         min("sal").alias("min_salary"), \
         mean("sal").alias("mean_salary"), \
     ) \
    .show(truncate=False)

+------+----------+------------------+----------+----------+------------------+
|deptno|sum_salary|avg_salary        |max_salary|min_salary|mean_salary       |
+------+----------+------------------+----------+----------+------------------+
|20    |10875     |2175.0            |3000      |800       |2175.0            |
|10    |8750      |2916.6666666666665|5000      |1300      |2916.6666666666665|
|30    |9400      |1566.6666666666667|2850      |950       |1566.6666666666667|
+------+----------+------------------+----------+----------+------------------+



In [103]:
emp.groupBy("deptno") \
    .agg(sum("sal").alias("sum_salary"), \
         avg("sal").alias("avg_salary"), \
         max("sal").alias("max_salary"), \
         min("sal").alias("min_salary"), \
         mean("sal").alias("mean_salary"), \
     ) \
    .where(col("sum_salary") > 9000)\
    .show(truncate=False)

+------+----------+------------------+----------+----------+------------------+
|deptno|sum_salary|avg_salary        |max_salary|min_salary|mean_salary       |
+------+----------+------------------+----------+----------+------------------+
|20    |10875     |2175.0            |3000      |800       |2175.0            |
|30    |9400      |1566.6666666666667|2850      |950       |1566.6666666666667|
+------+----------+------------------+----------+----------+------------------+



In [104]:
deptdata = [(10, '영업부', '서울'), (20, '개발부', '대전'), (30, '기획부', '서울'), (40, '마케팅부', '서울')]
deptcolname = ['deptno', 'dname', 'loc']
dept = spark.createDataFrame(data=deptdata, schema=deptcolname)
dept.show(truncate=False)

+------+--------+----+
|deptno|dname   |loc |
+------+--------+----+
|10    |영업부  |서울|
|20    |개발부  |대전|
|30    |기획부  |서울|
|40    |마케팅부|서울|
+------+--------+----+



## join()

In [105]:
emp.join(dept,emp.deptno ==  dept.deptno,"inner") \
     .show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+------+------+----+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|deptno|dname |loc |
+-----+------+---------+----+-------------------+----+----+------+------+------+----+
|7934 |MILLER|CLERK    |7782|1982-01-25 00:00:00|1300|null|10    |10    |영업부|서울|
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |10    |영업부|서울|
|7782 |CLARK |MANAGER  |7839|1981-09-06 00:00:00|2450|null|10    |10    |영업부|서울|
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |20    |개발부|대전|
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20    |20    |개발부|대전|
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20    |20    |개발부|대전|
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |20    |개발부|대전|
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |20    |개발부|대전|
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |30    |기획부|서울|
|7844 |TURNER

In [106]:
emp.join(dept,emp.deptno ==  dept.deptno,"right") \
     .show(truncate=False)

+-----+------+---------+----+-------------------+----+----+------+------+--------+----+
|empno|ename |job      |mgr |hiredate           |sal |comm|deptno|deptno|dname   |loc |
+-----+------+---------+----+-------------------+----+----+------+------+--------+----+
|7934 |MILLER|CLERK    |7782|1982-01-25 00:00:00|1300|null|10    |10    |영업부  |서울|
|7839 |KING  |PRESIDENT|null|1981-11-17 00:00:00|5000|null|10    |10    |영업부  |서울|
|7782 |CLARK |MANAGER  |7839|1981-09-06 00:00:00|2450|null|10    |10    |영업부  |서울|
|7902 |FORD  |ANALYST  |7566|1981-12-13 00:00:00|3000|null|20    |20    |개발부  |대전|
|7876 |ADAMS |CLERK    |7788|1983-01-12 00:00:00|1100|null|20    |20    |개발부  |대전|
|7788 |SCOTT |ANALYST  |7566|1982-12-08 00:00:00|3000|null|20    |20    |개발부  |대전|
|7566 |JONES |MANAGER  |7839|1981-03-02 00:00:00|2975|null|20    |20    |개발부  |대전|
|7369 |SMITH |CLERK    |7902|1980-12-17 00:00:00|800 |null|20    |20    |개발부  |대전|
|7900 |JAMES |CLERK    |7698|1981-12-03 00:00:00|950 |null|30    |30    

## union()

In [107]:
emp1 = emp.filter("job == 'MANAGER'").select("ename", "sal")
emp2 = emp.filter("deptno == 30").select("ename", "sal")
emp1.show()
emp2.show()
emp1.union(emp2).show()

+-----+----+
|ename| sal|
+-----+----+
|JONES|2975|
|BLAKE|2850|
|CLARK|2450|
+-----+----+

+------+----+
| ename| sal|
+------+----+
| ALLEN|1600|
|  WARD|1250|
|MARTIN|1250|
| BLAKE|2850|
|TURNER|1500|
| JAMES| 950|
+------+----+

+------+----+
| ename| sal|
+------+----+
| JONES|2975|
| BLAKE|2850|
| CLARK|2450|
| ALLEN|1600|
|  WARD|1250|
|MARTIN|1250|
| BLAKE|2850|
|TURNER|1500|
| JAMES| 950|
+------+----+



In [108]:
emp1 = emp.filter("job == 'MANAGER'").select("ename", "sal")
emp2 = emp.filter("deptno == 30").select("ename", "sal")
emp1.show()
emp2.show()
emp1.union(emp2).dropDuplicates().show()

+-----+----+
|ename| sal|
+-----+----+
|JONES|2975|
|BLAKE|2850|
|CLARK|2450|
+-----+----+

+------+----+
| ename| sal|
+------+----+
| ALLEN|1600|
|  WARD|1250|
|MARTIN|1250|
| BLAKE|2850|
|TURNER|1500|
| JAMES| 950|
+------+----+

+------+----+
| ename| sal|
+------+----+
| BLAKE|2850|
|MARTIN|1250|
|TURNER|1500|
| CLARK|2450|
| JAMES| 950|
| ALLEN|1600|
| JONES|2975|
|  WARD|1250|
+------+----+



## **map() 과 flatMap()**

### lines = [['w1',  'w2', 'w3'], ['w4', 'w5', 'w6']]
### lines를 map/flatmap을 이용하여 split하게 되면 아래와 같다.
### map: one2one mapping
###	Array(Array('w1', 'w2', 'w3'), Array('w4', 'w5', 'w6'))

### flatmap: one example → one result(flatten)
### Array('w1', 'w2', 'w3', 'w4', 'w5', 'w6')

![spark2](images/spark2.png)

In [109]:
data = ["둘리 또치 도우너 희동이 고길동 마이콜",
        "피카츄 꼬부기 잠만보",
        "듀크 턱시",
        "프로도 간달프 스미골",
        "코코"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

둘리 또치 도우너 희동이 고길동 마이콜
피카츄 꼬부기 잠만보
듀크 턱시
프로도 간달프 스미골
코코


In [110]:
rdd2=rdd.map(lambda x: x.split(" "))
rdd2.collect()

[['둘리', '또치', '도우너', '희동이', '고길동', '마이콜'],
 ['피카츄', '꼬부기', '잠만보'],
 ['듀크', '턱시'],
 ['프로도', '간달프', '스미골'],
 ['코코']]

In [111]:
rdd2=rdd.flatMap(lambda x: x.split(" "))
rdd2.collect()

['둘리',
 '또치',
 '도우너',
 '희동이',
 '고길동',
 '마이콜',
 '피카츄',
 '꼬부기',
 '잠만보',
 '듀크',
 '턱시',
 '프로도',
 '간달프',
 '스미골',
 '코코']

In [112]:
spark.sparkContext.parallelize([3,4,5]).map(lambda x: range(1,x)).collect() 

[range(1, 3), range(1, 4), range(1, 5)]

In [113]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect() 

[1, 2, 1, 2, 3, 1, 2, 3, 4]

In [114]:
spark.sparkContext.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect() 

[[3, 9], [4, 16], [5, 25]]

In [115]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x: [x,  x*x]).collect() 

[3, 9, 4, 16, 5, 25]

In [116]:
lines = spark.sparkContext.textFile("data/greeting.txt")
sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())

[('Birthday', 1),
 ('Day', 1),
 ('Evening', 1),
 ('Good', 3),
 ('Happy', 2),
 ('Morning', 1),
 ('New', 1),
 ('Year', 1)]

In [117]:
rdd1 = spark.sparkContext.textFile("data/greeting.txt")
print(type(rdd1))
print(rdd1)
print(rdd1.collect())
print("------------------------------------------------------------------------------")
rdd2 = rdd1.flatMap(lambda line: line.split())
print(type(rdd2))
print(rdd2)
print(rdd2.collect())
print("------------------------------------------------------------------------------")
rdd3 = rdd2.map(lambda w: (w,1))
print(type(rdd3))
print(rdd3)      
print(rdd3.collect())
print("------------------------------------------------------------------------------")
rdd4 = rdd3.reduceByKey(lambda v1, v2: v1+v2)
print(type(rdd4))
print(rdd4)
print(rdd4.collect())
print("------------------------------------------------------------------------------")
result = rdd4.collect()
print(type(result))
print(result)

<class 'pyspark.rdd.RDD'>
data/greeting.txt MapPartitionsRDD[608] at textFile at NativeMethodAccessorImpl.java:0
['Good Morning', 'Good Evening', 'Good Day', 'Happy Birthday', 'Happy New Year']
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[609] at RDD at PythonRDD.scala:53
['Good', 'Morning', 'Good', 'Evening', 'Good', 'Day', 'Happy', 'Birthday', 'Happy', 'New', 'Year']
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[610] at RDD at PythonRDD.scala:53
[('Good', 1), ('Morning', 1), ('Good', 1), ('Evening', 1), ('Good', 1), ('Day', 1), ('Happy', 1), ('Birthday', 1), ('Happy', 1), ('New', 1), ('Year', 1)]
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[615] at RDD at PythonRDD.scala:53
[('Good', 3), ('Morning', 1), ('Evening', 1), ('Birthday', 1), ('New', 1), ('Ye

### [PySpark API 도큐먼트](https://spark.apache.org/docs/latest/api/python/index.html)

## <span style='color:red'>**RDD**</span>
### Resilient Distributed Dataset의 약자(탄력 분산 데이터셋)
### 분산되어 존재하는 데이터들의 모임, 즉 클러스터에 분배되어 있는 데이터들을 하나로 관리하는 개념
### 스파크의 모든 데이터 타입들은 RDD를 기반으로 만들어지고 데이터끼리의 연산들은 RDD의 연산으로 이루어져 있음

In [118]:
greetRDD = spark.sparkContext.textFile('data/greeting.txt')
print(greetRDD)
greetRDD.collect()

data/greeting.txt MapPartitionsRDD[617] at textFile at NativeMethodAccessorImpl.java:0


['Good Morning',
 'Good Evening',
 'Good Day',
 'Happy Birthday',
 'Happy New Year']

In [119]:
goodLines = greetRDD.filter(lambda x : "Good" in x)
goodLines.collect()

['Good Morning', 'Good Evening', 'Good Day']

In [120]:
goodLines.count()

3

In [121]:
numbers = spark.sparkContext.parallelize(list(range(5)))
squared = numbers.map(lambda x : x * x).collect()
squared

[0, 1, 4, 9, 16]

In [122]:
strings = spark.sparkContext.parallelize(["hello spark", "hi python"])
splitted = strings.flatMap(lambda x : x.split(" ")).collect()
splitted

['hello', 'spark', 'hi', 'python']

In [123]:
numbers = spark.sparkContext.parallelize(list(range(1, 30, 3)))
result = numbers.filter(lambda x : x % 2 == 0).collect()
result

[4, 10, 16, 22, 28]

In [124]:
linesRDD = spark.sparkContext.parallelize(["test", "this is a test rdd"])
linesRDD

ParallelCollectionRDD[626] at parallelize at PythonRDD.scala:195

## <span style='color:red'>**페어 RDD**</span>
### 페어 RDD란 key-value쌍으로 이루어진 RDD
### 파이썬에서는 Tuple로 이뤄진 RDD가 곧 페어 RDD가 됨

In [125]:
examplePairRDD = spark.sparkContext.parallelize([(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)])
print(examplePairRDD)
examplePairRDD.collect()

ParallelCollectionRDD[627] at parallelize at PythonRDD.scala:195


[(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)]

- reduceByKey(func) : 동일 키에 대한 값들을 reduce(예 : rdd.reduceByKey(lambda x, y: x + y))
- mapValues(func) : 각 키에 대해 연산을 적용(예 : rdd.mapValues(lambda x : x + 1))
- sortByKey() : 키로 정렬한 RDD 리턴(예 : rdd.sortByKey())
- keys() : 키값들을 리턴(예 : rdd.keys())
- values() : value값들을 리턴(예 : rdd.values())

In [126]:
examplePairRDD.reduceByKey(lambda x, y : x + y).collect()

[(1, 8), (2, 4), (3, 4), (4, 10)]

In [127]:
examplePairRDD.mapValues(lambda x: x**2).collect()

[(1, 9), (1, 25), (2, 16), (3, 9), (4, 64), (4, 4), (3, 1)]

In [128]:
customerLines = spark.sparkContext.textFile("data/name-customers.csv")
print(customerLines)
customerLines.first()

data/name-customers.csv MapPartitionsRDD[635] at textFile at NativeMethodAccessorImpl.java:0


'Alfreds Futterkiste,Germany'

In [129]:
customerPairs = customerLines.map(lambda x: (x.split(",")[1], x.split(",")[0]))
print(customerPairs)
customerPairs.collect()

PythonRDD[637] at RDD at PythonRDD.scala:53


[('Germany', 'Alfreds Futterkiste'),
 ('Mexico', 'Ana Trujillo Emparedados y helados'),
 ('Mexico', 'Antonio Moreno Taqueria'),
 ('UK', 'Around the Horn'),
 ('Sweden', 'Berglunds snabbkop'),
 ('Germany', 'Blauer See Delikatessen'),
 ('France', 'Blondel pere et fils'),
 ('Spain', 'Bolido Comidas preparadas'),
 ('France', "Bon app'"),
 ('Canada', 'Bottom-Dollar Marketse'),
 ('UK', "B's Beverages"),
 ('Argentina', 'Cactus Comidas para llevar'),
 ('Mexico', 'Centro comercial Moctezuma'),
 ('Switzerland', 'Chop-suey Chinese'),
 ('Brazil', 'Comercio Mineiro'),
 ('UK', 'Consolidated Holdings'),
 ('Germany', 'Drachenblut Delikatessend'),
 ('France', 'Du monde entier'),
 ('UK', 'Eastern Connection'),
 ('Austria', 'Ernst Handel'),
 ('Brazil', 'Familia Arquibaldo'),
 ('Spain', 'FISSA Fabrica Inter. Salchichas S.A.'),
 ('France', 'Folies gourmandes'),
 ('Sweden', 'Folk och fa HB'),
 ('Germany', 'Frankenversand'),
 ('France', 'France restauration'),
 ('Italy', 'Franchi S.p.A.'),
 ('Portugal', 'Furi

In [130]:
customerPairCollected = customerPairs.groupByKey().collect()
customerDict = {
    country : [c for c in customers]
    for country, customers in customerPairCollected
}
customerDict['UK']

['Around the Horn',
 "B's Beverages",
 'Consolidated Holdings',
 'Eastern Connection',
 'Island Trading',
 'North/South',
 'Seven Seas Imports']

In [131]:
[k for k in customerPairs.sortByKey().keys().collect()][:10]

['Argentina',
 'Argentina',
 'Argentina',
 'Austria',
 'Austria',
 'Belgium',
 'Belgium',
 'Brazil',
 'Brazil',
 'Brazil']

In [132]:
mapReduced = customerPairs.mapValues(lambda x : 1).reduceByKey(lambda x, y: x + y)
{
    i:j for i, j in mapReduced.collect()
}

{'Mexico': 5,
 'France': 11,
 'Argentina': 3,
 'Switzerland': 2,
 'Brazil': 9,
 'Austria': 2,
 'Portugal': 2,
 'USA': 13,
 'Venezuela': 4,
 'Ireland': 1,
 'Belgium': 2,
 'Norway': 1,
 'Denmark': 2,
 'Finland': 2,
 'Poland': 1,
 'Germany': 11,
 'UK': 7,
 'Sweden': 2,
 'Spain': 5,
 'Canada': 3,
 'Italy': 3}

## RDD를 가지고 워드카운팅하는 예제

In [133]:
lines = spark.sparkContext.textFile("data/greeting.txt")
sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())

[('Birthday', 1),
 ('Day', 1),
 ('Evening', 1),
 ('Good', 3),
 ('Happy', 2),
 ('Morning', 1),
 ('New', 1),
 ('Year', 1)]

In [134]:
rdd1 = spark.sparkContext.textFile("data/greeting.txt")
print(type(rdd1))
print(rdd1)
print(rdd1.collect())
print("------------------------------------------------------------------------------")
rdd2 = rdd1.flatMap(lambda line: line.split())
print(type(rdd2))
print(rdd2)
print(rdd2.collect())
print("------------------------------------------------------------------------------")
rdd3 = rdd2.map(lambda w: (w,1))
print(type(rdd3))
print(rdd3)      
print(rdd3.collect())
print("------------------------------------------------------------------------------")
rdd4 = rdd3.reduceByKey(lambda v1, v2: v1+v2)
print(type(rdd4))
print(rdd4)
print(rdd4.collect())
print("------------------------------------------------------------------------------")
result = rdd4.collect()
print(type(result))
print(result)
print("------------------------------------------------------------------------------")
print(sorted(result))

<class 'pyspark.rdd.RDD'>
data/greeting.txt MapPartitionsRDD[663] at textFile at NativeMethodAccessorImpl.java:0
['Good Morning', 'Good Evening', 'Good Day', 'Happy Birthday', 'Happy New Year']
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[664] at RDD at PythonRDD.scala:53
['Good', 'Morning', 'Good', 'Evening', 'Good', 'Day', 'Happy', 'Birthday', 'Happy', 'New', 'Year']
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[665] at RDD at PythonRDD.scala:53
[('Good', 1), ('Morning', 1), ('Good', 1), ('Evening', 1), ('Good', 1), ('Day', 1), ('Happy', 1), ('Birthday', 1), ('Happy', 1), ('New', 1), ('Year', 1)]
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[670] at RDD at PythonRDD.scala:53
[('Good', 3), ('Morning', 1), ('Evening', 1), ('Birthday', 1), ('New', 1), ('Ye

## 파일 로딩(JSON, CSV)

In [135]:
import json
carsJson = spark.sparkContext.textFile("./data/cars.json")\
              .map(lambda x: json.loads(x))
carsJson

PythonRDD[673] at RDD at PythonRDD.scala:53

In [136]:
carsJson.first()

{'brand': 'Ford', 'models': {'name': 'Fiesta', 'price': '14260'}}

In [137]:
carsJson.collect()

[{'brand': 'Ford', 'models': {'name': 'Fiesta', 'price': '14260'}},
 {'brand': 'Ford', 'models': {'name': 'Focus', 'price': '18825'}},
 {'brand': 'Ford', 'models': {'name': 'Mustang', 'price': '26670'}},
 {'brand': 'BMW', 'models': {'name': '320', 'price': '40250'}},
 {'brand': 'BMW', 'models': {'name': 'X3', 'price': '41000'}},
 {'brand': 'BMW', 'models': {'name': 'X5', 'price': '60700'}},
 {'brand': 'Fiat', 'models': {'name': '500', 'price': '16495'}}]

## RDD를 가지고 Hive가상테이블 생성 ~> SQL을 사용해서 데이터 처리

In [138]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True)
emp

DataFrame[empno: int, ename: string, job: string, mgr: int, hiredate: timestamp, sal: int, comm: int, deptno: int]

In [139]:
from pyspark.sql import HiveContext
hiveCtx = HiveContext(spark.sparkContext)

In [140]:
emp.registerTempTable("hiveemp")
emp

DataFrame[empno: int, ename: string, job: string, mgr: int, hiredate: timestamp, sal: int, comm: int, deptno: int]

In [141]:
empResult = hiveCtx.sql("SELECT ename, sal FROM hiveemp")
empResult.collect()[:5]

[Row(ename='SMITH', sal=800),
 Row(ename='ALLEN', sal=1600),
 Row(ename='WARD', sal=1250),
 Row(ename='JONES', sal=2975),
 Row(ename='MARTIN', sal=1250)]

In [142]:
empResult = hiveCtx.sql("SELECT * FROM hiveemp order by sal")
empResult.collect()

[Row(empno=7369, ename='SMITH', job='CLERK', mgr=7902, hiredate=datetime.datetime(1980, 12, 17, 0, 0), sal=800, comm=None, deptno=20),
 Row(empno=7900, ename='JAMES', job='CLERK', mgr=7698, hiredate=datetime.datetime(1981, 12, 3, 0, 0), sal=950, comm=None, deptno=30),
 Row(empno=7876, ename='ADAMS', job='CLERK', mgr=7788, hiredate=datetime.datetime(1983, 1, 12, 0, 0), sal=1100, comm=None, deptno=20),
 Row(empno=7521, ename='WARD', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 2, 3, 0, 0), sal=1250, comm=500, deptno=30),
 Row(empno=7654, ename='MARTIN', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1981, 10, 22, 0, 0), sal=1250, comm=1400, deptno=30),
 Row(empno=7934, ename='MILLER', job='CLERK', mgr=7782, hiredate=datetime.datetime(1982, 1, 25, 0, 0), sal=1300, comm=None, deptno=10),
 Row(empno=7844, ename='TURNER', job='SALESMAN', mgr=7698, hiredate=datetime.datetime(1984, 10, 8, 0, 0), sal=1500, comm=None, deptno=30),
 Row(empno=7499, ename='ALLEN', job='SALESMAN'

## RDD를 가지고 임시뷰 생성 ~> SQL을 사용해서 데이터 처리

In [143]:
emp.createOrReplaceTempView("empview")

In [144]:
sparkdf = spark.sql("select * from empview")
print(type(sparkdf))
sparkdf.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+-----+------+---------+----+-------------------+----+----+------+
|empno| ename|      job| mgr|           hiredate| sal|comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17 00:00:00| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03 00:00:00|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02 00:00:00|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22 00:00:00|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06 00:00:00|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08 00:00:00|3000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17 00:00:00|5000|null|    10|
| 7844|TURNER| SALESMAN|7698|1984-10-08 00:00:00|1500|null|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12 00:00:00|1100|null|    20|
| 7900| JAMES|    CL

In [145]:
spark.sql("select * from empview where sal > 2000").show()

+-----+-----+---------+----+-------------------+----+----+------+
|empno|ename|      job| mgr|           hiredate| sal|comm|deptno|
+-----+-----+---------+----+-------------------+----+----+------+
| 7566|JONES|  MANAGER|7839|1981-03-02 00:00:00|2975|null|    20|
| 7698|BLAKE|  MANAGER|7839|1981-05-01 00:00:00|2850|null|    30|
| 7782|CLARK|  MANAGER|7839|1981-09-06 00:00:00|2450|null|    10|
| 7788|SCOTT|  ANALYST|7566|1982-12-08 00:00:00|3000|null|    20|
| 7839| KING|PRESIDENT|null|1981-11-17 00:00:00|5000|null|    10|
| 7902| FORD|  ANALYST|7566|1981-12-13 00:00:00|3000|null|    20|
+-----+-----+---------+----+-------------------+----+----+------+



In [146]:
spark.sql("select deptno, sum(sal), max(sal) from empview group by deptno").show()

+------+--------+--------+
|deptno|sum(sal)|max(sal)|
+------+--------+--------+
|    20|   10875|    3000|
|    10|    8750|    5000|
|    30|    9400|    2850|
+------+--------+--------+



In [147]:
spark.sql("select * from empview order by sal desc").show()

+-----+------+---------+----+-------------------+----+----+------+
|empno| ename|      job| mgr|           hiredate| sal|comm|deptno|
+-----+------+---------+----+-------------------+----+----+------+
| 7839|  KING|PRESIDENT|null|1981-11-17 00:00:00|5000|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08 00:00:00|3000|null|    20|
| 7902|  FORD|  ANALYST|7566|1981-12-13 00:00:00|3000|null|    20|
| 7566| JONES|  MANAGER|7839|1981-03-02 00:00:00|2975|null|    20|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06 00:00:00|2450|null|    10|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|1600| 300|    30|
| 7844|TURNER| SALESMAN|7698|1984-10-08 00:00:00|1500|null|    30|
| 7934|MILLER|    CLERK|7782|1982-01-25 00:00:00|1300|null|    10|
| 7654|MARTIN| SALESMAN|7698|1981-10-22 00:00:00|1250|1400|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03 00:00:00|1250| 500|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12 00:00:00|1100|null|   

In [148]:
spark.sql("select * from empview order by sal desc").take(1)

[Row(empno=7839, ename='KING', job='PRESIDENT', mgr=None, hiredate=datetime.datetime(1981, 11, 17, 0, 0), sal=5000, comm=None, deptno=10)]

In [149]:
spark.sql("select * from empview order by sal desc").take(1)[0][1]

'KING'

In [150]:
spark.sql("select * from empview order by sal desc").take(1)[0][4]

datetime.datetime(1981, 11, 17, 0, 0)

![이미지](images/spark_df.png)

## Row 객체

In [151]:
from pyspark.sql import Row
row=Row("James",40)
print(row[0] +","+str(row[1]))

James,40


In [152]:
row=Row(name="Alice", age=11)
print(row.name)

Alice


In [153]:
Person = Row("name", "age")
p1=Person("James", 40)
p2=Person("Alice", 35)
print(p1.name +","+p2.name)

James,Alice


In [154]:
from pyspark.sql import Row

data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())

[Row(lang=['Java', 'Scala', 'C++'], name='James,,Smith', state='CA'), Row(lang=['Spark', 'Java', 'C++'], name='Michael,Rose,', state='NJ'), Row(lang=['CSharp', 'VB'], name='Robert,,Williams', state='NV')]


In [155]:
collData=rdd.collect()
for row in collData:
    print(row.name + "," +str(row.lang))

James,,Smith,['Java', 'Scala', 'C++']
Michael,Rose,,['Spark', 'Java', 'C++']
Robert,,Williams,['CSharp', 'VB']


## 날짜데이터를 처리하자

In [156]:
import pyspark.sql.functions as f

In [157]:
l1 = [('2019-05-22',342),('2020-06-02',334),('2019-09-30',269),('2020-10-10',342),('2020-12-25',342)]
dfl1 =  spark.createDataFrame(l1).toDF("dates","sum")
dfl1.show()

+----------+---+
|     dates|sum|
+----------+---+
|2019-05-22|342|
|2020-06-02|334|
|2019-09-30|269|
|2020-10-10|342|
|2020-12-25|342|
+----------+---+



In [158]:
from pyspark.sql.functions import col
dfl2 = dfl1.withColumn('years',f.year(f.to_timestamp('dates', 'yyyy-MM-dd')))
dfl2 = dfl2.withColumn("month",f.month(f.to_timestamp('dates', 'yyyy-MM-dd')))
dfl2 = dfl2.withColumn("dayofmonth",f.dayofmonth(f.to_timestamp('dates', 'yyyy-MM-dd')))
dfl2.show()

+----------+---+-----+-----+----------+
|     dates|sum|years|month|dayofmonth|
+----------+---+-----+-----+----------+
|2019-05-22|342| 2019|    5|        22|
|2020-06-02|334| 2020|    6|         2|
|2019-09-30|269| 2019|    9|        30|
|2020-10-10|342| 2020|   10|        10|
|2020-12-25|342| 2020|   12|        25|
+----------+---+-----+-----+----------+



In [159]:
dfl2 = dfl1.withColumn('years',f.year(f.to_timestamp('dates')))
dfl2 = dfl2.withColumn("month",f.month(f.to_timestamp('dates')))
dfl2 = dfl2.withColumn("dayofmonth",f.dayofmonth(f.to_timestamp('dates')))
dfl2.show()

+----------+---+-----+-----+----------+
|     dates|sum|years|month|dayofmonth|
+----------+---+-----+-----+----------+
|2019-05-22|342| 2019|    5|        22|
|2020-06-02|334| 2020|    6|         2|
|2019-09-30|269| 2019|    9|        30|
|2020-10-10|342| 2020|   10|        10|
|2020-12-25|342| 2020|   12|        25|
+----------+---+-----+-----+----------+



In [160]:
dfl2.groupBy('years').sum('sum').show()

+-----+--------+
|years|sum(sum)|
+-----+--------+
| 2019|     611|
| 2020|    1018|
+-----+--------+



## NoneType 필터링
### pyspark에서 drop method는 NULL을 가진 행을 제거하는데 가장 간단한 함수다. 

### [drop 메소드에 인수]
### any: 모든 행의 컬럼값 중 하나라도 NULL의 값을 가지면 해당 행을 제거
### all: 모든 컬럼 값이 NULL이거나 NaN인 경우에만 해당 행을 제거

In [161]:
import pyspark.sql.functions as f


In [162]:
df = spark.createDataFrame([
    (1,'A','X1'),(2,None,'X2'),(2,'B','X2'),(2,'','X1'),(None,'','X3'),(1,'C','X1'),(2,None,'X1'),(2,'D',None),(None,None,None)
], ["ID", "TYPE", "CODE"])
df.show()

+----+----+----+
|  ID|TYPE|CODE|
+----+----+----+
|   1|   A|  X1|
|   2|null|  X2|
|   2|   B|  X2|
|   2|    |  X1|
|null|    |  X3|
|   1|   C|  X1|
|   2|null|  X1|
|   2|   D|null|
|null|null|null|
+----+----+----+



In [163]:
df.na.drop('any').show()

+---+----+----+
| ID|TYPE|CODE|
+---+----+----+
|  1|   A|  X1|
|  2|   B|  X2|
|  2|    |  X1|
|  1|   C|  X1|
+---+----+----+



In [164]:
df.na.drop('all').show()

+----+----+----+
|  ID|TYPE|CODE|
+----+----+----+
|   1|   A|  X1|
|   2|null|  X2|
|   2|   B|  X2|
|   2|    |  X1|
|null|    |  X3|
|   1|   C|  X1|
|   2|null|  X1|
|   2|   D|null|
+----+----+----+



In [165]:
df.na.drop('all', subset=['TYPE', 'CODE']).show()

+----+----+----+
|  ID|TYPE|CODE|
+----+----+----+
|   1|   A|  X1|
|   2|null|  X2|
|   2|   B|  X2|
|   2|    |  X1|
|null|    |  X3|
|   1|   C|  X1|
|   2|null|  X1|
|   2|   D|null|
+----+----+----+



In [166]:
df.na.drop('any', subset=['TYPE', 'CODE']).show()

+----+----+----+
|  ID|TYPE|CODE|
+----+----+----+
|   1|   A|  X1|
|   2|   B|  X2|
|   2|    |  X1|
|null|    |  X3|
|   1|   C|  X1|
+----+----+----+



In [167]:
df.show()

+----+----+----+
|  ID|TYPE|CODE|
+----+----+----+
|   1|   A|  X1|
|   2|null|  X2|
|   2|   B|  X2|
|   2|    |  X1|
|null|    |  X3|
|   1|   C|  X1|
|   2|null|  X1|
|   2|   D|null|
|null|null|null|
+----+----+----+



In [168]:
from decimal import Decimal

data = [{"Category": 'Category A', "ID": 1, "Value": Decimal(12.40)},
        {"Category": 'Category B', "ID": 2, "Value": Decimal(30.10)},
        {"Category": 'Category C', "ID": 3, "Value": None},
        {"Category": 'Category D', "ID": 4, "Value": Decimal(1.0)},
        ]

# Create data frame
df = spark.createDataFrame(data)
df.show()

+----------+---+--------------------+
|  Category| ID|               Value|
+----------+---+--------------------+
|Category A|  1|12.40000000000000...|
|Category B|  2|30.10000000000000...|
|Category C|  3|                null|
|Category D|  4|1.000000000000000000|
+----------+---+--------------------+





In [169]:
from decimal import Decimal

data = [Row(Category='Category A', ID=1, Value= Decimal(12.40)),
        Row(Category='Category B', ID=2, Value= Decimal(30.10)),
        Row(Category='Category C', ID=3, Value= None),
        Row(Category='Category D', ID=4, Value= Decimal(1.0)),
        ]

# Create data frame
df = spark.createDataFrame(data)
df.show()

+----------+---+--------------------+
|  Category| ID|               Value|
+----------+---+--------------------+
|Category A|  1|12.40000000000000...|
|Category B|  2|30.10000000000000...|
|Category C|  3|                null|
|Category D|  4|1.000000000000000000|
+----------+---+--------------------+



In [170]:
df.filter("Value is not null").show()

+----------+---+--------------------+
|  Category| ID|               Value|
+----------+---+--------------------+
|Category A|  1|12.40000000000000...|
|Category B|  2|30.10000000000000...|
|Category D|  4|1.000000000000000000|
+----------+---+--------------------+



In [171]:
df.where("Value is null").show()

+----------+---+-----+
|  Category| ID|Value|
+----------+---+-----+
|Category C|  3| null|
+----------+---+-----+



In [172]:
df.filter(df['Value'].isNull()).show()

+----------+---+-----+
|  Category| ID|Value|
+----------+---+-----+
|Category C|  3| null|
+----------+---+-----+



In [173]:
df.where(df.Value.isNotNull()).show()

+----------+---+--------------------+
|  Category| ID|               Value|
+----------+---+--------------------+
|Category A|  1|12.40000000000000...|
|Category B|  2|30.10000000000000...|
|Category D|  4|1.000000000000000000|
+----------+---+--------------------+



## 날짜타입 데이터 처리

In [174]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True)

In [175]:
emp.columns

['empno', 'ename', 'job', 'mgr', 'hiredate', 'sal', 'comm', 'deptno']

In [176]:
emp.dtypes

[('empno', 'int'),
 ('ename', 'string'),
 ('job', 'string'),
 ('mgr', 'int'),
 ('hiredate', 'timestamp'),
 ('sal', 'int'),
 ('comm', 'int'),
 ('deptno', 'int')]

In [177]:
from pyspark.sql.functions import col
newemp = emp.withColumn("hiredate",col("hiredate").cast("Date"))
newemp.printSchema()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: date (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)



In [178]:
newemp.select(f.year(newemp["hiredate"])).show()

+--------------+
|year(hiredate)|
+--------------+
|          1980|
|          1981|
|          1981|
|          1981|
|          1981|
|          1981|
|          1981|
|          1982|
|          1981|
|          1984|
|          1983|
|          1981|
|          1981|
|          1982|
+--------------+



In [179]:
newemp.select(f.month(newemp["hiredate"])).show()

+---------------+
|month(hiredate)|
+---------------+
|             12|
|              2|
|              2|
|              3|
|             10|
|              5|
|              9|
|             12|
|             11|
|             10|
|              1|
|             12|
|             12|
|              1|
+---------------+



In [180]:
newemp.select(f.dayofmonth(newemp["hiredate"])).show()

+--------------------+
|dayofmonth(hiredate)|
+--------------------+
|                  17|
|                  20|
|                   3|
|                   2|
|                  22|
|                   1|
|                   6|
|                   8|
|                  17|
|                   8|
|                  12|
|                   3|
|                  13|
|                  25|
+--------------------+



## 날짜타입 데이터 처리

In [182]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("data/flight-data/csv/2015-summary.csv")

In [183]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [184]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
sqlWay.show()

+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|            Anguilla|       1|
|              Russia|       1|
|            Paraguay|       1|
|             Senegal|       1|
|              Sweden|       1|
|            Kiribati|       1|
|              Guyana|       1|
|         Philippines|       1|
|            Djibouti|       1|
|            Malaysia|       1|
|           Singapore|       1|
|                Fiji|       1|
|              Turkey|       1|
|                Iraq|       1|
|             Germany|       1|
|              Jordan|       1|
|               Palau|       1|
|Turks and Caicos ...|       1|
|              France|       1|
|              Greece|       1|
+--------------------+--------+
only showing top 20 rows



In [185]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()
dataFrameWay.show()

+--------------------+-----+
|   DEST_COUNTRY_NAME|count|
+--------------------+-----+
|            Anguilla|    1|
|              Russia|    1|
|            Paraguay|    1|
|             Senegal|    1|
|              Sweden|    1|
|            Kiribati|    1|
|              Guyana|    1|
|         Philippines|    1|
|            Djibouti|    1|
|            Malaysia|    1|
|           Singapore|    1|
|                Fiji|    1|
|              Turkey|    1|
|                Iraq|    1|
|             Germany|    1|
|              Jordan|    1|
|               Palau|    1|
|Turks and Caicos ...|    1|
|              France|    1|
|              Greece|    1|
+--------------------+-----+
only showing top 20 rows



In [186]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [187]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [188]:
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



## 다중 파일도 한방에 읽을 수 있지요...

In [190]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [191]:
staticSchema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,TimestampType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

In [192]:
staticDataFrame.count()

541909

In [193]:
spark.sql("select * from retail_data").show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [194]:
spark.sql("select * from retail_data where InvoiceDate > ''").show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

## 윈도우함수(랭킹함수) 활용

In [195]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [196]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|Robert       |Sales     |4100  |2         |
|Saif         |Sales     |4100  |3         |
|Michael      |Sales     |4600  |4         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



In [197]:
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   2|
|         Saif|     Sales|  4100|   2|
|      Michael|     Sales|  4600|   4|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+----------+------+----+



In [198]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+



## 웹사이트에서 데이터 읽어오기

In [199]:
from pyspark import SparkFiles

spark.sparkContext.addFile("https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv")
df = spark.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=True)

In [200]:
df.printSchema ()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [201]:
df.show(5, truncate = False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

In [202]:
df.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
| 38| 89814|
| 28|336951|
| 44|160323|
| 18|103497|
+---+------+
only showing top 5 rows



In [203]:
df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [204]:
df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

In [205]:
df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|                 0|
|    max|             99999|
+-------+------------------+



In [206]:
df.filter(df.age > 40).count()

20211

## 다양한 집계(aggregation) 함수들

In [207]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

simpleData = [
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
  
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

df.select(collect_list("salary")).show(truncate=False)

df.select(collect_set("salary")).show(truncate=False)

df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate=False)
print("Distinct Count of Department & Salary: "+str(df2.collect()[0][0]))

print("count: "+str(df.select(count("salary")).collect()[0]))
df.select(first("salary")).show(truncate=False)
df.select(last("salary")).show(truncate=False)
df.select(kurtosis("salary")).show(truncate=False)
df.select(max("salary")).show(truncate=False)
df.select(min("salary")).show(truncate=False)
df.select(mean("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)
df.select(sum("salary")).show(truncate=False)
df.select(sumDistinct("salary")).show(truncate=False)
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+

approx_count_distinct: 6
avg: 3444.4444444444443
+------------------------------------------------------+
|collect_list(salary)                                  |
+------------------------------------------------------+
|[4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------+

+------------------------------------+
|collect_set(salary)                 |
+--------------

## UDF(User Defined Function) 활용

In [208]:
emp

DataFrame[empno: int, ename: string, job: string, mgr: int, hiredate: timestamp, sal: int, comm: int, deptno: int]

In [209]:
def detGrade(sal):
    Q = 'E'
    if(sal > 4000):
        Q = 'A'
    elif(sal > 3000):
        Q = 'B'
    elif(sal > 2000):
        Q = 'C'
    elif(sal > 1000):
        Q = 'D'
    return Q

In [210]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

grade = udf(detGrade, StringType())

In [211]:
newemp = emp.withColumn("grade", grade('sal'))
newemp.show()

+-----+------+---------+----+-------------------+----+----+------+-----+
|empno| ename|      job| mgr|           hiredate| sal|comm|deptno|grade|
+-----+------+---------+----+-------------------+----+----+------+-----+
| 7369| SMITH|    CLERK|7902|1980-12-17 00:00:00| 800|null|    20|    E|
| 7499| ALLEN| SALESMAN|7698|1981-02-20 00:00:00|1600| 300|    30|    D|
| 7521|  WARD| SALESMAN|7698|1981-02-03 00:00:00|1250| 500|    30|    D|
| 7566| JONES|  MANAGER|7839|1981-03-02 00:00:00|2975|null|    20|    C|
| 7654|MARTIN| SALESMAN|7698|1981-10-22 00:00:00|1250|1400|    30|    D|
| 7698| BLAKE|  MANAGER|7839|1981-05-01 00:00:00|2850|null|    30|    C|
| 7782| CLARK|  MANAGER|7839|1981-09-06 00:00:00|2450|null|    10|    C|
| 7788| SCOTT|  ANALYST|7566|1982-12-08 00:00:00|3000|null|    20|    C|
| 7839|  KING|PRESIDENT|null|1981-11-17 00:00:00|5000|null|    10|    A|
| 7844|TURNER| SALESMAN|7698|1984-10-08 00:00:00|1500|null|    30|    D|
| 7876| ADAMS|    CLERK|7788|1983-01-12 00:00:00|11

In [212]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [213]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



In [214]:
# AWS-LINUX

import findspark
findspark.init("/opt/spark")

In [215]:
import pyspark
from pyspark.sql import SparkSession

In [216]:
spark = SparkSession.builder.getOrCreate()
spark

### Spark을 사용하여 hotel.txt파일의 내용으로 텍스트 마이닝을 해보자~
#### hotel.txt 파일에서 가장 많이 등장한 명사들을 많이 등장한 순으로 30개를 출력해 본다. 

In [217]:
hoteldf = spark.read.text("data/hotel.txt") 
hoteldf.show(3)

+-----------------------------------+
|                              value|
+-----------------------------------+
|대중교통을 이용한다면 비추입니다...|
|  위치는 좋았는데, 가격대비 방이...|
|           만족하고 나왔습니다 ㅎㅎ|
+-----------------------------------+
only showing top 3 rows



In [218]:
print(hoteldf.take(3))

[Row(value='대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도 만족해요'), Row(value='위치는 좋았는데, 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지, 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요'), Row(value='만족하고 나왔습니다 ㅎㅎ')]


In [219]:
imsi = hoteldf.collect()
print(imsi[:3])

[Row(value='대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도 만족해요'), Row(value='위치는 좋았는데, 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지, 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요'), Row(value='만족하고 나왔습니다 ㅎㅎ')]


In [220]:
hotellist = [ x[0] for x in imsi ]
print(hotellist[:3])

['대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도 만족해요', '위치는 좋았는데, 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지, 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요', '만족하고 나왔습니다 ㅎㅎ']


In [221]:
hotelstr = ' '.join(hotellist)
print(hotelstr[:50])

대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도


In [222]:
import re

hangul = re.compile('[^ ㄱ-ㅣ가-힣]+') 
hangul.sub('', hotelstr) # 한글과 띄어쓰기를 제외한 모든 부분을 제거

'대중교통을 이용한다면 비추입니다 역과 애매한 거리입니다 시설은 너무 좋았어요 어메니티도 만족해요 위치는 좋았는데 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요 만족하고 나왔습니다 ㅎㅎ 위치는 두개의 역 중간즈음이라 좋지 않음 짐이 없고 날씨가 좋으면 다닐만 함 화장실에서 하수구냄새 가 난다는 후기를 많이 봐서 예약시 냄새안나는 방으로 요청했고 체크인시 물어봤더니 더블체크했다고 함 막상 사용해보니 냄새는 괜찮았는데 화장실쪽에서 자꾸 이상한 소리가 남 아마도 다른방 환풍기 같은게 돌아가는 소리같은데 박하는동안 밤마다 소리가 나서 좀 신경쓰였음 층 후문쪽에 바로 세븐일레븐 편의점이 있어서 편했고 점심에 먹은 호텔 부페가 맛있었음 투숙객  정도 할인되었던듯 층에 수라선이라는 식당 아주 맛있음 수라선 옆에 빌리엔젤도 있으니 식사후 후식먹기 딱 좋음 방도 생각보다 넓고 창문이 일단 커서 너무 좋았어요 그리고 역시 소문대로 침구는 정말 편했습니다모든게 다 좋았지만 지하철역과 멀어서 교통 부분이 조금 아쉬웠어요 그리고 방에 들어가서 베개를 보는데 얼룩과 먼지가 조금 있어 청소는 조금 아쉬웠지만 그거 빼고는 다 너무 만족스러웠습니다 깔끔하고 조용해서 좋았어요 지하철 역이 약간 먼게 흠 주차타워가 아닌 지하 주차장 이라서 체크아웃 시간에 한참 기다릴 일도 없고 시설도 깔끔하고 관리가 잘 되고 있는 것 같습니다 직원분의 친절도는 너무 최상등급 이라서 기분까지 좋았습니다 근처에 있는 서초나 콧대높은 구로는 비교가 안됩니다 신라스테이 서울 원탑은 역삼서대문서초 외구로 인 것 같습니다 위치도 지하철이 가까이 있어서 너무 좋았고 숙소도 방은 작지만 아늑하고 깨끗했어요 다만 엘레베이터가 세대인데 층수가 높아 한참 기다려야하는게 살짝 불편했고 방음도 쪼끔 안되는것 같았어요 다행히 옆방에혹은 위아래층에 있는 아기가 일찍 잠드는 듯해서 저희도 잘 쉬다 왔어요 위치도 서비스도 청결도 너무 좋았어요 전철을 타는경우만 말고

In [223]:
from konlpy.tag import Okt                                  ## 다른 형태소를 클래스를 가져온다. 
okt = Okt()
hotelnoun = okt.nouns(hotelstr)
print(hotelnoun[:50])

['대중교통', '이용', '면', '역', '거리', '시설', '어메니티', '위치', '가격', '대비', '방이', '청결', '직원', '응대', '강남', '가격', '그냥', '것', '위치', '개', '역', '중간', '즈음', '짐', '날씨', '함', '화장실', '하수', '냄새', '후기', '약시', '냄새', '안나', '방', '요청', '체크', '인시', '더블체크', '함', '막상', '사용', '냄새', '화장실', '쪽', '자꾸', '소리', '남', '방', '환풍기', '소리']


In [225]:
hotelRDD = spark.sparkContext.parallelize(hotelnoun)

In [226]:
wc = hotelRDD.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x1, x2: x1 + x2) \
                  .map(lambda x: (x[1], x[0])) \
                  .sortByKey(ascending=False).collect()

for (count, word) in wc[:30]:
    print("{} : {}".format(word, count))
    

위치 : 149
호텔 : 138
직원 : 104
가격 : 98
이용 : 81
조식 : 74
신라 : 67
시설 : 66
방 : 62
좀 : 61
룸 : 61
것 : 59
대비 : 50
스테이 : 49
조금 : 46
생각 : 46
상태 : 45
다만 : 42
때 : 41
곳 : 41
객실 : 40
숙소 : 39
소리 : 37
출장 : 37
층 : 37
더 : 34
냄새 : 34
청결 : 33
청소 : 33
화장실 : 31


### Spark를 활용하여 product_click_new.log 파일로 날짜 데이터에 대한 전처리를 처리해보자~

In [227]:
click = spark.read.csv("data/product_click_new.log", sep=" ", inferSchema=True)

In [228]:
click.show(5)

+------------+----+
|         _c0| _c1|
+------------+----+
|201612120944|p001|
|201612120944|p003|
|201612120944|p003|
|201612120945|p008|
|201612121052|p008|
+------------+----+
only showing top 5 rows



In [229]:
click = click.withColumnRenamed("_c0", "clicktime")\
       .withColumnRenamed("_c1", "pid")
click.show(5)

+------------+----+
|   clicktime| pid|
+------------+----+
|201612120944|p001|
|201612120944|p003|
|201612120944|p003|
|201612120945|p008|
|201612121052|p008|
+------------+----+
only showing top 5 rows



In [230]:
click.printSchema()

root
 |-- clicktime: long (nullable = true)
 |-- pid: string (nullable = true)



In [231]:
from pyspark.sql.functions import col
from pyspark.sql.functions import StringType
click = click.withColumn("clicktime",col("clicktime").cast(StringType()))

In [232]:
click.printSchema()

root
 |-- clicktime: string (nullable = true)
 |-- pid: string (nullable = true)



In [233]:
import pyspark.sql.functions as f
click = click.withColumn('year',f.year(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("month",f.month(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("day",f.dayofmonth(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("hour",f.hour(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("minute",f.minute(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))

In [234]:
click.show(5)

+------------+----+----+-----+---+----+------+
|   clicktime| pid|year|month|day|hour|minute|
+------------+----+----+-----+---+----+------+
|201612120944|p001|2016|   12| 12|   9|    44|
|201612120944|p003|2016|   12| 12|   9|    44|
|201612120944|p003|2016|   12| 12|   9|    44|
|201612120945|p008|2016|   12| 12|   9|    45|
|201612121052|p008|2016|   12| 12|  10|    52|
+------------+----+----+-----+---+----+------+
only showing top 5 rows



In [235]:
click.groupby("hour").count().show()

+----+-----+
|hour|count|
+----+-----+
|   9|  110|
|  10|   80|
|  11|  120|
+----+-----+



In [236]:
click.select(f.hour(f.to_timestamp(click.clicktime, 'yyyyMMddhhmm')).alias('dt')).groupby('dt').count().show()

+---+-----+
| dt|count|
+---+-----+
|  9|  110|
| 10|   80|
| 11|  120|
+---+-----+



## 타이타닉 데이터셋을 활용한 EDA,전처리 그리고 분석을 스파크로 해봅시다요

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
titanic_df = spark.read.csv("data/train.csv",header = 'True',inferSchema='True')

In [None]:
passengers_count = titanic_df.count()

In [None]:
print(passengers_count)

In [None]:
titanic_df.show(5)

In [None]:
titanic_df.describe().show()

In [None]:
titanic_df.printSchema()

In [None]:
titanic_df.select("Survived","Pclass","Embarked").show()

## 자~ EDA(Exploratory Data Analysis)를 해봅시다요.

In [None]:
titanic_df.groupBy("Survived").count().show()

In [None]:
titanic_df.groupBy("Sex","Survived").count().show()

In [None]:
titanic_df.groupBy("Pclass","Survived").count().show()

In [None]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [None]:
null_columns_count_list = null_value_count(titanic_df)

In [None]:
null_columns_count_list

In [None]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

In [None]:
mean_age = titanic_df.select(mean('Age')).collect()[0][0]
print(mean_age)

In [None]:
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

In [None]:
df = spark.createDataFrame([('100-200',)], ['str'])
df.select(regexp_extract('str', r'(\d+)-(\d+)', 0).alias('d')).collect()

In [None]:
titanic_df.show()

In [None]:
titanic_df.select("Initial").distinct().show()

In [None]:
titanic_df = titanic_df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [None]:
titanic_df.show()

In [None]:
titanic_df.select("Initial").distinct().show()

In [None]:
titanic_df.groupby('Initial').avg('Age').collect()

In [None]:
titanic_df.filter(titanic_df.Age==46).select("Initial").show()

In [None]:
titanic_df.select("Age").show()

In [None]:
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Miss") & (titanic_df["Age"].isNull()), 22).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Other") & (titanic_df["Age"].isNull()), 46).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Master") & (titanic_df["Age"].isNull()), 5).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mr") & (titanic_df["Age"].isNull()), 33).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mrs") & (titanic_df["Age"].isNull()), 36).otherwise(titanic_df["Age"]))

In [None]:
titanic_df.groupBy("Embarked").count().show()

In [None]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

In [None]:
titanic_df = titanic_df.drop("Cabin")

In [None]:
titanic_df.printSchema()

In [None]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [None]:
titanic_df.show()

In [None]:
titanic_df.groupBy("Family_Size").count().show()

In [None]:
titanic_df = titanic_df.withColumn('Alone',lit(0))

In [None]:
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

In [None]:
titanic_df.columns

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

In [None]:
titanic_df.show()

In [None]:
titanic_df.printSchema()

In [None]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

In [None]:
titanic_df.show()

## pyspark.ml 모듈의 분석 함수들은 데이터셋에 features 가 있어야 학습을 진행함
### pyspark.ml.VectorAssember를 사용해서 feature로 사용하여 컬럼들에 대한 리스트를 지정하고 features 열을 추가한다.

In [None]:
from pyspark.ml.feature import VectorAssembler
feature = VectorAssembler(inputCols=titanic_df.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_df)

## 훈련데이터와 검증 데이터를 8:2로 나눈다.

In [None]:
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

In [None]:
trainingData.show()

## 모델링
### Spark ML에서 제공하는 분류 분석

### LogisticRegression

### DecisionTreeClassifier

### RandomForestClassifier

### Support Vector Machine



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## LogisticRegression
### 로지스틱 회귀(Logistic Regression)는 회귀를 사용하여 데이터가 어떤 범주에 속할 확률에서 0 에서 1 사이의 값으로 예측하고 
### 그 확률에 따라 가능성이 더 높은 범주에 속하는 것으로 분류해주는 지도 학습 알고리즘이다

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

### 평가

In [None]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

## DecisionTreeClassifier

### 스무고개를 연상케하는 알고리즘
### 트리구조를 사용하고 각 분기점(NODE)에는 분석 대상의 속성이 위치함
### 가장 높은 정보 획득량(information gain)을 제공하는 속성을 우선적으로 선택하여 분류를 시작함
### 다른 모델들과는 다르게 결과물이 시각적으로 읽히기 쉬운형태로 나타나는 것이 장점

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "Survived", "features").show()

### 평가

In [None]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))

### RandomForestClassifier

### 의사결정 트리의 오버피팅 한계를 극복하기 위한 전략
### 랜덤 포레스트는 훈련을 통해 구성해놓은 다수의 나무들로부터 분류 결과를 취합해서 결론을 얻는 방식

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show()

### 평가

In [None]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

## Support Vector Machine

### 주어진 데이터가 어느 카테고리에 속할지 판단하는 이진 선형 분류 모델
### 데이터군으로 부터 최대의 마진을 갖도록 결정 경계를 찾아서 분류함 

In [None]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="Survived", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "Survived", "features").show()

### 평가

In [None]:
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine is = %g"% (svm_accuracy))
print("Test Error of Support Vector Machine = %g " % (1.0 - svm_accuracy))

https://www.kaggle.com/rmn420/titanicdata-pyspark