In [None]:
# 설치
!pip install pyspark

## RDD

In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
# SparkConf: 사용자가 재정의해서 쓸 수 있는 설정 옵션들에 대한 키와 값을 갖고 있는 객체
conf = SparkConf().setMaster("local").setAppName("practice")
# SparkContext: Spark 클러스터와 연결시켜주는 객체. RDD 생성을 위해 필요
sc = SparkContext('local', 'practice')    # SparkContext(conf)


In [None]:
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]

In [None]:
# parallelize: RDD를 생성하는 메서드
inputRDD = sc.parallelize(data)
print(inputRDD)
inputRDD.collect()
# collect: RDD에 포함되어 있는 모든 요소를 반환하는 리스트
py_rdd = inputRDD.collect()

for i in py_rdd:
   print(i[0] + "," + str(i[1]))

In [None]:
listRdd = sc.parallelize([1,2,3,4,5,3,2])
listRdd.collect()

# listRdd 개수 새기
print("Count : "+str(listRdd.count()))

# countByValue() 함수는 RDD에 포함된 요소의 개수 합계를 반환(딕셔너리 형태)
print("countByValue :  "+str(listRdd.countByValue()))

In [None]:
inputRDD.collect()
listRdd.collect()

# first() 함수는 RDD에 포함된 요소 중 첫 번째 반환
print("first :  "+str(listRdd.first()))
print("first :  "+str(inputRDD.first()))

In [None]:
inputRDD.collect()
listRdd.collect()

print("top : "+str(listRdd.top(2)))

print("top : "+str(inputRDD.top(2)))

print("min :  "+str(listRdd.min()))

print("min :  "+str(inputRDD.min()))

print("max :  "+str(listRdd.max()))

print("max :  "+str(inputRDD.max()))

In [None]:
# 리스트 생성
rdd = sc.parallelize(['a,b,c','d,e,f','g,h,i'])

# map() 함수는 자료구조 형태의 모든 요소에 적용 후 출력
map_rdd = rdd.map(lambda x: x.split(','))
print(map_rdd.collect())

# flatmap() 함수는 자료구조 형태의 모든 요소에 적용하고, 평면화(flat)하여 출력
flatmap_rdd = rdd.flatMap(lambda x: x.split(','))
print(flatmap_rdd.collect())



In [None]:
# 튜플 생성
data=[("Z", 1),("A", 20),("B", 30),("C", 40),("B", 30),("B", 60)]
inputRDD = sc.parallelize(data)

# reduceByKey() 함수는 Key, Value 형태로 데이터가 구성되어 있을 때 Key를 기준으로 합계를 연산
rdd2=inputRDD.reduceByKey(lambda a,b: a+b)

rdd2.collect()

for element in rdd2.collect():
    print(element)

## DataFrame & PySQL

In [None]:
from pyspark.sql import SparkSession

In [None]:
### SparkSession -> DataFrame 생성 위해 필요

# RDD를 생성하기 위해 SparkContext가 필요했던 것처럼 데이터프레임을 생성하기 위해서는 SparkSession을 이용해야 한다. 
# SparkSession은 인스턴스 생성을 위한 build() 메서드를 제공하고, 이 메서드를 이용하면 기존 인스턴스를 재사용하거나 새로운 인스턴스를 생성할 수 있다

In [None]:
# config 환경 설정 파일
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
import pyspark
# columns 변수에 리스트 형태로 데이터를 준비
columns = ["language","users_count"]
# data 변수에 리스트 내 튜플 형태의 데이터 준비
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

rdd = sc.parallelize(data)

# 컬럼 지정 x (rdd -> df)
dfFromRDD = rdd.toDF()
dfFromRDD.printSchema()

# 스키마 구조 확인을 위해 printSchema()사용

# 컬럼 지정
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

# df 바로 생성하기
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()

In [None]:
# StructType를 사용해 스키마를 먼저 생성하고 df 생성하기
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
]

