In [19]:
import datetime
import tarfile
import json
import bz2

## Filtering tweets from monthly .tar files

In [20]:
#keywords to filter the tweets
keywords=['gmo','gmos','gm food','gm foods','transgenic', 'transgenics','genetically-modified','genetically modified']

# initializing array which will hold the filtered tweets
filteredTweets=[]


# opening the tar file
with tarfile.open('data/TweetDS.tar', 'r') as tar:
    
    # iterating through all files and directories
    for member in tar.getmembers():
        
        #skipping directories
        if member.isdir():
            continue
        
        #looking for the target files json.bz2
        if member.name.endswith('.json.bz2'):
            
            # extracting each file
            file = tar.extractfile(member)
         
            #reading in and decompressing the file
            noBz2 = bz2.decompress(file.read()).decode('utf-8')
            
            # handling each tweet to find the match
            for line in noBz2.splitlines():
                
                #each line is one tweet in json format, therefore loading line by line
                tweet = json.loads(line)

                # making sure the tweet has the fields 'text' and 'created_at'. If not they'll be skipped
                if 'text' not in tweet or 'created_at' not in tweet or tweet['user']['lang'] != 'en':
                    continue

                # Skipping retweets
                if tweet['text'].startswith('RT') and tweet.get('retweeted_status') is not None:
                    continue
                
                # storing fields if present
                tweetTxt = tweet['text']
                created_at = tweet['created_at']

                # Searching matches in text, splitting by words, lower case like keywords
                if any(keyword in tweetTxt.lower().split() for keyword in keywords):                
                    # Add the matching tweet to the list
                    filteredTweets.append({
                        'text': tweetTxt,
                        'created_at': created_at
                    })
                    

                    
#writting the fields from matching tweets to a json output file
with open('myTweets.json', 'w') as output:
    json.dump(filteredTweets, output)


In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType

In [2]:
sc

### Loading data files to pyspark

In [21]:
#funtion to convert to time stamp with udf
def parseDate(date_str):
    return datetime.datetime.strptime(date_str, '%a %b %d %H:%M:%S %z %Y')

parse_date_udf= udf(parseDate, TimestampType())

#reading in the monthly json files
tweets= spark.read.json('hdfs://localhost:9000/user1/tweet*.json')

# applying udf to column
tweets= tweets.withColumn('created_at', parse_date_udf(tweets['created_at']))

#sorting by timestamp
tweets= tweets.orderBy('created_at')

tweets.printSchema();tweets.show()

root
 |-- created_at: timestamp (nullable = true)
 |-- text: string (nullable = true)

+-------------------+--------------------+
|         created_at|                text|
+-------------------+--------------------+
|2012-01-02 20:09:42|USDA has settled ...|
|2012-01-03 02:41:36|Sounds INSANE. GM...|
|2012-01-03 06:57:08|Petition: Tell th...|
|2012-01-03 12:30:19|Mandatory GMO foo...|
|2012-01-03 14:11:44|http://t.co/TI9oL...|
|2012-01-03 16:10:27|Want to learn mor...|
|2012-01-03 17:53:55|Safety assessment...|
|2012-01-03 18:30:38|GMO labeling effo...|
|2012-01-03 20:04:57|Leaked: US to Sta...|
|2012-01-04 00:22:12|Leaked: US to Sta...|
|2012-01-04 13:37:46|@IPIMCIMCIM  GMOs...|
|2012-01-05 02:42:31|@Kaybaebaeeee gmo...|
|2012-01-05 06:10:28|Apel ws. legislac...|
|2012-01-05 09:36:54|Woman files class...|
|2012-01-05 16:14:15|Leaked: US to Sta...|
|2012-01-05 17:42:20|@harjxo GMO twidd...|
|2012-01-05 19:42:18|@songbirdtiff Yup...|
|2012-01-05 21:07:46|Label GMO food ca...|
|2012-01-0

In [22]:
# number of rows
tweets.count()

3205

In [23]:
#sample
tweets.take(1)

[Row(created_at=datetime.datetime(2012, 1, 2, 20, 9, 42), text='USDA has settled upon a superb solution: let the GMO industry conduct its own environmental impact tests. http://t.co/Ed5BbDRb')]

### Dates without tweets

In [42]:
from pyspark.sql.functions import min, max, date_format, expr, hour
from pyspark.sql import functions as F
from pyspark.sql.types import DateType

In [50]:

