In [3]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q findspark pyspark
!pip install pyspark==3.3.1



In [5]:
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .appName("api_transformation")\
    .getOrCreate()
spark

In [6]:
df = spark.read.json("/content/datalake/twitter_datascience")
df.show()

+--------------------+--------------------+------------------+------------+
|                data|            includes|              meta|extract_date|
+--------------------+--------------------+------------------+------------+
|[{11, 53, 2025-10...|{[{2025-10-03T00:...|{1234567890abcdef}|  2025-10-03|
|[{59, 83, 2025-10...|{[{2025-10-03T00:...|{1234567890abcdef}|  2025-10-03|
|[{7, 17, 2025-10-...|{[{2025-10-03T00:...|{1234567890abcdef}|  2025-10-03|
|[{37, 4, 2025-10-...|{[{2025-10-03T00:...|{1234567890abcdef}|  2025-10-03|
|[{53, 48, 2025-10...|{[{2025-10-03T00:...|              null|  2025-10-03|
|[{80, 46, 2025-10...|{[{2025-10-04T00:...|{1234567890abcdef}|  2025-10-04|
|[{65, 35, 2025-10...|{[{2025-10-04T00:...|{1234567890abcdef}|  2025-10-04|
|[{24, 69, 2025-10...|{[{2025-10-04T00:...|              null|  2025-10-04|
|[{38, 89, 2025-10...|{[{2025-10-05T00:...|{1234567890abcdef}|  2025-10-05|
|[{81, 77, 2025-10...|{[{2025-10-05T00:...|              null|  2025-10-05|
|[{11, 54, 2

In [7]:
df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- conversation_id: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- in_reply_to_user_id: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- public_metrics: struct (nullable = true)
 |    |    |    |-- like_count: long (nullable = true)
 |    |    |    |-- quote_count: long (nullable = true)
 |    |    |    |-- reply_count: long (nullable = true)
 |    |    |    |-- retweet_count: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- includes: struct (nullable = true)
 |    |-- users: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- c

In [9]:
from pyspark.sql import functions as f
df.select(f.explode("data")).printSchema()
df.select(f.explode("data")).show()

root
 |-- col: struct (nullable = true)
 |    |-- author_id: string (nullable = true)
 |    |-- conversation_id: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- id: string (nullable = true)
 |    |-- in_reply_to_user_id: string (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- public_metrics: struct (nullable = true)
 |    |    |-- like_count: long (nullable = true)
 |    |    |-- quote_count: long (nullable = true)
 |    |    |-- reply_count: long (nullable = true)
 |    |    |-- retweet_count: long (nullable = true)
 |    |-- text: string (nullable = true)

+--------------------+
|                 col|
+--------------------+
|{11, 53, 2025-10-...|
|{55, 20, 2025-10-...|
|{44, 29, 2025-10-...|
|{36, 15, 2025-10-...|
|{21, 80, 2025-10-...|
|{39, 9, 2025-10-0...|
|{7, 5, 2025-10-03...|
|{51, 93, 2025-10-...|
|{15, 13, 2025-10-.

In [13]:
df.select(f.explode("data").alias("tweets"))\
.select("tweets.author_id", "tweets.conversation_id",
        "tweets.created_at", "tweets.id",
        "tweets.public_metrics.*", "tweets.text").printSchema()

root
 |-- author_id: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- text: string (nullable = true)



In [14]:
tweet_df = df.select(f.explode("data").alias("tweets"))\
  .select("tweets.author_id", "tweets.conversation_id",
        "tweets.created_at", "tweets.id",
        "tweets.public_metrics.*", "tweets.text")

tweet_df.show(5)

+---------+---------------+--------------------+---+----------+-----------+-----------+-------------+--------------------+
|author_id|conversation_id|          created_at| id|like_count|quote_count|reply_count|retweet_count|                text|
+---------+---------------+--------------------+---+----------+-----------+-----------+-------------+--------------------+
|       11|             53|2025-10-03T00:00:...| 74|        46|         68|         69|           97|Tweet fictício cr...|
|       55|             20|2025-10-03T00:00:...| 64|        67|         41|         39|           26|Este é um tweet f...|
|       44|             29|2025-10-03T00:00:...| 31|        34|         68|         50|            9|Um terceiro tweet...|
|       36|             15|2025-10-03T00:00:...| 61|        33|         39|         93|           44|Um terceiro tweet...|
|       21|             80|2025-10-03T00:00:...| 68|        15|         81|        100|           86|Outro tweet fictí...|
+---------+-----

In [19]:
df.select(f.explode("includes.users").alias("users")).select("users.*").printSchema()

root
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)



In [20]:
user_df = df.select(f.explode("includes.users").alias("users")).select("users.*")
user_df.show(5)

+--------------------+---+------+--------+
|          created_at| id|  name|username|
+--------------------+---+------+--------+
|2025-10-03T00:00:...| 94|User 1|   user1|
|2025-10-03T00:00:...| 85|User 2|   user2|
|2025-10-03T00:00:...| 26|User 3|   user3|
|2025-10-03T00:00:...| 64|User 4|   user4|
|2025-10-03T00:00:...| 33|User 5|   user5|
+--------------------+---+------+--------+
only showing top 5 rows



In [28]:
tweet_df.coalesce(1).write.mode('overwrite').json('/content/output/tweet')
user_df.coalesce(1).write.mode('overwrite').json('/content/output/user')