### empty RDD 생성

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)



EmptyRDD[0] at emptyRDD at NativeMethodAccessorImpl.java:0


### parallelize로 RDD 생성

In [3]:
rdd2 = spark.sparkContext.parallelize([])
print(rdd2)

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:287


### 스키마(Struct type)을 사용해서 empty DataFrame 생성

In [5]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField('firstname' , StringType(), True),
    StructField('middle', StringType(), True),
    StructField('lastname', StringType(), True)
])

In [6]:
df = spark.createDataFrame(emptyRDD, schema)
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middle: string (nullable = true)
 |-- lastname: string (nullable = true)



### empty rdd-> Dataframe 변환

In [7]:
df1 = emptyRDD.toDF(schema)
df1.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middle: string (nullable = true)
 |-- lastname: string (nullable = true)



### 스키마 없이 빈 DataFrame 생성(열없음)


In [8]:
df3 = spark.createDataFrame([], StructType([]))
df3.printSchema()

root



### RDD 생성

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(dept)

In [12]:
rdd

ParallelCollectionRDD[15] at readRDDFromFile at PythonRDD.scala:287

### RDD -> df 변환 (toDF() 사용)

In [13]:
df = rdd.toDF()
df.printSchema()
df.show(truncate = False) #생략없이 다 보여줌

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

+---------+---+
|_1       |_2 |
+---------+---+
|Finance  |10 |
|Marketing|20 |
|Sales    |30 |
|IT       |40 |
+---------+---+



In [14]:
deptColumns = ["dept_name", "dept_id"]
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate = False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



### createDataFrame() 사용


In [19]:
deptDF = spark.createDataFrame(rdd, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



### StructType 스키마 createDataFrame()사용

In [20]:
from pyspark.sql.types import StructType, StructField, StringType

deptSchema = StructType([
    StructField('dept_name', StringType(), True),
    StructField('dept_id', StringType(), True)
])

deptDF1 = spark.createDataFrame(rdd, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate = False)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: string (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



### DataFrame -> Pandas
PySpark에서 데이터를 처리한 후 Machine Learning 애플리케이션 또는 Python 애플리케이션을 사용한 추가 처리를 위해 데이터를 Pandas DataFrame으로 다시 변환해야 합니다.
Pandas는 단일 노드에서 작업을 실행하는 반면 PySpark는 여러 시스템에서 실행

In [21]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name", "middle_name", "last_name", "dob", "gender", "salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate = False)

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



pandas add a sequence number to the result as a row Index. You can rename pandas columns by using rename() function.



In [22]:
pandasDF = pysparkDF.toPandas()
print(pandasDF)

  first_name middle_name last_name    dob gender  salary
0      James                 Smith  36636      M   60000
1    Michael        Rose            40288      M   70000
2     Robert              Williams  42114         400000
3      Maria        Anne     Jones  39192      F  500000
4        Jen        Mary     Brown             F       0


### show() 
By default, it shows only 20 Rows, and the column values are truncated at 20 characters.

In [23]:
df.show()

df.show(truncate = False) # full column contents

df.show(2, truncate = False) # 2 row , full column contents

df.show(2, truncate = 25) # 2 row, column values 25 characters

df.show(n=3, truncate= 25, vertical= True) # DataFrame rows & columns vertically

+---------+---+
|       _1| _2|
+---------+---+
|  Finance| 10|
|Marketing| 20|
|    Sales| 30|
|       IT| 40|
+---------+---+

+---------+---+
|_1       |_2 |
+---------+---+
|Finance  |10 |
|Marketing|20 |
|Sales    |30 |
|IT       |40 |
+---------+---+

+---------+---+
|_1       |_2 |
+---------+---+
|Finance  |10 |
|Marketing|20 |
+---------+---+
only showing top 2 rows

+---------+---+
|       _1| _2|
+---------+---+
|  Finance| 10|
|Marketing| 20|
+---------+---+
only showing top 2 rows

-RECORD 0--------
 _1  | Finance   
 _2  | 10        
-RECORD 1--------
 _1  | Marketing 
 _2  | 20        
-RECORD 2--------
 _1  | Sales     
 _2  | 30        
only showing top 3 rows



In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno", "Quote"]
data = [("1", "Be the change that you wish to see in the world"),
    ("2", "Everyone thinks of changing the world, but no one thinks of changing himself."),
    ("3", "The purpose of our lives is to be happy."),
    ("4", "Be cool.")]
df = spark.createDataFrame(data, columns)
df.show()


+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change tha...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+



In [7]:
df.show(truncate=False)

+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be the change that you wish to see in the world                              |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|
|3    |The purpose of our lives is to be happy.                                     |
|4    |Be cool.                                                                     |
+-----+-----------------------------------------------------------------------------+



In [8]:
df.show(2, truncate=False)

+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be the change that you wish to see in the world                              |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|
+-----+-----------------------------------------------------------------------------+
only showing top 2 rows



In [9]:
df.show(2, truncate=25)

+-----+-------------------------+
|Seqno|                    Quote|
+-----+-------------------------+
|    1|Be the change that you...|
|    2|Everyone thinks of cha...|
+-----+-------------------------+
only showing top 2 rows



In [10]:
df.show(2, truncate=25, vertical = True)

-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change that you... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
only showing top 2 rows



### StructType, StructField 
programmatically specify the schema to the DataFrame and create complex columns like nested struct, array, and map columns
프로그래밍 방식으로 데이터프레임에 대한 스키마를 지정하고, 중첩된 구조체, 배열, map 컬럼 같은 복잡한 컬럼을 만든는데 사용됨

StructType 데이터프레임 구조 정의
pyspark.sql.types import StructType

Structfield 데이터프레임 컬럼의 메타데이터 정의
pyspark.sql.types import StructField

### Nested StructType object

In [34]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [18]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


structureData = [(("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

In [12]:
df2 = spark.createDataFrame(data = structureData, schema = structureSchema)
df2.printSchema()
df2.show(truncate = False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+



### DataFrame의 Struct 추가, 변경

In [21]:
from pyspark.sql.functions import col,struct,when

In [22]:
updatedDF = df2.withColumn("OtherInfo", struct(col("id").alias("identifier"),
                                        col("gender").alias("gender"),
                                               col("salary").alias("salary"),
                                               when(col("salary").cast(IntegerType()) < 2000, "Low")
                                               .when(col("salary").cast(IntegerType()) < 4000, "Medium")
                                               .otherwise("High").alias("Salary_Grade")
                                              )).drop("id", "gender", "salary")
updatedDF.printSchema()
updatedDF.show(truncate = False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+--------------------+------------------------+
|name                |OtherInfo               |
+--------------------+------------------------+
|{James, , Smith}    |{36636, M, 3100, Medium}|
|{Michael, Rose, }   |{40288, M, 4300, High}  |
|{Robert, , Williams}|{42114, M, 1400, Low}   |
|{Maria, Anne, Jones}|{39192, F, 5500, High}  |
|{Jen, Mary, Brown}  |{, F, -1, Low}          |
+--------------------+------------------------+



### SQL ArrayType & MapType

In [35]:
from pyspark.sql.types import *
arrayStructureSchema = StructType([
    StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
        StructField('lastname', StringType(), True)])),
    StructField('hobbies', ArrayType(StringType()), True),
    StructField('properties', MapType(StringType(), StringType()), True)])

In [42]:
arrayStructureSchema

StructType([StructField('name', StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True)]), True), StructField('hobbies', ArrayType(StringType(), True), True), StructField('properties', MapType(StringType(), StringType(), True), True)])

In [43]:
print(df2.schema.json())

{"fields":[{"metadata":{},"name":"name","nullable":true,"type":{"fields":[{"metadata":{},"name":"firstname","nullable":true,"type":"string"},{"metadata":{},"name":"middlename","nullable":true,"type":"string"},{"metadata":{},"name":"lastname","nullable":true,"type":"string"}],"type":"struct"}},{"metadata":{},"name":"id","nullable":true,"type":"string"},{"metadata":{},"name":"gender","nullable":true,"type":"string"},{"metadata":{},"name":"salary","nullable":true,"type":"integer"}],"type":"struct"}


In [None]:
import json
schemaFromJson = StructType.fromJson(json.loads(schema.json))
df3 = spark.createDataFrame(
    spark.sparkContext.parallelize(structureData), schemaFromJson)
df3.printSchema()

### column class object 생성
pyspark.sql.Column\
manipulate the Column values, evaluate the boolean expression to filter rows, retrieve a value or part of a value from a DataFrame column, and to work with list, map & struct columns.\
컬럼 값을 조작하거나, 부울 표현식으로 평가해서 로우를 필터링, 데이터프레임 컬럼에서 일부 또는 전체를 검색, 리스트, 맵, 구조체 컬럼을 사용함?..



In [48]:
# create column class object using lit()

from pyspark.sql.functions import lit
colObj = lit("sparkbyexample.com")

In [50]:
data = [("James",23), ("Ann", 40)]

In [71]:
df = spark.createDataFrame(data).toDF("name.fname", "gender")
df.printSchema()

root
 |-- name.fname: string (nullable = true)
 |-- gender: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eye: string (nullable = true)



In [72]:
df.select(df.gender).show()
df.select(df["gender"]).show()
df.select(df["`name.fname`"]).show()

from pyspark.sql.functions import col

from pyspark.sql.functions import col
df.select(col("gender")).show()    
df.select(col("`name.fname`")).show()

+-------------+
|       gender|
+-------------+
|{black, blue}|
|{grey, black}|
+-------------+

+-------------+
|       gender|
+-------------+
|{black, blue}|
|{grey, black}|
+-------------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+

+-------------+
|       gender|
+-------------+
|{black, blue}|
|{grey, black}|
+-------------+

+----------+
|name.fname|
+----------+
|     James|
|       Ann|
+----------+



In [58]:
from pyspark.sql import Row
data = [Row(name = "James", prop= Row(hair = "black", eye = "blue")),
Row(name = "Ann", prop = Row(hair = "grey", eye = "black"))]
df = spark.createDataFrame(data)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- prop: struct (nullable = true)
 |    |-- hair: string (nullable = true)
 |    |-- eye: string (nullable = true)



In [59]:
df.select(df.prop.hair).show()
df.select(df["prop.hair"]).show()
df.select(col("prop.hair")).show()

df.select(col("prop.*")).show()

+---------+
|prop.hair|
+---------+
|    black|
|     grey|
+---------+

+-----+
| hair|
+-----+
|black|
| grey|
+-----+

+-----+
| hair|
+-----+
|black|
| grey|
+-----+

+-----+-----+
| hair|  eye|
+-----+-----+
|black| blue|
| grey|black|
+-----+-----+



### Select()

