<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Sections
* [Description](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Lab](#2)
  * [2.1 Check Twitter Files](#2.1)
  * [2.2 Create the DataFrame](#2.3)
  * [2.3 Perform Analytics](#2.3)
* [3. TearDown](#3)
  * [3.1 Stop Hadoop](#3.1)

<a id='0'></a>
## Description
<p>
<div>The goals for this lab are:</div>
<ul>    
    <li>Get familiar with Spark DataFrames API</li>
    <li>Apply some transformations using Spark DataFrames API</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop

Open a terminal and execute
```sh
hadoop-start.sh
```

<a id='1.2'></a>
### 1.2 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [1]:
import findspark
findspark.init()

I'm changing pandas max column width property to improve data displaying

In [2]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.3'></a>
### 1.3 Create SparkSession

By setting this environment variable we can include extra libraries in our Spark cluster

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'

The first thing always is to create the SparkSession

In [4]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Twitter - Analytics - DataFrames")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .config("spark.sql.legacy.timeParserPolicy","LEGACY")
    .enableHiveSupport()
    .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<a id='2'></a>
## 2. Lab

<a id='2.1'></a>
### 2.1 Check Twitter Files

In order to complete this lab you need to previosly complete **'Twitter - RAW to STD - DataFrames'**.

Check you have the data ready in HDFS

http://localhost:50070/explorer.html#/datalake/std/twitter/bitcoin/

<a id='2.2'></a>
### 2.2 Create the DataFrame

The first step after creating the SparkSession is to create one or more DataFrames<br/>
The data in the std layer is often stored in advanced storage formats like **parquet** or **delta**.<br/>
These formats have the schema of the data embedded inside the file

In [5]:
tweets = (spark.read
               .parquet("hdfs://localhost:9000/datalake/std/twitter/bitcoin/"))

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

<a id='2.3'></a>
### 2.3 Perform Analytics

**Total number of tweets**<br/>


``` sql
select count(*)
from tweets
``` 

In [6]:
tweets.count()

10381

**Total number of distinct users**<br/>
``` sql
select count(distinct user.id)
from tweets
``` 

In [7]:
tweets.select("user.id").distinct().count()

                                                                                

6878

**Total number of users with geolocation enabled**<br/>
``` sql
select count(distinct user.id)
from tweets
where user.geo_enabled = true
``` 

In [8]:
tweets.where("user.geo_enabled=true").select("user.id").distinct().count()

                                                                                

971

**Total number of tweets per language**<br/>
``` sql
select lang,count(*) as total
from tweets
group by lang
``` 

In [9]:
from pyspark.sql.functions import *

df = (tweets
      .groupBy("lang")
      .agg(count("*").alias("total")))
      
df.toPandas()



Unnamed: 0,lang,total
0,en,10381


**Top 10 users with more tweets posted**<br/>
``` sql
select user.screen_name, max(user.statuses_count) tweets_posted 
from tweets
group by user.screen_name
order by tweets_posted desc
limit 10
```


In [10]:
df = (tweets
          .groupBy("user.screen_name")
          .agg(max("user.statuses_count").alias("tweets_posted"))
          .orderBy(desc("tweets_posted"))
          .limit(10))
df.toPandas()



Unnamed: 0,screen_name,tweets_posted
0,PulpNews,3958214
1,sectest9,2502247
2,AlertTrade,2465699
3,CryptoTraderPro,2359487
4,TraderMacoCosta,1686037
5,Gambiste1,1502833
6,bmurphypointman,1445443
7,xaelbot,1349762
8,codedailybot,1268103
9,bitcoinagile,1201491


**Top 10 users with more followers**<br/>
``` sql
select user.screen_name, max(user.followers_count) follower_count 
from tweets
group by user.screen_name
order by followers_count desc
limit 10
```


In [11]:
df = (tweets
          .groupBy("user.screen_name")
          .agg(max("user.followers_count").alias("followers_count"))
          .orderBy(desc("followers_count"))
          .limit(10))
df.toPandas()

Unnamed: 0,screen_name,followers_count
0,BitcoinMagazine,1663318
1,Cointelegraph,1413819
2,zerohedge,1116343
3,Scaramucci,1015644
4,TheMoonCarl,791445
5,davidgokhshtein,523384
6,Blowjobnl,412416
7,pascalguyon,391717
8,intocryptoverse,343527
9,WClementeIII,338628


**Top 10 users with more mentions**<br/>
``` sql
select lower(user_mention) as user_mention, count(*) as mentions
from tweets lateral view explode(entities.user_mentions.screen_name) u as user_mention
group by lower(user_mention)
order by mentions desc
limit 10
```

In [12]:
df = (tweets
          .select(explode("entities.user_mentions.screen_name").alias("user"))
          .groupBy(lower("user"))
          .agg(count("*").alias("mentions"))
          .orderBy(desc("mentions"))
          .limit(10))
df.toPandas()



Unnamed: 0,lower(user),mentions
0,airdropdet,449
1,binance,358
2,airdropinspect,350
3,airdropstario,325
4,nayibbukele,294
5,elonmusk,226
6,bitcoinmagazine,207
7,themooncarl,154
8,fegtoken,139
9,billym2k,137


**Top 10 more popular hashtags**<br/>
``` sql
select lower(hashtag) as hashtag, count(*) as total
from tweets lateral view explode(entities.hashtags.text) h as hashtag
group by lower(hashtag)
order by total desc
limit 10
```

In [13]:
df = (tweets
      .select(explode("entities.hashtags.text").alias("hashtag"))
      .groupBy("hashtag")
      .agg(count("*").alias("total"))
      .orderBy(desc("total"))
      .limit(10))
      
df.toPandas()

# to normalize (upper & lower case version of the same hashtag)
#.groupBy(lower("hashtag").alias("hashtag"))

Unnamed: 0,hashtag,total
0,Bitcoin,1529
1,BTC,561
2,Airdrop,453
3,bitcoin,428
4,btc,321
5,Crypto,251
6,ETH,242
7,Ethereum,223
8,doge,194
9,SHIB,190


**Top 10 more popular cashtags**<br/>
``` sql
select lower(hashtag) as hashtag, count(*) as total
from tweets lateral view explode(entities.symbols.text) h as hashtag
group by lower(hashtag)
order by total desc
limit 10
```

In [14]:
df = (tweets
    .select(explode("entities.symbols.text").alias("cashtag"))
    .groupBy(upper("cashtag").alias("cashtag"))
    .agg(count("*").alias("total"))\
    .orderBy(desc("total"))
    .limit(10))
    
df.toPandas()

Unnamed: 0,cashtag,total
0,BTC,708
1,SHIB,488
2,ETH,383
3,DOGE,207
4,DINGER,100
5,SOL,91
6,FLOKI,79
7,BNB,78
8,ADA,68
9,XRP,55


**Average number of words per tweet**<br/>
``` sql
select avg(size(split(text, ' '))) as avg_words
from tweets
```

In [15]:
tweets.select(avg(size(split("text", " "))).alias("avg_words")).toPandas()

Unnamed: 0,avg_words
0,18.10683


**Max and average number of hashtags**<br/>
``` sql
select max(size(entities.hashtags)) as max,
	   avg(size(entities.hashtags)) as average
from tweets
```

In [16]:
(tweets.select(
            max(size("entities.hashtags")).alias("max"),
            avg(size("entities.hashtags")).alias("average")
)).toPandas()

Unnamed: 0,max,average
0,15,1.110298


You have to install emojis library <br/>
Open a terminal and execute
```sh
pip3 install emojis
```

**Top 20 more popular emojis**<br/>

```sql
select emoji, count(*) as total
from tweets lateral view explode(get_emojis_udf(text)) e as emoji
group by emoji
order by total desc
limit 20
```

In [17]:
from pyspark.sql.functions import udf

import emojis

@udf("array<string>")
def get_emojis_udf(s):
    set = emojis.get(s)
    return [*set, ]

tweets.select(explode(get_emojis_udf("text")).alias("emoji"))\
      .groupBy("emoji").agg(count("*").alias("total")).orderBy(desc("total")).limit(20)\
      .toPandas()

                                                                                

Unnamed: 0,emoji,total
0,🚀,749
1,🔴,428
2,💲,424
3,🔍,423
4,🏆,346
5,⭐,342
6,🔥,333
7,➕,315
8,💧,299
9,🇸🇻,275


<a id='3'></a>
## 3. Tear Down

Once we complete the the lab we can stop all the services

<a id='3.1'></a>
### 3.1 Stop Hadoop

Stop Hadoop

Open a terminal and execute
```sh
hadoop-stop.sh
```