## PySpark 설치

In [3]:
%pip install pyspark==3.5.6
%pip install wget

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [None]:
import os, wget
from pyspark.sql import SparkSession

**Spark Session**: SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다. SparkSession을 이용해 RDD, 데이터 프레임등을 만든다. SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다

- local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executor로 동작한다

In [7]:
# 1) 로컬 호스트 IP 구하기
host_ip = "127.0.0.1"  # 또는 socket.gethostbyname(socket.gethostname())

# 2) 환경변수 지정
os.environ['SPARK_LOCAL_IP']           = host_ip
os.environ['PYSPARK_PYTHON']           = os.sys.executable
os.environ['PYSPARK_DRIVER_PYTHON']    = os.sys.executable

# 3) SparkSession 생성 시 config 추가
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('PySpark Tutorial')\
    .config("spark.driver.host", host_ip) \
    .config("spark.driver.bindAddress", host_ip) \
    .getOrCreate()

In [2]:
print("Driver Host:", spark.sparkContext.getConf().get("spark.driver.host"))
print("Bind Address:", spark.sparkContext.getConf().get("spark.driver.bindAddress"))
print("SPARK_LOCAL_IP   :", os.environ.get("SPARK_LOCAL_IP", None))

Driver Host: 127.0.0.1
Bind Address: 127.0.0.1
SPARK_LOCAL_IP   : 127.0.0.1


In [3]:
import socket

# 1) Spark 설정에 남아 있는 드라이버 호스트
host_conf = spark.sparkContext.getConf().get("spark.driver.host", None)

# 2) Java가 기본으로 뽑아낸 로컬 호스트 IP
local_ip = socket.gethostbyname(socket.gethostname())

print("spark.driver.host conf :", host_conf)
print("InetAddress.getLocalHost():", local_ip)

spark.driver.host conf : 127.0.0.1
InetAddress.getLocalHost(): 192.168.45.190


In [4]:
spark

In [7]:
import platform
import psutil

print("Architecture:", platform.architecture()[0])
print("Processor:", platform.processor())
print("Cores (Physical):", psutil.cpu_count(logical=False))
print("Threads (Logical):", psutil.cpu_count(logical=True))

Architecture: 64bit
Processor: Intel64 Family 6 Model 189 Stepping 1, GenuineIntel
Cores (Physical): 8
Threads (Logical): 8


In [8]:
!powershell "Get-CimInstance -ClassName Win32_OperatingSystem | Select-Object TotalVisibleMemorySize"


TotalVisibleMemorySize
----------------------
              33070452




## Python <> RDD <> DataFrame

#### Python 객체를 RDD로 변환해보기

##### 1> Python 리스트 생성

In [9]:
name_list_json = [ '{"name": "chaeyeon"}',
                  '{"name": "yeonsu"}',
                  '{"name": "somsom"}']

In [10]:
for n in name_list_json:
    print(n)

{"name": "chaeyeon"}
{"name": "yeonsu"}
{"name": "somsom"}


##### 2> 파이썬 리스트를 RDD로 변환
- RDD로 변환되는 순간 Spark 클러스터들의 서버들에 데이터가 나눠 저장됨(파티션)

In [11]:
rdd = spark.sparkContext.parallelize(name_list_json)

In [12]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [13]:
rdd.count()

3

In [14]:
import json

parsed_rdd = rdd.map(lambda el:json.loads(el))

In [15]:
parsed_rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

In [16]:
parsed_rdd.collect()

[{'name': 'chaeyeon'}, {'name': 'yeonsu'}, {'name': 'somsom'}]

In [17]:
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"])

In [18]:
parsed_name_rdd.collect()

['chaeyeon', 'yeonsu', 'somsom']

##### 파이썬 리스트를 데이터프레임으로 변환하기

In [19]:
from pyspark.sql.types import StringType

df = spark.createDataFrame(name_list_json, StringType())

In [20]:
df.count()

3

In [21]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [22]:
df.select('*').collect()

[Row(value='{"name": "chaeyeon"}'),
 Row(value='{"name": "yeonsu"}'),
 Row(value='{"name": "somsom"}')]

RDD를 DataFrame으로 변환해보는 예제: 앞서 parsed_rdd를 DataFrame으로 변환해보자

In [23]:
df_parsed_rdd = parsed_rdd.toDF()

In [24]:
df_parsed_rdd.printSchema()

root
 |-- name: string (nullable = true)



In [25]:
df_parsed_rdd.select('name').collect()

[Row(name='chaeyeon'), Row(name='yeonsu'), Row(name='somsom')]

### Spark 데이터프레임으로 로드해보기

In [22]:
save_path = 'data/name_gender.csv'
download_path = 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'

if not os.path.exists(save_path):
    wget.download(download_path, out=save_path)

In [9]:
name_gender_csv = save_path
df = spark.read.csv(name_gender_csv)
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)



In [10]:
df = spark.read.option("header", True).csv(name_gender_csv)
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)



In [11]:
df.show()

+----------+------+
|      name|gender|
+----------+------+
|  Adaleigh|     F|
|     Amryn|Unisex|
|    Apurva|Unisex|
|    Aryion|     M|
|    Alixia|     F|
|Alyssarose|     F|
|    Arvell|     M|
|     Aibel|     M|
|   Atiyyah|     F|
|     Adlie|     F|
|    Anyely|     F|
|    Aamoni|     F|
|     Ahman|     M|
|    Arlane|     F|
|   Armoney|     F|
|   Atzhiry|     F|
| Antonette|     F|
|   Akeelah|     F|
| Abdikadir|     M|
|    Arinze|     M|
+----------+------+
only showing top 20 rows



In [12]:
df.head(5)

[Row(name='Adaleigh', gender='F'),
 Row(name='Amryn', gender='Unisex'),
 Row(name='Apurva', gender='Unisex'),
 Row(name='Aryion', gender='M'),
 Row(name='Alixia', gender='F')]

In [13]:
df.groupby(["gender"]).count().collect()

[Row(gender='F', count=65),
 Row(gender='M', count=28),
 Row(gender='Unisex', count=7)]

In [14]:
df.rdd.getNumPartitions()

1

데이터프레임을 테이블뷰로 만들어서 SparkSQL로 처리해보기

In [15]:
df.createOrReplaceTempView("namegender")

In [16]:
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")

In [17]:
namegender_group_df.collect()

[Row(gender='F', count(1)=65),
 Row(gender='M', count(1)=28),
 Row(gender='Unisex', count(1)=7)]

In [18]:
spark.catalog.listTables()

[Table(name='namegender', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

Partition의 수 계산해보기

In [19]:
namegender_group_df.rdd.getNumPartitions()

1

In [20]:
two_namegender_group_df = namegender_group_df.repartition(2)

In [21]:
two_namegender_group_df.rdd.getNumPartitions()

2

In [None]:
spark.stop()