# Kafka Producer (API -> Python -> Kafka )

## 01. 데이터 수집 

In [1]:
%sh
hadoop fs -rm -r -f /user/fastcampus/checkpoint
hadoop fs -rm -r -f /user/fastcampus/sparkData

hadoop fs -mkdir -p /user/fastcampus/checkpoint/structured_streaming
hadoop fs -mkdir /user/fastcampus/sparkData


hadoop fs -ls /user/fastcampus/sparkData
hadoop fs -ls /user/fastcampus/checkpoint

hadoop fs -mkdir /user/fastcampus/checkpoint/structured_streaming/lol_table_text

rm -r /Users/yoonsung/fastcampus/zeppelin/zeppelin-0.10.1-bin-all/spark-warehouse/lol_table_text




# Spark Kafka 연동 (Kafka -> Spark)


In [3]:
%pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate()
ui_url = spark.sparkContext.uiWebUrl
print("Spark UI URL:", ui_url)

In [4]:
%pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')


lol = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ypgg") \
    .option("startingOffsets", "earliest") \
    .load()
    
lol.printSchema()
print(lol.isStreaming)

In [5]:
%pyspark
lol_value = lol.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
                .select("value")

        
lol_value.printSchema()

In [6]:
%pyspark

df_stream_lol = lol_value \
.filter("value is not null") \
.filter("value <> ''") \
.toDF("history")



In [7]:
%pyspark
from pyspark.sql.streaming import DataStreamWriter

# Write stream - HDFS
query_df_stream_lol_hdfs_text = df_stream_lol.selectExpr("CAST(history AS STRING)") \
.writeStream \
.trigger(processingTime = '5 seconds') \
.outputMode("append") \
.format("text") \
.option("checkpointLocation", "/user/fastcampus/checkpoint/structured_streaming/lol_hdfs_text") \
.option("path","/user/fastcampus/sparkData") \
.option("encoding", "utf-8") \
.queryName("query_df_stream_lol_hdfs_text") \
.start()

In [8]:
%pyspark
print(query_df_stream_lol_hdfs_text.status)

In [9]:
%pyspark
query_df_stream_lol_hdfs_text.stop()

In [10]:
%sh

hadoop fs -ls /user/fastcampus/sparkData


In [11]:
%sh
hadoop fs -cat /user/fastcampus/sparkData/part-00000-e5696a29-e015-43ac-a756-6347fe1d95fa-c000.txt

In [12]:
%pyspark
from pyspark.sql.streaming import DataStreamWriter


query_df_stream_lol_table_text = df_stream_lol \
    .writeStream \
    .trigger(processingTime = '5 seconds') \
    .outputMode("append") \
    .option("checkpointLocation", "/user/fastcampus/checkpoint/structured_streaming/lol_table_text") \
    .queryName("query_df_stream_lol_table_text") \
    .toTable("lol_table_text")

In [13]:
%pyspark
query_df_stream_lol_table_text.stop()

In [14]:
%pyspark

print(spark.catalog.listTables())

spark.table("lol_table_text").printSchema()

spark.catalog.refreshTable("lol_table_text")

print(spark.table("lol_table_text").count())

In [15]:
%pyspark
query_df_stream_lol_memory_text = df_stream_lol \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("query_df_stream_lol_memory_text") \
    .start()

In [16]:
%pyspark
print(spark.catalog.listTables())

spark.table("query_df_stream_lol_memory_text").printSchema()


print(spark.table("query_df_stream_lol_memory_text").count())

In [17]:
%pyspark
print(query_df_stream_lol_memory_text.status)

In [18]:
%pyspark

spark.catalog.refreshTable("lol_table_text")

df_all_lol_warehouse = spark.read.json(spark.table("lol_table_text").select("history").rdd.flatMap(lambda x: x))

z.show(df_all_lol_warehouse)


In [19]:
%pyspark
from pyspark.sql.functions import col

df_all_lol_warehouse = df_all_lol_warehouse \
  .withColumn("champion_name", col("championInfo").getItem("championName")) \
  .withColumn("champion_level", col("championInfo").getItem("championLevel")) \
  .withColumn("kill_streak", col("championInfo").getItem("killStreak")) \
  .withColumn("kill", col("championInfo").getItem("kill")) \
  .withColumn("death", col("championInfo").getItem("death")) \
  .withColumn("assist", col("championInfo").getItem("assist")) \
  .withColumn("lane", col("championInfo").getItem("lane")) \
  .withColumn("vision_ward_count", col("championInfo").getItem("visionWardCount")) \
  .withColumn("minion_count", col("championInfo").getItem("minionCOunt")) \
  .withColumn("spell_1", col("championInfo").getItem("spells").getItem(0).getItem(0)) \
  .withColumn("spell_2", col("championInfo").getItem("spells").getItem(1).getItem(0)) \
  .withColumn("items_1", col("championInfo").getItem("items").getItem(0)) \
  .withColumn("items_2", col("championInfo").getItem("items").getItem(1)) \
  .withColumn("items_3", col("championInfo").getItem("items").getItem(2)) \
  .withColumn("items_4", col("championInfo").getItem("items").getItem(3)) \
  .withColumn("items_5", col("championInfo").getItem("items").getItem(4)) \
  .withColumn("items_6", col("championInfo").getItem("items").getItem(5)) \
  .withColumn("items_7", col("championInfo").getItem("items").getItem(6)) \
  .withColumn("primary_rune", col("championInfo").getItem("runes").getItem("primaryRunes").getItem(1)) \
  .withColumn("sub_rune", col("championInfo").getItem("runes").getItem("subRunes").getItem(0))

