In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("rdd-dataframe")
        .master("local")
        .getOrCreate()
)
sc = spark.sparkContext

24/07/15 21:55:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark.read.text("../data/movies.csv").show(3)

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+
|               value|
+--------------------+
|movieId,title,genres|
|1,Toy Story (1995...|
|2,Jumanji (1995),...|
+--------------------+
only showing top 3 rows





                                                                                

In [6]:
movie = spark.read.option("sep", ",").option("header", True).csv("../data/movies.csv")
movie.show(3)

[Stage 7:>                                                          (0 + 1) / 1]

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows




                                                                                

In [7]:
## 스키마 출력하기 1
movie.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [32]:
## 스키마 출력하기 2
print(movie.dtypes)

[('movieId', 'string'), ('title', 'string'), ('genres', 'string')]


In [9]:
## Action 1
movie.select("movieId", "title").show(5)

                                                                                

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows



In [33]:
## Action 2
movie.take(5)

                                                                                

[Row(movieId='1', title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId='2', title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId='3', title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId='4', title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId='5', title='Father of the Bride Part II (1995)', genres='Comedy')]

In [40]:
## Action 3
movie.limit(2)

DataFrame[movieId: string, title: string, genres: string]

In [41]:
## sql - 대문자로 변경
movie.selectExpr("upper(title) as upperName").show(3, False)

                                                                                

+-----------------------+
|upperName              |
+-----------------------+
|TOY STORY (1995)       |
|JUMANJI (1995)         |
|GRUMPIER OLD MEN (1995)|
+-----------------------+
only showing top 3 rows



In [11]:
## col 활용하기
from pyspark.sql.functions import col

c1 = col("title")
c2 = col("genres")

movie.select(c1, c2).show(5)

                                                                                

+--------------------+--------------------+
|               title|              genres|
+--------------------+--------------------+
|    Toy Story (1995)|Adventure|Animati...|
|      Jumanji (1995)|Adventure|Childre...|
|Grumpier Old Men ...|      Comedy|Romance|
|Waiting to Exhale...|Comedy|Drama|Romance|
|Father of the Bri...|              Comedy|
+--------------------+--------------------+
only showing top 5 rows



In [12]:
cols = [c1, c2]
movie.select(*cols).show(5)

[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|               title|              genres|
+--------------------+--------------------+
|    Toy Story (1995)|Adventure|Animati...|
|      Jumanji (1995)|Adventure|Childre...|
|Grumpier Old Men ...|      Comedy|Romance|
|Waiting to Exhale...|Comedy|Drama|Romance|
|Father of the Bri...|              Comedy|
+--------------------+--------------------+
only showing top 5 rows




                                                                                

In [16]:
movie.select("title")

DataFrame[title: string]

In [17]:
sc.parallelize(movie.select("title")).count()
# 데이터 분산시키려 할 때 title 열을 직렬화하려고 하면 문제 발생

TypeError: cannot pickle '_thread.RLock' object

In [19]:
titles_rdd = movie.select("title").rdd

## RDD에서 값을 추출하여 리스트로 변환
titles_list = titles_rdd.map(lambda row: row.title).collect()
# collect() 메서드로 RDD의 데이터를 로컬 Python 리스트로 수집

print(titles_list[:10])

[Stage 13:>                                                         (0 + 1) / 1]

['Toy Story (1995)', 'Jumanji (1995)', 'Grumpier Old Men (1995)', 'Waiting to Exhale (1995)', 'Father of the Bride Part II (1995)', 'Heat (1995)', 'Sabrina (1995)', 'Tom and Huck (1995)', 'Sudden Death (1995)', 'GoldenEye (1995)']



                                                                                

In [21]:
titles = sc.parallelize(titles_list)
titles.count()

9742

In [31]:
title_len30 = titles.filter(lambda x: len(x) >= 30).collect()
print(title_len30[:10])
print(len(title_len30))

['Father of the Bride Part II (1995)', 'American President, The (1995)', 'Dracula: Dead and Loving It (1995)', 'Ace Ventura: When Nature Calls (1995)', 'City of Lost Children, The (Cité des enfants perdus, La) (1995)', 'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)', 'Twelve Monkeys (a.k.a. 12 Monkeys) (1995)', 'Cry, the Beloved Country (1995)', 'How to Make an American Quilt (1995)', 'Postman, The (Postino, Il) (1994)']
2580


In [47]:
from pyspark.sql.functions import split, when

# 새로운 컬럼 생성
movie_df = movie.withColumn("title_only", split("title", "(").getItem(0))
movie_df.show(3)

24/07/11 23:37:26 ERROR Executor: Exception in task 0.0 in stage 33.0 (TID 33)1]
java.util.regex.PatternSyntaxException: Unclosed group near index 1
(
	at java.base/java.util.regex.Pattern.error(Pattern.java:2027)
	at java.base/java.util.regex.Pattern.accept(Pattern.java:1877)
	at java.base/java.util.regex.Pattern.group0(Pattern.java:3060)
	at java.base/java.util.regex.Pattern.sequence(Pattern.java:2123)
	at java.base/java.util.regex.Pattern.expr(Pattern.java:2068)
	at java.base/java.util.regex.Pattern.compile(Pattern.java:1782)
	at java.base/java.util.regex.Pattern.<init>(Pattern.java:1428)
	at java.base/java.util.regex.Pattern.compile(Pattern.java:1068)
	at java.base/java.lang.String.split(String.java:2317)
	at org.apache.spark.unsafe.types.UTF8String.split(UTF8String.java:1034)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.ja

Py4JJavaError: An error occurred while calling o305.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 1 times, most recent failure: Lost task 0.0 in stage 33.0 (TID 33) (2e2b731dce0d executor driver): java.util.regex.PatternSyntaxException: Unclosed group near index 1
(
	at java.base/java.util.regex.Pattern.error(Pattern.java:2027)
	at java.base/java.util.regex.Pattern.accept(Pattern.java:1877)
	at java.base/java.util.regex.Pattern.group0(Pattern.java:3060)
	at java.base/java.util.regex.Pattern.sequence(Pattern.java:2123)
	at java.base/java.util.regex.Pattern.expr(Pattern.java:2068)
	at java.base/java.util.regex.Pattern.compile(Pattern.java:1782)
	at java.base/java.util.regex.Pattern.<init>(Pattern.java:1428)
	at java.base/java.util.regex.Pattern.compile(Pattern.java:1068)
	at java.base/java.lang.String.split(String.java:2317)
	at org.apache.spark.unsafe.types.UTF8String.split(UTF8String.java:1034)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.regex.PatternSyntaxException: Unclosed group near index 1
(
	at java.base/java.util.regex.Pattern.error(Pattern.java:2027)
	at java.base/java.util.regex.Pattern.accept(Pattern.java:1877)
	at java.base/java.util.regex.Pattern.group0(Pattern.java:3060)
	at java.base/java.util.regex.Pattern.sequence(Pattern.java:2123)
	at java.base/java.util.regex.Pattern.expr(Pattern.java:2068)
	at java.base/java.util.regex.Pattern.compile(Pattern.java:1782)
	at java.base/java.util.regex.Pattern.<init>(Pattern.java:1428)
	at java.base/java.util.regex.Pattern.compile(Pattern.java:1068)
	at java.base/java.lang.String.split(String.java:2317)
	at org.apache.spark.unsafe.types.UTF8String.split(UTF8String.java:1034)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [56]:
from pyspark.sql.functions import split, col, when, regexp_extract

# 영어만 추출
def extract_english(text):
    return regexp_extract(text, r'[a-zA-Z]+',0)

movie_df = movie.withColumn("title_english", extract_english("title"))
movie_df.show(3)

                                                                                

+-------+--------------------+--------------------+-------------+
|movieId|               title|              genres|title_english|
+-------+--------------------+--------------------+-------------+
|      1|    Toy Story (1995)|Adventure|Animati...|          Toy|
|      2|      Jumanji (1995)|Adventure|Childre...|      Jumanji|
|      3|Grumpier Old Men ...|      Comedy|Romance|     Grumpier|
+-------+--------------------+--------------------+-------------+
only showing top 3 rows



In [66]:
from pyspark.sql.functions import regexp_replace


movie_df = movie.withColumn("title_only", regexp_replace("title", r"\s*\(\d{4}\)", ""))
movie_df.show(3)

                                                                                

+-------+--------------------+--------------------+----------------+
|movieId|               title|              genres|      title_only|
+-------+--------------------+--------------------+----------------+
|      1|    Toy Story (1995)|Adventure|Animati...|       Toy Story|
|      2|      Jumanji (1995)|Adventure|Childre...|         Jumanji|
|      3|Grumpier Old Men ...|      Comedy|Romance|Grumpier Old Men|
+-------+--------------------+--------------------+----------------+
only showing top 3 rows



In [71]:
movie_df.filter("title_only == Grumpier Old Men").show()

ParseException: 
extraneous input 'Men' expecting <EOF>(line 1, pos 27)

== SQL ==
title_only == Grumpier Old Men
---------------------------^^^


In [70]:
movie_df.filter("title_only == 'Grumpier Old Men'").show()
# value에 띄어쓰기가 있는 경우 ''로 묶어주기

                                                                                

+-------+--------------------+--------------+----------------+
|movieId|               title|        genres|      title_only|
+-------+--------------------+--------------+----------------+
|      3|Grumpier Old Men ...|Comedy|Romance|Grumpier Old Men|
+-------+--------------------+--------------+----------------+



In [74]:
movie_df.select('title_only').collect().first()
# .collect()를 사용하면 파이썬 list 형태로 전환됨
# first 앞에는 RDD나 DataFrame 형태가 와야한다

                                                                                

AttributeError: 'list' object has no attribute 'first'

In [82]:
# RDD 형태
print(sc.parallelize(movie_df.select('title_only').collect()).first())

# DataFrame 형태
print(movie_df.select('title_only').first())

                                                                                

Row(title_only='Toy Story')


                                                                                

Row(title_only='Toy Story')


In [95]:
title_only_list = sc.parallelize(movie.select('title').collect()).map(lambda v:v.split("(")[0])
title_only_list.take(5)
# split 메소드를 Row 객체에 직접 적용하면 에러 발생. Row 객체에는 split 메소드 없음

24/07/12 00:24:30 ERROR Executor: Exception in task 0.0 in stage 66.0 (TID 66)1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1571, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'split' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1560, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in 

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 1 times, most recent failure: Lost task 0.0 in stage 66.0 (TID 66) (2e2b731dce0d executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1571, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'split' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1560, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_98/709973511.py", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1576, in __getattr__
    raise AttributeError(item)
AttributeError: split

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1571, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'split' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1560, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_98/709973511.py", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1576, in __getattr__
    raise AttributeError(item)
AttributeError: split

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [96]:
# title_only_list = sc.parallelize(movie.select('title').collect()).map(lambda v:v.title.split("(")[0])
title_only_list = movie.select('title').rdd.map(lambda v:v.title.split("(")[0])
# parallelize & collect 대신에 rdd 로도 가능

title_only_list.take(5)
# title_only_list.show(5)
# RDD에서 show 사용 x

                                                                                

['Toy Story ',
 'Jumanji ',
 'Grumpier Old Men ',
 'Waiting to Exhale ',
 'Father of the Bride Part II ']

In [3]:
ratings = spark.read.option("sep", ",").option("header", True).csv("../data/ratings.csv")
ratings.show(3)

                                                                                

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



In [5]:
## 스키마 출력하기
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [6]:
## 특정 col 고유 value 개수
rating_count = ratings.select("rating").distinct().count()
print(f"rating 의 고유한 값의 개수: {rating_count}")

                                                                                

rating 의 고유한 값의 개수: 10


In [9]:
ratings.select("rating").distinct().take(10)

                                                                                

[Row(rating='1.0'),
 Row(rating='4.5'),
 Row(rating='2.5'),
 Row(rating='3.5'),
 Row(rating='5.0'),
 Row(rating='0.5'),
 Row(rating='4.0'),
 Row(rating='1.5'),
 Row(rating='2.0'),
 Row(rating='3.0')]

In [19]:
## rating별 개수 세기
unique_counts = ratings.groupBy("rating").agg({"rating": "count"}).withColumnRenamed("name", "unique_count")
unique_counts.show()

                                                                                

+------+-------------+
|rating|count(rating)|
+------+-------------+
|   1.0|         2811|
|   4.5|         8551|
|   2.5|         5550|
|   3.5|        13136|
|   5.0|        13211|
|   0.5|         1370|
|   4.0|        26818|
|   1.5|         1791|
|   2.0|         7551|
|   3.0|        20047|
+------+-------------+



In [None]:
unique_counts.orderBy(unique_counts["count(rating)"].desc()).show()

In [15]:
from pyspark.sql.functions import from_unixtime


## timestamp 읽을 수 있는 시간으로 변경
df_readable_time = ratings.withColumn("readable_time", from_unixtime(ratings["timestamp"]))
df_readable_time.show(5)

                                                                                

+------+-------+------+---------+-------------------+
|userId|movieId|rating|timestamp|      readable_time|
+------+-------+------+---------+-------------------+
|     1|      1|   4.0|964982703|2000-07-30 18:45:03|
|     1|      3|   4.0|964981247|2000-07-30 18:20:47|
|     1|      6|   4.0|964982224|2000-07-30 18:37:04|
|     1|     47|   5.0|964983815|2000-07-30 19:03:35|
|     1|     50|   5.0|964982931|2000-07-30 18:48:51|
+------+-------+------+---------+-------------------+
only showing top 5 rows



In [18]:
df_readable_time = df_with_readable_time.drop('timestamp')
df_with_readable_time.show(5)

                                                                                

+------+-------+------+-------------------+
|userId|movieId|rating|      readable_time|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows

