<a href="https://colab.research.google.com/github/URBAN-IA/URBAN-IA/blob/main/Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark




In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab PySpark") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()


In [4]:
data = [("James", "Sales", 3000), ("Michael", "Sales", 4600),
        ("Robert", "Sales", 4100), ("Maria", "Finance", 3000)]

columns = ["Employee_Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()

df.groupBy("Department").sum("Salary").show()


+-------------+----------+------+
|Employee_Name|Department|Salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|      11700|
|   Finance|       3000|
+----------+-----------+



In [5]:

# SparkSession 생성
spark = SparkSession.builder \
    .appName("PySpark Tutorial") \
    .getOrCreate()

# 샘플 데이터 생성
data = [("Alice", 34, "HR"), ("Bob", 45, "Finance"), ("Cathy", 29, "IT"), ("David", 30, "Finance")]
columns = ["Name", "Age", "Department"]

df = spark.createDataFrame(data, schema=columns)


### 1. `show()`와 `printSchema()`

- **`show()`**: DataFrame의 내용을 표 형태로 보여줍니다. 기본적으로 20개의 행을 출력하며, 더 많은 행을 보려면 `df.show(n)`처럼 인수를 전달할 수 있습니다.
- **`printSchema()`**: DataFrame의 스키마(컬럼 이름과 데이터 타입)를 출력합니다.


In [6]:
df.show()
df.printSchema()


+-----+---+----------+
| Name|Age|Department|
+-----+---+----------+
|Alice| 34|        HR|
|  Bob| 45|   Finance|
|Cathy| 29|        IT|
|David| 30|   Finance|
+-----+---+----------+

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Department: string (nullable = true)




### 2. `where()`와 `filter()`

특정 조건을 만족하는 행만 선택할 때 사용합니다. `where()`와 `filter()`는 동일한 기능을 제공합니다.

In [7]:
# Age가 30 이상인 행 선택
df.where(df.Age >= 30).show()
df.filter(df.Age >= 30).show()


+-----+---+----------+
| Name|Age|Department|
+-----+---+----------+
|Alice| 34|        HR|
|  Bob| 45|   Finance|
|David| 30|   Finance|
+-----+---+----------+

+-----+---+----------+
| Name|Age|Department|
+-----+---+----------+
|Alice| 34|        HR|
|  Bob| 45|   Finance|
|David| 30|   Finance|
+-----+---+----------+




### 3. `select()`와 `drop()`

- **`select()`**: 특정 컬럼만 선택할 때 사용합니다.
- **`drop()`**: 특정 컬럼을 제거할 때 사용합니다.


In [8]:
# Name과 Age 컬럼만 선택
df.select("Name", "Age").show()

# Department 컬럼 제거
df.drop("Department").show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
|David| 30|
+-----+---+

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|Cathy| 29|
|David| 30|
+-----+---+




### 4. `join()`

다른 DataFrame과 조인할 때 사용합니다. 기본적으로 `inner` 조인을 수행하며, `left`, `right`, `outer` 조인도 가능합니다.

In [9]:
# 예제 데이터 생성
data2 = [("HR", "New York"), ("Finance", "San Francisco"), ("IT", "Seattle")]
columns2 = ["Department", "Location"]
df2 = spark.createDataFrame(data2, schema=columns2)

# Department 컬럼을 기준으로 조인
df.join(df2, on="Department", how="inner").show()


+----------+-----+---+-------------+
|Department| Name|Age|     Location|
+----------+-----+---+-------------+
|   Finance|  Bob| 45|San Francisco|
|   Finance|David| 30|San Francisco|
|        HR|Alice| 34|     New York|
|        IT|Cathy| 29|      Seattle|
+----------+-----+---+-------------+




### 5. `persist()`와 `cache()`

데이터를 메모리에 저장하여 반복 작업 시 성능을 높입니다. `cache()`는 메모리에만 저장하는 반면, `persist()`는 저장소 레벨을 지정하여 메모리 외에도 디스크에 저장할 수 있습니다.

In [10]:
# 캐싱
df.cache()

# 또는 메모리에 저장
df.persist()


DataFrame[Name: string, Age: bigint, Department: string]

In [None]:

### 6. `groupBy()`, `distinct()`, `count()`

- **`groupBy()`**: 특정 컬럼을 기준으로 그룹화하여 집계 작업을 수행할 수 있습니다.
- **`distinct()`**: 중복을 제거하고 고유한 행만 남깁니다.
- **`count()`**: 행의 개수를 계산합니다.

In [11]:
# Department별 평균 나이 계산
df.groupBy("Department").mean("Age").show()

# 중복 제거
df.distinct().show()

# 행 개수 세기
df.count()


+----------+--------+
|Department|avg(Age)|
+----------+--------+
|        HR|    34.0|
|   Finance|    37.5|
|        IT|    29.0|
+----------+--------+

+-----+---+----------+
| Name|Age|Department|
+-----+---+----------+
|  Bob| 45|   Finance|
|Alice| 34|        HR|
|Cathy| 29|        IT|
|David| 30|   Finance|
+-----+---+----------+



4

In [None]:
spark.stop()