df_all_lol_warehouse = df_all_lol_warehouse.drop("championInfo")

z.show(df_all_lol_warehouse)


In [20]:
%pyspark
df_all_lol_warehouse.columns

df_all_lol_warehouse = df_all_lol_warehouse.select(df_all_lol_warehouse.fewHoursGame, df_all_lol_warehouse.matchId, df_all_lol_warehouse.gameMode, df_all_lol_warehouse.tier, df_all_lol_warehouse.gameDuration, df_all_lol_warehouse.champion_name, \
                                                    df_all_lol_warehouse.lane, df_all_lol_warehouse.champion_level, df_all_lol_warehouse.kill, df_all_lol_warehouse.death, df_all_lol_warehouse.assist, df_all_lol_warehouse.kill_streak, \
                                                    df_all_lol_warehouse.spell_1, df_all_lol_warehouse.spell_2, df_all_lol_warehouse.primary_rune, df_all_lol_warehouse.sub_rune, \
                                                    df_all_lol_warehouse.items_1, df_all_lol_warehouse.items_2, df_all_lol_warehouse.items_3, df_all_lol_warehouse.items_4, df_all_lol_warehouse.items_5, \
                                                    df_all_lol_warehouse.items_6, df_all_lol_warehouse.items_7, df_all_lol_warehouse.minion_count, df_all_lol_warehouse.vision_ward_count, df_all_lol_warehouse.outCome)
                     

z.show(df_all_lol_warehouse)

In [21]:
%pyspark

df_all_lol_warehouse.filter(df_all_lol_warehouse.gameMode == "CLASSIC") \
                    .dropDuplicates() \
                    .createOrReplaceTempView("lol_agg_table")

In [22]:
%pyspark

lane_count = spark.sql("SELECT lane, count(lane) AS lane_count FROM lol_agg_table GROUP BY lane ORDER BY lane_count desc")

z.show(lane_count)

In [23]:
%pyspark
 
top = spark.sql("SELECT champion_name AS top_champion_name, count(champion_name) AS total_champions FROM lol_agg_table WHERE lane = 'TOP' GROUP BY champion_name ORDER BY total_champions desc LIMIT 10")
jungle = spark.sql("SELECT champion_name AS jungle_champion_name, count(champion_name) AS total_champions FROM lol_agg_table WHERE lane = 'JUNGLE' GROUP BY champion_name ORDER BY total_champions desc LIMIT 10")
middle = spark.sql("SELECT champion_name AS middle_champion_name, count(champion_name) AS total_champions FROM lol_agg_table WHERE lane = 'MIDDLE' GROUP BY champion_name ORDER BY total_champions desc LIMIT 10")
bottom = spark.sql("SELECT champion_name AS bottom_champion_name, count(champion_name) AS total_champions FROM lol_agg_table WHERE lane = 'BOTTOM' GROUP BY champion_name ORDER BY total_champions desc LIMIT 10")
support = spark.sql("SELECT champion_name AS support_champion_name, count(champion_name) AS total_champions FROM lol_agg_table WHERE lane = 'UTILITY' GROUP BY champion_name ORDER BY total_champions desc LIMIT 10")


z.show(top)
z.show(jungle)
z.show(middle)
z.show(bottom)
z.show(support)


In [24]:
%pyspark

from pyspark.sql.functions import round

top = spark.sql("""
    SELECT l.champion_name,  l.total_count AS total_count, v.victory_count AS victory_count, CAST(ROUND((v.victory_count / l.total_count) * 100) AS INTEGER) AS victory_rate
    FROM (
        SELECT champion_name, COUNT(champion_name) AS total_count
        FROM lol_agg_table
        WHERE lane = 'TOP'
        GROUP BY champion_name
    ) l
    JOIN (
        SELECT champion_name, COUNT(champion_name) AS victory_count
        FROM lol_agg_table
        WHERE lane = 'TOP' AND outCome = 'Victory'
        GROUP BY champion_name
    ) v
    ON l.champion_name = v.champion_name
    ORDER BY total_count desc
    LIMIT 10
""")

jungle = spark.sql("""
    SELECT l.champion_name,  l.total_count AS total_count, v.victory_count AS victory_count, CAST(ROUND((v.victory_count / l.total_count) * 100) AS INTEGER) AS victory_rate
    FROM (
        SELECT champion_name, COUNT(champion_name) AS total_count
        FROM lol_agg_table
        WHERE lane = 'JUNGLE'
        GROUP BY champion_name
    ) l
    JOIN (
        SELECT champion_name, COUNT(champion_name) AS victory_count
        FROM lol_agg_table
        WHERE lane = 'JUNGLE' AND outCome = 'Victory'
        GROUP BY champion_name
    ) v
    ON l.champion_name = v.champion_name
    ORDER BY total_count desc
    LIMIT 10
""")

