## TFIDF

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import string

sc = SparkContext()
spark = SparkSession.builder.appName('TFDIF').getOrCreate()

In [2]:
lines=sc.textFile('data.txt')
df = (lines.flatMap(lambda line: line.split('b\''))
      .filter(lambda line: '\\' not in line)
      .filter(lambda line: line != '')
      .map(lambda line: (line, )).toDF(['tweet']))

df.createOrReplaceTempView('tweets')
spark.sql( 'show tables from default' ).show()
tweets = spark.sql('select * from tweets')
tweets.show(5)

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |   tweets|       true|
+--------+---------+-----------+

+--------------------+
|               tweet|
+--------------------+
|Listening on port...|
|Received request ...|
|If Ashley Purdy l...|
|@jennyhalasz I pr...|
|@Starecrows the o...|
+--------------------+
only showing top 5 rows



In [3]:
regexTokenizer = RegexTokenizer(inputCol='tweet', outputCol='words', pattern='\\W')

countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(tweets)
token_count = regexTokenized.withColumn('tokens', countTokens(col('words')))
token_count.show(5)

+--------------------+--------------------+------+
|               tweet|               words|tokens|
+--------------------+--------------------+------+
|Listening on port...|[listening, on, p...|     4|
|Received request ...|[received, reques...|     8|
|If Ashley Purdy l...|[if, ashley, purd...|    22|
|@jennyhalasz I pr...|[jennyhalasz, i, ...|    16|
|@Starecrows the o...|[starecrows, the,...|    14|
+--------------------+--------------------+------+
only showing top 5 rows



In [4]:
countTokens = udf(lambda words: len(words), IntegerType())

remover = StopWordsRemover(inputCol='words', outputCol='filtered')
stop_words_removed = remover.transform(token_count.select(['words']))
filtered_df = stop_words_removed.withColumn('tokens', countTokens(col('filtered')))
filtered_df.show(5)

+--------------------+--------------------+------+
|               words|            filtered|tokens|
+--------------------+--------------------+------+
|[listening, on, p...|[listening, port,...|     3|
|[received, reques...|[received, reques...|     7|
|[if, ashley, purd...|[ashley, purdy, l...|    12|
|[jennyhalasz, i, ...|[jennyhalasz, pro...|     8|
|[starecrows, the,...|[starecrows, open...|     6|
+--------------------+--------------------+------+
only showing top 5 rows



In [5]:
from pyspark.sql import functions as f
cleaned_df = filtered_df.withColumn('cleaned', f.expr('filter(filtered, x -> not(length(x) < 3))'))
cleaned_df.show(5)

+--------------------+--------------------+------+--------------------+
|               words|            filtered|tokens|             cleaned|
+--------------------+--------------------+------+--------------------+
|[listening, on, p...|[listening, port,...|     3|[listening, port,...|
|[received, reques...|[received, reques...|     7|[received, reques...|
|[if, ashley, purd...|[ashley, purdy, l...|    12|[ashley, purdy, l...|
|[jennyhalasz, i, ...|[jennyhalasz, pro...|     8|[jennyhalasz, pro...|
|[starecrows, the,...|[starecrows, open...|     6|[starecrows, open...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows



In [6]:
countTokens = udf(lambda words: len(words), IntegerType())

cleaned_df_with_tokens = cleaned_df.withColumn('cleaned_tokens', countTokens(col('cleaned')))
cleaned_df_with_tokens.show(5)

+--------------------+--------------------+------+--------------------+--------------+
|               words|            filtered|tokens|             cleaned|cleaned_tokens|
+--------------------+--------------------+------+--------------------+--------------+
|[listening, on, p...|[listening, port,...|     3|[listening, port,...|             3|
|[received, reques...|[received, reques...|     7|[received, reques...|             4|
|[if, ashley, purd...|[ashley, purdy, l...|    12|[ashley, purdy, l...|            12|
|[jennyhalasz, i, ...|[jennyhalasz, pro...|     8|[jennyhalasz, pro...|             7|
|[starecrows, the,...|[starecrows, open...|     6|[starecrows, open...|             6|
+--------------------+--------------------+------+--------------------+--------------+
only showing top 5 rows



In [7]:
from pyspark.sql.functions import desc

cleaned_df_with_tokens.sort(desc('tokens')).show(n=5, truncate=True)

+--------------------+--------------------+------+--------------------+--------------+
|               words|            filtered|tokens|             cleaned|cleaned_tokens|
+--------------------+--------------------+------+--------------------+--------------+
|[rt, rubyperry11,...|[rt, rubyperry11,...|    21|[rubyperry11, guy...|            19|
|[b, rt, espguitar...|[b, rt, espguitar...|    18|[espguitarsusa, l...|            13|
|[b, rt, espguitar...|[b, rt, espguitar...|    18|[espguitarsusa, l...|            13|
|[ad, skeleton, pl...|[ad, skeleton, pl...|    18|[skeleton, playin...|            15|
|[b, jtsom, guitar...|[b, jtsom, guitar...|    18|[jtsom, guitar, d...|            16|
+--------------------+--------------------+------+--------------------+--------------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import explode, count

sum_of_words = cleaned_df_with_tokens.withColumn('cleaned', explode(col('cleaned'))).groupBy('cleaned').agg(count('*'))
sum_of_words.show(5)

+------------+--------+
|     cleaned|count(1)|
+------------+--------+
|       still|       5|
|trustinjonas|       2|
|    received|       1|
|        bone|       1|
|    keyboard|       1|
+------------+--------+
only showing top 5 rows



In [9]:
num_features = sum_of_words.groupBy().sum().collect()[0][0]
print(num_features)

1357


In [10]:
from pyspark.sql.functions import monotonically_increasing_id

hashingTF = HashingTF(inputCol='cleaned', outputCol='rawFeatures', numFeatures=num_features)
featurizedData = hashingTF.transform(cleaned_df_with_tokens)

idf = IDF(inputCol='rawFeatures', outputCol='idf')
idfModel = idf.fit(featurizedData)
TFIDFData = idfModel.transform(featurizedData).withColumn('id', monotonically_increasing_id())

TFIDFData.select('cleaned', 'rawFeatures', 'cleaned_tokens', 'idf', 'id').show(5)

+--------------------+--------------------+--------------+--------------------+---+
|             cleaned|         rawFeatures|cleaned_tokens|                 idf| id|
+--------------------+--------------------+--------------+--------------------+---+
|[listening, port,...|(1357,[216,695,12...|             3|(1357,[216,695,12...|  0|
|[received, reques...|(1357,[84,205,901...|             4|(1357,[84,205,901...|  1|
|[ashley, purdy, l...|(1357,[15,103,104...|            12|(1357,[15,103,104...|  2|
|[jennyhalasz, pro...|(1357,[256,453,57...|             7|(1357,[256,453,57...|  3|
|[starecrows, open...|(1357,[113,297,32...|             6|(1357,[113,297,32...|  4|
+--------------------+--------------------+--------------+--------------------+---+
only showing top 5 rows



In [11]:
row = TFIDFData.filter(TFIDFData['id'] == 2).select('rawFeatures').collect()
row_data = row[0].asDict()
row_data

{'rawFeatures': SparseVector(1357, {15: 1.0, 103: 1.0, 104: 1.0, 290: 1.0, 358: 1.0, 435: 1.0, 453: 1.0, 625: 1.0, 642: 1.0, 727: 1.0, 868: 1.0, 1174: 1.0})}