## Configuration

Uncomment required one:

In [180]:
APP_NAME = 'Twitter Stat'
# Standalone version:
SPARK_ADDRESS = "local[*]"
RAW_DATA_PATH = '../data/raw_data.csv'
TMP_DIR_PATH = '../data/'
OUTPUT_PATH = '../data/USER_SENT_TOPIC.parquet'

# Multi-node version:
# SPARK_ADDRESS = "spark://192.168.13.133:7077"
# HDFS_ADDRESS = "hdfs://192.168.13.133:8020/"
# os.environ["HADOOP_CONF_DIR"] = "/usr/local/hadoop/conf"

# Stage 0: From SQLite to Parquet

Timur, here's tha place for your code: getting data from db and saving to csv

In [2]:
import pandas as pd
import numpy as np
import datetime, time 

Here, we are using pandas for simplification the rename process and dealing with types

In [181]:
upd_columns = {'id_str': 'id', 'screename': 'username', 'created_at': 'date_time', 
               'pic': 'picture'}
raw_data = pd.read_csv(RAW_DATA_PATH, encoding='utf-8', parse_dates=['created_at', 'creation']).drop("Unnamed: 0", axis=1)
raw_data.rename(columns=upd_columns, inplace=True)

In [182]:
raw_data.to_parquet(TMP_DIR_PATH+'tmp_data.parquet')

# Stage 1: pySpark Initialisation

Require for using pySpark in jupiter

In [5]:
import findspark
findspark.init()

### pySpark imports

In [65]:
import pyspark
from pyspark import SparkConf
# Data Aggregation:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
# Topic extraction
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector
from pyspark.sql import Row
from pyspark.sql.column import _to_java_column, _to_seq, Column
# LDA
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, IDF, HashingTF
from pyspark.ml.clustering import LDA, DistributedLDAModel
# Clustering
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [8]:
spark = SparkSession\
    .builder\
    .enableHiveSupport()\
    .master(SPARK_ADDRESS)\
    .appName(APP_NAME)\
    .getOrCreate()

# Stage 2: Work with Data in pySpark

In [9]:
data = spark\
    .read\
    .parquet(TMP_DIR_PATH+'tmp_data.parquet')\
    .drop('__index_level_0__')\
    .cache()

## 2.1 Data modification, features extraction

Calculate aggreagated stats per user based on the sentiment column from each row

In [10]:
wind = Window\
    .partitionBy(col("username"))
splitting = udf(lambda t: [(int(x.split(' : ')[0]) if x.split(' : ')[0]!='None' \
                            else int(-1)) for x in t.split(', ') if t!=-2], ArrayType(IntegerType()))

In [11]:
data = data.withColumn("id", col("id").cast(LongType()))\
            .withColumn("Positive", 
                       struct(
                        col("positive_score").alias('score'),
                        variance(col("positive_score")).over(wind).alias("var"),
                        mean(col("positive_score")).over(wind).alias("mean"),
                        max(col("positive_score")).over(wind).alias("max"),
                        min(col("positive_score")).over(wind).alias("min"),
                        count(col("positive_score")).over(wind).alias("cnt"))).drop("positive_score")\
            .withColumn("Neutral", 
                       struct(
                        col("neutral_score").alias('score'),
                        variance(col("neutral_score")).over(wind).alias("var"),
                        mean(col("neutral_score")).over(wind).alias("mean"),
                        max(col("neutral_score")).over(wind).alias("max"),
                        min(col("neutral_score")).over(wind).alias("min"),
                        count(col("neutral_score")).over(wind).alias("cnt"))).drop("neutral_score")\
            .withColumn("Negative", 
                       struct(
                        col("negative_score").alias('score'),
                        variance(col("negative_score")).over(wind).alias("var"),
                        mean(col("negative_score")).over(wind).alias("mean"),
                        max(col("negative_score")).over(wind).alias("max"),
                        min(col("negative_score")).over(wind).alias("min"),
                        count(col("negative_score")).over(wind).alias("cnt"))).drop("negative_score")\
            .na.fill({'retweets_id': -2, 'likes_id': -2})\
                    .withColumn('retweeted_by', splitting('retweets_id'))\
                    .withColumn('liked_by', splitting('likes_id')).drop('retweets_id', 'likes_id')

## 2.2 Topic Extraction

In [12]:
import re
try: 
    from nltk.corpus import stopwords
except:
    import nltk
    print("Download NLTK data...")
    nltk.download("stopwords")
    from nltk.corpus import stopwords

### Functions

In [13]:
import pymystem3
m = pymystem3.Mystem()

stop = ['.', '"', ',', '!', 
        '?', ';', ':', '-', '&', '—', "'"]
stop_words = stopwords.words("english")
service_tags = ["<tag>", "<reply>", "<num>", "<url>", "<smile>", "<sad>"]

def merge_text(raw):
    """Merging text from all tweets"""
    text = ' '.join(raw)
    return text    
merge_text_udf = udf(merge_text, StringType())

def separate(text):
    text = re.sub(r'[^\x00-\x7f]', r'', text)
    text = [word for word in text.split(' ') if word != '']
    return text
separate_udf = udf(separate, ArrayType(StringType()))

def lemmatize(word):
    if word in service_tags:
        return word
    else:
        return m.lemmatize(word)[0]

def clean_word(word):
    if word.isalpha():
        return word
    elif word in service_tags:
        return word
    else:
        return ''
    
def process_punct(text):
    for char in stop:
        text = text.replace(char, ' ')
    return text

def process_text(text):
    text = text.lower().replace("\n", " ")
    text = re.sub(r"\[\S+\]", " <reply>", text)
    text = re.sub(r"https?:\/\/\S+\b|www\.(\w+\.)+\S*", "<url>", text)
    text = re.sub(r"#(\w+)", " <tag>", text)
    text = re.sub("\d+", " <num>", text)
    text = re.sub(r"\(+", " <sad>", text)
    text = re.sub(r"\)+", " <smile>", text)
    text = process_punct(text)   
    tokens = text.split(" ")
    tokens = [token for token in tokens if token != '']
    tokens = [lemmatize(x) for x in tokens]
    tokens = [token for token in tokens if token not in stop_words]
    tokens = [clean_word(token) for token in tokens]
    return " ".join(tokens).strip()

process_text_udf = udf(lambda x: process_text(x), StringType())

### LDA

#### Configuration of LDA

In [14]:
num_topics = 10
max_iterations = 30
vocab_size = 5000
min_tf = 5
    
lda_model_path = TMP_DIR_PATH + "topics/lda.model"
lda_topics_path = TMP_DIR_PATH + "topics/lda_topics"
lda_transformed_docs_path = TMP_DIR_PATH + "topics/lda_transformed_docs"

#### Custom functions

In [15]:
def train_lda(df, num_topics, max_iterations):
    lda = LDA(k=num_topics, seed=123, optimizer="em",
              maxIter=max_iterations, featuresCol="features")
    lda_model = lda.fit(df)
    return lda_model

def write_topics_df(lda_model, cv_model, num_topics, lda_topics_path):
    print("Building vocabulary")
    vocabulary = cv_model.vocabulary
    idx_to_text = lambda idx: [vocabulary[int(x)] for x in idx]
    idx_to_text_udf = udf(idx_to_text, ArrayType(StringType()))
    
    terms_count = len(vocabulary)
    
    topics = lda_model\
        .describeTopics(terms_count)\
        .withColumn("terms", idx_to_text_udf("termIndices"))\
        .select("topic", "terms", "termIndices", "termWeights")
    topics.write\
        .parquet(lda_topics_path, mode='overwrite')

def perform_topic_modelling(posts, num_topics, max_iterations, 
                            lda_model_path, lda_topics_path, lda_transformed_docs_path,
                            text_col="text"):
    posts_aggregated = posts
    
    print("Training LDA model")
    lda_model = train_lda(posts_aggregated, num_topics, max_iterations)
    lda_model.write().overwrite()\
        .save(lda_model_path)
    
    print("Writing topics")
    write_topics_df(lda_model, cv_model, num_topics, lda_topics_path)
    
    print("Inferring topics mixture for individual posts")
    posts_transformed = lda_model.transform(posts_aggregated)
    
    main_topic_idx = lambda l: int(np.argmax([float(x) for x in l]))
    main_topic_idx_udf = udf(main_topic_idx, IntegerType())
    
    posts_transformed = posts_transformed\
        .withColumn("main_topic", main_topic_idx_udf("topicDistribution"))\
        
    posts_transformed\
        .write\
        .parquet(lda_transformed_docs_path, mode='overwrite')

#### Temp RDD for LDA:

