<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Sections
* [Description](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Lab](#2)
  * [2.1 Check Twitter Files](#2.1)
  * [2.2 Create the DataFrame](#2.3)
  * [2.3 Perform Analytics](#2.3)
* [3. TearDown](#3)
  * [3.1 Stop Hadoop](#3.1)

<a id='0'></a>
## Description
<p>
<div>The goals for this lab are:</div>
<ul>    
    <li>Get familiar with Spark DataFrames API</li>
    <li>Apply some transformations using Spark DataFrames API</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop

Open a terminal and execute
```sh
hadoop-start.sh
```

<a id='1.2'></a>
### 1.2 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [207]:
import findspark
findspark.init()

I'm changing pandas max column width property to improve data displaying

In [208]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.3'></a>
### 1.3 Create SparkSession

By setting this environment variable we can include extra libraries in our Spark cluster

In [209]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'

The first thing always is to create the SparkSession

In [210]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Twitter - Analytics - DataFrames-Copy2")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .config("spark.sql.legacy.timeParserPolicy","LEGACY")
    .enableHiveSupport()
    .getOrCreate())

<a id='2'></a>
## 2. Lab

<a id='2.1'></a>
### 2.1 Check Twitter Files

In order to complete this lab you need to previosly complete **'Twitter - RAW to STD - DataFrames'**.

Check you have the data ready in HDFS

http://localhost:50070/explorer.html#/datalake/std/twitter/bitcoin/

<a id='2.2'></a>
### 2.2 Create the DataFrame

The first step after creating the SparkSession is to create one or more DataFrames<br/>
The data in the std layer is often stored in advanced storage formats like **parquet** or **delta**.<br/>
These formats have the schema of the data embedded inside the file

In [211]:
tweets = (spark.read
               .parquet("hdfs://localhost:9000/datalake/std/twitter/Covid19/year=2021"))

In [192]:
total = tweets.count()
total

33407

<a id='2.3'></a>
### 2.3 Perform Analytics

**Total number of tweets**<br/>


``` sql
select count(*)
from tweets
``` 

In [193]:
tweets.count()

33407

In [201]:
df = (tweets
          .select("entities.urls.expanded_url")
          .limit(10)
          )
df.toPandas()

Unnamed: 0,expanded_url
0,[]
1,[]
2,[https://twitter.com/i/web/status/1468898891693277193]
3,[https://twitter.com/i/web/status/1468920444887740427]
4,[https://twitter.com/i/web/status/1468916430246752257]
5,[https://twitter.com/i/web/status/1468931245522444288]
6,[]
7,[https://twitter.com/i/web/status/1468663594652995593]
8,[]
9,[https://twitter.com/i/web/status/1468931246977785858]


**Total number of distinct users**<br/>
``` sql
select count(distinct user.id)
from tweets
``` 

In [142]:
df = (tweets
          .select("*")
          .distinct()
          .orderBy(desc("retweeted_status.retweet_count"))
          .limit(40)
          )
df.toPandas()

                                                                                

Unnamed: 0,created_at,id,id_str,text,source,truncated,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,...,reply_count,retweet_count,favorite_count,entities,favorited,retweeted,possibly_sensitive,filter_level,lang,dt
0,2021-12-09 16:21:40,1468964067045896198,1468964067045896198,"RT @SamIAm2021MD: I am blown away by this COVID vaccine video, one of the coolest things I have seen in a long time https://t.co/yREIETqtWh","<a href=""http://twitter.com/download/iphone"" rel=""nofollow"">Twitter for iPhone</a>",False,,,,,...,0,0,0,"([(SamIAm2021MD,)], [], [(https://twitter.com/SamIAm2021MD/status/1459051952831189013/video/1,)], [], [])",False,False,False,low,en,2021-12-09
1,2021-12-09 14:27:47,1468935408293711878,1468935408293711878,"RT @SamIAm2021MD: I am blown away by this COVID vaccine video, one of the coolest things I have seen in a long time https://t.co/yREIETqtWh","<a href=""https://mobile.twitter.com"" rel=""nofollow"">Twitter Web App</a>",False,,,,,...,0,0,0,"([(SamIAm2021MD,)], [], [(https://twitter.com/SamIAm2021MD/status/1459051952831189013/video/1,)], [], [])",False,False,False,low,en,2021-12-09
2,2021-12-08 12:57:56,1468550408285179907,1468550408285179907,"RT @SamIAm2021MD: I am blown away by this COVID vaccine video, one of the coolest things I have seen in a long time https://t.co/yREIETqtWh","<a href=""http://twitter.com/download/iphone"" rel=""nofollow"">Twitter for iPhone</a>",False,,,,,...,0,0,0,"([(SamIAm2021MD,)], [], [(https://twitter.com/SamIAm2021MD/status/1459051952831189013/video/1,)], [], [])",False,False,False,low,en,2021-12-08
3,2021-12-09 17:26:34,1468980398965366787,1468980398965366787,RT @TheKaranMenon: The Omnicron variant is just another example of how COVID will be prolonged if rich nations and Big Pharma keep blocking…,"<a href=""http://twitter.com/download/android"" rel=""nofollow"">Twitter for Android</a>",False,,,,,...,0,0,0,"([(TheKaranMenon,)], [], None, [], [])",False,False,,low,en,2021-12-09
4,2021-12-09 14:27:46,1468935403730358279,1468935403730358279,RT @TheKaranMenon: The Omnicron variant is just another example of how COVID will be prolonged if rich nations and Big Pharma keep blocking…,"<a href=""https://mobile.twitter.com"" rel=""nofollow"">Twitter Web App</a>",False,,,,,...,0,0,0,"([(TheKaranMenon,)], [], None, [], [])",False,False,,low,en,2021-12-09
5,2021-12-09 14:21:01,1468933706001883144,1468933706001883144,RT @TheKaranMenon: The Omnicron variant is just another example of how COVID will be prolonged if rich nations and Big Pharma keep blocking…,"<a href=""http://twitter.com/download/iphone"" rel=""nofollow"">Twitter for iPhone</a>",False,,,,,...,0,0,0,"([(TheKaranMenon,)], [], None, [], [])",False,False,,low,en,2021-12-09
6,2021-12-09 12:12:07,1468901266482995201,1468901266482995201,RT @TheKaranMenon: The Omnicron variant is just another example of how COVID will be prolonged if rich nations and Big Pharma keep blocking…,"<a href=""https://mobile.twitter.com"" rel=""nofollow"">Twitter Web App</a>",False,,,,,...,0,0,0,"([(TheKaranMenon,)], [], None, [], [])",False,False,,low,en,2021-12-09
7,2021-12-08 12:56:45,1468550111596847108,1468550111596847108,RT @TheKaranMenon: The Omnicron variant is just another example of how COVID will be prolonged if rich nations and Big Pharma keep blocking…,"<a href=""http://twitter.com/download/android"" rel=""nofollow"">Twitter for Android</a>",False,,,,,...,0,0,0,"([(TheKaranMenon,)], [], None, [], [])",False,False,,low,en,2021-12-08
8,2021-12-09 17:26:29,1468980381223297040,1468980381223297040,"RT @aggonzalez03: can we just reflect on the fact that there are people buying fake vaccination cards so they can travel, but they’re the s…","<a href=""https://mobile.twitter.com"" rel=""nofollow"">Twitter Web App</a>",False,,,,,...,0,0,0,"([(aggonzalez03,)], [], None, [], [])",False,False,,low,en,2021-12-09
9,2021-12-09 17:25:34,1468980149655941120,1468980149655941120,"RT @aggonzalez03: can we just reflect on the fact that there are people buying fake vaccination cards so they can travel, but they’re the s…","<a href=""http://twitter.com/download/iphone"" rel=""nofollow"">Twitter for iPhone</a>",False,,,,,...,0,0,0,"([(aggonzalez03,)], [], None, [], [])",False,False,,low,en,2021-12-09


In [154]:
df = (tweets
          .groupBy("text")
          .agg(max("retweeted_status.favorite_count").alias("favorite_count"))
          .orderBy(desc("favorite_count"))
          .limit(10))
df.toPandas()

Unnamed: 0,text,favorite_count
0,"RT @SamIAm2021MD: I am blown away by this COVID vaccine video, one of the coolest things I have seen in a long time https://t.co/yREIETqtWh",504255
1,RT @ChelsIsRight: Not the Impossible Vaccine,171718
2,RT @TheKaranMenon: The Omnicron variant is just another example of how COVID will be prolonged if rich nations and Big Pharma keep blocking…,118112
3,"RT @aggonzalez03: can we just reflect on the fact that there are people buying fake vaccination cards so they can travel, but they’re the s…",98704
4,RT @abbygov: finance bros are so far removed from reality and like... ethics. i was venting about the US not lifting vaccine patents earlie…,79930
5,"RT @RealCandaceO: Dear @innoutburger and @ChickfilA, \nThank you for serving the unvaccinated AND the vaccinated. \nBusiness without discrim…",75470
6,"RT @NBSaphierMD: I’m not sure who needs to hear this but, opposing vaccine mandates is not equivalent to opposing vaccines.",61020
7,RT @POTUS: The best protection against Omicron is simple: Get fully vaccinated. Get a booster shot.,60990
8,"RT @narendramodi: Every Indian would be proud of today’s record vaccination numbers. \n\nI acknowledge our doctors, innovators, administrator…",55577
9,"RT @GNev2: He just needed to apologise.Instead he gave us more lies, threw his team under a bus,vaccine passports,introduced the potential…",55317


In [176]:
df = (tweets
          .groupBy("retweeted_status.user.url")
          .agg(max("retweeted_status.favorite_count").alias("favorite_count"))
          .orderBy(desc("favorite_count"))
          .limit(10))
df.toPandas()

Unnamed: 0,url,favorite_count
0,,


In [124]:
tweets.select("user.id").distinct().count()

27558

In [203]:
!pip install textblob
!pip install tweepy

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [206]:
# Import Libraries
from textblob import TextBlob
import sys
import tweepy
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os
import nltk
#import pycountry
import re
import string
import nltk
nltk.download('vader_lexicon')

#from wordcloud import WordCloud, STOPWORDS
from PIL import Image
from nltk.sentiment.vader import SentimentIntensityAnalyzer
#from langdetect import detect
from nltk.stem import SnowballStemmer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
#from sklearn.feature_extraction.text import CountVectorizer

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/osbdet/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [213]:
#Sentiment Analysis
def percentage(part,whole):
     return 100 * float(part)/float(whole)
#keyword = input(Bitcoin)
#noOfTweet = int(input (10))

tweets1 = df.collect()
positive = 0
negative = 0
neutral = 0
polarity = 0
tweet_list = []
neutral_list = []
negative_list = []
positive_list = []

for tweet in tweets1:
 #print(tweet.text)
    tweet_list.append(tweet.text)
    analysis = TextBlob(tweet.text)
    score = SentimentIntensityAnalyzer().polarity_scores(tweet.text)
    neg = score["neg"]
    neu = score["neu"]
    pos = score["pos"]
    comp = score["compound"]
    polarity += analysis.sentiment.polarity
 
    if neg > pos:
        negative_list.append(tweet.text)
        negative += 1
    elif pos > neg:
        positive_list.append(tweet.text)
        positive += 1
    elif pos == neg:
        neutral_list.append(tweet.text)
        neutral += 1

positive = percentage(positive, 1736)
negative = percentage(negative, 1736)
neutral = percentage(neutral, 1736)
polarity = percentage(polarity, 1736)
positive = format(positive, ".1f")
negative = format(negative, '.1f')
neutral = format(neutral, '.1f')

tweet_list = pd.DataFrame(tweet_list)
neutral_list = pd.DataFrame(neutral_list)
negative_list = pd.DataFrame(negative_list)
positive_list = pd.DataFrame(positive_list)
print("total number: ",len(tweet_list))
print("positive number: ",len(positive_list))
print("negative number: ", len(negative_list))
print("neutral number: ",len(neutral_list))

total number:  33407
positive number:  9212
negative number:  9591
neutral number:  14604


**Total number of users with geolocation enabled**<br/>
``` sql
select count(distinct user.id)
from tweets
where user.geo_enabled = true
``` 

In [223]:
tweets.where("user.geo_enabled=true").select("user.id").distinct().count()

8394

In [214]:
! pip install igraph
! pip install networkx
! pip  install karateclub

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [215]:
! pip install xgboost

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [216]:
import igraph as ig
import networkx
import json
import csv
import ast
from operator import itemgetter
import re
from karateclub import Graph2Vec
import xgboost

In [234]:
import sys
import os
import re
import tweepy
from tweepy import OAuthHandler
from textblob import TextBlob
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from IPython.display import clear_output
import matplotlib.pyplot as plt 
%matplotlib inline

In [None]:
df = (tweets
          .select("*")
          )

In [241]:
tweet_lst = []
#geoc=":,:,1000mi"
for tweet1 in df.collect():
    tweetDate = tweet1.created_at.date()
    if(tweet1.coordinates != None):
        tweet_lst.append([tweetDate,tweet1.id,tweet1.coordinates['coordinates'][0],tweet1.coordinates['coordinates'][1],tweet1.user.screen_name,tweet1.user.name,tweet.text,tweet1.user.geo_enabled])

tweet_df = pd.DataFrame(tweet_lst, columns=['tweet_dt', 'id', 'lat','long','username', 'name', 'tweet','geo'])
tweet_df

Unnamed: 0,tweet_dt,id,lat,long,username,name,tweet,geo
0,2021-12-09,1468932972342648834,-86.271622,12.148413,jemar_wafo,reyjemar villaber,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True
1,2021-12-09,1468933453970386951,-86.271622,12.148413,SamSimon8,SamSimon,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True
2,2021-12-09,1468933960927518721,-86.271622,12.148413,SamSimon8,SamSimon,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True
3,2021-12-09,1468934153597079553,-86.271622,12.148413,vanit483,VANIT,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True
4,2021-12-09,1468934598100930565,-86.271622,12.148413,spyder246,SPYDER,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True
5,2021-12-09,1468901801122353155,-92.28801,34.745972,HiPeople_21,HiPeople,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True
6,2021-12-08,1468550568478068736,-74.000763,40.720757,JuniousSmithIII,JS3,RT @cricketwyvern: Vaccine passports: the policy that more than any other has demonstrably failed to prevent infection surges in country af…,True


**Total number of tweets per language**<br/>
``` sql
select lang,count(*) as total
from tweets
group by lang
``` 

In [13]:
from pyspark.sql.functions import *

df = (tweets
      .groupBy("lang")
      .agg(count("*").alias("total")))
      
df.toPandas()

Unnamed: 0,lang,total
0,en,26903
1,de,408
2,es,3088
3,zh,115
4,fr,2893


**Top 10 users with more tweets posted**<br/>
``` sql
select user.screen_name, max(user.statuses_count) tweets_posted 
from tweets
group by user.screen_name
order by tweets_posted desc
limit 10
```


In [18]:
df = (tweets
          .groupBy("user.screen_name")
          .agg(max("user.statuses_count").alias("tweets_posted"))
          .orderBy(desc("tweets_posted"))
          .limit(10))
df.toPandas()

Unnamed: 0,screen_name,tweets_posted
0,CaraotaDigital,6826888
1,la_patilla,6053684
2,TomthunkitsMind,3069786
3,sectest9,2525955
4,filafresh,2425011
5,eazeee2004,1867318
6,robinsnewswire,1852335
7,stephenoflyf,1850502
8,chidambara09,1763815
9,Afropages,1722637


**Top 10 users with more followers**<br/>
``` sql
select user.screen_name, max(user.followers_count) follower_count 
from tweets
group by user.screen_name
order by followers_count desc
limit 10
```


In [20]:
df = (tweets
          .groupBy("user.screen_name")
          .agg(max("user.followers_count").alias("followers_count"))
          .orderBy(desc("followers_count"))
          .limit(10))
df.toPandas()

Unnamed: 0,screen_name,followers_count
0,washingtonpost,18516190
1,ndtv,16423882
2,BBCNews,12955319
3,el_pais,8218374
4,business,7742074
5,la_patilla,7216411
6,hrw,4817772
7,FraseSimple,4467796
8,CNBC,4422445
9,TODAYshow,4202788


**Top 10 users with more mentions**<br/>
``` sql
select lower(user_mention) as user_mention, count(*) as mentions
from tweets lateral view explode(entities.user_mentions.screen_name) u as user_mention
group by lower(user_mention)
order by mentions desc
limit 10
```

In [21]:
df = (tweets
          .select(explode("entities.user_mentions.screen_name").alias("user"))
          .groupBy(lower("user"))
          .agg(count("*").alias("mentions"))
          .orderBy(desc("mentions"))
          .limit(10))
df.toPandas()

Unnamed: 0,lower(user),mentions
0,essexpr,469
1,josplaysenpai,381
2,rwmalonemd,279
3,spectatorindex,261
4,gnev2,243
5,disclosetv,213
6,dehennadavison,209
7,prisonplanet,194
8,chelsisright,191
9,simonjamesjupp,191


**Top 10 more popular hashtags**<br/>
``` sql
select lower(hashtag) as hashtag, count(*) as total
from tweets lateral view explode(entities.hashtags.text) h as hashtag
group by lower(hashtag)
order by total desc
limit 10
```

In [43]:
df = (tweets
      .select(explode("entities.hashtags.text").alias("hashtag"))
      .agg(count("*").alias("total"))
      .orderBy(desc("total"))
      .limit(10))
      
df.toPandas()

# to normalize (upper & lower case version of the same hashtag)
#.groupBy(lower("hashtag").alias("hashtag"))

Unnamed: 0,col
0,Munich
1,Covid19
2,plague
3,Government
4,AbogadoPorLaVerdad
...,...
4532,research
4533,commercialization
4534,DirectAN
4535,QAG


**Top 10 more popular cashtags**<br/>
``` sql
select lower(hashtag) as hashtag, count(*) as total
from tweets lateral view explode(entities.symbols.text) h as hashtag
group by lower(hashtag)
order by total desc
limit 10
```

In [24]:
df = (tweets
    .select(explode("entities.symbols.text").alias("cashtag"))
    .groupBy(upper("cashtag").alias("cashtag"))
    .agg(count("*").alias("total"))\
    .orderBy(desc("total"))
    .limit(10))
    
df.toPandas()

Unnamed: 0,cashtag,total
0,PFE,89
1,BNTX,24
2,NVAX,10
3,SPY,5
4,OCGN,4
5,ES_F,3
6,SPX,3
7,BTNX,2
8,QQQ,2
9,NRXP,2


**Average number of words per tweet**<br/>
``` sql
select avg(size(split(text, ' '))) as avg_words
from tweets
```

In [53]:
tweets.select(avg(size(split("text", " "))).alias("avg_words")).toPandas()

Unnamed: 0,avg_words
0,18.752597


**Max and average number of hashtags**<br/>
``` sql
select max(size(entities.hashtags)) as max,
	   avg(size(entities.hashtags)) as average
from tweets
```

In [55]:
(tweets.select(
            max(size("entities.hashtags")).alias("max"),
            avg(size("entities.hashtags")).alias("average")
)).toPandas()

Unnamed: 0,max,average
0,11,0.13581


You have to install emojis library <br/>
Open a terminal and execute
```sh
pip3 install emojis
```

**Top 20 more popular emojis**<br/>

```sql
select emoji, count(*) as total
from tweets lateral view explode(get_emojis_udf(text)) e as emoji
group by emoji
order by total desc
limit 20
```

In [54]:
from pyspark.sql.functions import udf

import emojis

@udf("array<string>")
def get_emojis_udf(s):
    set = emojis.get(s)
    return [*set, ]

tweets.select(explode(get_emojis_udf("text")).alias("emoji"))\
      .groupBy("emoji").agg(count("*").alias("total")).orderBy(desc("total")).limit(20)\
      .toPandas()

                                                                                

Unnamed: 0,emoji,total
0,🚨,216
1,💉,207
2,👇,146
3,😂,104
4,💪,99
5,👏,82
6,🤔,74
7,🔥,69
8,🤡,67
9,😡,64


<a id='3'></a>
## 3. Tear Down

Once we complete the the lab we can stop all the services

<a id='3.1'></a>
### 3.1 Stop Hadoop

Stop Hadoop

Open a terminal and execute
```sh
hadoop-stop.sh
```