In [1]:
import datetime
import tarfile
import json
import bz2
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, desc, min, max, to_timestamp, to_date, date_format, col, expr, hour, year, month, dayofweek, count
from pyspark.sql import functions as F #module that includes a variety of functions like to extract features

In [3]:
sc

In [4]:
#viewing dataset as pd df
columns = ["index", "id", "date", "flag", "user", "text"]
pdDf = pd.read_csv("ProjectTweets.csv", header=None, names=columns)

pdDf

Unnamed: 0,index,id,date,flag,user,text
0,0,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,1,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
2,2,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
3,3,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,4,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all...."
...,...,...,...,...,...,...
1599995,1599995,2193601966,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,AmandaMarie1028,Just woke up. Having no school is the best fee...
1599996,1599996,2193601969,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,TheWDBoards,TheWDB.com - Very cool to hear old Walt interv...
1599997,1599997,2193601991,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,bpbabe,Are you ready for your MoJo Makeover? Ask me f...
1599998,1599998,2193602064,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,tinydiamondz,Happy 38th Birthday to my boo of alll time!!! ...


### Loading csv data file to spark

In [5]:
# Issue with pattern in the DateTimeFormatter when converting to timestamp due to spark version. Setting timeParserPolicy to LEGACY as the error suggested
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") 

#defining schema
schema = StructType([
    StructField("index", StringType(), True),
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)
])

tweetsDf= spark.read.csv('hdfs://localhost:9000/user1/ProjectTweets.csv', schema=schema, header=False)

# converting date column to timestamp, pattern taken from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
tweetsDf= tweetsDf.withColumn("date", to_timestamp("date", "EEE MMM dd HH:mm:ss zzz yyyy"))

tweetsDf.printSchema();tweetsDf.show()

root
 |-- index: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+-----+----------+-------------------+--------+---------------+--------------------+
|index|        id|               date|    flag|           user|                text|
+-----+----------+-------------------+--------+---------------+--------------------+
|    0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|    1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|    2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|    3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|    4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|    5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|    6|1467811592|2009-04-07 06:20:03|NO_QUERY|        mybirch|         Need a hug |
|    7|1467811594|2009-04-07 06:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|    8|1467811795|2009-04-07 06:20:05|NO_QUERY|2Hood4Hollywood|@T

                                                                                

checking flag column

In [6]:
#extracting "flag" column as RDD, showing unique values with distinct funcion and showing with collect()
tweetsDf.select("flag").rdd.distinct().collect()

                                                                                

[Row(flag='NO_QUERY')]

In [7]:
# counting the number of duplicated IDs, grouping by id and filterring out rows where id count is 1. Then counting those rows
tweetsDf.groupBy("id").count().filter(col("count") > 1).count()

                                                                                

1685

In [8]:
#counting the number of duplicated rows, same as above but addind the remaining features except index because this one would always be different
tweetsDf.groupBy(["id", "date", "flag", "user", "text"]).count().filter(col("count") > 1).count()

                                                                                

1685

In [9]:
#number of rows
tweetsDf.count()

                                                                                

1600000

In [10]:
# dropping rows with duplicate IDs (keeping the first occurrence)
tweetsDf= tweetsDf.dropDuplicates(["id"])

In [11]:
#number of rows
tweetsDf.count()

                                                                                

1598315

Checking number of unique users and users with greatest count of tweets

In [12]:
# number of unique users as RDD, same as done with attribute 'flag'
tweetsDf.select("user").rdd.distinct().count()

                                                                                

659775

In [13]:
#users with greatest count of tweets
# registerring the df as a temporary view
tweetsDf.createOrReplaceTempView("tweets")

# SQL query, selecting user column and adding another column called count showing the counts in descendent order
query = """SELECT user, COUNT(*) as count
    FROM tweets
    GROUP BY user
    ORDER BY count DESC"""

# running the SQL query and showing result
spark.sql(query).show()

                                                                                

+---------------+-----+
|           user|count|
+---------------+-----+
|       lost_dog|  549|
|        webwoke|  345|
|       tweetpet|  310|
|SallytheShizzle|  281|
|    VioletsCRUK|  279|
|    mcraddictal|  276|
|       tsarnick|  248|
|    what_bugs_u|  246|
|    Karen230683|  238|
|      DarkPiano|  236|
|   SongoftheOss|  227|
|      Jayme1988|  225|
|         keza34|  219|
| ramdomthoughts|  216|
|      shanajaca|  213|
|         wowlew|  212|
|     nuttychris|  211|
|   TraceyHewins|  211|
|   thisgoeshere|  207|
|     Spidersamm|  205|
+---------------+-----+
only showing top 20 rows



### DATES

In [14]:
#Ordering rows by date
tweetsDf= tweetsDf.orderBy("date")
tweetsDf.show()



+-----+----------+-------------------+--------+---------------+--------------------+
|index|        id|               date|    flag|           user|                text|
+-----+----------+-------------------+--------+---------------+--------------------+
|    0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|    1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|    2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|    4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
|    3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|    5|1467811372|2009-04-07 06:20:00|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|    6|1467811592|2009-04-07 06:20:03|NO_QUERY|        mybirch|         Need a hug |
|    7|1467811594|2009-04-07 06:20:03|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|    8|1467811795|2009-04-07 06:20:05|NO_QUERY|2Hood4Hollywood|@T

                                                                                

