In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=e53252e5310cf90a0d903c57197900bc516fcc86c969a80b883937cc3ce838d7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, udf, lag, lead, window, round
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder \
    .appName("Twitch Streamers Analysis") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/25 11:55:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
data_path = "/kaggle/input/top-1000-twitch-streamers-data-may-2024/datasetV2.csv"
df = spark.read.option("header", "true").csv(data_path)

                                                                                

In [5]:
df

DataFrame[RANK: string, NAME: string, LANGUAGE: string, TYPE: string, MOST_STREAMED_GAME: string, 2ND_MOST_STREAMED_GAME: string, AVERAGE_STREAM_DURATION: string, FOLLOWERS_GAINED_PER_STREAM: string, AVG_VIEWERS_PER_STREAM: string, AVG_GAMES_PER_STREAM: string, TOTAL_TIME_STREAMED: string, TOTAL_FOLLOWERS: string, TOTAL_VIEWS: string, TOTAL_GAMES_STREAMED: string, ACTIVE_DAYS_PER_WEEK: string, MOST_ACTIVE_DAY: string, DAY_WITH_MOST_FOLLOWERS_GAINED: string]

In [6]:
df.columns

['RANK',
 'NAME',
 'LANGUAGE',
 'TYPE',
 'MOST_STREAMED_GAME',
 '2ND_MOST_STREAMED_GAME',
 'AVERAGE_STREAM_DURATION',
 'FOLLOWERS_GAINED_PER_STREAM',
 'AVG_VIEWERS_PER_STREAM',
 'AVG_GAMES_PER_STREAM',
 'TOTAL_TIME_STREAMED',
 'TOTAL_FOLLOWERS',
 'TOTAL_VIEWS',
 'TOTAL_GAMES_STREAMED',
 'ACTIVE_DAYS_PER_WEEK',
 'MOST_ACTIVE_DAY',
 'DAY_WITH_MOST_FOLLOWERS_GAINED']

In [7]:
df.count()

999

In [8]:
df.show(1)

+----+--------+--------+-----------+------------------+----------------------+-----------------------+---------------------------+----------------------+--------------------+-------------------+---------------+-----------+--------------------+--------------------+---------------+------------------------------+
|RANK|    NAME|LANGUAGE|       TYPE|MOST_STREAMED_GAME|2ND_MOST_STREAMED_GAME|AVERAGE_STREAM_DURATION|FOLLOWERS_GAINED_PER_STREAM|AVG_VIEWERS_PER_STREAM|AVG_GAMES_PER_STREAM|TOTAL_TIME_STREAMED|TOTAL_FOLLOWERS|TOTAL_VIEWS|TOTAL_GAMES_STREAMED|ACTIVE_DAYS_PER_WEEK|MOST_ACTIVE_DAY|DAY_WITH_MOST_FOLLOWERS_GAINED|
+----+--------+--------+-----------+------------------+----------------------+-----------------------+---------------------------+----------------------+--------------------+-------------------+---------------+-----------+--------------------+--------------------+---------------+------------------------------+
|   1|kaicenat| English|personality|     Just Chatting|     I'm 

In [9]:
# Data type conversion for numeric columns

numeric_columns = ['RANK', 'AVERAGE_STREAM_DURATION', 'FOLLOWERS_GAINED_PER_STREAM',
                   'AVG_VIEWERS_PER_STREAM', 'AVG_GAMES_PER_STREAM', 'TOTAL_TIME_STREAMED',
                   'TOTAL_FOLLOWERS', 'TOTAL_VIEWS', 'TOTAL_GAMES_STREAMED', 'ACTIVE_DAYS_PER_WEEK']
for column in numeric_columns:
    df = df.withColumn(column, col(column).cast("double"))

In [10]:
df

DataFrame[RANK: double, NAME: string, LANGUAGE: string, TYPE: string, MOST_STREAMED_GAME: string, 2ND_MOST_STREAMED_GAME: string, AVERAGE_STREAM_DURATION: double, FOLLOWERS_GAINED_PER_STREAM: double, AVG_VIEWERS_PER_STREAM: double, AVG_GAMES_PER_STREAM: double, TOTAL_TIME_STREAMED: double, TOTAL_FOLLOWERS: double, TOTAL_VIEWS: double, TOTAL_GAMES_STREAMED: double, ACTIVE_DAYS_PER_WEEK: double, MOST_ACTIVE_DAY: string, DAY_WITH_MOST_FOLLOWERS_GAINED: string]

In [11]:
# Data filtering for streamers whose data was collected in English

english_streamers_df = df.filter(col("LANGUAGE") == "English")
english_streamers_df.count()

401

In [12]:
# Calculate the average number of subscribers per streamer depending on type

avg_followers_by_type = df.groupBy("TYPE").agg(avg("TOTAL_FOLLOWERS").
                        alias("avg_followers")).orderBy(desc("avg_followers"))
avg_followers_by_type.show()

+-----------+------------------+
|       TYPE|     avg_followers|
+-----------+------------------+
|    esports|1067475.4210526317|
|personality| 910443.5668789808|
+-----------+------------------+



In [13]:
# Filter data for streamers with the largest subscribers

top_streamers_df = df.filter(col("TOTAL_FOLLOWERS") > 1000000)
top_streamers_df.select(top_streamers_df.columns[:5]).show(10)