In [43]:
data4topic = data.select('username', 'text')\
    .groupBy('username')\
    .agg(collect_list("text").alias('raw_text'))\
    .withColumn('all_text', merge_text_udf('raw_text'))\
    .withColumn("processed_text", process_text_udf('all_text'))\
    .withColumn('user_tokens', separate_udf('processed_text'))\
    .cache()

#### Model

In [35]:
cv = CountVectorizer(inputCol="user_tokens", 
                     outputCol="raw_features", 
                     vocabSize=vocab_size, 
                     minTF=min_tf)
cv_model = cv.fit(data4topic)

### Transformation

In [37]:
trasformdUserSlicesDf = cv_model.transform(data4topic)

In [38]:
a = SparseVector(vocab_size, [])
def filt(vector):
    if vector == a:
        return 0
    else:
        return 1
www = udf(filt, IntegerType())

In [41]:
trasformdUserSlicesDf = trasformdUserSlicesDf\
    .withColumn('tmp', www('raw_features'))
trasformdUserSlicesDf = trasformdUserSlicesDf\
    .filter(trasformdUserSlicesDf.tmp == 1)

In [42]:
print("Applying tf-idf")
idf = IDF(inputCol="raw_features", outputCol="features", minDocFreq=1)
idf_model = idf\
    .fit(trasformdUserSlicesDf)
posts_aggregated = idf_model\
    .transform(trasformdUserSlicesDf.drop('features'))

Applying tf-idf


In [44]:
def as_vector(col):
    f = spark._jvm.com.example.spark.udfs.udfs.as_vector()
    return Column(f.apply(_to_seq(spark, [col], _to_java_column)))

### Perform modelling for topics

In [None]:
perform_topic_modelling(posts_aggregated,
    num_topics, 
    max_iterations, 
    lda_model_path, 
    lda_topics_path, 
    lda_transformed_docs_path)

### Join Username and his topics

User_Topic RDD here:

In [62]:
data4topic.unpersist()
user_topic = spark.read.parquet(lda_transformed_docs_path)\
    .withColumnRenamed('screename', 'username')\
    .join(spark.read.parquet(lda_topics_path), col('main_topic')==col('topic'))\
    .select(col('username'), col('topic'), col('terms'))\
    .cache()

## 2.3 Clustering

Specification of features for further clustering

In [89]:
sentiments = ['Positive', 'Negative', 'Neutral']
feats = ['mean', 'var']

In [113]:
features = [s+'_'+f for s in sentiments for f in feats]

Tmp RDD for clustering

In [149]:
vecAssembler = VectorAssembler(inputCols=features, outputCol="features")   
data4cluster = vecAssembler.transform(data\
    .dropDuplicates(['username'])\
    .select(*[col(s)[f].alias(s+'_'+f) for s in sentiments for f in feats], 'username'))\
    .dropna(subset=features)

### K-Means here

In [131]:
# 2 clusters here
kmeans = KMeans(k=15, seed=1)\
    .setFeaturesCol('features')
model = kmeans\
    .fit(data4cluster\
         .dropna(subset=features))

RDD with User->Cluster information

In [150]:
user_cluster = model\
    .transform(data4cluster\
         .dropna(subset=features))\
    .drop(*features, 'features')\
    .cache()

## 2.4 Joining all in one

In [162]:
user_cluster.printSchema()
user_topic.printSchema()

root
 |-- username: string (nullable = true)
 |-- prediction: integer (nullable = false)

root
 |-- username: string (nullable = true)
 |-- topic: integer (nullable = true)
 |-- terms: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [171]:
result = data.dropDuplicates(['username'])\
        .join(user_cluster, 'username', 'outer')\
        .join(user_topic, 'username', 'outer')\
        .cache()

In [173]:
result.dropna(subset=['prediction', 'topic']).count()

594

### Export to parquet

In [179]:
result\
    .write.mode("overwrite")\
    .parquet(OUTPUT_PATH)

## ////////\\\\\\\\\\\\\\

# Stage 3: Visualisation etc.

## Length of tweets

In [None]:
_sizes2 = data.filter(col("len").isNotNull()).filter(col("date_time")>datetime.datetime(2017, 11, 7, 0, 0, 0)).select(col("len")).collect()

In [None]:
_sizes_old = data.filter(col("len").isNotNull()).filter(col("date_time")<datetime.datetime(2017, 11, 1, 0, 0, 0)).select(col("len")).collect()

## Histograms

In [None]:
plot_Hist(_sizes_old, color='#00aced', save=True)

In [None]:
plot_Hist(_sizes2, color='#00aced', save=True, title='280_limit')