In [1]:
import os
from pyspark import SparkContext, SQLContext
import pandas as pd

from HDFS.HDFSUtil import HDFSUtil
import spacy

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

java8_location = '/usr/lib/jvm/java-8-openjdk-amd64'  # Set your own
os.environ['JAVA_HOME'] = java8_location

# Load utils and create Spark Context

In [3]:
nlp = spacy.load("en_core_web_sm")
hdfsUtil = HDFSUtil()
df, df_schema = hdfsUtil.read_file_dataframe("tweets_20-02-2020.csv")

In [4]:
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame(df, df_schema)

# Select column by name

In [5]:
result = df.select(['text']).take(5)

for i in result:
    print(i.text)

@Indounik @muddletoes @MackayIM Checking out  @MackayIM's tweets, yes you’re right, they are to be preferred as a source of information on #coronavirus
#Coronavirus is having unexpected impacts,including #tradewar : hopefully, there won’t be many more. Taiwan bans Italian pig imports in quarrel over flight ban, East Asia News &amp; Top Stories - The Straits Times https://t.co/5TUPaXQlin
... hopa #coronavirus style! 

Today with #Caesar I went to #polyclinic at the entrance we were #green dot, #greenlight to no sick people area, when suddenly Caesar makes his fantastic double #sneeze and #cough!

Hey… https://t.co/iG2cHzEz3t
Omg such a harsh condition.. how to get such a 2 biggest virus at the same time. I cannot imagine how hard it is .. #coronavirus #singapore #wuhan #hubei #china https://t.co/BBAANLmipI
#Coronavirus : South Korean sect identified as hotbed
https://t.co/TWqypZwfXq #SouthKorea


# Split sentence into token using sql function split

In [6]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import split

result = df.select(['text']).withColumn("text", split("text", "\s+")).collect()

result