In [73]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James", "Smith", "USA", "CA"),
       ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")]
columns = ["firstname", "lastname", "country", "state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate = False)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+



In [74]:
df.select("firstname", "lastname").show()
df.select(df.firstname, df.lastname).show()
df.select(df["firstname"], df["lastname"]).show()



+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+



In [75]:
#col()
from pyspark.sql.functions import col
df.select(col("firstname"), col("lastname")).show()
#정규식
df.select(df.colRegex("`^.*name*`")).show()

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+

+---------+--------+
|firstname|lastname|
+---------+--------+
|    James|   Smith|
|  Michael|    Rose|
|   Robert|Williams|
|    Maria|   Jones|
+---------+--------+



In [76]:
df.select(*columns).show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [77]:
# 모든 열 선택
df.select([col for col in df.columns]).show()
df.select("*").show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



In [78]:
#인덱스 별로 선택
df.select(df.columns[:3]).show(3)
df.select(df.columns[2:4]).show(3)

+---------+--------+-------+
|firstname|lastname|country|
+---------+--------+-------+
|    James|   Smith|    USA|
|  Michael|    Rose|    USA|
|   Robert|Williams|    USA|
+---------+--------+-------+
only showing top 3 rows

+-------+-----+
|country|state|
+-------+-----+
|    USA|   CA|
|    USA|   NY|
|    USA|   CA|
+-------+-----+
only showing top 3 rows



In [79]:
data = [
        (("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]

In [80]:
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate = False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+-----+------+
|name                  |state|gender|
+----------------------+-----+------+
|{James, null, Smith}  |OH   |M     |
|{Anna, Rose, }        |NY   |F     |
|{Julia, , Williams}   |OH   |F     |
|{Maria, Anne, Jones}  |NY   |M     |
|{Jen, Mary, Brown}    |NY   |M     |
|{Mike, Mary, Williams}|OH   |M     |
+----------------------+-----+------+



In [81]:
df2.select("name").show(truncate=False)

+----------------------+
|name                  |
+----------------------+
|{James, null, Smith}  |
|{Anna, Rose, }        |
|{Julia, , Williams}   |
|{Maria, Anne, Jones}  |
|{Jen, Mary, Brown}    |
|{Mike, Mary, Williams}|
+----------------------+



In [82]:
df2.select("name.firstname", "name.lastname").show(truncate = False)

+---------+--------+
|firstname|lastname|
+---------+--------+
|James    |Smith   |
|Anna     |        |
|Julia    |Williams|
|Maria    |Jones   |
|Jen      |Brown   |
|Mike     |Williams|
+---------+--------+



In [83]:
df2.select("name.*").show(truncate = False)

+---------+----------+--------+
|firstname|middlename|lastname|
+---------+----------+--------+
|James    |null      |Smith   |
|Anna     |Rose      |        |
|Julia    |          |Williams|
|Maria    |Anne      |Jones   |
|Jen      |Mary      |Brown   |
|Mike     |Mary      |Williams|
+---------+----------+--------+



### collect()
데이터 세트의 모든 요소를 드라이버 노드로 검색하는 데 사용되는 작업

In [87]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dept = [("Finance", 10),\
       ("Marketing", 20),\
       ("Sales", 30),\
       ("IT" , 40)\
       ]
deptColumns = ["dept_name", "dept_id"]
deptDF = spark.createDataFrame(data = dept, schema = deptColumns)
deptDF.show(truncate = False)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



collect() 로 데이터 검색


In [88]:
dataCollect = deptDF.collect()
print(dataCollect)
#데이터프레임 모든 요소를 드라이버 노드에 로우타입 배열로 반환함,
#action이므로 드라이버에 반환

[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]


In [89]:
for row in dataCollect:
    print(row['dept_name'] + "," + str(row['dept_id']))

Finance,10
Marketing,20
Sales,30
IT,40


In [90]:
deptDF.collect()[0][0]

'Finance'

In [91]:
dataCollect[0][0]

'Finance'

In [92]:
#collect로 select() 
dataCollect = deptDF.select("dept_name").collect()

In [94]:
# select()
deptDF.select("dept_name").show()

+---------+
|dept_name|
+---------+
|  Finance|
|Marketing|
|    Sales|
|       IT|
+---------+



### collect() vs select()
select는 새로운 데이터프레임을 반환하고, 선택된 열을 보유하는 변환
collect는 배열의 전체 데이터 세트를 드라이버에 반환



In [95]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]

deptDF = spark.createDataFrame(data = dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate = False)

dataCollect = deptDF.collect()
print(dataCollect)

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+

[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]


In [96]:
dataCollect2 = deptDF.select("dept_name").collect()
print(dataCollect2)

[Row(dept_name='Finance'), Row(dept_name='Marketing'), Row(dept_name='Sales'), Row(dept_name='IT')]


In [98]:
for row in dataCollect:
    print(row['dept_name'] + "," + str(row['dept_id']))

Finance,10
Marketing,20
Sales,30
IT,40


### withColumn() 
change the value, convert the datatype of an existing column, create a new column, and many more.\
값을 바꾸거나, 데이터타입을 변환하거나, 새로운 컬럼 만들거나 등등

In [99]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

In [100]:
columns = ["firstname","middlename","lastname","dob","gender","salary"]


In [101]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.createDataFrame(data = data, schema = columns)

change datatype


In [103]:
df.withColumn("salary", col("salary").cast("Integer")).show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



### create column 
기존 컬럼에서 새로운 컬럼 생성

In [104]:
df.withColumn("CopiedColumn", col("salary")* -1).show()

+---------+----------+--------+----------+------+------+------------+
|firstname|middlename|lastname|       dob|gender|salary|CopiedColumn|
+---------+----------+--------+----------+------+------+------------+
|    James|          |   Smith|1991-04-01|     M|  3000|       -3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|       -4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|       -4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|       -4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|           1|
+---------+----------+--------+----------+------+------+------------+



### add a new column
lit()\
lit() 함수는 DataFrame 열에 상수 값을 추가하는 데 사용됩니다.

In [105]:
df.withColumn("Country", lit("USA")).show()
df.withColumn("Country", lit("USA"))\
.withColumn("anotherColumn", lit("anotherValue"))\
.show()

+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname|       dob|gender|salary|Country|
+---------+----------+--------+----------+------+------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|    USA|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    USA|
|   Robert|          |Williams|1978-09-05|     M|  4000|    USA|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    USA|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|    USA|
+---------+----------+--------+----------+------+------+-------+

+---------+----------+--------+----------+------+------+-------+-------------+
|firstname|middlename|lastname|       dob|gender|salary|Country|anotherColumn|
+---------+----------+--------+----------+------+------+-------+-------------+
|    James|          |   Smith|1991-04-01|     M|  3000|    USA| anotherValue|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    USA| anotherValue|
|   Robert|        

### rename column


In [106]:
df.withColumnRenamed("gender", "sex")\
.show(truncate = False)

+---------+----------+--------+----------+---+------+
|firstname|middlename|lastname|dob       |sex|salary|
+---------+----------+--------+----------+---+------+
|James    |          |Smith   |1991-04-01|M  |3000  |
|Michael  |Rose      |        |2000-05-19|M  |4000  |
|Robert   |          |Williams|1978-09-05|M  |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F  |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F  |-1    |
+---------+----------+--------+----------+---+------+



### Drop Column

In [107]:
df.drop("salary").show()

+---------+----------+--------+----------+------+
|firstname|middlename|lastname|       dob|gender|
+---------+----------+--------+----------+------+
|    James|          |   Smith|1991-04-01|     M|
|  Michael|      Rose|        |2000-05-19|     M|
|   Robert|          |Williams|1978-09-05|     M|
|    Maria|      Anne|   Jones|1967-12-01|     F|
|      Jen|      Mary|   Brown|1980-02-17|     F|
+---------+----------+--------+----------+------+



### withColumnRenamed
withColumnRenamed(existingName, newName)

In [6]:
dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [8]:
schema = StructType([
    StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
        StructField('lastname', StringType(), True)
    ])),
    StructField('dob', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('salary', IntegerType(), True)
])

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.createDataFrame(data = dataDF, schema = schema)
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [10]:
df.withColumnRenamed("dob", "DateOfBirth").printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



### rename multiple columns

In [11]:
df2 = df.withColumnRenamed("dob", "DateOfBirth").withColumnRenamed("salary", "salary_amount")
df2.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary_amount: integer (nullable = true)



### using Select - rename nested elements

In [12]:
from pyspark.sql.functions import *
df.select(col("name.firstname").alias("fname"),\
col("name.middlename").alias("mname"),\
col("name.lastname").alias("lname"),\
col("dob"),col("gender"),col("salary"))\
.printSchema()

root
 |-- fname: string (nullable = true)
 |-- mname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



### using pyspark DataFrame - rename nested columns
기존 컬럼 삭제해야함\
name.firstname => fname 생성, name 삭제\
신규 컬럼은 문자열로, 기존 컬럼은 반드시 컬럼형 (col('컬럼명'))을 이용

In [13]:
from pyspark.sql.functions import *
df4 = df.withColumn("fname", col("name.firstname"))\
.withColumn("mname", col("name.middlename"))\
.withColumn("lname", col("name.lastname"))\
.drop("name")
df4.printSchema()

root
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- mname: string (nullable = true)
 |-- lname: string (nullable = true)



### change all columns

In [14]:
newColumns = ["newCol1","newCol2","newCol3","newCol4"]
df.toDF(*newColumns).printSchema()

root
 |-- newCol1: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- newCol2: string (nullable = true)
 |-- newCol3: string (nullable = true)
 |-- newCol4: integer (nullable = true)



### filter()
where() 써도 됨\
조건 만족하는 로우의 새로운 DataFrame or RDD 리턴함\
filter(condition)


In [10]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, ArrayType

data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
    

In [11]:
schema = StructType([
    StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
    StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)])

In [12]:
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



### DataFram filter() with Column Condition

In [18]:
df.filter(df.state == "OH").show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



### DataFram filter() with SQL

In [19]:
df.filter("gender == 'M'").show()
df.filter("gender != 'M'").show()
df.filter("gender <> 'M'").show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+

+-------------------+------------------+-----+------+
|               name|         languages|state|gender|
+-------------------+------------------+-----+------+
|     {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Julia, , Williams}|      [CSharp, VB]|   OH|     F|
+-------------------+------------------+-----+------+

