In [1]:
collection_data = "Spark The Definitive Guide: Big Data Processing Made Simple"

In [2]:
supplementData = {
    "Spark":1000, "Definitive":200, "Big":300, "Simple":100
}

In [3]:
# 

In [4]:
suppBroadcast = spark.sparkContext.broadcast(supplementData)

In [5]:
suppBroadcast.value

{'Spark': 1000, 'Definitive': 200, 'Big': 300, 'Simple': 100}

In [6]:
words = spark.sparkContext.parallelize(collection_data.split(), 2) #RDD형태로 변경, 파티션 수는 2개

In [7]:
words.map(lambda word : (word, suppBroadcast.value.get(word, 0))).sortBy(lambda x:x[1], ascending=False).collect()

                                                                                

[('Spark', 1000),
 ('Big', 300),
 ('Definitive', 200),
 ('Simple', 100),
 ('The', 0),
 ('Guide:', 0),
 ('Data', 0),
 ('Processing', 0),
 ('Made', 0)]

In [8]:
flight = spark.read.parquet('/root/spark-3.5.1/flight_data/parquet/24.03.20_spark.gz.parquet')

                                                                                

In [9]:
flight.show(2) #분할된 표를 볼때는 show


                                                                                

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
+-----------------+-------------------+-----+
only showing top 2 rows



In [10]:
# 항공표 추적하기

In [11]:
# 한국 관련된 항공편
# sql 사용
flight.createOrReplaceTempView('flight') #임시뷰 생성
spark.sql("select * from flight where ORIGIN_COUNTRY_NAME like '%Korea%' or DEST_COUNTRY_NAME like '%Korea%'").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|        South Korea|  621|
|      South Korea|      United States|  683|
+-----------------+-------------------+-----+



In [12]:
# 함수 사용
from pyspark.sql.functions import col, lower
flight.filter(lower(col('ORIGIN_COUNTRY_NAME')).like('%kor%') | lower(col('DEST_COUNTRY_NAME')).like('%kor%')).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|        South Korea|  621|
|      South Korea|      United States|  683|
+-----------------+-------------------+-----+



In [13]:
#Accumulator 분산 환경에서 공유되는 변수 - 여러 작업에서 공통적으로 사용되는 값을 수집
#읽기 전용
#클러스터의 각 노드에 값을 추가, 로컬ㅇ[서느느 변경할 수 없움
# 디버깅을 정보수집이나 

In [14]:
# 세션 새로 만들기

In [15]:
from pyspark.sql import SparkSession

In [16]:
new_spartk = SparkSession.builder.appName("new spark session").master("local[*]")

In [17]:
new_spartk

<pyspark.sql.session.SparkSession.Builder at 0x7f56a5ab36a0>

In [27]:
spark.read.options(header = 'true', inferSchema = 'true').csv('/root/spark-3.5.1/online-dataset.csv').repartition(2)\
.selectExpr("instr(Description, 'GLASS') >= 1 as is_glass")\
.groupBy("is_glass").count().show()

[Stage 33:>                                                         (0 + 1) / 1]

+--------+------+
|is_glass| count|
+--------+------+
|    NULL|  1454|
|    true| 12861|
|   false|527594|
+--------+------+



                                                                                

In [28]:
# cashing --- 성능 튜닝

In [55]:
from datetime import datetime

In [70]:
origin_file_path = '/root/spark-3.5.1/bydata/by-day/*.csv'
df_1 = spark.read.options(header = 'true', inferSchema = 'true').csv(origin_file_path)

                                                                                

In [51]:
df_1.show(2)

+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084| RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077|DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [52]:
# df2 Country로 묶어서 count해서 df2에 저장
df_1.createOrReplaceTempView('df1')
df2 = spark.sql("select Country, count(Country) from df1 group by Country")
df3 = spark.sql("select CustomerID, count(CustomerID) from df1 group by CustomerID")
df4 = spark.sql("select InvoiceNo, count(InvoiceNo) from df1 group by InvoiceNo")

In [71]:
# 함수 사용
# 캐싱전
def getCountGroupby(dataframe, colname):
    return dataframe.groupBy(colname).count().show(5)

print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

getCountGroupby(df_1, 'Country')
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

getCountGroupby(df_1, 'CustomerID')
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

getCountGroupby(df_1, 'InvoiceNo')
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

시간 :  2024-03-26 12:39:56
+---------+-----+
|  Country|count|
+---------+-----+
|   Sweden|  462|
|Singapore|  229|
|  Germany| 9495|
|      RSA|   58|
|   France| 8557|
+---------+-----+
only showing top 5 rows

시간 :  2024-03-26 12:39:57
+----------+-----+
|CustomerID|count|
+----------+-----+
|   14452.0|   62|
|   16916.0|  143|
|   17633.0|   72|
|   14768.0|    6|
|   13094.0|   30|
+----------+-----+
only showing top 5 rows

시간 :  2024-03-26 12:39:57
+---------+-----+
|InvoiceNo|count|
+---------+-----+
|   574966|    8|
|   575091|   38|
|   578057|   28|
|   537252|    1|
|   578459|    8|
+---------+-----+
only showing top 5 rows

시간 :  2024-03-26 12:39:57


In [69]:
def getCountGroupby(dataframe, colname):
    result = dataframe.groupBy(colname).count().cache()  # 캐싱
    result.show(5)  # 결과 출력
    return result  # 캐싱된 결과 반환
    
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

