In [62]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [56]:

spark = SparkSession.builder.appName("twitter_transform")\
    .master("local[*]")\
    .config("spark.driver.host", "127.0.0.1")\
    .getOrCreate()

In [57]:
path = "../datalake/twitter_spark"
df = spark.read.format("json").load(path)

                                                                                

In [60]:
df.show()

+--------------------+--------------------+--------------------+------------+
|                data|            includes|                meta|extract_date|
+--------------------+--------------------+--------------------+------------+
|[{159663454011354...|{[{2022-11-26T22:...|{1665871041879474...|  2023-06-05|
|[{163102162644700...|{[{2023-03-01T20:...|{1665870356694069...|  2023-06-05|
|[{328700883, 1665...|{[{2011-07-03T21:...|{1665869734179422...|  2023-06-05|
|[{161106447754107...|{[{2023-01-05T18:...|{1665869200181665...|  2023-06-05|
|[{165690748272235...|{[{2023-05-12T06:...|{1665868754691997...|  2023-06-05|
|[{141839209615164...|{[{2021-07-23T02:...|{1665868073113473...|  2023-06-05|
|[{163464544704946...|{[{2023-03-11T20:...|{1665867465568530...|  2023-06-05|
|[{268619848, 1665...|{[{2011-03-19T04:...|{1665866682353475...|  2023-06-05|
|[{144592995, 1665...|{[{2010-05-16T19:...|{1665866163232833...|  2023-06-05|
|[{134397719026262...|{[{2020-12-29T17:...|{1665865269892227...|

In [61]:
df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- conversation_id: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- in_reply_to_user_id: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- public_metrics: struct (nullable = true)
 |    |    |    |-- bookmark_count: long (nullable = true)
 |    |    |    |-- impression_count: long (nullable = true)
 |    |    |    |-- like_count: long (nullable = true)
 |    |    |    |-- quote_count: long (nullable = true)
 |    |    |    |-- reply_count: long (nullable = true)
 |    |    |    |-- retweet_count: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- includes: struct (nullable =

In [74]:
df.select(f.explode("data").alias("data")).printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- author_id: string (nullable = true)
 |    |-- conversation_id: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- edit_history_tweet_ids: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- id: string (nullable = true)
 |    |-- in_reply_to_user_id: string (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- public_metrics: struct (nullable = true)
 |    |    |-- bookmark_count: long (nullable = true)
 |    |    |-- impression_count: long (nullable = true)
 |    |    |-- like_count: long (nullable = true)
 |    |    |-- quote_count: long (nullable = true)
 |    |    |-- reply_count: long (nullable = true)
 |    |    |-- retweet_count: long (nullable = true)
 |    |-- text: string (nullable = true)



In [76]:
df.select(f.explode("data").alias("data")).show()

+--------------------+
|                data|
+--------------------+
|{1596634540113543...|
|{1633823161535049...|
|{1596634540113543...|
|{1633823161535049...|
|{1596634540113543...|
|{1664611112883736...|
|{1596634540113543...|
|{1664611112883736...|
|{2556649380, 1665...|
|{1656912622166605...|
|{1631021626447007...|
|{1656912622166605...|
|{1631021626447007...|
|{1590399234221621...|
|{1656910448997044...|
|{1664617497839161...|
|{1656910448997044...|
|{1512501204353433...|
|{1605648387462004...|
|{1664617497839161...|
+--------------------+
only showing top 20 rows



In [85]:
df.select(
    f.explode("data").alias("tweets")
).select(
    "tweets.author_id", 
    "tweets.conversation_id", 
    "tweets.created_at",
    "tweets.edit_history_tweet_ids",
    "tweets.id",
    "tweets.in_reply_to_user_id",
    "tweets.lang",
    "tweets.public_metrics.*",
    "tweets.text"
).printSchema()

root
 |-- author_id: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- edit_history_tweet_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- in_reply_to_user_id: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- bookmark_count: long (nullable = true)
 |-- impression_count: long (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- text: string (nullable = true)



In [87]:
df_tweet = df.select(
    f.explode("data").alias("tweets")
).select(
    "tweets.author_id", 
    "tweets.conversation_id", 
    "tweets.created_at",
    "tweets.id",
    "tweets.in_reply_to_user_id",
    "tweets.lang",
    "tweets.public_metrics.*",
    "tweets.text"
)

In [89]:
df_tweet.show()

+-------------------+-------------------+--------------------+-------------------+-------------------+----+--------------+----------------+----------+-----------+-----------+-------------+--------------------+
|          author_id|    conversation_id|          created_at|                 id|in_reply_to_user_id|lang|bookmark_count|impression_count|like_count|quote_count|reply_count|retweet_count|                text|
+-------------------+-------------------+--------------------+-------------------+-------------------+----+--------------+----------------+----------+-----------+-----------+-------------+--------------------+
|1596634540113543169|1665871041879474176|2023-06-05T23:59:...|1665871041879474176|               null|  en|             0|               1|         0|          0|          0|            0|Contact me now fo...|
|1633823161535049729|1665871041216774144|2023-06-05T23:59:...|1665871041216774144|               null|  en|             0|               0|         0|          

In [100]:
df.select(f.explode("includes.users").alias("user")).printSchema()

df.select(
    f.explode("includes.users").alias("user")
).select(
    "user.created_at",
    "user.id",
    "user.name",
    "user.username",
).show()

root
 |-- user: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- username: string (nullable = true)

+--------------------+-------------------+--------------------+---------------+
|          created_at|                 id|                name|       username|
+--------------------+-------------------+--------------------+---------------+
|2022-11-26T22:38:...|1596634540113543169|Kevin Poulsen Hacker|    halalking_1|
|2023-03-09T13:32:...|1633823161535049729|           harsh raj|  harshrajind60|
|2023-06-02T12:34:...|1664611112883736576|         Arun sharma| Arunsharma8326|
|2014-06-09T11:26:...|         2556649380|   Giles Dickenson-J|    GilesDJones|
|2023-05-12T06:42:...|1656912622166605824|      himanshu kumar|himanshkumar601|
|2023-03-01T20:01:...|1631021626447007744|               Bunty|amanshuklamyju1|
|2023-05-12T06:42:...|1656912622166605824|      himanshu kumar|him