# Introduction to text analysis with PySpark using Twitter data

Objective:
_Using the Spark Python API (PySpark) in a Databricks notebook environment to perform some basic text mining._

***************************************************************************************

**PLAN**


1. Data overview
    + Tweet object
    + JSON format
    + Spark DataFrame
    + Databricks table
    
2. Data sourcing
    + (prior) Get Twitter data using the REST API
    + (prior) Aggregate tweet collection as a single JSON file
    + (prior) Upload the JSON source file to a Databricks table 
    + Create dataframe from Databricks table
    
3. Data exploration
   + Show dataframe, print schema
   + Basic sql queries
   + User tweet frequency bar graph
   + Count tweets containing a given keyword

4. Text preprocessing
   + tokenization (unigram, bigram, ...)
   + stop word removal
   + lemmatization (stemming, synonym expansion)

5. Text analysis
   + Clustering?

***************************************************************************************
**Doc & programming guides**
+ [PySpark doc](https://spark.apache.org/docs/latest/api/python/)
+ [Spark SQL programming guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
+ [Databricks doc](https://docs.databricks.com/)
+ [Twitter API overview](https://dev.twitter.com/overview/api)

**Tutorials**
+ [Databricks workshop doc](http://training.databricks.com/workshop/sparkcamp.pdf)
+ [Scala crash course](https://lintool.github.io/SparkTutorial/slides/day1_Scala_crash_course.pdf)

***************************************************************************************


***************************************************************************************

## 1. Overview of the relevant data objects and structures

#### [Tweets](https://dev.twitter.com/overview/api/tweets)

+ Relevant fields considered for this presentation: `tweet_id`, `user_id` and `text`
+ Natively in `.json` format
+ cf tweet sample

#### JSON format

+ Semi-structured data format
+ [Databricks specific JSON format requirements](https://docs.databricks.com/spark/latest/data-sources/read-json.html)
+ Python script to extract a collection of `.json` tweet files to a single `.json` file.


#### [Spark Dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)

+ A DataFrame is a distributed collection of data organized into named columns
+ Supports (domain-specific) DataFrame Operations and SQL queries


#### [Spark table](https://docs.databricks.com/user-guide/tables.html)

+ Tables are a simple way to make structured data available across an organization
+ Tables are essentially Spark dataframe that are persisted in the Hive metastore (data warehouse)
+ A user can query tables with Spark SQL or any of Apache Spark’s language APIs
+ In Databricks, tables can be created from local files using the Data Import UI (_> table > Create Table > Local file_)
+ In Databricks, supported source file formats for creating tables include `.json`, `.csv`, `.avro` and `.parquet` 


***************************************************************************************

## 2. Data sourcing

### Priors

+ Get Twitter data using the REST API
+ Aggregate tweet collection as a single JSON file
+ Upload the JSON source file to a Databricks table 

### Create dataframe from Databricks table

In [2]:
# Set table name
table_name = "faam_dataset_v4"

# Option_1: Using SQL query
tweet_df_sql = sql("SELECT * FROM {}".format(table_name))

# Option_2: Using sqlContext
tweet_df_sqlContext = sqlContext.table(table_name)

# Random sampling
#tweet_df = tweet_df.sample(False, 0.2)

# Display schema and data sample
tweet_df.printSchema()
tweet_df.show(10)

-- quick exploration: count entities

In [4]:
# Quick df facts
df_size = tweet_df.count()
num_unique_tweets = tweet_df.select('tweet_id').distinct().count()
num_unique_users = tweet_df.select('user_id').distinct().count()

print("Number of records: {}".format(df_size))
print("Number of unique tweets: {}".format(num_unique_tweets))
print("Number of unique users: {}".format(num_unique_users))

-- User tweet frequency distribution

In [6]:
# User tweet frequency distribution
user_tweet_count_df = tweet_df.groupBy("user_id").count()

# Retrieve max user tweet frequency
max_count = user_tweet_count_df.agg({"count": "max"}).collect()[0][0]


# Iterate through frequency ranges from 0 to max_count
step = 200
for threshold in range(0, max_count, step):
    
    # Determine freq of users with a tweet count in range(threshold, threshold+step)
    step_freq = user_tweet_count_df\
    .filter("count>{}".format(threshold))\
    .filter("count<{}".format(threshold+step))\
    .count()
    
    # Display bar the given tweet count bucket
    print("{threshold}{indent}{freqdots}".format(
      threshold=threshold,
      indent=' ' * (5-len(str(threshold))),
      freqdots='.' * step_freq))

### How many tweets contain a given keyword?

#### Define regex for detecting the keyword

In [8]:
import re

# Define regular expression
substr = 'lol'
regex = "(?<![a-z])({substring})(?![a-z])".format(substring=substring)

# Test regular expression
test_values = [
  ("lollipop", False),
  ("lol! ", True),
  ("...lol", True),
  ("lmao lol haha", True),
  ("/LoL/", True)
  ]

test_results = []
for test_val in test_values:
    test_result = (re.search(regex, test_val[0].lower()) is not None) ==  test_val[1]
    test_results.append(test_result)

if all(test_results):
    print("SUCCESS: the regex passed ALL the tests")
else:
    failed_test = ', '.join([str(test_index+1) for test_index, test_res in list(enumerate(test_results)) if not test_res])
    print("FAILED test# {}".format(failed_test))

#### Option1: Use an SQL query

In [10]:
# Register the DataFrame as a SQL temporary view
tweet_df.createOrReplaceTempView("tweets")

# Query temp view using SQL syntax
sql("SELECT * FROM tweets WHERE LOWER(text) RLIKE '{}'".format(regex)).count()

#### Option#2: Use RDD `filter()`

In [12]:
import re

(
tweet_df
    .select("text")
    .rdd
    .map(lambda x: unicode(x[0])) # convert row type to string type
    .filter(lambda x: re.search(regex, x.lower()) is not None) # filter records using a custom lambda function
    .count()
)

### Data preparation

We apply the following transformation to the input text data:

+ Clean strings
+ Tokenize (`String -> Array<String>`)
+ Remove stop words
+ Stem words
+ Create bigrams


#### 1. Clean text string

In [14]:
from pyspark.sql.functions import col, lower, regexp_replace, split

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "^rt ", "")
  c = regexp_replace(c, "(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  #c = split(c, "\\s+") tokenization...
  return c

clean_text_df = tweet_df.select(clean_text(col("text")).alias("text"))

clean_text_df.printSchema()
clean_text_df.show(10)

#### 2. Tokenize

In [16]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="vector")
vector_df = tokenizer.transform(clean_text_df).select("vector")

vector_df.printSchema()
vector_df.show(10)

#### 3. Remove stop words

In [18]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Display default list
stopwords[:10]

In [19]:
# Specify input/output columns
remover.setInputCol("vector")
remover.setOutputCol("vector_no_stopw")

# Transform existing dataframe with the StopWordsRemover
vector_no_stopw_df = remover.transform(vector_df).select("vector_no_stopw")

# Display
vector_no_stopw_df.printSchema()
vector_no_stopw_df.show()

#### 5. Stem tokens

In [21]:
# Import stemmer library
from nltk.stem.porter import *

# Instantiate stemmer object
stemmer = PorterStemmer()

# Quick test of the stemming function
tokens = ["thanks", "its", "proverbially", "unexpected", "running"]
for t in tokens:
  print(stemmer.stem(t))

In [22]:
# Create stemmer python function
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec

# Create user defined function for stemming with return type Array<String>
from pyspark.sql.types import *
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

# Create new df with vectors containing the stemmed tokens 
vector_stemmed_df = (
    vector_no_stopw_df
        .withColumn("vector_stemmed", stemmer_udf("vector_no_stopw"))
        .select("vector_stemmed")
  )

# Display
vector_stemmed_df.printSchema()
vector_stemmed_df.show()

### 6. Create bigrams

In [24]:
from pyspark.ml.feature import NGram

# Define NGram transformer
ngram = NGram(n=2, inputCol="vector_stemmed", outputCol="bigrams")

# Create bigram_df as a transform of unigram_df using NGram tranformer
bigrams_df = ngram.transform(vector_filtered_df)

# Display
bigrams_df.printSchema()
bigrams_df.show()

### 7. Filter out small/empty vectors

In [26]:
from pyspark.sql.functions import col, size

production_df = bigrams_df.where(size(col("bigrams")) >= 2)

# Display
production_df.printSchema()
production_df.show()