# <center>Data Exploration</center>
---

## Reading Data

In [1]:
import os
import glob
from tqdm import tqdm

In [2]:
file_paths = 'telegram_2024/extracted/*/2024-*.tsv.gz'

In [3]:
from pyspark.sql.functions import col, from_unixtime, to_date, count, input_file_name, regexp_extract, collect_list, concat_ws
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,LongType,BooleanType

print("Oi")

schema = StructType([
    StructField("id", IntegerType(), False),    
    StructField("user_id", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("timestamp", LongType(), True),                  
    StructField("bot_flag", BooleanType(), True),                
    StructField("via_bot_id", BooleanType(), True),             
    StructField("via_business_bot_id", BooleanType(), True),    
    StructField("reply_to_msg_id", IntegerType(), True),         
    StructField("fwd_flag", BooleanType(), True),               
    StructField("fwd_from_id", IntegerType(), True),             
    StructField("media_type", StringType(), True),               
    StructField("views", IntegerType(), True),                   
    StructField("forwards", IntegerType(), True),                
    StructField("replies", IntegerType(), True),                 
    StructField("reactions", IntegerType(), True),       
    StructField("reaction_json", StringType(), True),   
    ])

print("Schema")

# Load all files into a single DataFrame, adding a column for the file path
df = (spark.read.option("header", True)  # Read with header
      .option("delimiter", "\t")         # Set tab delimiter
      .option("quote", '"')              # Set quote character
      .option("multiline", True)         # Allow multiline fields
      .option("escape", '"')             # Disable escape
      .schema(schema)
      .csv(file_paths)                  # Read CSV files
      .withColumn("file_path", input_file_name()))  # Add file path column

print("Load")

# Extract the folder from the path
df = df.withColumn("group_name", regexp_extract(col("file_path"), r"/extracted/([^/]+)/", 1))

Oi
Schema


                                                                                

25/08/22 10:20:27 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
Load


In [4]:
df = df.na.drop(subset=["id", "text", "timestamp"])

In [5]:
df.dtypes

[('id', 'int'),
 ('user_id', 'int'),
 ('text', 'string'),
 ('timestamp', 'bigint'),
 ('bot_flag', 'boolean'),
 ('via_bot_id', 'boolean'),
 ('via_business_bot_id', 'boolean'),
 ('reply_to_msg_id', 'int'),
 ('fwd_flag', 'boolean'),
 ('fwd_from_id', 'int'),
 ('media_type', 'string'),
 ('views', 'int'),
 ('forwards', 'int'),
 ('replies', 'int'),
 ('reactions', 'int'),
 ('reaction_json', 'string'),
 ('file_path', 'string'),
 ('group_name', 'string')]

## 1. Groups Analysis

In [11]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
mean_text_df = spark.sql("SELECT avg(LENGTH(text)) AS mean_text_length FROM view_users WHERE group_name LIKE '%chat%'")
mean_text = mean_text_df.first()["mean_text_length"]
print(f"Average text size in chats: {mean_text}")

[Stage 9:>                                                          (0 + 1) / 1]

Average text size in chats: 229.27393617021278


                                                                                

In [13]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
mean_text_df = spark.sql("SELECT avg(LENGTH(text)) AS mean_text_length FROM view_users WHERE group_name LIKE '%channel%'")
mean_text = mean_text_df.first()["mean_text_length"]
print(f"Average text size in channels: {mean_text}")

[Stage 15:>                                                         (0 + 1) / 1]

Average text size in channels: 364.0619007175237


                                                                                

In [14]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
mean_text_df = spark.sql("SELECT AVG(SIZE(SPLIT(text, ' '))) AS mean_text_length FROM view_users WHERE group_name LIKE '%chat%'")
mean_text = mean_text_df.first()["mean_text_length"]
print(f"average number of words in chats text: {mean_text}")

[Stage 18:>                                                         (0 + 1) / 1]

average number of words in  chats text: 26.54255319148936


                                                                                

In [15]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
mean_text_df = spark.sql("SELECT AVG(SIZE(SPLIT(text, ' '))) AS mean_text_length FROM view_users WHERE group_name LIKE '%channel%'")
mean_text = mean_text_df.first()["mean_text_length"]
print(f"average number of words in channels text: {mean_text}")

[Stage 21:>                                                         (0 + 1) / 1]

average number of words in channels text: 45.75567497303782


                                                                                

In [21]:
df_group_ac = (df.filter(df.group_name.contains("chat")).groupBy("group_name").count())

print("Most active chats:")
df_group_ac.orderBy("count",ascending=False).show()

Most active chats:


[Stage 27:>                                                         (0 + 1) / 1]

+---------------+-----+
|     group_name|count|
+---------------+-----+
|chat_4162612396|  137|
| chat_940424150|   72|
|chat_4089520470|   57|
|chat_4203053576|   53|
|chat_4245063824|   34|
|chat_4196654950|   11|
|chat_4135900175|    3|
|chat_4212459000|    3|
|chat_4203915870|    2|
|chat_4252052923|    2|
|chat_4225274879|    2|
+---------------+-----+



                                                                                

In [23]:
df_group_ac = (df.filter(df.group_name.contains("channel")).groupBy("group_name").count())

print("Most active channels:")
df_group_ac.orderBy("count",ascending=False).show()

Most active channels:




+------------------+-------+
|        group_name|  count|
+------------------+-------+
|channel_2000420557|2123738|
|channel_1980676178|1997841|
|channel_1797530467|1814098|
|channel_2036444332|1758924|
|channel_1371589778|1714636|
|channel_1567712182|1699462|
|channel_1746291710|1634378|
|channel_1952461446|1629629|
|channel_1529099498|1545463|
|channel_2126136797|1524753|
|channel_1629500046|1476594|
|channel_1831067702|1464576|
|channel_1391678616|1460102|
|channel_1790937453|1398498|
|channel_2124392949|1362132|
|channel_1815062639|1327522|
|channel_2046555407|1294497|
|channel_2068810045|1278377|
|channel_1652712042|1270078|
|channel_1240453727|1220734|
+------------------+-------+
only showing top 20 rows



                                                                                

In [16]:
df.count()

                                                                                

874034881

## 2. Users Analysis

In [6]:
total_users = df.select("user_id").distinct().count()
print("Total users: ",total_users)



Total users:  3382309


                                                                                

In [4]:
df_user_message = df.groupBy("user_id").count()

print("Most active users:")
df_user_message.orderBy("count",ascending=False).show()

Most active users:




+----------+---------+
|   user_id|    count|
+----------+---------+
|      null|722880472|
| 609517172|  8958426|
| 210944655|  1869373|
|1463217879|  1515729|
| 162726413|  1404807|
|1142750381|  1255247|
|1990154044|  1219569|
|1923112992|   980468|
|1094946493|   929955|
|1423105784|   899219|
| 908327188|   760073|
| 515681181|   669167|
|1194029880|   619060|
|1089912432|   613754|
| 605308260|   564479|
|1895870896|   532675|
| 890952089|   527978|
| 730507113|   499781|
| 889839684|   498836|
|1056409347|   468181|
+----------+---------+
only showing top 20 rows



                                                                                

In [10]:
pd_df = df_user_message.toPandas()
pd_df.to_csv("User_message_counts.csv")

                                                                                

In [12]:
df.select("group_name").filter(df.user_id.isin("609517172")).groupBy("group_name").count().show()

[Stage 21:>                                                         (0 + 1) / 1]

+------------------+------+
|        group_name| count|
+------------------+------+
|channel_1996582667|  5994|
|channel_1947932949|355534|
|channel_1770408140|  2304|
|channel_1543131980|  5028|
|channel_1441780077| 14365|
|channel_1381960982|  6250|
|channel_1678354592|   432|
|channel_2049018613| 18084|
|channel_2072741004|    46|
|channel_2072990255|  4744|
|channel_1554138363|   635|
|channel_1379968896|  2259|
|channel_1429136166|   662|
|channel_1662612309|   652|
|channel_2119657720|160876|
|channel_1658542133|  1150|
|channel_1654014630|    93|
|channel_2217231959|   534|
|channel_1892885391|   123|
|channel_2241591968|  2398|
+------------------+------+
only showing top 20 rows



                                                                                

In [23]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
print("How many groups each user was activy:")
user_group_counts_df = spark.sql("SELECT user_id,COUNT(DISTINCT group_name) as group_count FROM view_users GROUP BY user_id ORDER BY group_count DESC")

How many groups each user was activy:


In [25]:
pd_df = user_group_counts_df.toPandas()
pd_df.to_csv("User_group_counts.csv")

                                                                                

In [5]:
from pyspark.sql.functions import avg, sum, count, max

resumo_completo = df_user_message.agg(
    avg("count").alias("media_de_mensagens_por_usuario"),
    sum("count").alias("total_de_mensagens_no_dataset"),
    max("count").alias("maximo_de_mensagens_de_um_usuario"),
    count("user_id").alias("numero_de_usuarios_unicos")
)

resumo_completo.show()



+------------------------------+-----------------------------+---------------------------------+-------------------------+
|media_de_mensagens_por_usuario|total_de_mensagens_no_dataset|maximo_de_mensagens_de_um_usuario|numero_de_usuarios_unicos|
+------------------------------+-----------------------------+---------------------------------+-------------------------+
|             276.8139358443536|                    948626687|                        722880472|                  3426946|
+------------------------------+-----------------------------+---------------------------------+-------------------------+



                                                                                

## 3. Message Analysis

In [7]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
mean_text_df = spark.sql("SELECT avg(LENGTH(text)) AS mean_text_length FROM view_users")
mean_text = mean_text_df.first()["mean_text_length"]
print(f"Average text size: {mean_text}")

[Stage 3:>                                                          (0 + 1) / 1]

Average text size: 364.06184273325357


                                                                                

In [8]:
df.select("user_id","id","text","group_name").createOrReplaceTempView('view_users')
mean_text_df = spark.sql("SELECT AVG(SIZE(SPLIT(text, ' '))) AS mean_text_length FROM view_users")
mean_text = mean_text_df.first()["mean_text_length"]
print(f"average number of words in texts: {mean_text}")

[Stage 6:>                                                          (0 + 1) / 1]

average number of words in texts: 45.755666707768384


                                                                                

## 4. Timestamp Analysis

In [17]:
df.select("user_id","id","text","group_name","timestamp").createOrReplaceTempView('view_timestamp')

messages_per_month = spark.sql("""
    SELECT 
        DATE_FORMAT(FROM_UNIXTIME(timestamp), 'yyyy-MM') AS month,
        COUNT(*) AS count_messages
    FROM view_timestamp
    GROUP BY DATE_FORMAT(FROM_UNIXTIME(timestamp), 'yyyy-MM')
    ORDER BY count_messages DESC
""")

messages_per_month.show()



+-------+--------------+
|  month|count_messages|
+-------+--------------+
|2024-11|     129623554|
|2024-10|     112609494|
|2024-09|      86095652|
|2024-08|      85918163|
|2024-07|      74096066|
|2024-06|      72367023|
|2024-05|      65398887|
|2024-04|      60148410|
|2024-03|      56463309|
|2024-12|      44747806|
|2024-01|      43551871|
|2024-02|      43014646|
+-------+--------------+



                                                                                