# 1부

In [1]:
# pyspark 설치
!pip install pyspark



In [2]:
import pyspark
import pandas as pd

In [3]:
titanic_df = pd.read_csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/titanic_train.csv')

In [4]:
# spark Session을 시작하기 위해 몇 가지 필드 추가 생성
from pyspark.sql import SparkSession

In [5]:
# spark 변수 생성
# sparkSession 이름은 practice
spark = SparkSession.builder.appName('practice').getOrCreate() 

In [6]:
spark

In [7]:
df_pyspark = spark.read.csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv')

In [8]:
print(df_pyspark)   # 엑셀의 0행 에 있는 모든 열들을 나타내줌
df_pyspark.show()   # dataset보기

DataFrame[_c0: string, _c1: string, _c2: string]
+---------+---+----------+
|      _c0|_c1|       _c2|
+---------+---+----------+
|     Name|age|Experience|
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



In [9]:
# Name과 Age를 메인컬럼으로 지정하려면 .option을 설정한다.  
# 첫번쨰 행을 header로 간주
print( spark.read.option('header','true').csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv') )
df_pyspark = spark.read.option('header','true').csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv')
df_pyspark.show()

DataFrame[Name: string, age: string, Experience: string]
+---------+---+----------+
|     Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



In [10]:
type(df_pyspark)  

pyspark.sql.dataframe.DataFrame

In [11]:
df_pyspark.head(3)   # pyspark에서 head는 각 행에 대한 정보가 기본적으로 표시됨

[Row(Name='Krish', age='31', Experience='10'),
 Row(Name='Sudhanshu', age='30', Experience='8'),
 Row(Name='Sunny', age='29', Experience='4')]

In [12]:
# printSchema 
df_pyspark.printSchema()   # 열 정보를 나타내줌

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)



## 데이터 불러오기
- 데이터 불러오기
- 행, 열 정보 확인
- dataframe 살펴보기


In [13]:
# pyspark로 작업하기 위해서는 SparkSession을 시작해야한다.
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [15]:
spark

In [16]:
# dataset 읽기
print( spark.read.option('header','true').csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv') )
spark.read.option('header','true').csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv').show()
df_pyspark.printSchema()  # PrintSchema → 열의 유형 확인

# 불러온 dataset을 'df_pyspark'라는 변수에 저장
df_pyspark = spark.read.option('header','true').csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv')

DataFrame[Name: string, age: string, Experience: string]
+---------+---+----------+
|     Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)



**excel에서는 숫자로 적었는데 왜 문자열로 인식할까?**  
  - csv에서 옵션을 지정하지 않는 한 문자열로 인식하는듯하다.

**해결 방법**
- spark로 csv파일을 읽을 때 csv파일에 `inferSchema=True`옵션을 추가해줘서 의도한대로 data형식이 잘 나오도록 한다.

In [17]:
df_pyspark = spark.read.option('header','true').csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv',inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [18]:
# header와 추론 스키마를 모두 포함하도록하는 방법 
df_pyspark = spark.read.csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test1.csv',
                            header=True,inferSchema=True)
print(df_pyspark.printSchema())
df_pyspark.show()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)

None
+---------+---+----------+
|     Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



In [19]:
type(df_pyspark)  # type은 DataFrame

pyspark.sql.dataframe.DataFrame

# 2부

## 열 선택 & 인덱싱

In [20]:
# 모든 열 이름 확인
df_pyspark.columns

['Name', 'age', 'Experience']

In [21]:
# pandas에서 .head()는 dataframe형태로 확인이 가능했지만 
# pyspark에서 .head()는 각 행의 내용을 보여줌 
df_pyspark.head(3)  # 각 행 정보 확인 가능

[Row(Name='Krish', age=31, Experience=10),
 Row(Name='Sudhanshu', age=30, Experience=8),
 Row(Name='Sunny', age=29, Experience=4)]

In [22]:
df_pyspark.show()

+---------+---+----------+
|     Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



In [23]:
# 열 선택
print( df_pyspark.select('Name') )   # Dataframe 형태로 반환됨
print( type(df_pyspark.select('Name')) )

# 열을 선택한 뒤 .show()를 해주면 해당하는 열만 보여줌
df_pyspark.select('Name').show()     
df_pyspark.select('Name','Age').show()

DataFrame[Name: string]
<class 'pyspark.sql.dataframe.DataFrame'>
+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
+---------+

+---------+---+
|     Name|Age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
+---------+---+



In [24]:
df_pyspark.select('Name','Age').show()
# df_pyspark.select(['Name','Age']).show()    # 동일한 결과

+---------+---+
|     Name|Age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
+---------+---+



In [25]:
# pandas에서 열을 직접 선택할 경우
df_pyspark['Name']   # 열이 반환됨  → Columns<'Name'>

Column<'Name'>

In [26]:
# 직접 열을 선택하는 경우에는 .show()를 지원하지 않음
# df_pyspark['Name'].show()

In [27]:
# 데이터 유형 확인
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int')]

In [28]:
# dataset 설명 
print( df_pyspark.describe() )  
df_pyspark.describe().show()

DataFrame[summary: string, Name: string, age: string, Experience: string]
+-------+-----+----+-----------------+
|summary| Name| age|       Experience|
+-------+-----+----+-----------------+
|  count|    3|   3|                3|
|   mean| null|30.0|7.333333333333333|
| stddev| null| 1.0|3.055050463303893|
|    min|Krish|  29|                4|
|    max|Sunny|  31|               10|
+-------+-----+----+-----------------+



## 열 추가

In [29]:
# 열 추가  
# .withColumn은 클래스를 추가하거나 동일한 이름을 가진 기존 열을 교체하여 새 DF를 반환
df_pyspark.withColumn('Esperience After 2 Year',df_pyspark['Experience']+2) 