+-------------------+------------------+-----+------+
|               name|         languages|state|gender|
+-------------------+------------------+-----+------+
|     {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Julia, , William

In [20]:
df.filter((df.state == "OH") & (df.gender == "M") ).show(truncate = False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



### Filter List Values
isin()

In [21]:
li = ["OH", "CA", "DE"]
df.filter(df.state.isin(li)).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [22]:
df.filter(~df.state.isin(li)).show()
df.filter(df.state.isin(li)==False).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



### Starts With, Ends With, Contains

In [23]:
df.filter(df.state.startswith("N")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [24]:
df.filter(df.state.endswith("H")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [25]:
df.filter(df.state.contains("H")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



### Filter like and rlike
rlike(regex like) 대소문자 구분하지 않고 필터

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import col, array_contains

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [4]:
import pyspark
data2 = [(2,"Michael Rose"),(3,"Robert Williams"),\
     (4,"Rames Rose"),(5,"Rames rose")
  ]
df2 = spark.createDataFrame(data = data2, schema = ["id", "name"])

In [5]:
df2.filter(df2.name.like("%rose%")).show()                       

+---+----------+
| id|      name|
+---+----------+
|  5|Rames rose|
+---+----------+



In [6]:
df2.filter(df2.name.rlike("(?i)^*rose$")).show()

+---+------------+
| id|        name|
+---+------------+
|  2|Michael Rose|
|  4|  Rames Rose|
|  5|  Rames rose|
+---+------------+



### filter Array column
array_contains()\
값이 배열에 포함되어 있으면 true 반환, 아니면 false

In [13]:
from pyspark.sql.functions import array_contains
df.filter(array_contains(df.languages, "Java")).show(truncate = False)

+----------------+------------------+-----+------+
|name            |languages         |state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }  |[Spark, Java, C++]|NY   |F     |
+----------------+------------------+-----+------+



### Filtering on Nested Struct columns

In [14]:
df.filter(df.name.lastname == "Williams").show(truncate = False)

+----------------------+------------+-----+------+
|name                  |languages   |state|gender|
+----------------------+------------+-----+------+
|{Julia, , Williams}   |[CSharp, VB]|OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]|OH   |M     |
+----------------------+------------+-----+------+



multiple column filter \
and => &, or => |


In [None]:
df.filter( (df.state == "OH") & (df.gender == "M") ).show(truncate=False)

### Distinct Duplicate row

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

In [4]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
# Prepare Data
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]

In [3]:
columns = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate = False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [9]:
df.count()

10

In [4]:
#test 내가 해본 예제
df.createOrReplaceTempView("tempview")
spark.sql('select * from tempview').show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



### Distinct row
중복제거하고 남은 로우 출력\
모든행을 비교

In [5]:
distinctDF = df.distinct()

In [6]:
print("Distinct count:" + str(distinctDF.count()))
distinctDF.show(truncate = False)

Distinct count:9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Michael      |Sales     |4600  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Jen          |Finance   |3900  |
|Scott        |Finance   |3300  |
|Kumar        |Marketing |2000  |
|Jeff         |Marketing |3000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [10]:
df2 = df.dropDuplicates()
print("Distinct count:" + str(df2.count()))
df2.show(truncate = False)
#모든 행을 비교해서 이름, 부서, 연봉이 다 겹쳤을 때 제거함

Distinct count:9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Michael      |Sales     |4600  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|Jen          |Finance   |3900  |
|Scott        |Finance   |3300  |
|Kumar        |Marketing |2000  |
|Jeff         |Marketing |3000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



### Distinct of selected Multiple Columns
dropDuplicates() multiple columns to eliminate duplicates.\
dropDuplicates() on DataFrame returns a new DataFrame with duplicate rows removed.



In [12]:
dropDisDF = df.dropDuplicates(["department", "salary"])
print("Distinct count of department & salary : " + str(dropDisDF.count()))
dropDisDF.show(truncate = False)

# 선택한 두 컬럼만 비교해서 중복을 제거함, 이름은 같던 말던~

Distinct count of department & salary : 8
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|Maria        |Finance   |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Kumar        |Marketing |2000  |
|Jeff         |Marketing |3000  |
|James        |Sales     |3000  |
|Robert       |Sales     |4100  |
|Michael      |Sales     |4600  |
+-------------+----------+------+



### orderBy() sort()
sorting 해서 새로운 DataFrame 리턴함

In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, asc


simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate = False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



### sorting()

In [16]:
df.sort("salary" , "age", ascending = [True, False])
# column1 은 ascending 오름차순, column2 는 descending 내림차순

DataFrame[employee_name: string, department: string, state: string, salary: bigint, age: bigint, bonus: bigint]

In [19]:
df.sort("department", "state").show(truncate = False)
df.sort(col("department"), col("state")).show(truncate = False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|

### orderBy()
By default, it orders by ascending.

In [20]:
df.orderBy("department", "state").show(truncate = False)
df.orderBy(col("department"), col("state")).show(truncate = False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Raman        |Finance   |CA   |99000 |40 |24000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Raman        |Finance   |CA   |99000 |40 |24000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|

### sort by asc
명시적으로 asc 넣고싶으면 asc()

In [22]:
df.sort(df.department.desc(), df.state.desc()).show(truncate = False)
df.sort(col("department").asc(), col("state").asc()).show(truncate = False)
df.orderBy(col("department").asc(), col("state").asc()).show(truncate = False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|

### sort by desc
desc(), asc_nulls_first() and asc_nulls_last() and equivalent descending functions.

### Raw SQL

In [25]:
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name, department, state, salary, age, bonus from EMP ORDER BY department asc").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+



### groupBy() 
to collect the identical data into groups on DataFrame \
and perform count, sum, avg, min, max functions on the grouped data.

In [26]:
simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]

In [28]:
df = spark.createDataFrame(data = simpleData, schema = schema)
df.printSchema()
df.show(truncate = False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



### groupBy() 

In [29]:
df.groupBy("department").sum("salary").show(truncate = False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+



In [34]:
df.groupBy("department").count().show()

+----------+-----+
|department|count|
+----------+-----+
|     Sales|    3|
|   Finance|    4|
| Marketing|    2|
+----------+-----+



In [36]:
df.groupBy("department").min("salary").show(truncate = False)

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|Sales     |81000      |
|Finance   |79000      |
|Marketing |80000      |
+----------+-----------+



In [38]:
df.groupBy("department").max("salary").show()

+----------+-----------+
|department|max(salary)|
+----------+-----------+
|     Sales|      90000|
|   Finance|      99000|
| Marketing|      91000|
+----------+-----------+



In [39]:
df.groupBy("department").avg("salary").show()

+----------+-----------------+
|department|      avg(salary)|
+----------+-----------------+
|     Sales|85666.66666666667|
|   Finance|          87750.0|
| Marketing|          85500.0|
+----------+-----------------+



In [41]:
#mean도 평균
df.groupBy("department").mean("salary").show()

+----------+-----------------+
|department|      avg(salary)|
+----------+-----------------+
|     Sales|85666.66666666667|
|   Finance|          87750.0|
| Marketing|          85500.0|
+----------+-----------------+



### groupBy Multiple columns 

In [54]:
df.groupBy("department", "state")\
.sum("salary", "bonus").sort(desc("sum(salary)"))\
.show(truncate = False)

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Finance   |CA   |189000     |47000     |
|Sales     |NY   |176000     |30000     |
|Finance   |NY   |162000     |34000     |
|Marketing |NY   |91000      |21000     |
|Sales     |CA   |81000      |23000     |
|Marketing |CA   |80000      |18000     |
+----------+-----+-----------+----------+



### aggregates at a time
agg()를 이용해서 집계쿼리 sum(), avg(), min(), max() mean() e.t.c. 한 문장으로 \
"from pyspark.sql.functions import sum,avg,max,min,mean,count"

In [57]:
from pyspark.sql.functions import sum, avg, max
df.groupBy("department")\
.agg(sum("salary").alias("sum_salary"),\
avg("salary").alias("avg_salary"),\
sum("bonus").alias("sum_bonus"),\
max("bonus").alias("max_bonus")\
    )\
.show(truncate = False)


+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+



### filter on aggregate data
similar to HAVING\
use either where() or filter()

In [9]:
from pyspark.sql.functions import sum, avg, max

df.groupBy("department")\
.agg(sum("salary").alias("sum_salary"),avg("salary").alias("avg_salary"),\
sum("bonus").alias("sum_bonus"),\
max("bonus").alias("max_bonus"))\
.where(col("sum_bonus") >= 50000)\
.show(truncate = False)

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
+----------+----------+-----------------+---------+---------+



### Join Two DataFrames
support all basic join type(INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN)\
involve data suffling across the network

join(self, other, on = None, how=None)

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data = emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate = False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data = dept, schema = deptColumns)
deptDF.show(truncate = False)
                               

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

join operation works by combining data from two or more Datasets based on a common column or key\

common key: 조인하기 위해서는 common key or column 필요함. 이 key 는 일치하는 row를 조인하기 위해 사용되어짐\
partitioning: pyspark 데이터셋은 여러 노드에 분산,분할 되어있음. 이상적으로 같은 조인 키 데이터는 같은 파티션에 위치되어 있어야함\
데이터셋이 조인키로 분할되지 않은 경우 셔플을 수행해서 데이터를 재분산하고 동일 키를 가진 로우를 동일 노드에 있도록 할 수 있음\
셔플링은 비싼 작업임, 특히 큰 데이터셋에서

inner join: 두 데이터프레임에서 같은 키를 가진 로우만 반환\
left join: 왼쪽 데이터프레임의 모든 로우와 오른쪽 데이터프레임과 매칭되는 로우를 반환함\
right join: 오른쪽 데이터프레임의 모든 로우와 왼쪽 데이터프레임과 매칭되는 로우 반환\
full outer join: 두 데이터프레임의 모든 로우 반환\
left semi join: 오른쪽 데이터프레임과 매치되는 왼쪽 데이터프레임 모든 로우 반환\
left anti join: 오른쪽 데이터프레임과 매치되지 않는 왼쪽 데이터프레임 모든 로우 반환



In [2]:
#inner join (defalut)
#inner join combines two DataFrames based on the key(common column) provided and results in rows 

empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "inner").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [4]:
#full outer join
#모든 행 반환, 조인 식이 일치하지 않는 경우 null 반환

In [3]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "outer").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [5]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "full").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [6]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "fullouter").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [7]:
#Left outer join
#
empDF.show()
deptDF.show()
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "left").show(truncate = False)
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "leftouter").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_em

In [8]:
# |6     |Brown   |2              |2010       |50         |      |-1    |null     |null 
# emp_dept_id에서 dept_id와 일치하지 않는 Brown에 대해 deptDF 컬럼에 null 할당

In [20]:
#right outer join
empDF.show()
deptDF.show()
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "right").show(truncate = False)
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "rightouter").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_em

In [9]:
# dept_id 와 일치하는 값의 row 다 반환
# 일치하지 않는 empDF row 값에 대해 null 할당

