## 1. Pyspark 현재 사양

In [2]:
import findspark

findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="SparkContext")

# SparkContext 버전
print("스파크 컨텍스트 버젼: ", sc.version)

# SparkContext 파이썬 버전
print("Spark Context 파이썬 버전:", sc.pythonVer)

# SparkContext 마스터
print("Spark Context 마스터:", sc.master)


스파크 컨텍스트 버젼:  3.3.4
Spark Context 파이썬 버전: 3.9
Spark Context 마스터: local[*]


## 2. pyspark 환경 설정 및 간단한 예제

In [9]:
import findspark

findspark.init()
# pyspark 환경 빌드 예제
from pyspark.sql import SparkSession

#sparksession 드라이버 프로세스 얻기
#클러스터모드의 경우 master에 local[*] 대신 yarn이 들어간다.
spark = SparkSession.builder.appName("sample").master("local[*]").getOrCreate()

#jupyter환경에서만 가능한 config, .show()메소드를 사용할 필요없이 dataframe만 실행해도,정렬된 프린팅을 해준다.
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)


In [3]:
#간단한 예제 1
df= spark.range(500).toDF("number")
df.select(df["number"]+10)

In [4]:
#간단한 예제 2
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")

Column<'((((someCol + 5) * 200) - 6) < otherCol)'>

In [10]:
spark.stop()

## 3. json 파일 불러오기 실습

In [2]:
import findspark

findspark.init()

from pyspark.sql import SparkSession
sp = SparkSession.builder.appName("file_load").master("local[*]").getOrCreate()

In [107]:
json_file_path = r"C:\Users\psych\GRANDMASTER_01170808.json"
df = sp.read.json(json_file_path)
# df1.printSchema()
# df1.show()

In [108]:
df.printSchema()

root
 |-- mat_info: struct (nullable = true)
 |    |-- info: struct (nullable = true)
 |    |    |-- gameCreation: long (nullable = true)
 |    |    |-- gameDuration: long (nullable = true)
 |    |    |-- gameEndTimestamp: long (nullable = true)
 |    |    |-- gameId: long (nullable = true)
 |    |    |-- gameMode: string (nullable = true)
 |    |    |-- gameName: string (nullable = true)
 |    |    |-- gameStartTimestamp: long (nullable = true)
 |    |    |-- gameType: string (nullable = true)
 |    |    |-- gameVersion: string (nullable = true)
 |    |    |-- mapId: long (nullable = true)
 |    |    |-- participants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- allInPings: long (nullable = true)
 |    |    |    |    |-- assistMePings: long (nullable = true)
 |    |    |    |    |-- assists: long (nullable = true)
 |    |    |    |    |-- baronKills: long (nullable = true)
 |    |    |    |    |-- basicPings: long (nullable

### spark DF에서 데이터 추출하기

row 단위로만 가능함

In [None]:
from pyspark.sql.functions import col,explode, monotonically_increasing_id
# 추출을 원하는 컬럼을 selected_column 선언
selected_column = ["mat_info.info.participants.summonerName"] # ,"mat_info.info.participants.assists","mat_info.info.participants.teamPosition" 
all_columns_df = df.select(selected_column)

# 특정 열에 대해 구성요소를 explode를 통해 array to element 재배열
summon_df = all_columns_df.select(explode("summonerName").alias("summonerName"))
summon_df = summon_df.withColumn("row_id", monotonically_increasing_id()) # 차후 조인을 위해 row_id 정의
# new_df.printSchema()
# summon_df.show()

+----------------+------+
|    summonerName|row_id|
+----------------+------+
|아름다워지고싶요|     0|
|      warangurus|     1|
|        DK Saint|     2|
|        DK Rahel|     3|
|         용병456|     4|
|    General irel|     5|
|             Zac|     6|
|          쏘령관|     7|
|          바이탈|     8|
|                |     9|
|챔프처음해도잘함|    10|
|나는 멍청이 우우|    11|
|      쿠쿠공주님|    12|
|         ADC0315|    13|
|            neuo|    14|
|    No teammates|    15|
|    gnoeiwngonge|    16|
|       HLE Loki1|    17|
|                |    18|
|      비밀인간jk|    19|
+----------------+------+
only showing top 20 rows



### Reduce를 이용하여 반복작업

In [90]:
# 함수 정의

def _make_explode_df(temp_df, column_name):
    temp_df = temp_df.select(explode(f"{column_name}").alias(f"{column_name}"))
    temp_df = temp_df.withColumn("row_id", monotonically_increasing_id())
    dfs.append(temp_df)

In [113]:
from functools import reduce
from pyspark.sql.functions import col,explode, monotonically_increasing_id


# df들을 리스트에 넣는다
dfs = []


# 1. 중복 생성하여 개수를 맞춰줘야하는 컬럼 gameMode, matchId

# gameMode 저장 ------------------------
selected_column = ["mat_info.info.gameMode","mat_info.info.participants.win"] # gameMode - teamposition의 개수에 맞춰 중복 저장
temp1_df = df.select(*selected_column)
temp1_df = temp1_df.select(col("gameMode"), explode("win").alias("win"))
temp1_df = temp1_df.withColumn("row_id", monotonically_increasing_id())
dfs.append(temp1_df)

# matchId 저장 ------------------------
col2 = ["mat_info.metadata.matchId","mat_info.info.participants.teamposition"]
temp2_df = df.select(*col2)
temp2_df = temp2_df.select(col("matchId"), explode("teamposition").alias("teamposition"))
temp2_df = temp2_df.withColumn("row_id", monotonically_increasing_id())
dfs.append(temp2_df)

In [116]:
# mat_info.info.participants 컬럼들
participante_cols = ["championName","championId","assists","kills","deaths"]
for column_name in participante_cols:
    selected_column = [f"mat_info.info.participants.{column_name}"]
    temp_df = df.select(selected_column)
    _make_explode_df(temp_df, column_name)

# mat_info.info.participants.perk 컬럼들
perk_cols = ["defense", "flex","offense"]
for column_name in perk_cols:
    selected_column = [f"mat_info.info.participants.perks.statPerks.{column_name}"]
    temp_df = df.select(selected_column)
    _make_explode_df(temp_df, column_name)


15


In [117]:
# join 함수 정의
def join_dfs(df1, df2):
    return df1.join(df2, "row_id", "inner")

In [118]:
# reduce를 이용해 반복 잡업 실행
reduced_df = reduce(join_dfs, dfs)

# reduce 결과 출력
reduced_df.show(truncate=False)

+------+--------+-----+-------------+------------+------------+----------+-------+-----+------+------------+----------+-------+-----+------+-------+----+-------+
|row_id|gameMode|win  |matchId      |teamposition|championName|championId|assists|kills|deaths|championName|championId|assists|kills|deaths|defense|flex|offense|
+------+--------+-----+-------------+------------+------------+----------+-------+-----+------+------------+----------+-------+-----+------+-------+----+-------+
|0     |CLASSIC |false|KR_6905429491|TOP         |Akali       |84        |0      |3    |4     |Akali       |84        |0      |3    |4     |5002   |5008|5008   |
|1     |CLASSIC |false|KR_6905429491|JUNGLE      |XinZhao     |5         |7      |2    |6     |XinZhao     |5         |7      |2    |6     |5003   |5008|5005   |
|2     |CLASSIC |false|KR_6905429491|MIDDLE      |Leblanc     |7         |4      |5    |7     |Leblanc     |7         |4      |5    |7     |5003   |5008|5005   |
|3     |CLASSIC |false|KR_69

In [None]:
# 스파크 세션 중단
sp.stop()