+----+----------+--------+-----------+--------------------+
|RANK|      NAME|LANGUAGE|       TYPE|  MOST_STREAMED_GAME|
+----+----------+--------+-----------+--------------------+
| 1.0|  kaicenat| English|personality|       Just Chatting|
| 2.0|    jynxzi| English|personality|Tom Clancy's Rain...|
| 4.0|   caseoh_| English|personality|            NBA 2K23|
| 5.0|      ibai| Spanish|personality|       Just Chatting|
| 6.0| auronplay| Spanish|personality|           Minecraft|
| 7.0|   zerator|  French|personality|   World of Warcraft|
| 8.0|     tarik| English|personality|            VALORANT|
| 9.0| riotgames| English|    esports|   League of Legends|
|10.0|papaplatte|  German|personality|       Just Chatting|
|12.0|aminematue|  French|personality|  Grand Theft Auto V|
+----+----------+--------+-----------+--------------------+
only showing top 10 rows



In [14]:
# Calculate the average number of viewers per stream depending on the type of streamer

avg_viewers_by_type = df.groupBy("MOST_STREAMED_GAME").agg(avg("AVG_VIEWERS_PER_STREAM").
                        alias("avg_viewers")).orderBy(desc("avg_viewers"))
avg_viewers_by_type.show(10)

+------------------+------------------+
|MOST_STREAMED_GAME|       avg_viewers|
+------------------+------------------+
|          NBA 2K22|          211701.0|
|    Special Events|          113156.0|
|           FIFA 21| 73795.33333333333|
|             Chess|           71906.0|
|         Destiny 2|62148.857142857145|
|     Rocket League| 56851.57142857143|
|         Overwatch| 56038.42857142857|
|           F1 2019|           42554.0|
| World of Warcraft|           40071.0|
| League of Legends|37957.607142857145|
+------------------+------------------+
only showing top 10 rows



In [15]:
# Define a user-defined function (UDF) to categorize streamers by number of subscribers

def categorize_followers(total_followers):
    if total_followers > 500000:
        return "Popular"
    else:
        return "Unpopular"

In [16]:
# Registering UDFs with Spark
categorize_followers_udf = udf(categorize_followers, StringType())

In [17]:
# Apply UDF to create a new column with streamer category
df_with_category = df.withColumn("FOLLOWERS_CATEGORY", categorize_followers_udf(col("TOTAL_FOLLOWERS")))

In [18]:
df_with_category.select("NAME", "TOTAL_FOLLOWERS", "FOLLOWERS_CATEGORY").show(20)

[Stage 15:>                                                         (0 + 1) / 1]

+----------------+---------------+------------------+
|            NAME|TOTAL_FOLLOWERS|FOLLOWERS_CATEGORY|
+----------------+---------------+------------------+
|        kaicenat|         1.06E7|           Popular|
|          jynxzi|      5760000.0|           Popular|
|         caedrel|       797000.0|           Popular|
|         caseoh_|      4220000.0|           Popular|
|            ibai|         1.56E7|           Popular|
|       auronplay|         1.63E7|           Popular|
|         zerator|      1570000.0|           Popular|
|           tarik|      3110000.0|           Popular|
|       riotgames|      6860000.0|           Popular|
|      papaplatte|      2410000.0|           Popular|
|dota2_paragon_ru|       352000.0|         Unpopular|
|      aminematue|      2590000.0|           Popular|
|kato_junichi0817|       939000.0|           Popular|
|       fps_shaka|      1420000.0|           Popular|
|        illojuan|      4090000.0|           Popular|
|        hasanabi|      2570

                                                                                

In [19]:
# Using Window Functions for Time Series Analysis

In [20]:
# Window function to calculate the moving average number of viewers per stream

windowSpec = Window.partitionBy("RANK").orderBy("RANK").rowsBetween(-1, 1)
df_with_ma_viewers = df.withColumn("MOVING_AVG_VIEWERS", avg("AVG_VIEWERS_PER_STREAM").over(windowSpec))
df_with_ma_viewers = df_with_ma_viewers.withColumn("MOVING_AVG_VIEWERS", round(col("MOVING_AVG_VIEWERS"), 2))
df_with_ma_viewers.select("NAME", "RANK", "AVG_VIEWERS_PER_STREAM", "MOVING_AVG_VIEWERS").show(10)

+----------+----+----------------------+------------------+
|      NAME|RANK|AVG_VIEWERS_PER_STREAM|MOVING_AVG_VIEWERS|
+----------+----+----------------------+------------------+
|  kaicenat| 1.0|               15852.0|           15852.0|
|    jynxzi| 2.0|                1145.0|            1145.0|
|   caedrel| 3.0|               12331.0|           12331.0|
|   caseoh_| 4.0|                   0.0|               0.0|
|      ibai| 5.0|              190714.0|          190714.0|
| auronplay| 6.0|              213849.0|          213849.0|
|   zerator| 7.0|               70813.0|           70813.0|
|     tarik| 8.0|                6043.0|            6043.0|
| riotgames| 9.0|              346968.0|          346968.0|
|papaplatte|10.0|               48758.0|           48758.0|
+----------+----+----------------------+------------------+
only showing top 10 rows



In [21]:
spark.stop()