In [22]:
#left semi join
#inner join이랑 비슷함. 왼쪽 데이터셋 모든 컬럼 리턴하고 오른쪽은 무시함,
#오른쪽 데이터프레임과 매치되는 레코드 & 왼쪽 데이터프레임 컬럼만 리턴
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "leftsemi").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



In [23]:
#leftanti
#lefts semi 의 반대 
#매치되는 않는 왼쪽 데이터셋만 레코드만 리턴
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, "leftanti").show(truncate = False)

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



### Self join
위에 있는 조인 유형 선택해서 셀프조인 사용할 수 있음


In [28]:
empDF.show()
print("==self join==")
empDF.alias("emp1").join(empDF.alias("emp2"),\
col("emp1.superior_emp_id") == col("emp2.emp_id"), "inner")\
.select(col("emp1.emp_id"),col("emp1.name"),\
col("emp2.emp_id").alias("superior_emp_id"),\
col("emp2.name").alias("superior_emp_name")).show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|    name|superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|     1|   Smith|             -1|       2018|         10|     M|  3000|
|     2|    Rose|              1|       2010|         20|     M|  4000|
|     3|Williams|              1|       2010|         10|     M|  1000|
|     4|   Jones|              2|       2005|         10|     F|  2000|
|     5|   Brown|              2|       2010|         40|      |    -1|
|     6|   Brown|              2|       2010|         50|      |    -1|
+------+--------+---------------+-----------+-----------+------+------+

==self join==
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4   

In [29]:
empDF.alias("emp1").join(empDF.alias("emp2"),col("emp1.superior_emp_id") == col("emp2.emp_id"), "inner")\
.select(col("emp1.emp_id"),col("emp1.name"),col("emp2.emp_id").alias("superior_emp_id"),col("emp2.name").alias("superior_emp_name")).show(truncate = False)

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+



### SQL expression

<!-- empDF.createOrReplaceTempView("EMP")
detDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id").show(truncate = False)
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp
 -->

In [10]:
empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id").show(truncate = False)
joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id").show(truncate = False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

### Join on multiple DataFrames
join more than two tables, 

In [None]:
df1.join(df2, df1.id1 == df2.id2, "inner").join(df3, df1.id1 == df3.id3, "inner")

### union and union all

union(): merge two DataFrame's of the same structure/schema. schemas are not the same it returns an error \
unionAll(): deprecated since Spark 2.0.0, replace with union()

In SQL, union eliminates duplicates but unionall merge two datasets including duplicate records\
but, in pyspark recommend duplicate() function to remove duplicate rows.

In [14]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate = False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+



In [15]:
simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]

df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

df2.printSchema()
df2.show(truncate = False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



### Merge two or more DataFrames using union
merges two DataFrames and returns the new DataFrame \
with all rows from two Dataframes regardless of duplicate data.

In [17]:
unionDF = df.union(df2)
unionDF.show(truncate = False)

#same output
unionAllDF = df.unionAll(df2)
unionAllDF.show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|

### Merge without Duplicates


In [18]:
disDF = df.union(df2).distinct()
disDF.show(truncate = False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
+-------------+----------+-----+------+---+-----+



### unionByName()
merge/union two DataFrames with column names\
열 이름으로 두 데이터프레임 병합하는데 사용\
열 이름이 다른 순서로 있거나 DataFrame에 누락된 열이 있는 경우 두 개의 DataFrame 통합하는데 사용


unionByName(df, allowMissingColumns = True)

### Difference between unionByName() vs union()
unionByName() is used to merge two DataFrames by column names instead of by position\
unionByName() also provides an argument allowMissingColumns to specify if you have a different column counts

In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [("James",34), ("Michael",56), \
        ("Robert",30), ("Maria",24) ]
df1 = spark.createDataFrame(data = data, schema = ["name", "id"])
df1.printSchema()

data2=[(34,"James"),(45,"Maria"), \
       (45,"Jen"),(34,"Jeff")]

df2 = spark.createDataFrame(data = data2, schema = ["id", "name"])
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [22]:
df3 = df1.unionByName(df2)
df3.printSchema()
df3.show()

df3t = df1.union(df2)
df3t.show()

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

+-------+---+
|   name| id|
+-------+---+
|  James| 34|
|Michael| 56|
| Robert| 30|
|  Maria| 24|
|  James| 34|
|  Maria| 45|
|    Jen| 45|
|   Jeff| 34|
+-------+---+

+-------+-----+
|   name|   id|
+-------+-----+
|  James|   34|
|Michael|   56|
| Robert|   30|
|  Maria|   24|
|     34|James|
|     45|Maria|
|     45|  Jen|
|     34| Jeff|
+-------+-----+



### unionByName() with Different Number of Columns
different number of columns then use allowMissingColumns = True\
the result of the DataFrame contains null values for the columns that are missing on the DataFrame

#### param allowMissingColumns is available since Spark 3.1 version.

In [25]:
df1 = spark.createDataFrame([[5, 2, 6]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[6, 7, 3]], ["col1", "col2", "col3"])

df3 = df1.unionByName(df2, allowMissingColumns = True)
df3.printSchema()
df3.show()

root
 |-- col0: long (nullable = true)
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)

+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+



### UDF
UDF’s are the most expensive operations hence use them only you have no choice and when essential\
선택의 여지가 없고, 꼭 필요할때만 사용해라!

create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively

기능 재사용 시 사용. ex)첫번째 글자를 대문자로 전환할때 udf 만들어서 재사용 할 수 있음\
udf 만들기 전에 비슷한 기능이 있는지 확인하는 게 좋음, 그리고 디자인을 매우 세심하게 해야함 그렇지 않으면 성능 및 최적화 문제 발생할 가능성 있음

### create UDF

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data = data, schema = columns)

