<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

# Sections
* [Description](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Lab](#2)
  * [2.1 Check Twitter Files](#2.1)
  * [2.2 Create the DataFrame](#2.3)
  * [2.3 Perform Analytics](#2.3)
* [3. TearDown](#3)
  * [3.1 Stop Hadoop](#3.1)

<a id='0'></a>
## Description
<p>
<div>The goals for this lab are:</div>
<ul>    
    <li>Get familiar with Spark DataFrames API</li>
    <li>Apply some transformations using Spark DataFrames API</li>
</ul>    
</p>

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop

Open a terminal and execute
```sh
hadoop-start.sh
```

<a id='1.2'></a>
### 1.2 Search for Spark Installation 
This step is required just because we are working in the course environment.

In [11]:
import findspark
findspark.init()

I'm changing pandas max column width property to improve data displaying

In [12]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.3'></a>
### 1.3 Create SparkSession

By setting this environment variable we can include extra libraries in our Spark cluster

In [13]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'

The first thing always is to create the SparkSession

In [18]:
    from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    SparkSession.builder.config(conf=SparkConf())

<pyspark.sql.session.SparkSession.Builder at 0x7f50d04d2da0>

In [19]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Twitter - Analytics - DataFrames")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .config("spark.sql.legacy.timeParserPolicy","LEGACY")
    .enableHiveSupport()
    .getOrCreate())

Py4JError: org.apache.spark.api.python.PythonUtils.getPythonAuthSocketTimeout does not exist in the JVM

<a id='2'></a>
## 2. Lab

<a id='2.1'></a>
### 2.1 Check Twitter Files

In order to complete this lab you need to previosly complete **'Twitter - RAW to STD - DataFrames'**.

Check you have the data ready in HDFS

http://localhost:50070/explorer.html#/datalake/std/twitter/bitcoin/

<a id='2.2'></a>
### 2.2 Create the DataFrame

The first step after creating the SparkSession is to create one or more DataFrames<br/>
The data in the std layer is often stored in advanced storage formats like **parquet** or **delta**.<br/>
These formats have the schema of the data embedded inside the file

In [15]:
tweets = (spark.read
               .parquet("hdfs://localhost:9000/datalake/std/twitter/xbox/"))

NameError: name 'spark' is not defined

<a id='2.3'></a>
### 2.3 Perform Analytics

**Total number of tweets**<br/>


``` sql
select count(*)
from tweets
``` 

In [16]:
tweets.count()

NameError: name 'tweets' is not defined

**Total number of distinct users**<br/>
``` sql
select count(distinct user.id)
from tweets
``` 

In [None]:
tweets.select("user.id").distinct().count()

**Total number of users with geolocation enabled**<br/>
``` sql
select count(distinct user.id)
from tweets
where user.geo_enabled = true
``` 

In [None]:
tweets.where("user.geo_enabled=true").select("user.id").distinct().count()

**Total number of tweets per language**<br/>
``` sql
select lang,count(*) as total
from tweets
group by lang
``` 

In [None]:
from pyspark.sql.functions import *

df = (tweets
      .groupBy("lang")
      .agg(count("*").alias("total")))
      
df.toPandas()

**Top 10 users with more tweets posted**<br/>
``` sql
select user.screen_name, max(user.statuses_count) tweets_posted 
from tweets
group by user.screen_name
order by tweets_posted desc
limit 10
```


In [None]:
df = (tweets
          .groupBy("user.screen_name")
          .agg(max("user.statuses_count").alias("tweets_posted"))
          .orderBy(desc("tweets_posted"))
          .limit(10))
df.toPandas()

**Top 10 users with more followers**<br/>
``` sql
select user.screen_name, max(user.followers_count) follower_count 
from tweets
group by user.screen_name
order by followers_count desc
limit 10
```


In [None]:
df = (tweets
          .groupBy("user.screen_name")
          .agg(max("user.followers_count").alias("followers_count"))
          .orderBy(desc("followers_count"))
          .limit(10))
df.toPandas()

**Top 10 users with more mentions**<br/>
``` sql
select lower(user_mention) as user_mention, count(*) as mentions
from tweets lateral view explode(entities.user_mentions.screen_name) u as user_mention
group by lower(user_mention)
order by mentions desc
limit 10
```

In [None]:
df = (tweets
          .select(explode("entities.user_mentions.screen_name").alias("user"))
          .groupBy(lower("user"))
          .agg(count("*").alias("mentions"))
          .orderBy(desc("mentions"))
          .limit(10))
df.toPandas()

**Top 10 more popular hashtags**<br/>
``` sql
select lower(hashtag) as hashtag, count(*) as total
from tweets lateral view explode(entities.hashtags.text) h as hashtag
group by lower(hashtag)
order by total desc
limit 10
```

In [None]:
df = (tweets
      .select(explode("entities.hashtags.text").alias("hashtag"))
      .groupBy("hashtag")
      .agg(count("*").alias("total"))
      .orderBy(desc("total"))
      .limit(10))
      
df.toPandas()

# to normalize (upper & lower case version of the same hashtag)
#.groupBy(lower("hashtag").alias("hashtag"))

**Top 10 more popular cashtags**<br/>
``` sql
select lower(hashtag) as hashtag, count(*) as total
from tweets lateral view explode(entities.symbols.text) h as hashtag
group by lower(hashtag)
order by total desc
limit 10
```

In [None]:
df = (tweets
    .select(explode("entities.symbols.text").alias("cashtag"))
    .groupBy(upper("cashtag").alias("cashtag"))
    .agg(count("*").alias("total"))\
    .orderBy(desc("total"))
    .limit(10))
    
df.toPandas()

**Average number of words per tweet**<br/>
``` sql
select avg(size(split(text, ' '))) as avg_words
from tweets
```

In [None]:
tweets.select(avg(size(split("text", " "))).alias("avg_words")).toPandas()

**Max and average number of hashtags**<br/>
``` sql
select max(size(entities.hashtags)) as max,
	   avg(size(entities.hashtags)) as average
from tweets
```

In [None]:
(tweets.select(
            max(size("entities.hashtags")).alias("max"),
            avg(size("entities.hashtags")).alias("average")
)).toPandas()

You have to install emojis library <br/>
Open a terminal and execute
```sh
pip3 install emojis
```

**Top 20 more popular emojis**<br/>

```sql
select emoji, count(*) as total
from tweets lateral view explode(get_emojis_udf(text)) e as emoji
group by emoji
order by total desc
limit 20
```

In [None]:
from pyspark.sql.functions import udf

import emojis

@udf("array<string>")
def get_emojis_udf(s):
    set = emojis.get(s)
    return [*set, ]

tweets.select(explode(get_emojis_udf("text")).alias("emoji"))\
      .groupBy("emoji").agg(count("*").alias("total")).orderBy(desc("total")).limit(20)\
      .toPandas()

In [None]:
#Top geo locations

df = (tweets
      .where("user.geo_enabled=true")
      .select("place.country")
      .distinct())
df.toPandas()



In [None]:
df = (tweets
          .groupBy("place.country")
          .agg(max("user.statuses_count").alias("tweets_posted"))
          .orderBy(desc("tweets_posted"))
          .limit(10))
df.toPandas()

<a id='3'></a>
## 3. Tear Down

Once we complete the the lab we can stop all the services

<a id='3.1'></a>
### 3.1 Stop Hadoop

Stop Hadoop

Open a terminal and execute
```sh
hadoop-stop.sh
```