# oldest and latest dates in df
oldestDate= tweets.select(min('created_at')).first()[0].date()
latestDate= tweets.select(max('created_at')).first()[0].date()

#converting dates to Unix timestamps -> int() cannot be datetime.date
oldestDate= int(min_date.strftime("%s"))
latestDate= int(max_date.strftime("%s"))


#generating the list of dates. 86400 s/day
timeRange= spark.range(start=oldestDate, end=latestDate, step=86400).selectExpr("to_date(from_unixtime(id)) as date")



#Listing missing dates with anti-join between the date range and the 'created_at' column. 
missingDates= timeRange.join(tweets, timeRange.date == tweets.created_at.cast(DateType()), 'leftanti') # leftanti selects the rows from the left df that do not have a matching key from the right df


print("Count of missing dates:",missingDates.count());missingDates.show()

Count of missing dates: 31
+----------+
|      date|
+----------+
|2012-02-05|
|2012-03-02|
|2012-03-03|
|2012-03-04|
|2012-03-05|
|2012-03-06|
|2012-03-07|
|2012-03-08|
|2012-03-09|
|2012-03-10|
|2012-03-11|
|2012-03-12|
|2012-03-13|
|2012-03-14|
|2012-03-15|
|2012-03-16|
|2012-03-17|
|2012-03-18|
|2012-03-19|
|2012-03-20|
+----------+
only showing top 20 rows



#### Checking full hours per day without tweets

In [168]:
from pyspark.sql.functions import collect_list

In [173]:
minDate= tweets.selectExpr("min(created_at)").first()[0]
maxDate= tweets.selectExpr("max(created_at)").first()[0]


minTimestamp= int(minDate.timestamp())
maxTimestamp= int(maxDate.timestamp())

allHours= spark.range(minTimestamp, maxTimestamp, 3600).select(expr("timestamp_seconds(id)").alias('hour'))




tweetsWithHour= tweets.withColumn('hour_of_day', hour('created_at'))


tweetHours= tweetsWithHour.groupBy('hour_of_day').agg(collect_list('hour_of_day').alias('hours'))

allHours.join(tweetHours, allHours.hour == tweetHours.hour_of_day, 'left')


AnalysisException: cannot resolve '(hour = hour_of_day)' due to data type mismatch: differing types in '(hour = hour_of_day)' (timestamp and int).;
'Join LeftOuter, (hour#1371 = hour_of_day#1373)
:- Project [timestamp_seconds(id#1369L) AS hour#1371]
:  +- Range (1325534982, 1356574202, step=3600, splits=Some(1))
+- Aggregate [hour_of_day#1373], [hour_of_day#1373, collect_list(hour_of_day#1373, 0, 0) AS hours#1385]
   +- Project [created_at#238, text#234, day_of_week#840, month#804, hour(created_at#238, Some(Europe/Dublin)) AS hour_of_day#1373]
      +- Project [created_at#238, text#234, dayofweek(cast(created_at#238 as date)) AS day_of_week#840, month#804, hour_of_day#821]
         +- Project [created_at#238, text#234, day_of_week#789, month#804, hour(created_at#238, Some(Europe/Dublin)) AS hour_of_day#821]
            +- Project [created_at#238, text#234, day_of_week#789, month(cast(created_at#238 as date)) AS month#804]
               +- Project [created_at#238, text#234, dayofweek(cast(created_at#238 as date)) AS day_of_week#789]
                  +- Sort [created_at#238 ASC NULLS FIRST], true
                     +- Project [parseDate(created_at#233) AS created_at#238, text#234]
                        +- Relation [created_at#233,text#234] json


In [192]:
import pandas as pd
import numpy as np
import seaborn as sns
import re
import string
from string import punctuation
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')


import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Dropout
from tensorflow.keras.callbacks import EarlyStopping

[nltk_data] Downloading package stopwords to /home/hduser/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
2023-05-22 23:15:39.313115: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-22 23:15:39.485692: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-22 23:15:39.491966: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [191]:
!pip install nltk

Defaulting to user installation because normal site-packages is not writeable
Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m282.0 kB/s[0m eta [36m0:00:00[0m[36m0:00:01[0m[36m0:00:01[0m:01[0m
Collecting regex>=2021.8.3
  Downloading regex-2023.5.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (769 kB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m769.7/769.7 KB[0m [31m74.0 kB/s[0m eta [36m0:00:00[0m31m73.3 kB/s[0m eta [36m0:00:01[0m
Installing collected packages: regex, nltk
Successfully installed nltk-3.8.1 regex-2023.5.5