df.show(truncate = False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [2]:
def convertCase(str):
    resStr = ""
    arr = str.split(" ")
    for x in arr:
        resStr = resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr


In [3]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# StringType() is by default hence not required 원래 스트링이 기본이니까 옵션 안줘도됨
convertUDF = udf(lambda z: convertCase(z), StringType())

### UDF with DataFrame select()

In [4]:
df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show(truncate = False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



### UDF with DataFrame withColumn()


In [5]:
def upperCase(str):
    return str.upper()

In [6]:
upperCaseUDF = udf(lambda z:upperCase(z), StringType())

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show(truncate = False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



### UDF & use it on SQL

In [7]:
spark.udf.register("convertUDF", convertCase, StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE").show(truncate = False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



### UDF using annotation

In [8]:
@udf(returnType = StringType())
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name",  upperCase(col("Name"))).show(truncate = False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



### Excution order
실행순서를 보장하지 않음\
pyspark 는 쿼리 최적화 및 Planning을 위해 실행순서를 재정렬 하므로 표현식을 사용할 때 부작용을 주의해야함\
UDF 사용 시에는 null handling을 특히 주의해야함

In [9]:

""" 
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + "where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False) 

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



### Handling null check

In [12]:
""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data = data, schema = columns)
df2.show(truncate = False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2").show(truncate = False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_124/3922637301.py", line 3, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'


In [13]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "",StringType())
spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2").show(truncate = False)
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          "where Name is not null and _nullsafeUDF(Name) like '%John%'").show(truncate = False)

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



### transform()
pyspark provides two transform() functions one with DataFrame and another in pyspark.sql.function

pyspark.sql.DataFrame.transform() – Available since Spark 3.0\
pyspark.sql.functions.transform()

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate = False)

root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+



### DataFrame.transform()
pyspark.sql.DataFrame.transform()
사용자 함수를 연결하고 새로운 DataFrame()을 리턴함

#### DataFrame.transform(func: Callable[[..], DataFrame], *args: Any,**kwargs: Any) -> pyspark.sql.dataframe.DataFrame
func - custom function to call\
*args - Arguments to pass to func\
*kwargs - Keyword arguments to pass to func


In [16]:
# Custom transformation 1
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName", upper(df.CourseName))

# Custom transformation 2
def reduce_price(df, reduceBy):
    return df.withColumn("new_fee", df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",df.new_fee - (df.new_fee * df.discount) / 100)


### Apply DataFrame.transform()
사용자 정의 함수를 연결하고 transform() 함수를 실행

In [17]:
df2 = df.transform(to_upper_str_columns).transform(reduce_price, 1000).transform(apply_discount)

In [18]:
df2.show(truncate = False)

+----------+----+--------+-------+--------------+
|CourseName|fee |discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|JAVA      |4000|5       |3000   |2850.0        |
|PYTHON    |4600|10      |3600   |3240.0        |
|SCALA     |4100|15      |3100   |2635.0        |
|SCALA     |4500|15      |3500   |2975.0        |
|PHP       |3000|20      |2000   |1600.0        |
+----------+----+--------+-------+--------------+



열을 선택하려는 경우 select()를 쓰거나, 또 다른 custom function을 쓸 수 있음

In [23]:
#custom function
def select_columns(df):
    return df.select("CourseName", "discounted_fee")
    
#chain transformations
df2 = df.transform(to_upper_str_columns).transform(reduce_price,1000).transform(apply_discount).transform(select_columns)

In [24]:
df2.show(truncate = False)

+----------+--------------+
|CourseName|discounted_fee|
+----------+--------------+
|JAVA      |2850.0        |
|PYTHON    |3240.0        |
|SCALA     |2635.0        |
|SCALA     |2975.0        |
|PHP       |1600.0        |
+----------+--------------+



### sql.functions.transform()
apply the transformation on a column of type Array. This function applies the specified transformation on every element of the array and returns an object of ArrayType.

배열 컬럼 변형에 적용됨. 이 함수는 모든 요소에 지정된 변환을 적용하고 배열타입 객체를 리턴함
#### pyspark.sql.functions.transform(col, f)

In [8]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data = data, schema = ["Name", "Languages1", "Languages2"])
df.printSchema()
df.show()

from pyspark.sql.functions import upper
from pyspark.sql.functions import transform
df.select(transform("Languages1", lambda x: upper(x)).alias("languages1")).show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+

+------------------+
|        languages1|
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|      [CSHARP, VB]|
+------------------+



### apply Function to Column
withColumn(), sql(), select() 사용해서 기존 함수나 custom 함수 컬럼에 적용할 수 있음\
custom function에 적용하기 위해 function을 만들고 udf로 등록해야함.\
pyspark 최신 버전에서는 pandas API 사용을 제공하므로 pyspark.pandas.DataFrame.apply() 사용할 수 있음


In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno", "Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate = False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



### apply Function using withColumn()
withColumn() is a transformation function 

In [11]:
from pyspark.sql.functions import upper
df.withColumn("Upper_Name", upper(df.Name)).show()

+-----+------------+------------+
|Seqno|        Name|  Upper_Name|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



### Apply Function using select()
select() is used to select the columns from the pyspark DataFrame

In [12]:
df.select("Seqno", "Name", upper(df.Name)).show()

+-----+------------+------------+
|Seqno|        Name| upper(Name)|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



### apply function using SQL
SQL 쿼리 실행하기 위해서는 spark.sql() function , createOrReplaceTempView() 사용해야함\
이 테이블은 SparkSession 끄기 전까지 사용가능함

In [13]:
df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, UPPER(Name) from TAB").show()

+-----+------------+------------+
|Seqno|        Name| upper(Name)|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



### Custom Function
upper()은 있는 함수지만 예시를 위해 만듦, udf는 비싼 작업이니까 선택의 여지가 없을 때 사용!

In [14]:
def upperCase(str):
    return str.upper()

In [15]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
upperCaseUDF = udf(lambda x: upperCase(x), StringType())

### Apply Custom UDF to Column


In [17]:
df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show(truncate = False)
df.select(col("Seqno"),upperCaseUDF(col("Name")).alias("Name")).show(truncate = False)
spark.udf.register("upperCaseUDF", upperCaseUDF)
df.createOrReplaceTempView("TAB")
spark.sql("select Seqno, Name, upperCaseUDF(Name) from TAB").show()

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |JOHN JONES  |
|2    |TRACEY SMITH|
|3    |AMY SANDERS |
+-----+------------+

+-----+------------+------------------+
|Seqno|        Name|upperCaseUDF(Name)|
+-----+------------+------------------+
|    1|  john jones|        JOHN JONES|
|    2|tracey smith|      TRACEY SMITH|
|    3| amy sanders|       AMY SANDERS|
+-----+------------+------------------+



### Pandas apply()

In [19]:
import pyspark.pandas as ps
import numpy as np

technologies = ({
    'Fee' :[20000,25000,30000,22000,np.NaN],
    'Discount':[1000,2500,1500,1200,3000]
               })

psdf = ps.DataFrame(technologies)
print(psdf)

def add(data):
    return data[0] + data[1]
    
addDF = psdf.apply(add, axis=1)
print(addDF)

       Fee  Discount
0  20000.0      1000
1  25000.0      2500
2  30000.0      1500
3  22000.0      1200
4      NaN      3000




0    21000.0
1    27500.0
2    31500.0
3    23200.0
4        NaN
dtype: float64


축(axis)는 n차원 배열을 구성하는 요소\
axis = n으로 불리는 축은 그냥 바깥 리스트에서 안쪽 리스트 순으로 0부터 이름 붙인것에 불과\
1차원 배열은 axis=0\
2차원 배열은 axis=0,axis=1 


### map() Transformation
RDD transformation. RDD/DataFrame 의 모든요소에 변환함수를 적용하고 새로운 RDD를 반환\
컬럼 추가, 컬럼 업데이트, data 변환 작업 등에 적용됨\
map 변환에 결과는 항상 입력 레코드 수와 동일함
#### Note1: DataFrame doesn’t have map() transformation to use with DataFrame hence you need to convert DataFrame to RDD first.\
데이터프레임은 map() 없음 -> DatafFrame 을 RDD 로 변환
#### Note2: If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.
초기화가 많은 경우 mapPartitions()를 사용, 모든 레코드 대신 각 파티션에 대해 한 번만 실행됨

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd = spark.sparkContext.parallelize(data)

#### map(f, preservesPartitioning = False)

In [3]:
rdd2 = rdd.map(lambda x:(x,1))
for element in rdd2.collect():
    print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


In [4]:

data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]

df = spark.createDataFrame(data = data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [5]:
#referring column by index
rdd2 = df.rdd.map(lambda x:(x[0]+","+x[1], x[2], x[3]*2))
df2 = rdd2.toDF(["name", "gender", "new_salary"])
df2.show()

+---------------+------+----------+
|           name|gender|new_salary|
+---------------+------+----------+
|    James,Smith|     M|        60|
|      Anna,Rose|     F|        82|
|Robert,Williams|     M|       124|
+---------------+------+----------+



In [6]:
#column Names
rdd2 = df.rdd.map(lambda x:(x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2))

In [7]:
rdd2 = df.rdd.map(lambda x:(x.firstname+","+x.lastname, x.gender, x.salary*2))

In [8]:
def func1(x):
    firstName = x.firstname
    lastName = x.lastname
    name = firstName + ","+ lastName
    gender=x.gender.lower()
    salary = x.salary*2
    return(name,gender,salary)
rdd2 = df.rdd.map(lambda x: func(x))

### flatMap() transformation
flattens the RDD/DataFrame(array/map/DataFrame columns) after applying the function on every element and return a new PySpark RDD/DataFrame



In [9]:

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s


#### flatMap(f, preservesPartitioning = False)

In [10]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd = spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

rdd2 = rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [13]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()

arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data = arrayData, schema = ['name', 'knownLanguages','properties'])

from pyspark.sql.functions import explode
df2 = df.select(df.name, explode(df.knownLanguages))
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  null|
|   Robert|CSharp|
|   Robert|      |
|Jefferson|     1|
|Jefferson|     2|
+---------+------+



### foreach()
available RDD, DataFrame to iterate/loop over each element in the DataFrame,\
similar to for with advanced concepts
#### DataFrame.foreach(f)

.This operation is mainly used if you wanted to manipulate accumulators, save the DataFrame results to RDBMS tables, Kafka topics, and other external sources.\
RDD의 모든 요소에 특정한 함수를 적용하는 메서드

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data = data, schema = columns)
df.show()

def f(df):
    print(df.Seqno)
df.foreach(f)

+-----+------------+
|Seqno|        Name|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [16]:
### foreach() with accumulator Example

accum=spark.sparkContext.accumulator(0)
df.foreach(lambda x:accum.add(int(x.Seqno)))
print(accum.value)

6


#### RDD.foreach(f: Callable[[T], None]) -> None

In [17]:

# foreach() with RDD example
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value) #Accessed by driver

15


### Random Sample
pyspark.sql.DataFrame.sample()\
pyspark.sqlDataFrame.sampleBy()\
RDD.sample()\
RDD.takeSample()

큰 사이즈의 데이터셋/파일로 처리하다보면 시간이 너무 많이 걸릴 수 있음, 분석할 동안은 큰 파일의 랜덤 부분집합 표본을 사용하는 것이 좋음

Pyspark sampling (pyspark.sql.DataFrame.sample()) 은 데이터셋에서 랜덤 샘플 레코드를 얻는 과정임\
큰 데이터셋을 가지고 있을 때 원본 파일의 10% 정도같이 하위 집합을 분석/ 테스트 시 유용함
### sample(withReplacement, fraction, seed = None)
fraction - 생성할 행의 비율, 범위는 [0.0,1.0]임. 레코드 비율의 정확한 수를 보장하진 않음\
seed - 샘플링을 위한 seed\
withReplacement - 교체여부에 따른 샘플(default False) 


### fraction 사용해서 random sample 얻기

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
df=spark.range(100)
print(df.sample(0.06).collect())

# 100레코드 중 6% 샘플 레코드를 요청했으나(6줄) , 7줄이 나옴 => 샘플 function은 지정된 정확한 비율을 반환하지 않음

[Row(id=4), Row(id=23), Row(id=39), Row(id=99)]


### seed 사용해서 샘플 재사용



In [2]:
print(df.sample(0.1, 123).collect())
print(df.sample(0.1, 123).collect())
print(df.sample(0.1, 456).collect())

[Row(id=36), Row(id=37), Row(id=41), Row(id=43), Row(id=56), Row(id=66), Row(id=69), Row(id=75), Row(id=83)]
[Row(id=36), Row(id=37), Row(id=41), Row(id=43), Row(id=56), Row(id=66), Row(id=69), Row(id=75), Row(id=83)]
[Row(id=19), Row(id=21), Row(id=42), Row(id=48), Row(id=49), Row(id=50), Row(id=75), Row(id=80)]


### withReplacement
반복되는 값의 랜덤샘플 얻고싶은경우, True value 사용


In [3]:
print(df.sample(True, 0.3, 123).collect())
#14,14,52,52,65,65...반복되는 값 가짐

[Row(id=0), Row(id=5), Row(id=9), Row(id=11), Row(id=14), Row(id=14), Row(id=16), Row(id=17), Row(id=21), Row(id=29), Row(id=33), Row(id=41), Row(id=42), Row(id=52), Row(id=52), Row(id=54), Row(id=58), Row(id=65), Row(id=65), Row(id=71), Row(id=76), Row(id=79), Row(id=85), Row(id=96)]


In [4]:
print(df.sample(0.3, 123).collect())

[Row(id=0), Row(id=4), Row(id=17), Row(id=19), Row(id=24), Row(id=25), Row(id=26), Row(id=36), Row(id=37), Row(id=41), Row(id=43), Row(id=44), Row(id=53), Row(id=56), Row(id=66), Row(id=68), Row(id=69), Row(id=70), Row(id=71), Row(id=75), Row(id=76), Row(id=78), Row(id=83), Row(id=84), Row(id=88), Row(id=94), Row(id=96), Row(id=97), Row(id=98)]


### stratified sampling 
sampleBy()\
각 계층에 대한 샘플링 비율 리턴
#### sampleBy(col, fractions, seed = None)





In [6]:
df2 = df.select((df.id % 3).alias("key"))
print(df2.sampleBy("key", {0: 0.1, 1:0.2}, 0).collect())

[Row(key=0), Row(key=1), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=0), Row(key=1), Row(key=1), Row(key=1)]


### RDD Sample
RDD also provides sample(), takeSample() return an array[T]
#### sample(self, withReplacement, fraction, seed = None)

In [7]:
rdd = spark.sparkContext.range(0,100)
print(rdd.sample(False, 0.1,0).collect())
print(rdd.sample(True, 0.3, 123).collect())

[24, 29, 41, 64, 86]
[0, 11, 13, 14, 16, 18, 21, 23, 27, 31, 32, 32, 48, 49, 49, 53, 54, 72, 74, 77, 77, 83, 88, 91, 93, 98, 99]


#### takeSample(self, withReplacement, num, seed=None)

In [9]:
print(rdd.takeSample(False,10,0))
print(rdd.takeSample(True, 30, 123))

[58, 1, 96, 74, 29, 24, 32, 37, 94, 91]
[43, 65, 39, 18, 84, 86, 25, 13, 40, 21, 79, 63, 7, 32, 26, 71, 23, 61, 83, 60, 22, 35, 84, 22, 0, 88, 16, 40, 65, 84]


### fillna() , fill() - replace NULL/None Values
DataFrame.fillna() or DataFrameNaFunctions.fill()\
DataFrame 열의 NULL/None 값을 0, 빈 문자열, 공백, 상수로 바꾸는 역할

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

filePath = "small_zipcode.csv"
df = spark.read.options(header = 'true', inferSchema='true').csv(filePath)
df.printSchema()
df.show(truncate = False)

root
 |-- id: integer (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- population: integer (nullable = true)

+---+-------+--------+-------------------+-----+----------+
|id |zipcode|type    |city               |state|population|
+---+-------+--------+-------------------+-----+----------+
|1  |704    |STANDARD|null               |PR   |30100     |
|2  |704    |null    |PASEO COSTA DEL SUR|PR   |null      |
|3  |709    |null    |BDA SAN LUIS       |PR   |3700      |
|4  |76166  |UNIQUE  |CINGULAR WIRELESS  |TX   |84000     |
|5  |76177  |STANDARD|null               |TX   |null      |
+---+-------+--------+-------------------+-----+----------+



#### fillna(value, subset=None) fill(value, subset=None)
DataFrame.fillna(), DataFrameNaFunctions.fill() replace NULL/None values\
value - int, long, float, string, dict...\
subset - optional, 

### replace NULL/None Values with Zero(0)
fill(value:Long)


In [3]:
#Replace 0 for null for all integer columns

df.na.fill(value=0).show()

#Replace 0 for null on only population column 

df.na.fill(value=0, subset=["population"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               null|   TX|         0|
+---+-------+--------+-------------------+-----+----------+

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|               null|   PR|     30100|
|  2|    704|    null|PASEO COSTA DEL SUR|   PR|         0|
|  3|    709|    null|       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|               nu

### replace Null/None with empty String

In [6]:
df.na.fill("").show(truncate = False)

+---+-------+--------+-------------------+-----+----------+
|id |zipcode|type    |city               |state|population|
+---+-------+--------+-------------------+-----+----------+
|1  |704    |STANDARD|                   |PR   |30100     |
|2  |704    |        |PASEO COSTA DEL SUR|PR   |null      |
|3  |709    |        |BDA SAN LUIS       |PR   |3700      |
|4  |76166  |UNIQUE  |CINGULAR WIRELESS  |TX   |84000     |
|5  |76177  |STANDARD|                   |TX   |null      |
+---+-------+--------+-------------------+-----+----------+



In [7]:
df.na.fill("unknown", ["city"]).na.fill("", ["type"]).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|            unknown|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|            unknown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



In [8]:
df.na.fill({"city":"unknown", "type": ""}).show()

+---+-------+--------+-------------------+-----+----------+
| id|zipcode|    type|               city|state|population|
+---+-------+--------+-------------------+-----+----------+
|  1|    704|STANDARD|            unknown|   PR|     30100|
|  2|    704|        |PASEO COSTA DEL SUR|   PR|      null|
|  3|    709|        |       BDA SAN LUIS|   PR|      3700|
|  4|  76166|  UNIQUE|  CINGULAR WIRELESS|   TX|     84000|
|  5|  76177|STANDARD|            unknown|   TX|      null|
+---+-------+--------+-------------------+-----+----------+



### pivot() unpivot()
Pivot() 그룹화된 열 값 중 하나를 고유한 데이터가 있는 개별 열로 바꾸는 집계입니다.


In [65]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
#Create spark session
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate = False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



In [66]:
pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



버전 2.0 에서는 성능이 개선됐는데, 그 아래는 매우 비싼 작업임. 인수로 컬럼 데이터를 제공하는걸 권장

In [67]:
countries = ["USA", "China", "Canada", "Mexico"]
pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show(truncate = False)

+-------+----+-----+------+------+
|Product|USA |China|Canada|Mexico|
+-------+----+-----+------+------+
|Orange |4000|4000 |null  |null  |
|Beans  |1600|1500 |null  |2000  |
|Banana |1000|400  |2000  |null  |
|Carrots|1500|1200 |2000  |null  |
+-------+----+-----+------+------+



In [68]:
pivotDF = df.groupBy("Product", "Country").sum("Amount").groupBy("Product")\
.pivot("Country").sum("sum(Amount)")
pivotDF.show(truncate=False)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



### partitionBy()
pyspark.sql.DataFrameWriter\
큰 데이터셋을 하나 이상의 컬럼을 기반으로 작은 파일로 분할하는데 사용\
데이터레이크에서 쿼리 성능을 개선하는 방법\
하나 이상의 파티션 키로 큰 데이터셋을 작은 데이터셋으로 나눔\
PySpark supports partition in two ways; partition in memory (DataFrame) and partition on the disk (File system).

partition in memory: repartition() or coalesec()\
partition on disk: DataFrame 을 디스크에 다시 쓰는 동안, pyspark.sql.DataFrameWriter의 partitionBy() 를 사용해서 컬럼기반으로 데이터를 파티션 하는 방법을 선택할 수 있음 

#### partitionBy(self, *cols)


In [5]:
df = spark.read.option("header", True).csv("simple-zipcodes.csv")
df.printSchema()

root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- State: string (nullable = true)



In [6]:
df.write.option("header", True).partitionBy("state").mode("overwrite").csv("zipcodes-state")

파티션으로 데이터를 쓰는 동안, 데이터파일에서 파티션 컬럼을 제거함\
저장공간이 절약됨

### partitionBy() multiple columns

In [17]:
df.write.option("header", True).partitionBy("state", "city").mode("overwrite").csv("zipcodes-state")
df.printSchema()
df.show()

root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- State: string (nullable = true)

+------------+-------+-------------------+-------+-----+
|RecordNumber|Country|               City|Zipcode|State|
+------------+-------+-------------------+-------+-----+
|           1|     US|        PARC PARQUE|    704|   PR|
|           2|     US|PASEO COSTA DEL SUR|    704|   PR|
|          10|     US|       BDA SAN LUIS|    709|   PR|
|       49347|     US|               HOLT|  32564|   FL|
|       49348|     US|          HOMOSASSA|  34487|   FL|
|       61391|     US|  CINGULAR WIRELESS|  76166|   TX|
|       61392|     US|         FORT WORTH|  76177|   TX|
|       61393|     US|           FT WORTH|  76177|   TX|
|       54356|     US|        SPRUCE PINE|  35585|   AL|
|       76511|     US|           ASH HILL|  27007|   NC|
|           4|     US|    URB EUGENE RICE|    704|   PR|
|

### using repartition() and partitionBy() together
repartition()은 지정된 수의 파티션을 메모리에 만듦\
partitionBy()는 각 메모리 파티션과 파티션 컬럼에 대해 파일을 디스크에 씀

In [11]:
df.repartition(2).write.option("header", True).partitionBy("state").mode("overwrite").csv("zipcodes-state-more")

### Data Skew
파티션 파일당 레코드 수 제어 \
maxRecordsPerFile \
데이터가 왜곡됨(일부 파티션에는 레코드 수가 매우 적고 다른 파티션에는 레코드 수가 많음)현상에서 특히 도움됨

In [12]:
df.write.option("header", True).option("maxRecordsPerFile",2).partitionBy("state")\
.mode("overwrite").csv("zipcodes-state")

### read a specific partition
전체 파일 스캔하는 대신 특정 폴더에서 데이터 읽음

In [15]:
dfSinglePart = spark.read.option("header", True).csv("zipcodes-state/state=AL/city=SPRINGVILLE")
dfSinglePart.printSchema()
dfSinglePart.show()

root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Zipcode: string (nullable = true)

+------------+-------+-------+
|RecordNumber|Country|Zipcode|
+------------+-------+-------+
|       54355|     US|  35146|
+------------+-------+-------+



특정 파티션 데이터를 읽었을 때, 파티션 컬럼을 포함하지 않는 걸 볼 수 있음


데이터 셋 전체 폴더로 읽으면 또 스키마랑 데이터에 포함돼서 읽혀짐

### SQL - Read Partition Data
파티션이 없는 쿼리보다 훨씬 빠름

In [19]:
parqDF = spark.read.option("header", True).csv("zipcodes-state")
parqDF.createOrReplaceTempView("ZIPCODE")
spark.sql("select * from ZIPCODE where state = 'AL' and city = 'SPRINGVILLE'").show()

+------------+-------+-------+-----+-----------+
|RecordNumber|Country|Zipcode|state|       city|
+------------+-------+-------+-----+-----------+
|       54355|     US|  35146|   AL|SPRINGVILLE|
+------------+-------+-------+-----+-----------+



### How to Choose a Partition Column When Writing to File System?
파티션 개수 정할 때 조심해야함, 너무 많은 파티션을 많은 서브디렉터리를 생성해서 오버헤드 줄 수 있음(하둡쓴다면)\
since it must keep all metadata for the file system in memory.\
. Ideally, you should partition on Year/Month but not on a date.

### MapType(Dict)
key-value 쌍으로 저장하는 Python Dictionary 타입 나타냄\
key type, value type, valueContainsNull(BooleanType) 세가지 필드로 구성\
pyspark.sql.types.MapType 사용하여 MapType() 객체 생성




In [1]:
from pyspark.sql.types import StringType, MapType
mapCol = MapType(StringType(), StringType(), False)

1 매개변수 KeyType 맵의 키 유형 지정 시 사용\
2 매개변수 ValueType 맵의 값 유형 지정 시 사용\
3 매개변수 ValueContainsNull 옵션 boolean type, 두번째 매개변수 값이 null/none 값을 허용하는지 여부를 지정하는 데 사용\
map의 key는 None/Null 값을 허용X


### MapType From StructType

In [2]:
from pyspark.sql.types import StructField, StructType, StringType, MapType
schema = StructType([
    StructField('name', StringType(), True),
    StructField('properties', MapType(StringType(), StringType()), True)])

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]
df = spark.createDataFrame(data = dataDictionary, schema = schema)
df.printSchema()
df.show(truncate = False)

root
 |-- name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|name      |properties                   |
+----------+-----------------------------+
|James     |{eye -> brown, hair -> black}|
|Michael   |{eye -> null, hair -> brown} |
|Robert    |{eye -> black, hair -> red}  |
|Washington|{eye -> grey, hair -> grey}  |
|Jefferson |{eye -> , hair -> brown}     |
+----------+-----------------------------+



### MapType 요소 접근
Dictionary 컬럼에서 key, value 추출


In [4]:
df3 = df.rdd.map(lambda x:(x.name, x.properties["hair"], x.properties["eye"])).toDF(["name","hair","eye"])
df3.printSchema()
df3.show()

root
 |-- name: string (nullable = true)
 |-- hair: string (nullable = true)
 |-- eye: string (nullable = true)

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [12]:
df.withColumn("hair", df.properties.getItem("hair"))\
.withColumn("eye", df.properties.getItem("eye"))\
.drop("properties")\
.show(truncate = False)


df.withColumn("hair",df.properties["hair"]) \
  .withColumn("eye",df.properties["eye"]) \
  .drop("properties") \
  .show()

+----------+-----+-----+
|name      |hair |eye  |
+----------+-----+-----+
|James     |black|brown|
|Michael   |brown|null |
|Robert    |red  |black|
|Washington|grey |grey |
|Jefferson |brown|     |
+----------+-----+-----+

+----------+-----+-----+
|      name| hair|  eye|
+----------+-----+-----+
|     James|black|brown|
|   Michael|brown| null|
|    Robert|  red|black|
|Washington| grey| grey|
| Jefferson|brown|     |
+----------+-----+-----+



In [13]:
from pyspark.sql.functions import explode
df.select(df.name, explode(df.properties)).show()

+----------+----+-----+
|      name| key|value|
+----------+----+-----+
|     James| eye|brown|
|     James|hair|black|
|   Michael| eye| null|
|   Michael|hair|brown|
|    Robert| eye|black|
|    Robert|hair|  red|
|Washington| eye| grey|
|Washington|hair| grey|
| Jefferson| eye|     |
| Jefferson|hair|brown|
+----------+----+-----+



In [14]:
from pyspark.sql.functions import map_keys
df.select(df.name, map_keys(df.properties)).show()

+----------+--------------------+
|      name|map_keys(properties)|
+----------+--------------------+
|     James|         [eye, hair]|
|   Michael|         [eye, hair]|
|    Robert|         [eye, hair]|
|Washington|         [eye, hair]|
| Jefferson|         [eye, hair]|
+----------+--------------------+



In [15]:
from pyspark.sql.functions import explode,map_keys
keysDF = df.select(explode(map_keys(df.properties))).distinct()
keyList = keysDF.rdd.map(lambda x:x[0]).collect()
print(keyList)

['eye', 'hair']


In [17]:
from pyspark.sql.functions import map_values
df.select(df.name, map_values(df.properties)).show()

+----------+----------------------+
|      name|map_values(properties)|
+----------+----------------------+
|     James|        [brown, black]|
|   Michael|         [null, brown]|
|    Robert|          [black, red]|
|Washington|          [grey, grey]|
| Jefferson|             [, brown]|
+----------+----------------------+



### Aggregate functions

In [26]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [35]:
#approx_count_distinct()
# 그룹에 distinct item 수를 리턴
# salary에 고유한 값들 개수
print("approx_count_distint:" + str(df.select(approx_count_distinct("salary")).collect()[0][0]))


approx_count_distint:6


In [36]:
print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

avg: 3400.0


In [37]:
# the collect_list function has grouped the data by name and created a list of all the course values for each name.
df.select(collect_list("salary")).show(truncate=False)

+------------------------------------------------------------+
|collect_list(salary)                                        |
+------------------------------------------------------------+
|[3000, 4600, 4100, 3000, 3000, 3300, 3900, 3000, 2000, 4100]|
+------------------------------------------------------------+



In [39]:
# collect_set => distinct value 리턴함
df.select(collect_set("salary")).show(truncate = False)

+------------------------------------+
|collect_set(salary)                 |
+------------------------------------+
|[4600, 3000, 3900, 4100, 3300, 2000]|
+------------------------------------+



In [31]:
df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate = False)
print("Distinct Count of Department & Salary: " +str(df2.collect()[0][0]))

+----------------------------------+
|count(DISTINCT department, salary)|
+----------------------------------+
|8                                 |
+----------------------------------+

Distinct Count of Department & Salary: 8


In [40]:
print("count:"+ str(df.select(count("salary")).collect()[0]))

count:Row(count(salary)=10)


In [41]:
df.select(first("salary")).show(truncate = False)

+-------------+
|first(salary)|
+-------------+
|3000         |
+-------------+



In [42]:
df.select(last("salary")).show(truncate =False)

+------------+
|last(salary)|
+------------+
|4100        |
+------------+



In [43]:
df.select(kurtosis("salary")).show(truncate=False)

+-------------------+
|kurtosis(salary)   |
+-------------------+
|-0.6467803030303032|
+-------------------+



In [44]:
df.select(max("salary")).show(truncate = False)

+-----------+
|max(salary)|
+-----------+
|4600       |
+-----------+



In [45]:
df.select(min("salary")).show(truncate=False)

+-----------+
|min(salary)|
+-----------+
|2000       |
+-----------+



In [46]:
df.select(mean("salary")).show(truncate= False)

+-----------+
|avg(salary)|
+-----------+
|3400.0     |
+-----------+



In [47]:
df.select(skewness("salary")).show(truncate = False)

+--------------------+
|skewness(salary)    |
+--------------------+
|-0.12041791181069571|
+--------------------+



In [48]:
df.select(stddev("salary"), stddev_samp("salary"), stddev_pop("salary")).show(truncate = False)

+-------------------+-------------------+------------------+
|stddev_samp(salary)|stddev_samp(salary)|stddev_pop(salary)|
+-------------------+-------------------+------------------+
|765.9416862050705  |765.9416862050705  |726.636084983398  |
+-------------------+-------------------+------------------+



In [49]:
df.select(sum("salary")).show(truncate = False)

+-----------+
|sum(salary)|
+-----------+
|34000      |
+-----------+



In [50]:
df.select(sumDistinct("salary")).show(truncate = False)



+--------------------+
|sum(DISTINCT salary)|
+--------------------+
|20900               |
+--------------------+



In [51]:
df.select(variance("salary"), var_samp("salary"), var_pop("salary")).show(truncate = False)

+-----------------+-----------------+---------------+
|var_samp(salary) |var_samp(salary) |var_pop(salary)|
+-----------------+-----------------+---------------+
|586666.6666666666|586666.6666666666|528000.0       |
+-----------------+-----------------+---------------+



### Window Functions
입력 행 범위에 대한 순위, 행 번호 등과 같은 결과 계산하는데 쓰임\
로우 그룹에서 작동하며 모든 입력 로우에 대해 단일 값을 리턴함\
그룹에 대한 작업 수행하려면 Window.partitionBy()로 데이터를 먼저 파티션해야하며,\
행 번호, 순위 함수에 대해 추가로 파티션 데이터를 orderBy 로 정렬해야함


In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



WindowSpec창 ( 프레임 ) 에 어떤 행이 포함되는지를 정의하는 창 사양 입니다 . 즉, 일부 관계 에 의해 현재 행과 연관된 행 집합입니다 .

### row_number()
각 윈도우 파티션 행의 순서 번호 1부터 시작

In [59]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_numver", row_number().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_numver|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



### rank()
윈도우 파티션 내 순위\
동점일 때 순위 차이를 남김

In [60]:
from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(windowSpec)).show()

# 동점일 때 건너뛴 거 볼 수 있음

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



### dense_rank()
차이 없이 순위 매기는 함수



In [62]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



### percent_rank()


In [63]:
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank", percent_rank().over(windowSpec)).show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



### ntile()
상대적인 순위\
2 argument, 1&2 리턴

In [64]:
from pyspark.sql.functions import ntile
df.withColumn("ntile", ntile(2).over(windowSpec)).show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+



### Window Analytic functions


### cume_dist()
값의 누적 cumulative 분포 가져옴

In [10]:
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist", cume_dist().over(windowSpec)).show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
+-------------+----------+------+------------------+



### lag()


In [11]:
from pyspark.sql.functions import lag
df.withColumn("lag" , lag("salary",2).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



### lead()


In [12]:
from pyspark.sql.functions import lead
df.withColumn("lead", lead("salary",2).over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
+-------------+----------+------+----+



### window aggregate functions


In [13]:
windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number

df.withColumn("row", row_number().over(windowSpec)).\
withColumn("avg", avg(col("salary")).over(windowSpecAgg))\
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))\
.withColumn("min", min(col("salary")).over(windowSpecAgg))\
.withColumn("max", max(col("salary")).over(windowSpecAgg))\
.where(col("row") == 1).select("department", "avg", "sum", "min", "max").show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



### Date and Timestamp 
DateType default format yyyy-MM-dd\
TimestampType default yyyy-MM-dd HH:mm:ss.SSSS\
retrun null, 입력이 date/ Timestamp 로 cast 되지 않는  string 일 경우\
https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()


+---+----------+
| id|     input|
+---+----------+
|  1|2020-02-01|
|  2|2019-03-01|
|  3|2021-03-01|
+---+----------+



https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/
### current_date()


In [2]:
df.select(current_date().alias("current_date")).show(1)

#By default, the data will be returned in yyyy-dd-mm format.



+------------+
|current_date|
+------------+
|  2023-12-05|
+------------+
only showing top 1 row



### date_format()
parses the date and converts from yyyy-dd-mm

In [6]:
df.select(col("input"),date_format(col("input"), "MM-dd-yyyy").alias("date_format")).show()

+----------+-----------+
|     input|date_format|
+----------+-----------+
|2020-02-01| 02-01-2020|
|2019-03-01| 03-01-2019|
|2021-03-01| 03-01-2021|
+----------+-----------+



### to_date()
문자열을 날짜 형식으로 바꿈

In [7]:
df.select(col("input"), to_date(col("input"),"yyyy-MM-dd").alias("to_date")).show()

+----------+----------+
|     input|   to_date|
+----------+----------+
|2020-02-01|2020-02-01|
|2019-03-01|2019-03-01|
|2021-03-01|2021-03-01|
+----------+----------+



### datediff()


In [8]:
df.select(col("input"), datediff(current_date(), col("input")).alias("datediff")).show()

+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|    1403|
|2019-03-01|    1740|
|2021-03-01|    1009|
+----------+--------+



In [9]:
df.select(col("input"), datediff(lit('2020-02-02'), col("input")).alias("datediff")).show()
#lit 으로 열에 상수값 채운다음에 뺴기 -> 아마 오른쪽에 빼는 값이 열의 값이라서 수가 맞는 열 요소가 필요한가봄

+----------+--------+
|     input|datediff|
+----------+--------+
|2020-02-01|       1|
|2019-03-01|     338|
|2021-03-01|    -393|
+----------+--------+



### months_between()
return 두 날짜 사이의 개월 수 계산하기\
MONTHS_BETWEEN(날짜, 날짜)

In [28]:
df.select(col("input"), months_between(current_date(), col("input")).alias("months_between")).show()

+----------+--------------+
|     input|months_between|
+----------+--------------+
|2020-02-01|   46.09677419|
|2019-03-01|   57.09677419|
|2021-03-01|   33.09677419|
+----------+--------------+



### trunc()
truncates the date at a specified unit\
TRUNC(DT, 'YEAR')	-- 월, 일 초기화\
     , TRUNC(DT, 'MONTH')	-- 일 초기화\
     , TRUNC(DT, 'DAY')		-- 요일 초기화 (일요일)


In [10]:
df.select(col("input"), \
          trunc(col("input"),"Month").alias("Month_Trunc"),
          trunc(col("input"),"Year").alias("Month_Year"),
          trunc(col("input"),"Month").alias("Month_Trunc")).show()


+----------+-----------+----------+-----------+
|     input|Month_Trunc|Month_Year|Month_Trunc|
+----------+-----------+----------+-----------+
|2020-02-01| 2020-02-01|2020-01-01| 2020-02-01|
|2019-03-01| 2019-03-01|2019-01-01| 2019-03-01|
|2021-03-01| 2021-03-01|2021-01-01| 2021-03-01|
+----------+-----------+----------+-----------+



### add_months(), date_add(), date_sub()



In [11]:
df.select(col("input"), \
          add_months(col("input"),3).alias("add_months"),
          add_months(col("input"),-3).alias("sub_months"),
          date_add(col("input"),4).alias("date_add"),
          date_sub(col("input"),4).alias("date_sub")).show()


+----------+----------+----------+----------+----------+
|     input|add_months|sub_months|  date_add|  date_sub|
+----------+----------+----------+----------+----------+
|2020-02-01|2020-05-01|2019-11-01|2020-02-05|2020-01-28|
|2019-03-01|2019-06-01|2018-12-01|2019-03-05|2019-02-25|
|2021-03-01|2021-06-01|2020-12-01|2021-03-05|2021-02-25|
+----------+----------+----------+----------+----------+



### year(), month(), month(),next_day(), weekofyear()


In [46]:
df.select(col("input"), 
     year(col("input")).alias("year"), 
     month(col("input")).alias("month"), 
     next_day(col("input"),"Sunday").alias("next_day"), 
     weekofyear(col("input")).alias("weekofyear") 
  ).show()

+----------+----+-----+----------+----------+
|     input|year|month|  next_day|weekofyear|
+----------+----+-----+----------+----------+
|2020-02-01|2020|    2|2020-02-02|         5|
|2019-03-01|2019|    3|2019-03-03|         9|
|2021-03-01|2021|    3|2021-03-07|         9|
+----------+----+-----+----------+----------+



### dayofweek(), dayofmonth(), dayofyear()
#dayofweek 1 for sunday, 2 for monday, ...7 for saturday\
#dayofmonth 며칠인지 반환\
#dayofyear 년을 기준으로 며칠째인지 반환

In [49]:
df.select(col("input"),  
     dayofweek(col("input")).alias("dayofweek"), 
     dayofmonth(col("input")).alias("dayofmonth"), 
     dayofyear(col("input")).alias("dayofyear"), 
  ).show()



+----------+---------+----------+---------+
|     input|dayofweek|dayofmonth|dayofyear|
+----------+---------+----------+---------+
|2020-02-01|        7|         1|       32|
|2019-03-01|        6|         1|       60|
|2021-03-01|        2|         1|       60|
+----------+---------+----------+---------+



### current_timestamp()

In [14]:
data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+



In [15]:
df2.select(current_timestamp().alias("current_timestamp")).show(1,truncate = False)

+--------------------------+
|current_timestamp         |
+--------------------------+
|2023-12-05 01:57:52.098615|
+--------------------------+
only showing top 1 row



### to_timestamp()

In [16]:
df2.select(col("input"), to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp")).show(truncate = False)

+-----------------------+-----------------------+
|input                  |to_timestamp           |
+-----------------------+-----------------------+
|02-01-2020 11 01 19 06 |2020-02-01 11:01:19.06 |
|03-01-2019 12 01 19 406|2019-03-01 12:01:19.406|
|03-01-2021 12 01 19 406|2021-03-01 12:01:19.406|
+-----------------------+-----------------------+



### hour(), Minute() and second()


In [21]:
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])

df3.select(col("input"), 
    hour(col("input")).alias("hour"), 
    minute(col("input")).alias("minute"),
    second(col("input")).alias("second") 
  ).show(truncate=False)


+-----------------------+----+------+------+
|input                  |hour|minute|second|
+-----------------------+----+------+------+
|2020-02-01 11:01:19.06 |11  |1     |19    |
|2019-03-01 12:01:19.406|12  |1     |19    |
|2021-03-01 12:01:19.406|12  |1     |19    |
+-----------------------+----+------+------+



### JSON functions
#### from_json() 
json string 을 struct type, map type 으로 바꿈
#### to_json() 
map type, struct type 을 json string 으로 바꿈
#### json_tuple()
json에서 데이터 추출해서 새로운 컬럼으로 만듦
#### get_json_object()
지정된 json 경로를 기반으로 JSON 문자열에서 JSON 요소를 추출
#### schema_of_json()
json string으로부터 스키마 string 생성


In [21]:
#DataFrame with JSON String

from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
df = spark.createDataFrame([(1, jsonString)], ["id", "value"])
df.show(truncate = False)

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+



### from_json()


In [5]:
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import from_json
#Convert JSON string column to Map type
df2 = df.withColumn("value", from_json(df.value, MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate = False)

root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+



### to_json()
convert DataFrame columns MapType or Struct type to JSON string

In [6]:
from pyspark.sql.functions import to_json, col
df2.withColumn("value", to_json(col("value"))).show(truncate = False)

+---+----------------------------------------------------------------------------+
|id |value                                                                       |
+---+----------------------------------------------------------------------------+
|1  |{"Zipcode":"704","ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+----------------------------------------------------------------------------+



### json_tuple()
 query or extract the elements from JSON column and create the result as a new columns

In [7]:
from pyspark.sql.functions import json_tuple
df.select(col("id"), json_tuple(col("value"), "Zipcode", "ZipCodeType", "City")).toDF("id", "Zipcode", "ZipCodeType", "City").show(truncate = False)

+---+-------+-----------+-----------+
|id |Zipcode|ZipCodeType|City       |
+---+-------+-----------+-----------+
|1  |704    |STANDARD   |PARC PARQUE|
+---+-------+-----------+-----------+



### get_json_object()
json column path 기반으로 json string 추출


In [8]:
from pyspark.sql.functions import get_json_object
df.select(col("id"), get_json_object(col("value"), "$.ZipCodeType").alias("ZipCodeType")).show(truncate = False)

+---+-----------+
|id |ZipCodeType|
+---+-----------+
|1  |STANDARD   |
+---+-----------+



### schema_of_json()

In [3]:
from pyspark.sql.functions import schema_of_json, lit
schemaStr = spark.range(1)\
    .select(schema_of_json(lit("""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""))) \
    .collect()[0][0]
print(schemaStr)

NameError: name 'spark' is not defined

### Read CSV File into DataFrame

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
df = spark.read.csv("zipcodes.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



In [5]:
df = spark.read.format("csv").load("zipcodes.csv")
#or
#df = spark.read.format("org.apache.spark.sql.csv").load("zipcodes.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



### header

In [6]:
df2 = spark.read.option("header", True).csv("zipcodes.csv")
df2.printSchema()
#컬럼 이름으로 헤더를 갖고 있다면, header option에 True 지정해라-> option("header", True), 이거 안하면 그냥 데이터레코드로 취급함


root
 |-- RecordNumber: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: string (nullable = true)
 |-- TotalWages: string (nullable = true)
 |-- Notes: string (nullable = true)



### read multiple CSV 
df = spark.read.csv("path1, path2, path3")

### read all CSV Files in a Directory
df = spark.read.csv("Folder path")

### delimiter

In [33]:
df3 = spark.read.options(delimiter =',').csv("zipcodes.csv")

### inferSchema

In [38]:
#True 하면 자동으로 컬럼타입 추론함
df4 = spark.read.options(inferSchema = 'True', delimiter =',').csv("zipcodes.csv")
df4.printSchema()

#option 함수로 바꿔도됨
df4 = spark.read.option("inferSchema", True).option("delimiter", ",").csv("zipcodes.csv")

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)



### header

In [39]:
df3 = spark.read.options(header = 'True', inferSchema = 'True', delimiter = ',').csv("zipcodes.csv")
df3.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: integer (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



### write dataframe to csv file

In [44]:
df.write.option("header", True).csv("zipcodes")

### options 
Other options available quote,escape,nullValue,dateFormat,quoteMode .



In [None]:
 df2.write.options(header = 'True', delimiter=',').csv("zipcodes2.csv")

### saving modes
overwrite\
append\
ignore\
error

In [7]:
df2.write.mode('overwrite').csv("zipcodes")
#==
df2.write.format("csv").mode('overwrite').save("zipcodes")

### Read and write Parquet

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("parquetFile").getOrCreate()
data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1)]
columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data, columns)