DataFrame[Name: string, age: int, Experience: int, Esperience After 2 Year: int]

In [30]:
df_pyspark.withColumn('Esperience After 2 Year',df_pyspark['Experience']+2).show()

+---------+---+----------+-----------------------+
|     Name|age|Experience|Esperience After 2 Year|
+---------+---+----------+-----------------------+
|    Krish| 31|        10|                     12|
|Sudhanshu| 30|         8|                     10|
|    Sunny| 29|         4|                      6|
+---------+---+----------+-----------------------+



In [31]:
# 추가한 열이 기존 df에는 추가되지 않음
# 저장하기 위해서는 변수에 할당해야함

df_pyspark = df_pyspark.withColumn('EXperience After 2 Year',df_pyspark['Experience']+2)
df_pyspark.show()

+---------+---+----------+-----------------------+
|     Name|age|Experience|EXperience After 2 Year|
+---------+---+----------+-----------------------+
|    Krish| 31|        10|                     12|
|Sudhanshu| 30|         8|                     10|
|    Sunny| 29|         4|                      6|
+---------+---+----------+-----------------------+



## 열 삭제

In [32]:
# drop columns
df_pyspark.drop('Experience After 2 Year').show()

+---------+---+----------+
|     Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



In [33]:
# drop 또한 기존 DF에는 영향을 주지 않음
# 변수에 할당해야함
df_pyspark = df_pyspark.drop('Experience After 2 Year')

# column명 변경

In [35]:
# rename columns  - 기존 DF에 영향 X
df_pyspark.withColumnRenamed('Name','NEW Name').show()

+---------+---+----------+
| NEW Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



In [37]:
df_pyspark = df_pyspark.withColumnRenamed('Name','NEW Name')
df_pyspark.show()

+---------+---+----------+
| NEW Name|age|Experience|
+---------+---+----------+
|    Krish| 31|        10|
|Sudhanshu| 30|         8|
|    Sunny| 29|         4|
+---------+---+----------+



# 3부

## Null 값 처리
- 행, 열 삭제 방법
- 
- 누락된 값을 처리할 떄의 다양한 매개변수

In [38]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('practice').getOrCreate()

In [40]:
print( spark.read.csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test2.csv',header=True, inferSchema=True) )
spark.read.csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test2.csv',header=True, inferSchema=True).show()

df_pyspark = spark.read.csv('/content/drive/MyDrive/멀티캠퍼스/DE/data/test2.csv',header=True, inferSchema=True)

DataFrame[Name: string, age: int, Experience: int, Salary: int]
+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|SudhanShu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null|  4000|
|     null|  34|        10| 38000|
|     null|  46|      null|  null|
+---------+----+----------+------+



In [43]:
# 열 삭제
df_pyspark.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|null|      null|  4000|
|  34|        10| 38000|
|  46|      null|  null|
+----+----------+------+



## 행 삭제

In [45]:
# 행 삭제
df_pyspark.na.drop().show()  # null값이 들어있는 행 삭제

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|SudhanShu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [48]:
# how == all
# 모든 컬럼이 null일 경우에만 삭제 
df_pyspark.na.drop(how='all').show()

# test2.csv와 같은 경우 적어도 하나의 값을 가지고 있기 때문에 삭제되지는 않았음

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|SudhanShu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null|  4000|
|     null|  34|        10| 38000|
|     null|  46|      null|  null|
+---------+----+----------+------+



In [49]:
# how == any
# 하나라도 null값이 있으면 제거
df_pyspark.na.drop(how='any').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|SudhanShu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



## 임계값

In [50]:
# threshold
# thresh 옵션을 이용하여 임계값 설정
# null값이 2개가 초과인 행은 제거하도록 함 
df_pyspark.na.drop(how='any',thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|SudhanShu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null|  4000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



## subset

In [58]:
# subset 
# 특정 열에 대해서 null값이 있는 행 삭제
df_pyspark.na.drop(how='any', subset=['Experience']).show()  
df_pyspark.na.drop(how='any', subset=['Age']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|SudhanShu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|SudhanShu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
|     null| 46|      null|  null|
+---------+---+----------+------+



## Null 값 채우기

In [69]:
# 누락된 값 채우기
# null 값을 채울 때 채우려는 값의 data type과 컬럼의 data type이 맞아야 함
 
# null 값을 'Missing Values'로 채우기  - Name만 str이므로 Name의 null값만 채워짐
print('행 하나 선택')
df_pyspark.na.fill('Missing Values').show() 

# 여러 행을 선택하는 것도 가능
print('행 다중 선택')
df_pyspark.na.fill(0,['Age','Salary']).show()

# 지정도 가능
print('채우고자 하는 행 지정')
df_pyspark.na.fill( {'Name':'Missing Values', 'Salary' : '0'} ).show()

행 하나 선택
+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     SudhanShu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|null|      null|  4000|
|Missing Values|  34|        10| 38000|
|Missing Values|  46|      null|  null|
+--------------+----+----------+------+

행 다중 선택
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|SudhanShu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|      null|  4000|
|     null| 34|        10| 38000|
|     null| 46|      null|     0|
+---------+---+----------+------+

채우고자 하는 행 지정
+------

In [74]:
# Null 값을 평균으로 대체
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['age','Experience','Salary'],
    outputCols = [f'{c}_imputed' for c in ['age','Experience','Salary']]
    ).setStrategy("mean")   # 평균값으로 채우겠다고 설정

 # 중앙값은 'median'   

In [75]:
# 'imputer'라는 행 추가..?
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|SudhanShu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null|  4000|         29|                 5|          4000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  46|      null|  null|         46|                 5|         21250|
+---------+----+----------+-

45분