schema = StructType([ 
    StructField("firstname",StringType(),True), 
    StructField("middlename",StringType(),True), 
    StructField("lastname",StringType(),True), 
    StructField("id", StringType(), True), 
    StructField("gender", StringType(), True), 
    StructField("salary", IntegerType(), True)
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
# truncate=False로 생략되는 부분 없이 모두 보여주기
df.show(truncate=False)

In [None]:
from google.colab import files
f = files.upload()

In [None]:
df2 = spark.read.csv('animal_data_img.csv')
df2.show(truncate=False)


In [None]:
# 빈 RDD생성

emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)
rdd2= spark.sparkContext.parallelize([])
print(rdd2)

In [None]:
# 스키마 구조와 함께 빈 Dataframe 생성
# StructType() 함수를 이용하여 스키마 구조를 정의하고, 전 장에서 생성했던 emptyRDD를 createDataFrame() 함수내의 데이터로 입력하여 생성

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
])
df = spark.createDataFrame(emptyRDD,schema)
df.printSchema()

df1 = emptyRDD.toDF(schema)
df1.printSchema()

df2 = spark.createDataFrame([], schema)
df2.printSchema()

df3 = spark.createDataFrame([], StructType([]))
df3.printSchema()

In [None]:
# RDD to Dataframe
# parallelize() 를 통해 RDD 형태로 데이터를 생성했다면, RDD를 저장해둔 변수.toDF(“컬럼명 변수“)를 통해 Dataframe 생성

dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

In [None]:
# RDD to Dataframe
# createDataFrame() 함수의 첫 번째 인자를 RDD 변수, 두 번째 인자를 컬럼명들을 저장한 변수로 Dataframe 생성