In [12]:
df.write.parquet("people.parquet")

In [3]:
###########Read parquet file into DataFrame
parDF = spark.read.parquet("people.parquet")

In [4]:
parDF.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



### append or Overwrite an existing Parquet file

In [16]:
df.write.mode("append").parquet("people.parquet")
df.write.mode("overwrite").parquet("people.parquet")

In [7]:
parDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
+---------+----------+--------+-----+------+------+



### creating a table on parquet File

In [14]:
spark.sql("create temporary view person1 using parquet options (path \"people.parquet\")")
spark.sql("select * from person1").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



### create parquet partition file


In [18]:
df.write.partitionBy("gender", "salary").mode("overwrite").parquet("people2.parquet")

### retrieving from a partitioned parqeut file

In [19]:
parDF2 = spark.read.parquet("people2.parquet/gender=M")
parDF2.show(truncate = False)

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|dob  |salary|
+---------+----------+--------+-----+------+
|Robert   |          |Williams|42114|4000  |
|Michael  |Rose      |        |40288|4000  |
|James    |          |Smith   |36636|3000  |
+---------+----------+--------+-----+------+



### creating a table on Partitioned Parquet file


In [21]:
spark.sql("create temporary view person2 using parquet options (path \"people2.parquet/gender=F\")")
spark.sql("select * from person2").show()

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|   Maria |      Anne|   Jones|39192|  4000|
|      Jen|      Mary|   Brown|     |    -1|
+---------+----------+--------+-----+------+



### lit()

In [22]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("111",50000),("222",60000),("333",40000)]
columns= ["EmpId","Salary"]
df = spark.createDataFrame(data = data, schema = columns)

In [23]:
from pyspark.sql.functions import col, lit
df2 = df.select(col("EmpId"), col("Salary"), lit("1").alias("lit_value1"))
df2.show(truncate = False)

+-----+------+----------+
|EmpId|Salary|lit_value1|
+-----+------+----------+
|111  |50000 |1         |
|222  |60000 |1         |
|333  |40000 |1         |
+-----+------+----------+

