In [127]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import *
from pyspark.sql import Window

os.environ["JAVA_HOME"] = "/usr/local/opt/jenv/versions/openjdk64-1.8.0.242"

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("twitter test") \
    .config("spark.some.config.option", "SparkSessionExample") \
    .getOrCreate()

sc = spark.sparkContext  # Note that spark is a SparkSession object

In [133]:
# Choice 
# choice = 1 # single file
#choice = 0 # all files
choice = -1 # test file

# Extract
test_file = "file:///Users/qiuchenzhang/Code/CMU/15619/Ying_Liu_Zhi_Zhu-S20/phase1/twitter/ETL/input/query2_ref.txt"
one_file = "file:///Users/qiuchenzhang/Code/CMU/15619/Ying_Liu_Zhi_Zhu-S20/phase1/twitter/ETL/input//part-r-00000.gz" 
all_files = "gs://cmuccpublicdatasets/twitter/s20/*.gz" 

filename = one_file if choice == 1 else all_files if choice == 0 else test_file
df = spark.read.json(filename)



In [134]:

# Transform
# 1. convert time to timetsamp
# 2. concatenate hashtags with ,
selectDf = df.select(col("id").alias("tid"),
                     col("id_str").alias("tid_str"),
                     unix_timestamp(col("created_at"),
                     "EEE MMM dd HH:mm:ss ZZZZ yyyy").alias("timestamp"), 
                     col("text").alias("content"), 
                     col("in_reply_to_user_id").alias("reply_to_uid"),
                     col("in_reply_to_user_id_str").alias("reply_to_uid_str"), 
                     col("user.id").alias("sender_uid"), 
                     col("user.id_str").alias("sender_uid_str"), 
                     col("user.screen_name").alias("sender_screen_name"),
                     col("user.description").alias("sender_description"),
                     col("retweeted_status.user.id").alias("retweet_to_uid"), 
                     col("retweeted_status.user.id_str").alias("retweet_to_uid_str"),
                     col("retweeted_status.user.screen_name").alias("retweet_to_uid_screen_name"),
                     col("retweeted_status.user.description").alias("retweet_to_uid_description"),
                     concat_ws(",", col("entities.hashtags.text")).alias("hashtags"), 
                     col("lang"))

# filter out malformed tweets and tweets not using specific languages
# 1. Cannot be parsed).alias(a JSON object
# 2. Both id and id_str of the tweet object are missing or null
# 3. Both id and id_str of the user object are missing or null
# 4. created_at is missing or null
# 5. text is missing or null or empty_string
# 6. hashtag array missing or null or of length zero/empty
filterDf = selectDf.filter(col("lang").isin(["ar", "en", "fr", "in", "pt", "es", "tr", "ja"]) &
                           (col("tid").isNotNull() | col("tid_str").isNotNull()) &
                           (col("sender_uid").isNotNull() | col("sender_uid_str").isNotNull()) &
                           col("timestamp").isNotNull() &
                           col("content").isNotNull() &
                           col("hashtags").isNotNull()).where(length(col("hashtags")) > 0)


# combine user_id and user_str 
cleanDf = filterDf.withColumn("tid", when(col("tid_str").isNotNull(), col("tid_str")).otherwise(col("tid"))). \
                   withColumn("sender_uid", when(col("sender_uid_str").isNotNull(), col("sender_uid_str")).otherwise(col("sender_uid"))). \
                   withColumn("reply_to_uid", when(col("reply_to_uid_str").isNotNull(), col("reply_to_uid_str")).otherwise(col("reply_to_uid"))). \
                   withColumn("retweet_to_uid", when(col("retweet_to_uid_str").isNotNull(), col("retweet_to_uid_str")).otherwise(col("retweet_to_uid"))). \
                   drop("tid_str", "sender_uid_str", "reply_to_uid_str", "retweet_to_uid_str")

# tweetDf is for tweet_table
tweetDf = cleanDf.drop("sender_screen_name", "sender_description", "retweet_to_uid_screen_name", "retweet_to_uid_description")
# userDf is for user_table
senderDf = cleanDf.select("sender_uid", "sender_screen_name", "sender_description", "timestamp")

userMidDf = cleanDf.select(col("retweet_to_uid").alias("uid"), 
                           col("retweet_to_uid_screen_name").alias("screen_name"), 
                           col("retweet_to_uid_description").alias("description"), 
                           col("timestamp")). \
                     filter(col("uid").isNotNull()). \
                     union(senderDf)

# find the latest information of each users
# may cost a lot of time
# only execute when running one file
userDf = sc.parallelize(["uid", "screen_name", "description", "lastestTime"]).map(lambda x: (x, )).toDF()
if choice != 0:
    w = Window.partitionBy(col("uid"))
    userDf = userMidDf.withColumn("lastestTime", max("timestamp").over(w)). \
                              filter(col("lastestTime") == col("timestamp")). \
                              drop("timestamp")



In [135]:
# validation
userUnique = userDf.groupBy(col("uid")).count().where(col("count") > 1)

# check if all timestamp are numeric
userMidDf.select(
  "timestamp",
  col("timestamp").cast("long").isNotNull().alias("Value")).where(~col("Value")).show()

print(filterDf.count())
print(userMidDf.count())
print(userDf.count())
print(userUnique.count())

userDf.registerTempTable("userDf")
userMidDf.registerTempTable("userMidDf")
cleanDf.registerTempTable("cleanDf")
spark.sql("select timestamp from userMidDf")
userMidDf.printSchema()

+---------+-----+
|timestamp|Value|
+---------+-----+
+---------+-----+

39092
57585
54060
89
root
 |-- uid: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- timestamp: long (nullable = true)



In [136]:
# Load
# Load to intermediate file
# save to intermediate file, which dilimiter should use ???

testOutput = "file:///Users/qiuchenzhang/Code/CMU/15619/Ying_Liu_Zhi_Zhu-S20/phase1/twitter/ETL/output/raw2csv/testoutput/"
oneOutput = "file:///Users/qiuchenzhang/Code/CMU/15619/Ying_Liu_Zhi_Zhu-S20/phase1/twitter/ETL/output/raw2csv/oneOutput/"
allOutput = "file:///Users/qiuchenzhang/Code/CMU/15619/Ying_Liu_Zhi_Zhu-S20/phase1/twitter/ETL/output/raw2csv/allOutput/"

output = oneOutput if choice == 1 else allOutput if choice == 0 else testOutput

tweetDf.write.mode("overwrite").csv(output + "tweetDf", sep="⊢", header=True ) # overwrite old result
selectDf.write.mode("overwrite").csv(output + "selectDf", sep="⊢", header=True ) # overwrite old result
filterDf.write.mode("overwrite").csv(output + "filterDf", sep="⊢", header=True ) # overwrite old result
userMidDf.write.mode("overwrite").csv(output + "userMidDf", sep="⊢", header=True ) # overwrite old result
userDf.write.mode("overwrite").csv(output + "userDf", sep="⊢", header=True ) # overwrite old result