deptDF = spark.createDataFrame(rdd, deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

In [None]:
# RDD to Dataframe
# createDataFrame() 함수의 두 번째 인자를 StructType으로 구성한 구조를 정의해서 생성

from pyspark.sql.types import StructType,StructField, StringType
deptSchema = StructType([       
    StructField('dept_name', StringType(), True),
    StructField('dept_id', StringType(), True)
])
deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

In [None]:
# Dataframe to Pandas

dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)
deptColumns = ["dept_name","dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

import pandas
pandasDF = df2.toPandas()
print(pandasDF)

In [None]:
# pandas -> df

PySparkDF = spark.createDataFrame(pandasDF)

PySparkDF.show(truncate=False)

In [None]:
# show 확인해보기

from pyspark.sql import SparkSession
columns = ["Seqno","Quote"]
data = [("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy."),
    ("4", "Be cool.")]
df = spark.createDataFrame(data,columns)
df.show()

In [None]:
df.show(truncate=False)
# df.show(2,truncate=False) 
# df.show(2,truncate=25) 
# df.show(n=3,truncate=25,vertical=True)

In [None]:
# Column
# select
data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
df.select(df.gender).show()
df.select(df["gender"]).show()
df.select(df["`name.fname`"]).show()

In [None]:
# Column
# col

from pyspark.sql.functions import col
df.select(col("gender")).show()
df.select(col("`name.fname`")).show()

In [None]:
# Column
# Row를 통해 행 생성

from pyspark.sql import Row
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
      Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df=spark.createDataFrame(data)
df.printSchema()
df.show()


In [None]:
# Column
#생성된 “prop” 컬럼 내부의 점(“.”)을 추가하고,“hair”을 입력하여 중첩 구조로 이루어진 데이터에 접근

df.select(df.prop.hair).show()
df.select(df["prop.hair"]).show()
df.select(col("prop.hair")).show()
df.select(col("prop.*")).show()

In [None]:
# Column
# 컬럼에 연산자(+, -, *, /, %, <, >, ==)를 이용하여 컬럼끼리의 연산 및 비교를 수행할 수 있음

data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")

df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()
df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()


In [None]:
# 새 df생성
data=[("James","Bond","100",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","XXX","400",''),
      ("Tom Brand",None,"400",'M')] 
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)
df.show()

In [None]:
# Column
# alias() 함수를 사용하여 별칭을 이용하여 컬럼명을 변경하여 원하는 컬럼만 출력

df.select(df.fname.alias("first_name"), df.lname.alias("last_name")).show()

In [None]:
# Column
# expr() 함수를 이용하여 “fname”과 “lname” 컬럼의 데이터를 합쳐셔 표현

from pyspark.sql.functions import expr
df.select(expr(" fname ||','|| lname").alias("fullName")).show()


In [None]:
# Column
# asc()와 desc() 함수를 이용하여 컬럼의 데이터를 정렬할 수 있음

df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()


In [None]:
#Column
# cast() 함수를 이용하여 데이터의 형변환이 가능

df.printSchema()
df.select(df.fname,df.id.cast("int")).printSchema()


In [None]:
# Column
# between() 함수를 이용하여 지정된 값의 범위만큼 데이터 출력 가능
# contains() 함수를 통해 입력한 문자열이 포함된 행을 출력 가능

df.filter(df.id.between(100,300)).show()
df.filter(df.fname.contains("Cruise")).show()
df.filter(df.fname == "Cruise").show()
df.filter(df.fname == "Tom Cruise").show()


In [None]:
# Column
# startswith() 함수를 통해 특정 문자가 시작되는 행을 검색할 수도 있고, 반대로 endswith() 함수를 통해 특정 문자가 끝부터 시작하는 행을 검색할 수도 있음


df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()


In [None]:
# Column
# isNull() 함수를 통해 null 값이 있는 행만 출력할 수 있고, isNotNull() 함수를 통해 null 값이 없는 행만 출력할 수 있음

df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()

In [None]:
# Column
# substr 함수를 통해 특정 컬럼에 원하는 문자열 만큼만 출력할 수 있음

df.select(df.fname.substr(1,2).alias("substr")).show()


In [None]:
# Column
# when() 함수를 통해 다양한 조건들을 순서적으로 추가하여 만족할때 까지의 구문을 생성 가능
# otherwise() 함수도 마지막에 작성하여 그외의 조건으로 명시할 수 있음

df.show()

from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male")
.when(df.gender=="F","Female")
.alias("new_gender")).show()

In [None]:
# Column
# isin() 함수를 통해 특정 값이 존재하는 행만 출력할 수도 있으며, isin() 함수의 입력값은 하나의 정수 값뿐만 아니라 List 형태로 다중 값도 작성할 수 있음

li=["100","200"]
df.select(df.fname,df.lname,df.id).filter(df.id.isin(li)).show()


In [None]:
# Select
# select() 함수 내에 columns () 함수의 [:] 인덱스 방법을 통해 원하는 컬럼의 길이 만큼 출력 가능

df.show()
df.select(df.columns[:3]).show(3)
df.select(df.columns[2:4]).show(3)

In [None]:
# WithColumn
# withColumn() 함수를 통해 컬럼을 생성하거나 변경할 수 있음

from pyspark.sql.functions import col
df2 = df.withColumn("id",col("id").cast("Integer"))
df2.printSchema()
df.printSchema()

In [None]:
# WidhColumn
# col() 함수에 연산을 수행

df3 = df.withColumn("id",col("id")*100)
df3.show()
df.show() 

In [None]:
# WithColumnRenamed
# WidhColumnRenamed() 함수에 첫 번째 인자는 기존 컬럼명, 두 번째 인자는 변경할 컬럼명을 입력하여 컬럼명을 변경

df5 = df.withColumnRenamed("id","new_id")
df5.show()
df.show()

In [None]:
# WithColumnRenamed
# 다수의 컬럼명을 변경하고 싶다면, WithColumnRenamed () 함수에 점(“.”)을 이용하여 계속적으로 함수를 사용

df.show()
df.withColumnRenamed("id","new_id").withColumnRenamed("fname", "new_fname").show()