getCountGroupby(df1, 'Country')
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

getCountGroupby(df1, 'CustomerID')
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

getCountGroupby(df1, 'InvoiceNo')
print("시간 : ", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

시간 :  2024-03-26 12:37:19


24/03/26 12:37:19 WARN CacheManager: Asked to cache already cached data.
24/03/26 12:37:21 WARN CacheManager: Asked to cache already cached data.        


+---------+-----+
|  Country|count|
+---------+-----+
|   Sweden|  462|
|Singapore|  229|
|  Germany| 9495|
|      RSA|   58|
|   France| 8557|
+---------+-----+
only showing top 5 rows

시간 :  2024-03-26 12:37:21
+----------+-----+
|CustomerID|count|
+----------+-----+
|   14452.0|   62|
|   16916.0|  143|
|   17633.0|   72|
|   14768.0|    6|
|   13094.0|   30|
+----------+-----+
only showing top 5 rows

시간 :  2024-03-26 12:37:21
+---------+-----+
|InvoiceNo|count|
+---------+-----+
|   574966|    8|
|   575091|   38|
|   578057|   28|
|   537252|    1|
|   578459|    8|
+---------+-----+
only showing top 5 rows

시간 :  2024-03-26 12:37:21


24/03/26 12:37:21 WARN CacheManager: Asked to cache already cached data.


In [54]:
df2.show(10)
df3.show(10)
df4.show(10)

                                                                                

+-----------+--------------+
|    Country|count(Country)|
+-----------+--------------+
|     Sweden|           462|
|    Germany|          9495|
|     France|          8557|
|     Greece|           146|
|    Belgium|          2069|
|    Finland|           695|
|      Malta|           127|
|Unspecified|           446|
|      Italy|           803|
|       EIRE|          8196|
+-----------+--------------+
only showing top 10 rows



                                                                                

+----------+-----------------+
|CustomerID|count(CustomerID)|
+----------+-----------------+
|   14452.0|               62|
|   16916.0|              143|
|   17633.0|               72|
|   14768.0|                6|
|   13094.0|               30|
|   17884.0|              117|
|   16596.0|               12|
|   15145.0|               67|
|   16858.0|               13|
|   13160.0|                4|
+----------+-----------------+
only showing top 10 rows





+---------+----------------+
|InvoiceNo|count(InvoiceNo)|
+---------+----------------+
|   574966|               8|
|   575091|              38|
|   578057|              28|
|   537252|               1|
|   578459|               8|
|  C578132|               1|
|   578292|              72|
|   576112|              20|
|   577022|              38|
|   574592|               8|
+---------+----------------+
only showing top 10 rows



                                                                                

## activity-data 이용

In [80]:
!unzip '/root/spark-3.5.1/a-data.zip'

Archive:  /root/spark-3.5.1/a-data.zip
  inflating: activity-data/_committed_730451297822678341  
 extracting: activity-data/_started_730451297822678341  
 extracting: activity-data/_SUCCESS  
  inflating: activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-data/part-00001-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-data/part-00002-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-data/part-00003-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-data/part-00004-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-data/part-00005-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-data/part-00006-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json  
  inflating: activity-d

In [3]:
spark.conf.set('spark.sql.shuffle.partitions',5)

In [7]:
static = spark.read.json('/root/spark-3.5.1/activity-data')

                                                                                

In [8]:
streaming = spark.readStream.schema(static.schema).option('maxFilePerTrigger', 10).json('/root/spark-3.5.1/activity-data')

In [10]:
static.show(2)

                                                                                

+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
+-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
only showing top 2 rows



In [19]:
eventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

In [13]:
sc.uiWebUrl.split(':')[-1]

'4041'

In [20]:
from pyspark.sql.functions import col, window
eventTime.groupBy(window(col("event_time"),"10 minutes")).count().writeStream.queryName("pyevent_per_window").format("memory")\
.outputMode("complete").start()

24/03/26 13:23:33 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e22c8fda-cd9f-450c-ae9e-57ac5a950a61. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/26 13:23:34 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f010edc71f0>



In [21]:
eventTime.groupBy(window(col("event_time"),"10 minutes", "5 minutes")).count().writeStream.queryName("pyevent_per_window2").format("memory")\
.outputMode("complete").start()

24/03/26 13:24:16 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9b456a95-b82b-4f43-8d24-707bb409bce8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/26 13:24:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f010dd67220>

                                                                                

In [25]:
eventTime.withWatermark("event_time", "30 minutes")\
.groupBy(window(col("event_time"),"10 minutes", "5 minutes"))\
.count().writeStream.queryName("pyevent_per_window3").format("memory").outputMode("complete").start()

24/03/26 13:28:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-78657cde-b988-48eb-8f5e-fee46994fd8b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/03/26 13:28:31 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f010edd6700>



In [30]:
# eventTime.withWatermark("event_time", "5 seconds")\
# .dropDuplicates(["User", "event_time"])\
# .groupBy("User")\
# .count().writeStream.queryName("pyevent_per_window4").format("memory").outputMode("complete").start()

eventTime.withWatermark("event_time", "5 seconds")\
.dropDuplicates(["User", "event_time"])\
.groupBy("User")\
.count().writeStream.queryName("pyevent_per_window4").format("memory").outputMode("complete").stop()

KeyboardInterrupt: 