In [19]:
# checking if the df is ordered by id
# aggregation on the df and calculation of the min and max values of parameter "id"
#checking if min_id < max_id, returning a df with a single Boolean value
# collect executes the transformations and select access row 0 column 0 from resulting df
tweetsDf.agg(F.min("id").alias("min_id"), F.max("id").alias("max_id")) \
                    .select(F.col("min_id") < F.col("max_id")) \
                    .collect()[0][0]

# True confirms the df is ordered by id

                                                                                

True

In [20]:
# is the df ordered by index? Same code as above
tweetsDf.agg(F.min("index").alias("min_index"), F.max("index").alias("max_index")) \
                    .select(F.col("min_index") < F.col("max_index")) \
                    .collect()[0][0]

                                                                                

True

In [21]:
min_date, max_date = tweetsDf.select(min("date"), max("date")).first()
print('minimum date: ',min_date,'\nmaximum date: ',max_date)

[Stage 51:>                                                         (0 + 1) / 1]

minimum date:  2009-04-07 06:19:45 
maximum date:  2009-06-25 18:28:31


                                                                                

## Text

#### Tweets in English

In [None]:
#!pip install langdetect

In [22]:
#importing library for language detection
from langdetect import detect

In [23]:
def detect_language(tweet):
    try:
        return detect(tweet)
    except:
        return 'unknown'  # Return 'unknown' in case of an error with langdetect

# Register the UDF
detect_language_udf = udf(detect_language)

# Add a new column 'language' to the DataFrame with the detected language
tweetsDf = tweetsDf.withColumn('language', detect_language_udf(col('text')))

In [24]:
nonEnglishTweetsDf = tweetsDf.filter(col('language') != 'en')

In [None]:
tweetsDf.select(col("text"), col("language")).show()

[Stage 57:=>  (1 + 1) / 3][Stage 58:>   (0 + 0) / 2][Stage 59:>   (0 + 0) / 2]

In [None]:
nonEnglishTweetsDf.count()

In [None]:
# Filter out non-English tweets
englishTweetsDf = tweetsDf.filter(col('language') == 'en')

In [None]:
# Show the number of tweets in English
englishTweetsDf.count()

In [None]:
tweetsDf.write.parquet('savedTweetsDf.parquet')

#### more EDA about dates

In [None]:
# from pyspark.sql.window import Window

# # Create a window specification to define the order by 'date'
# windowSpec = Window.orderBy("date")

# adding a column that represents the previous 'date' value
tweetsDf = tweetsDf.withColumn("prev_date", F.lag("date").over(tweetsDf))

# filtering out rows where there is a missing date
missing_dates_df = tweetsDf.filter((F.col("date") - F.col("prev_date") > F.expr("INTERVAL 1 DAY")) | F.col("prev_date").isNull())

missing_dates_df.show()

In [None]:
#spark = SparkSession.builder.appName("MissingDates").getOrCreate()

#generating df with all dates between min_date and max_date
all_dates_df = spark.range((max_date - min_date).days + 1).selectExpr(f"date_add('{min_date}', CAST(id as int)) as all_dates")

# left anti-join to get the missing dates
missing_dates_df = all_dates_df.join(tweetsDf, all_dates_df["all_dates"] == tweetsDf["date"], "leftanti")

# collecting and print the missing dates
missing_dates = missing_dates_df.select(date_format("all_dates", "EEE MMM dd HH:mm:ss zzz yyyy").alias("missing_dates")).collect()
for row in missing_dates:
    print(row["missing_dates"])

In [None]:
tweetsDf.agg(F.min("date")).collect()[0][0]

In [None]:
tweetsDf.agg(F.max("date")).collect()[0][0]

In [None]:
sql_query = """
SELECT *,
       UNIX_TIMESTAMP(date) - LAG(UNIX_TIMESTAMP(date), 1, 0) OVER (ORDER BY date) AS gap_seconds
FROM tweets
ORDER BY gap_seconds DESC
"""

spark.sql(sql_query).show()

In [None]:
gaps_df = spark.sql("""
    SELECT DATE_ADD(d1, 1) AS missing_date
    FROM (
        SELECT CAST(date AS DATE) AS d1, 
               LEAD(CAST(date AS DATE), 1) OVER (ORDER BY date) AS d2
        FROM tweets
    ) temp
    WHERE DATE_ADD(d1, 1) < d2
""")

# Count the number of gaps in days
gaps_df.count()

In [None]:
#SQL with a CTE to find the rows with missing timestamps in between
result_df = spark.sql("""
    WITH Temp AS (
        SELECT *,
               DATE_ADD(CAST(date AS DATE), 1) AS next_date,
               LEAD(CAST(date AS DATE), 1) OVER (ORDER BY date) AS lead_date
        FROM tweets
    )
    SELECT *
    FROM Temp
    WHERE next_date < lead_date
""")

result_df.show(truncate=False)