In [None]:
# 모든 컬럼명 변경
newColumns = ["newCol1","newCol2","newCol3","newCol4"]
new_df = df.toDF(*newColumns)
new_df.show()
df.show()

In [None]:
# drop
# drop() 함수를 이용하여 컬럼명을 인자값으로 지정하면, Dataframe에서 해당 컬럼을 삭제

df6 = df.drop("gender")
df6.show()
df.show()

In [None]:
# filter
# filter() 함수를 이용하여 원하는 데이터 출력

df.show()
df.filter(df.fname == "James").show()
df.filter(df.fname != "James").show() 
df.filter(~(df.fname == "James")).show()

In [None]:
df.show()
df.filter("gender == 'M'").show()
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()

In [None]:
# filter
# AND/OR/NOT 연산을 지원하기 때문에 다양한 조건 입력이 가능

df.show()
df.filter((df.gender == "F") & (df.fname == "Ann")).show(truncate=False)  

In [None]:
df.show()
df.filter(df.fname.like("Tom%")).show()

In [None]:
# 새 df 생성

data = [("James", "Sales", 3000),
  ("Robert", "Sales", 4100),
  ("Maria", "Finance", 3000),
  ("James", "Sales", 3000),
  ("Scott", "Finance", 3000),
  ("Jen", "Finance", 3900),
  ("Maria", "Finance", 3000),
]
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

In [None]:
# Distinct
# Dataframe에서 중복을 제거하기 위한 함수

distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)

df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)

In [None]:
# 다수 컬럼 지정

