In [None]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5874bc0c5e5488ce5a3fd65c55ab417b3e9fa305a04639b61481758ac6916961
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import pyspark
from pyspark.sql import SparkSession
# spark 2.0 이후, sparkContext와 동일한 기능 (spark 기능의 진입점, main 함수)
# sparkContext도 여전히 지원하며, RDD를 이용할 경우, sparkContext를 사용함

### SparkSession 생성

In [None]:
spark = SparkSession.builder.appName('session').config('spark.driver.host','localhost').getOrCreate()
# SparkSession.builder를 사용하여 기본적인 설정의 SparkSession을 생성함
# config('spark.driver.host','localhost'): application의 host를 local로 설정(환경변수 설정 필요, 공지사항 참조)
# appName: application의 이름 설정
# config: application의 다양한 환경 설정 가능
# getOrCreate: 기존 SparkSession을 가져오거나, 새롭게 생성

### data read

In [None]:
students = spark.read.csv('students.csv')
# SparkSession의 read는 data를 read하는 역할 수행
# SparkSession의 read를 이용하여 data를 read하면 DataFrame type으로 read함
# ※ pandas의 DataFrame과 다르다는 것을 명심할 것~~!!
# 일반적으로 local에서 data를 가져올 경우, file:/// 뒤에 C:/이후의 full path를 입력
# 위의 경우, 'file:///Users/User/Downloads/students.csv'
# cluster 환경에서는 파일 경로가, url 또는 HDFS 등 일수 있다.

In [None]:
students.show(3)
# show(n): DataFrame의 n개의 행 보여줌

+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|   lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|standard|                   none|        72|           72|           74|
|female|       group C|               some college|standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|standard|                   none|        90|           95|           93|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
only showing top 3 rows



In [None]:
students.printSchema()
# DataFrame의 schema 확인

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: string (nullable = true)
 |-- reading score: string (nullable = true)
 |-- writing score: string (nullable = true)



In [None]:
students = spark.read.csv('students.csv',header=True,inferSchema=True)
# read의 option 설정
# 1. header: True로 설정할 경우, 첫 행을 column명으로 간주함 (default: None)
# 2. inferSchema: True로 설정할 경우, 각 column의 type과 nullable 확인
#                 default: None(모든 column을 string으로 간주함)
#                 직접 schema를 입력할 수도 있음
#                 대용량 데이터의 경우, read 속도가 매우 느릴 수 있으므로, 직접 schema를 입력하는 것이 좋음

In [None]:
students.show(5)

+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|       lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+------------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|    standard|                   none|        72|           72|           74|
|female|       group C|               some college|    standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|    standard|                   none|        90|           95|           93|
|  male|       group A|         associate's degree|free/reduced|                   none|        47|           57|           44|
|  male|       group C|               some college|    standard|                   none|        76|     

In [None]:
people = spark.read.json('people.json')
# json file의 경우, column 명이 자동으로 생성됨
print(type(people))
people.show(3)
people.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
+---------------+---------------+-------------------+--------------------+-----------------+------------+--------------------+
|_corrupt_record|           city|         creditcard|               email|              mac|        name|           timestamp|
+---------------+---------------+-------------------+--------------------+-----------------+------------+--------------------+
|              [|           NULL|               NULL|                NULL|             NULL|        NULL|                NULL|
|           NULL|Lake Gladysberg|1228-1221-1221-1431|katlyn@jenkinsmag...|08:fd:0b:cd:77:f7|Keeley Bosco|2015-04-25 13:57:...|
|           NULL|           NULL|1228-1221-1221-1431|juvenal@johnston....|90:4d:fa:42:63:a2| Rubye Jerde|2015-04-25 09:02:...|
+---------------+---------------+-------------------+--------------------+-----------------+------------+--------------------+
only showing top 3 rows

root
 |-- _corrupt_record: string (nullable 

In [None]:
# DataFrame을 출력할 경우, value는 확입 불가능하며, column명과 data type만 확인할 수 있다.
print(people)
# show를 이용하여 일부 행만 확인할 수 있음

DataFrame[_corrupt_record: string, city: string, creditcard: string, email: string, mac: string, name: string, timestamp: string]


In [None]:
# DataFrame.columns는 column명만 확인
print(students.columns)

['gender', 'race/ethnicity', 'parental level of education', 'lunch', 'test preparation course', 'math score', 'reading score', 'writing score']


In [None]:
# DataFrame.describe(): column명과 data type 확인 (printSchema가 더 보기 좋음)
print(students.describe())

DataFrame[summary: string, gender: string, race/ethnicity: string, parental level of education: string, lunch: string, test preparation course: string, math score: string, reading score: string, writing score: string]


### Schema 지정하기

In [None]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

In [None]:
# StructField는 각 column의 특성을 지정할 수 있음 (column명, data type, nullable)
# StructField('column명', datatype, nullable)
data_field = [StructField('name', StringType(), True),
              StructField('email', StringType(), True),
              StructField('city', StringType(), True),
              StructField('mac', StringType(), True),
              StructField('time', DateType(), True),
              StructField('creditcard', StringType(), True),]

In [None]:
# StructType은 StructField를 원소로하는  list를 입력 받아 read에 적용할 수 있는 schema type으로 변환
data_schema = StructType(fields = data_field)

In [None]:
people2 = spark.read.json('people.json', schema = data_schema)
people.printSchema()
people2.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- city: string (nullable = true)
 |-- creditcard: string (nullable = true)
 |-- email: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- name: string (nullable = true)
 |-- timestamp: string (nullable = true)

root
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- mac: string (nullable = true)
 |-- time: date (nullable = true)
 |-- creditcard: string (nullable = true)