middle = spark.sql("""
    SELECT l.champion_name,  l.total_count AS total_count, v.victory_count AS victory_count, CAST(ROUND((v.victory_count / l.total_count) * 100) AS INTEGER) AS victory_rate
    FROM (
        SELECT champion_name, COUNT(champion_name) AS total_count
        FROM lol_agg_table
        WHERE lane = 'MIDDLE'
        GROUP BY champion_name
    ) l
    JOIN (
        SELECT champion_name, COUNT(champion_name) AS victory_count
        FROM lol_agg_table
        WHERE lane = 'MIDDLE' AND outCome = 'Victory'
        GROUP BY champion_name
    ) v
    ON l.champion_name = v.champion_name
    ORDER BY total_count desc
    LIMIT 10
""")

bottom = spark.sql("""
    SELECT l.champion_name,  l.total_count AS total_count, v.victory_count AS victory_count, CAST(ROUND((v.victory_count / l.total_count) * 100) AS INTEGER) AS victory_rate
    FROM (
        SELECT champion_name, COUNT(champion_name) AS total_count
        FROM lol_agg_table
        WHERE lane = 'BOTTOM'
        GROUP BY champion_name
    ) l
    JOIN (
        SELECT champion_name, COUNT(champion_name) AS victory_count
        FROM lol_agg_table
        WHERE lane = 'BOTTOM' AND outCome = 'Victory'
        GROUP BY champion_name
    ) v
    ON l.champion_name = v.champion_name
    ORDER BY total_count desc
    LIMIT 10
""")

support = spark.sql("""
    SELECT l.champion_name,  l.total_count AS total_count, v.victory_count AS victory_count, CAST(ROUND((v.victory_count / l.total_count) * 100) AS INTEGER) AS victory_rate
    FROM (
        SELECT champion_name, COUNT(champion_name) AS total_count
        FROM lol_agg_table
        WHERE lane = 'UTILITY'
        GROUP BY champion_name
    ) l
    JOIN (
        SELECT champion_name, COUNT(champion_name) AS victory_count
        FROM lol_agg_table
        WHERE lane = 'UTILITY' AND outCome = 'Victory'
        GROUP BY champion_name
    ) v
    ON l.champion_name = v.champion_name
    ORDER BY total_count desc
    LIMIT 10
""")



z.show(top)
z.show(jungle)
z.show(middle)
z.show(bottom)
z.show(support)

In [25]:
%pyspark

from pyspark.sql import functions as F

# items_1에 대한 count
items_1 = spark.sql("""
    SELECT champion_name, items_1 AS item, count(items_1) AS count
    FROM lol_agg_table
    WHERE lane = 'TOP' and outCome = 'Victory'
    GROUP BY champion_name, items_1
""")

# items_2에 대한 count
items_2 = spark.sql("""
    SELECT champion_name, items_2 AS item, count(items_2) AS count
    FROM lol_agg_table
    WHERE lane = 'TOP' and outCome = 'Victory'
    GROUP BY champion_name, items_2
""")

# items_3에 대한 count
items_3 = spark.sql("""
    SELECT champion_name, items_3 AS item, count(items_3) AS count
    FROM lol_agg_table
    WHERE lane = 'TOP' and outCome = 'Victory'
    GROUP BY champion_name, items_3
""")

# items_4에 대한 count
items_4 = spark.sql("""
    SELECT champion_name, items_4 AS item, count(items_4) AS count
    FROM lol_agg_table
    WHERE lane = 'TOP' and outCome = 'Victory'
    GROUP BY champion_name, items_4
""")

# items_5에 대한 count
items_5 = spark.sql("""
    SELECT champion_name, items_5 AS item, count(items_5) AS count
    FROM lol_agg_table
    WHERE lane = 'TOP' and outCome = 'Victory'
    GROUP BY champion_name, items_5
""")

# items_6에 대한 count
items_6 = spark.sql("""
    SELECT champion_name, items_6 AS item, count(items_6) AS count
    FROM lol_agg_table
    WHERE lane = 'TOP' and outCome = 'Victory'
    GROUP BY champion_name, items_6
""")

# 조인 및 합산
joined_items = items_1.join(items_2, ['champion_name', 'item'], 'inner') \
    .join(items_3, ['champion_name', 'item'], 'inner') \
    .join(items_4, ['champion_name', 'item'], 'inner') \
    .join(items_5, ['champion_name', 'item'], 'inner') \
    .join(items_6, ['champion_name', 'item'], 'inner') \
    .select(items_1['champion_name'], items_1['item'], (items_1['count'] + items_2['count'] + items_3['count'] + items_4['count'] + items_5['count'] + items_6['count']).alias('total_count')) \
    .orderBy(F.desc('champion_name'))

# 결과 출력
z.show(joined_items)