[Row(text=['@Indounik', '@muddletoes', '@MackayIM', 'Checking', 'out', "@MackayIM's", 'tweets,', 'yes', 'you’re', 'right,', 'they', 'are', 'to', 'be', 'preferred', 'as', 'a', 'source', 'of', 'information', 'on', '#coronavirus']),
 Row(text=['#Coronavirus', 'is', 'having', 'unexpected', 'impacts,including', '#tradewar', ':', 'hopefully,', 'there', 'won’t', 'be', 'many', 'more.', 'Taiwan', 'bans', 'Italian', 'pig', 'imports', 'in', 'quarrel', 'over', 'flight', 'ban,', 'East', 'Asia', 'News', '&amp;', 'Top', 'Stories', '-', 'The', 'Straits', 'Times', 'https://t.co/5TUPaXQlin']),
 Row(text=['...', 'hopa', '#coronavirus', 'style!', 'Today', 'with', '#Caesar', 'I', 'went', 'to', '#polyclinic', 'at', 'the', 'entrance', 'we', 'were', '#green', 'dot,', '#greenlight', 'to', 'no', 'sick', 'people', 'area,', 'when', 'suddenly', 'Caesar', 'makes', 'his', 'fantastic', 'double', '#sneeze', 'and', '#cough!', 'Hey…', 'https://t.co/iG2cHzEz3t']),
 Row(text=['Omg', 'such', 'a', 'harsh', 'condition..', 'h

# Create user define function udf (dfu) in spark
User define function have to define return type.

In [26]:
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

def squared(x):
    return float(x * x)

squared_udf = F.udf(lambda y: squared(y), FloatType())

df.select('followers_count', squared_udf('followers_count').alias('followers_count_sqd')).show()

+---------------+-------------------+
|followers_count|followers_count_sqd|
+---------------+-------------------+
|           2720|          7398400.0|
|            645|           416025.0|
|            305|            93025.0|
|            175|            30625.0|
|           1558|          2427364.0|
|             34|             1156.0|
|            281|            78961.0|
|           1088|          1183744.0|
|           1054|          1110916.0|
|            877|           769129.0|
|            390|           152100.0|
|           1137|          1292769.0|
|            119|            14161.0|
|            324|           104976.0|
|             60|             3600.0|
|           7884|        6.2157456E7|
|           2793|          7800849.0|
|            961|           923521.0|
|            546|           298116.0|
|           1171|          1371241.0|
+---------------+-------------------+
only showing top 20 rows



# Extract sentence by sentence and extract tokens
This way of doing is slow.

In [33]:
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import spacy 

nlp = spacy.load("en_core_web_sm")

def splited(x):    
    doc = nlp(x)
    return [str(token.lemma_) for token in doc if not token.is_stop]

squared_udf = F.udf(lambda y: splited(y), StringType())

result_list = df.select('text', squared_udf('text').alias('token')).collect()
result_list

[Row(text="@Indounik @muddletoes @MackayIM Checking out  @MackayIM's tweets, yes you’re right, they are to be preferred as a source of information on #coronavirus", token='[@Indounik, @muddletoes, @MackayIM, check,  , @MackayIM, tweet, ,, yes, right, ,, prefer, source, information, #, coronavirus]'),
 Row(text='#Coronavirus is having unexpected impacts,including #tradewar : hopefully, there won’t be many more. Taiwan bans Italian pig imports in quarrel over flight ban, East Asia News &amp; Top Stories - The Straits Times https://t.co/5TUPaXQlin', token='[#, Coronavirus, have, unexpected, impact, ,, include, #, tradewar, :, hopefully, ,, will, ., Taiwan, ban, italian, pig, import, quarrel, flight, ban, ,, East, Asia, News, &, amp, ;, Stories, -, Straits, Times, https://t.co/5tupaxqlin]'),
 Row(text='... hopa #coronavirus style! \n\nToday with #Caesar I went to #polyclinic at the entrance we were #green dot, #greenlight to no sick people area, when suddenly Caesar makes his fantastic dou

# Combine all sentence into single doc and do extract once
This is much faster, notice we can adding more function and it is still faster

In [34]:
sample_text = " ".join([text.text for text in df.select(['text']).collect()])
doc = nlp(sample_text)
[str(token.lemma_) for token in doc if not token.is_stop 
 and not token.is_punct 
 and not token.is_space 
 and len(token) >= 3 
 and not token.like_url
 and token.is_alpha]

['check',
 'tweet',
 'yes',
 'right',
 'prefer',
 'source',
 'information',
 'coronavirus',
 'Coronavirus',
 'have',
 'unexpected',
 'impact',
 'include',
 'tradewar',
 'hopefully',
 'Taiwan',
 'ban',
 'italian',
 'pig',
 'import',
 'quarrel',
 'flight',
 'ban',
 'East',
 'Asia',
 'News',
 'amp',
 'Stories',
 'Straits',
 'Times',
 'hopa',
 'coronavirus',
 'style',
 'today',
 'Caesar',
 'go',
 'polyclinic',
 'entrance',
 'green',
 'dot',
 'greenlight',
 'sick',
 'people',
 'area',
 'suddenly',
 'Caesar',
 'make',
 'fantastic',
 'double',
 'sneeze',
 'cough',
 'hey',
 'Omg',
 'harsh',
 'condition',
 'big',
 'virus',
 'time',
 'imagine',
 'hard',
 'coronavirus',
 'singapore',
 'wuhan',
 'hubei',
 'china',
 'Coronavirus',
 'south',
 'korean',
 'sect',
 'identify',
 'hotbed',
 'SouthKorea',
 'prevent',
 'spread',
 'Wuhan',
 'coronavirus',
 'coronavirus',
 'China',
 'WuhanCoronavirus',
 'coronovirus',
 'WuhanPneumonia',
 'Chine',
 'coronarovirus',
 'CoronavirusOutbreak',
 'timeline',
 'chart

# Random sample from our dataframe

Random sample from our dataset will further reduce the time required for computation

In [74]:
max_row = 1000

# extract tokens
sample_text = " ".join(text.text for text in df.select("text").rdd.takeSample(False, max_row, seed=42))
doc = nlp(sample_text)
tokens = [str(token.lemma_) for token in doc if not token.is_stop 
 and not token.is_punct 
 and not token.is_space 
 and len(token) >= 3 
 and not token.like_url
 and token.is_alpha]

# Count tokens
token_dic = {}
token_set = set(tokens)
for t in token_set:
    token_dic[t] = 0

for t in tokens:
    token_dic[t] += 1

token_dic

{'cupboard': 1,
 'Caesar': 2,
 'guard': 1,
 'purpose': 1,
 'DigitalBytes': 1,
 'territory': 2,
 'amidst': 7,
 'greenlight': 1,
 'folk': 2,
 'perfectly': 1,
 'cousin': 1,
 'week': 18,
 'relation': 1,
 'jingle': 1,
 'strain': 2,
 'contagion': 3,
 'sign': 13,
 'Fingers': 1,
 'WallSt': 1,
 'porous': 1,
 'Pharma': 1,
 'Walsh': 1,
 'Recovered': 11,
 'Beijing': 6,
 'traveldeclaration': 1,
 'ABS': 1,
 'trust': 6,
 'visit': 16,
 'Dingtalk': 1,
 'fight': 19,
 'diary': 1,
 'coronavírus': 3,
 'Centre': 3,
 'Resorts': 2,
 'quickly': 3,
 'joaquinphoenix': 1,
 'hoax': 1,
 'Passenger': 1,
 'DuBois': 1,
 'DPMM': 1,
 'Holland': 1,
 'soon': 6,
 'wave': 1,
 'Liu': 1,
 'rmb': 1,
 'storm': 2,
 'Smart': 1,
 'NowThis': 1,
 'mapping': 1,
 'question': 2,
 'hygiene': 2,
 'FutureMap': 1,
 'Sign': 1,
 'chocolate': 1,
 'experiment': 2,
 'activity': 3,
 'Sporting': 1,
 'enemy': 1,
 'safe': 13,
 'Awwsome': 1,
 'disadvantage': 1,
 'dent': 2,
 'Forex': 3,
 'operation': 1,
 'earth': 2,
 'CORONAVIRUS': 2,
 'Film': 1,
 'T