In [5]:
!pip install pandas pyspark



In [6]:
# Importacao de bibliotecas

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf

In [7]:
from pyspark.sql import SparkSession

# Configuração do Spark para se conectar ao cluster Spark no contêiner
#.master("spark://spark:7077") \
spark = SparkSession.builder \
    .appName("labdados_transformation") \
    .master("local") \
    .getOrCreate()

# Verifica a versão do Spark para confirmar a conexão
print(spark.version)


3.5.0


In [41]:
spark.stop()

In [8]:
import os

# Verificar o conteúdo do diretório /home/jovyan/work
notebooks_dir = '/home/jovyan/work'
print("Conteúdo do diretório de notebooks:", os.listdir(notebooks_dir))


Conteúdo do diretório de notebooks: ['.ipynb_checkpoints', 'exploracao_spark.ipynb', 'output']


In [9]:
import os

# Verifique o diretório onde o arquivo JSON deveria estar
datalake_dir = '/datalake/twitter_datascience/extract_date=2024-11-02'
print("Conteúdo do diretório datalake:", os.listdir(datalake_dir))

Conteúdo do diretório datalake: ['datascience_20241102.json']


In [10]:
spark.catalog.clearCache()


In [12]:
# Leitura de dados

json_path = '/datalake/twitter_datascience/extract_date=2024-11-02/datascience_20241102.json'
df = spark.read.json(json_path)

# Coluna adicional para dados seguirem o padrao definido no curso
df = df.withColumn("extract_date", sf.to_date(sf.lit("2024-11-02")))

In [13]:
df.show(5)

+--------------------+--------------------+------------------+------------+
|                data|            includes|              meta|extract_date|
+--------------------+--------------------+------------------+------------+
|[{4, 63, 2024-11-...|{[{2024-11-02T23:...|{1234567890abcdef}|  2024-11-02|
|[{30, 16, 2024-11...|{[{2024-11-02T16:...|              NULL|  2024-11-02|
|[{85, 46, 2024-11...|{[{2024-11-02T04:...|              NULL|  2024-11-02|
|[{23, 25, 2024-11...|{[{2024-11-02T19:...|{1234567890abcdef}|  2024-11-02|
|[{35, 48, 2024-11...|{[{2024-11-02T22:...|              NULL|  2024-11-02|
+--------------------+--------------------+------------------+------------+
only showing top 5 rows



In [17]:
df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- conversation_id: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- in_reply_to_user_id: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- public_metrics: struct (nullable = true)
 |    |    |    |-- like_count: long (nullable = true)
 |    |    |    |-- quote_count: long (nullable = true)
 |    |    |    |-- reply_count: long (nullable = true)
 |    |    |    |-- retweet_count: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- includes: struct (nullable = true)
 |    |-- users: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- c

In [18]:
data_explode = df.select(sf.explode(df.data))

data_explode.show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|col                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{4, 63, 2024-11-02T17:53:55.931233+0000, [71], 61, 18, en, {70, 19, 72, 70}, Tweet fictício criado usando inteligência artificial para falar sobre data engineer}|
|{15, 84, 2024-11-02T00:53:24.440670+0000, [49], 47, 66, en, {26, 46, 78, 88}, Este é um tweet fictício sobre data engineer}                                      |
|{27, 96, 2024-11-02T08:54:53.556819+0000, [58], 22, 7, en, {7, 65, 60, 33}, Este é um tweet fictício sobre data engineer}                                        |
|{6, 94, 2024-11

In [19]:
tweet = df.select(sf.explode(df.data).alias('tweet'))\
    .select('tweet.author_id', 'tweet.conversation_id', 'tweet.created_at',
            'tweet.id', 'tweet.public_metrics.*', 'tweet.text')

tweet.show(10, truncate=False)

+---------+---------------+-------------------------------+---+----------+-----------+-----------+-------------+-----------------------------------------------------------------------------------+
|author_id|conversation_id|created_at                     |id |like_count|quote_count|reply_count|retweet_count|text                                                                               |
+---------+---------------+-------------------------------+---+----------+-----------+-----------+-------------+-----------------------------------------------------------------------------------+
|4        |63             |2024-11-02T17:53:55.931233+0000|61 |70        |19         |72         |70           |Tweet fictício criado usando inteligência artificial para falar sobre data engineer|
|15       |84             |2024-11-02T00:53:24.440670+0000|47 |26        |46         |78         |88           |Este é um tweet fictício sobre data engineer                                       |
|27       |96  

In [20]:
data_explode = df.select(sf.explode(df.includes.users).alias('user'))

data_explode.show(10, truncate=False)

+------------------------------------------------------+
|user                                                  |
+------------------------------------------------------+
|{2024-11-02T23:44:24.405905+0000, 90, User 1, user1}  |
|{2024-11-02T19:20:32.612742+0000, 46, User 2, user2}  |
|{2024-11-02T17:19:27.159242+0000, 28, User 3, user3}  |
|{2024-11-02T15:12:08.011115+0000, 34, User 4, user4}  |
|{2024-11-02T04:46:41.478867+0000, 41, User 5, user5}  |
|{2024-11-02T22:58:40.410548+0000, 72, User 6, user6}  |
|{2024-11-02T03:25:31.390233+0000, 70, User 7, user7}  |
|{2024-11-02T17:16:57.134988+0000, 16, User 8, user8}  |
|{2024-11-02T19:44:02.445211+0000, 12, User 9, user9}  |
|{2024-11-02T21:04:05.250260+0000, 24, User 10, user10}|
+------------------------------------------------------+
only showing top 10 rows



In [21]:
user = df.select(sf.explode(df.includes.users).alias('user'))\
    .select('user.created_at', 'user.id', 'user.name', 'user.username')

user.show(10, truncate=False)

+-------------------------------+---+-------+--------+
|created_at                     |id |name   |username|
+-------------------------------+---+-------+--------+
|2024-11-02T23:44:24.405905+0000|90 |User 1 |user1   |
|2024-11-02T19:20:32.612742+0000|46 |User 2 |user2   |
|2024-11-02T17:19:27.159242+0000|28 |User 3 |user3   |
|2024-11-02T15:12:08.011115+0000|34 |User 4 |user4   |
|2024-11-02T04:46:41.478867+0000|41 |User 5 |user5   |
|2024-11-02T22:58:40.410548+0000|72 |User 6 |user6   |
|2024-11-02T03:25:31.390233+0000|70 |User 7 |user7   |
|2024-11-02T17:16:57.134988+0000|16 |User 8 |user8   |
|2024-11-02T19:44:02.445211+0000|12 |User 9 |user9   |
|2024-11-02T21:04:05.250260+0000|24 |User 10|user10  |
+-------------------------------+---+-------+--------+
only showing top 10 rows



In [22]:
tweet.coalesce(1).write.mode("overwrite").json('output/tweet')
user.coalesce(1).write.mode('overwrite').json('output/user')