df.show()
dropDisDF = df.dropDuplicates(["department","salary"])
print("Distinct count of department & salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)


In [None]:
# Sort
# 컬럼의 데이터를 정렬

df.show()
df.sort("department","salary").show()
df.sort(col("department"),col("salary")).show()


In [None]:
# Sort
# orderBy() 함수를 사용하여 동일한 기능을 수행

df.orderBy("department","salary").show()
df.orderBy(col("department"),col("salary")).show()


In [None]:
# Sort
# 오름차순, 내림차순은 지정한 컬럼명 뒤에 .asc()와 .desc()를 사용하여 정렬을 수행

df.sort(df.department.asc(),df.salary.desc()).show()
df.sort(col("department").asc(),col("salary").desc()).show()
df.orderBy(col("department").asc(),col("salary").desc()).show()


In [None]:
# SQL처럼 사용하고 싶다면 ViewFrame 생성해서 가능

df.createOrReplaceTempView("ViewFrame")
spark.sql("select department, salary from ViewFrame ORDER BY department asc").show()

In [None]:
# groupBy
# groupBy() 함수를 통해 원하는 컬럼을 그룹화하여 다양한 집계를 수행할 수 있음

df.show()
df.groupBy("department").count().show()

In [None]:
# groupBy
# "department” 컬럼을 기준으로 그룹화하고, sum(), min(), max(), avg(), mean() 연산(집계) 수행

df.groupBy("department").sum("salary").show()
df.groupBy("department").min("salary").show()
df.groupBy("department").max("salary").show()
df.groupBy("department").avg( "salary").show()
df.groupBy("department").mean( "salary") .show()

In [None]:
#  groupBy
# groupBy() 함수 내의 다수의 컬럼명을 지정하여 다수의 컬럼명을 그룹화가 가능

df.groupBy("employee_name","department").sum("salary").show()


In [None]:
# Join
# Join을 위해 2개의 DF생성

emp = [(1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)]
empColumns = ["emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

In [None]:
# Join
# Inner Join

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").show()

In [None]:
# Join
# full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show()
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show()
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show()

In [None]:
# Join
# left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show()
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter").show()

In [None]:
# Join
#right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show()
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter").show()

In [None]:
# Join
#left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show()

In [None]:
# Join
#left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show()

In [None]:
# Join
#self join
empDF.alias("emp1").join(empDF.alias("emp2"), 
col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner").select(col("emp1.emp_id"),col("emp1.name"),
col("emp2.emp_id").alias("superior_emp_id"), col("emp2.name").alias("superior_emp_name")).show()


In [None]:
# Union
# 병합을 위해 2개의 DF생성

simpleData = [("A","B","C"), ("A","B","D"), ("A","B","E"), ("A","B","F"), ("A","B","D")]
columns = ["col_1","col_2","col_3"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

simpleData2 = [("A","B","G"), ("A","B","H"), ("A","B","I"), ("A","B","F"), ("A","B","D")]
columns2 = ["col_1","col_2","col_3"]
df2 = spark.createDataFrame(data = simpleData2, schema = columns2)
df2.printSchema()
df2.show(truncate=False)

In [None]:
# Union
# Dataframe 변수를 명시하고, .union(“합병할 Dataframe명”)을 입력

unionDF = df.union(df2)
unionDF.show()

disDF = df.union(df2).distinct()
disDF.show()

In [None]:
# Union
# UDF(User Defined Function)
# 일반 Python 함수를 생성하고, PySpark 관련 명령어에서 불러서 편리하게 사용이 가능함
# converCast() 함수를 생성

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

columns = ["Seqno","Name"]
data = [("1", "john jones"),("2", "tracey smith"),("3", "amy sanders")]
df = spark.createDataFrame(data,columns)
df.show()


In [None]:
# (User Defined Function)
# 작성한 함수를 바로는 사용할 수 없고, PySpark의 udf 함수를 우선 적용하여야함

from pyspark.sql.functions import udf
convertUDF = udf(lambda z: convertCase(z))
df.select(col("Seqno"),convertUDF(col("Name")).alias("Name") ).show()


In [None]:
# map vs flatmap
# map()은 RDD 각 요소에 함수를 적용
# flatmap()은 map으로 반환된 결과의 각 요소를 반환

rdd = sc.parallelize(['A,B,C','D,E,F','G,H,I'])

map_rdd = rdd.map(lambda x: x.split(','))
print(map_rdd.collect())

In [None]:
# map vs flatmap
# Dataframe에 적용하면, map()을 통해 구성된 RDD 데이터를 출력하면, 하나의 행이 하나의 튜플로 구성되어 출력
# flatmap()을 통해 구성된 RDD 데이터를 출력하면, 하나의 행이 하나의 튜플이 아닌 모든 행이 한번에 처리되어 출력

data = [('James','Smith','M',30),('Anna','Rose','F',41),('Robert','Williams','M',62)]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

rdd2=df.rdd.map(lambda x: (x[0] + "," + x[1], x[2], x[3]*2))
print(rdd2.collect())

rdd3=df.rdd.flatMap(lambda x: (x[0] + "," + x[1], x[2], x[3]*2))  
print(rdd3.collect())

In [None]:
# sample
# sample() 함수는 데이터에서 무작위로 추출할 수 있는 기능

df=spark.range(20)
sample_df = df.sample(0.2)
sample_df.show()

In [None]:
# 시드값 추가

df.sample(0.2,123).show()
df.sample(0.2,123).show()
df.sample(0.2,456).show()
df.sample(0.2,456).show()

# True를 추가하면 중복 허용
df.sample(True,0.2).show()
df.sample(0.2).show()

In [None]:
from google.colab import files
f = files.upload()

In [None]:
# fill
# fill() 함수는 Dataframe 데이터가 0, 빈 문자열, 공백, NULL 등일때 전처리를 할 수 있는 기능

df = spark.read.options(header='true', inferSchema='true').csv("animal_data_img.csv")
df.printSchema()
df.show(truncate=False)

In [None]:
# fill
# fill() 함수는 Dataframe 데이터가 0, 빈 문자열, 공백, NULL 등일때 전처리를 할 수 있는 기능

df.na.fill(value=0).show()
df.na.fill(value=0,subset=["Animal_Type"]).show()
