In [1]:
#Version 0.001
#Including libraries to boot up spark and some configs.

import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import StructType,StructField, StringType
sc = SparkContext('local')
spark = SparkSession(sc)

#File load
df = spark.read.json("twitter_small.json")



In [2]:
#Getting all the word count in the tweet text

df.withColumn('word', f.explode(f.split(f.col("full_text"), ' '))) \
  .groupBy('word') \
  .count() \
  .sort('count', ascending=False) \
  .show()

+----+-----+
|word|count|
+----+-----+
|  de| 3114|
|  RT| 2236|
| het| 1845|
| een| 1841|
| van| 1803|
|  en| 1709|
|  in| 1613|
|  is| 1598|
| dat| 1319|
|  je| 1199|
|  op| 1020|
|  ik| 1015|
|niet|  995|
|  te|  993|
|voor|  924|
| met|  908|
|    |  850|
| die|  800|
|zijn|  631|
|  er|  623|
+----+-----+
only showing top 20 rows



In [3]:
#Tokenizer to tokenize full text in the tweet
#Dutch stop words defined in remover and transformed into clean dataframe.

tokenizer2 = Tokenizer(inputCol="full_text", outputCol="words_token")
tokenized = tokenizer2.transform(df).select('full_text','words_token')
remover = StopWordsRemover(stopWords=["aan","af","al","als","bij","dan","dat","die","dit","een","en","er","had","heb","hem","het","hij","hoe","hun","ik","in","is","je","kan","me","men","met","mij","nog","nu","of","ons","ook","te","tot","uit","van","was","wat","we","wel","wij","zal","ze","zei","zij","zo","zou","aangaande","aangezien","achter","achterna","afgelopen","aldaar","aldus","alhoewel","alias","alle","allebei","alleen","alsnog","altijd","altoos","ander","andere","anders","anderszins","behalve","behoudens","beide","beiden","ben","beneden","bent","bepaald","betreffende","binnen","binnenin","boven","bovenal","bovendien","bovengenoemd","bovenstaand","bovenvermeld","buiten","daar","daarheen","daarin","daarna","daarnet","daarom","daarop","daarvanlangs","de","dikwijls","door","doorgaand","dus","echter","eer","eerdat","eerder","eerlang","eerst","elk","elke","enig","enigszins","enkel","erdoor","even","eveneens","evenwel","gauw","gedurende","geen","gehad","gekund","geleden","gelijk","gemoeten","gemogen","geweest","gewoon","gewoonweg","haar","hadden","hare","hebben","hebt","heeft","hen","hierbeneden","hierboven","hoewel","hunne","ikzelf","inmiddels","inzake","jezelf","jij","jijzelf","jou","jouw","jouwe","juist","jullie","klaar","kon","konden","krachtens","kunnen","kunt","later","liever","maar","mag","meer","mezelf","mijn","mijnent","mijner","mijzelf","misschien","mocht","mochten","moest","moesten","moet","moeten","mogen","na","naar","nadat","net","niet","noch","nogal","ofschoon","om","omdat","omhoog","omlaag","omstreeks","omtrent","omver","onder","ondertussen","ongeveer","onszelf","onze","op","opnieuw","opzij","over","overeind","overigens","pas","precies","reeds","rond","rondom","sedert","sinds","sindsdien","slechts","sommige","spoedig","steeds","tamelijk","tenzij","terwijl","thans","tijdens","toch","toen","toenmaals","toenmalig","totdat","tussen","uitgezonderd","vaakwat","vandaan","vanuit","vanwege","veeleer","verder","vervolgens","vol","volgens","voor","vooraf","vooral","vooralsnog","voorbij","voordat","voordezen","voordien","voorheen","voorop","vooruit","vrij","vroeg","waar","waarom","wanneer","want","waren","weer","weg","wegens","weldra","welk","welke","wie","wiens","wier","wijzelf","zelfs","zichzelf","zijn","zijne","zodra","zonder","zouden","zowat","zulke","zullen","zult","rt","ık","deze","u","via","-","      "], inputCol='words_token', outputCol='words_clean')
data_clean = remover.transform(tokenized).select('words_token', 'words_clean')
print('############ Data Cleaning extract:')
data_clean.show()


############ Data Cleaning extract:
+--------------------+--------------------+
|         words_token|         words_clean|
+--------------------+--------------------+
|[rt, @desiree_lav...|[@desiree_laverne...|
|[@wovenwn, @gsbde...|[@wovenwn, @gsbde...|
|[barry, atsma, pe...|[barry, atsma, pe...|
|[rt, @favv_consum...|[@favv_consument:...|
|[@ananninga, @sil...|[@ananninga, @sil...|
|[rt, @ohboywhatas...|[@ohboywhatashot:...|
|[@mariekehoogwout...|[@mariekehoogwout...|
|[rt, @moorkopje2:...|[@moorkopje2:, bi...|
|[rt, @leolewin:, ...|[@leolewin:, cens...|
|[@st1ucia, ı, can...|[@st1ucia, ı, can...|
|[eyeshield, https...|[eyeshield, https...|
|[@sintsonja, @pol...|[@sintsonja, @pol...|
|[rt, @markdeholla...|[@markdehollander...|
|[absoluut, geen, ...|[absoluut, coulan...|
|[rt, @seven__:, n...|[@seven__:, neder...|
|[rt, @choi_bts2:,...|[@choi_bts2:, jin...|
|[rt, @narkobars:,...|[@narkobars:, sho...|
|[@12_klisman, wtf...|[@12_klisman, wtf...|
|[rt, @vrh_haaglan...|[@vrh_haaglanden:.

In [4]:
#Word usage without stop words.

result = data_clean.withColumn('word', f.explode(f.col('words_clean'))) \
  .groupBy('word') \
  .count().sort('count', ascending=False) \

print('############')
result.show(6)



############
+------+-----+
|  word|count|
+------+-----+
|      | 1982|
|  gaat|  241|
|mensen|  226|
|  goed|  209|
| wordt|  203|
|  echt|  192|
+------+-----+
only showing top 6 rows



In [5]:
result.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [6]:
#Twitter file doesnt use timestamp format, instead I used string to date get timestamp

df.select(col("created_at"),to_date(col("created_at"),"MM-dd-yyyy").alias("date")) \
  .show()




+--------------------+----+
|          created_at|date|
+--------------------+----+
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
|Fri May 29 15:38:...|null|
+--------------------+----+
only showing top 20 rows



In [19]:
#VERSION 0.2
onlyDate = df_with_date.select("date")

In [20]:

#version 0.2
onlyDate.printSchema()
result.printSchema()
result.show()

root
 |-- date: date (nullable = true)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)

+------+-----+
|  word|count|
+------+-----+
|      | 1982|
|  gaat|  241|
|mensen|  226|
|  goed|  209|
| wordt|  203|
|  echt|  192|
|  veel|  177|
|     1|  169|
|  eens|  163|
|  hier|  157|
|  gaan|  150|
|   wil|  143|
|    ın|  141|
|worden|  139|
|  heel|  138|
|nieuwe|  131|
|   omg|  131|
|    ja|  124|
| tegen|  122|
|  iets|  121|
+------+-----+
only showing top 20 rows



In [26]:
from pyspark.sql.functions import lit

onlyDate2 = onlyDate.withColumn('word', lit(None).cast(StringType()))

onlyDate2.show()
onlyDate2.printSchema()

+----------+----+
|      date|word|
+----------+----+
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
+----------+----+
only showing top 20 rows

root
 |-- date: date (nullable = true)
 |-- word: string (nullable = true)



In [32]:
from pyspark.sql.types import StringType,BooleanType,DateType
onlyDate4 = onlyDate2.withColumn("date",col("date").cast(StringType())) \
    .withColumn("word",col("word").cast(StringType()))
onlyDate4.printSchema()

root
 |-- date: string (nullable = true)
 |-- word: string (nullable = true)



In [34]:
onlyDate4.show()

+----------+----+
|      date|word|
+----------+----+
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
+----------+----+
only showing top 20 rows



In [66]:
result.show()

result = result.dropna()
result.show()
print("bundan sonra")
header=result.first()[0]
print(header)
print("bundan önce")
result.filter(~col("word").contains(header)).show(10,False)
#merged_df1=result.unionByName(onlyDate4.withColumn("date",col("date")), allowMissingColumns=False)
#merged_df1.show()

+------+-----+
|  word|count|
+------+-----+
|      | 1982|
|  gaat|  241|
|mensen|  226|
|  goed|  209|
| wordt|  203|
|  echt|  192|
|  veel|  177|
|     1|  169|
|  eens|  163|
|  hier|  157|
|  gaan|  150|
|   wil|  143|
|    ın|  141|
|worden|  139|
|  heel|  138|
|nieuwe|  131|
|   omg|  131|
|    ja|  124|
| tegen|  122|
|  iets|  121|
+------+-----+
only showing top 20 rows

+------+-----+
|  word|count|
+------+-----+
|      | 1982|
|  gaat|  241|
|mensen|  226|
|  goed|  209|
| wordt|  203|
|  echt|  192|
|  veel|  177|
|     1|  169|
|  eens|  163|
|  hier|  157|
|  gaan|  150|
|   wil|  143|
|    ın|  141|
|worden|  139|
|  heel|  138|
|nieuwe|  131|
|   omg|  131|
|    ja|  124|
| tegen|  122|
|  iets|  121|
+------+-----+
only showing top 20 rows

bundan sonra

bundan önce
+----+-----+
|word|count|
+----+-----+
+----+-----+



In [60]:
onlyDate4.show()
onlyDate4.printSchema()
combined= onlyDate4.join(result, onlyDate4.word ==  result.word,"rightouter") \
     .sort('count', ascending=False)\
     .show(10000,truncate=False)
combined.show(30)

+----------+----+
|      date|word|
+----------+----+
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
|2020-05-29|null|
+----------+----+
only showing top 20 rows

root
 |-- date: string (nullable = true)
 |-- word: string (nullable = true)

+----+----+----------------------------------------------------------------------+-----+
|date|word|word                                                                  |count|
+----+----+----------------------------------------------------------------------+-----+
|null|null|                                                                      |1982 |
|null|null|gaat                                                                  |241  |
|null|null|mensen  

AttributeError: 'NoneType' object has no attribute 'show'

+----+----+------+-----+
|date|word|  word|count|
+----+----+------+-----+
|null|null|      | 1982|
|null|null|  gaat|  241|
|null|null|mensen|  226|
|null|null|  goed|  209|
|null|null| wordt|  203|
|null|null|  echt|  192|
|null|null|  veel|  177|
|null|null|     1|  169|
|null|null|  eens|  163|
|null|null|  hier|  157|
|null|null|  gaan|  150|
|null|null|   wil|  143|
|null|null|    ın|  141|
|null|null|worden|  139|
|null|null|  heel|  138|
|null|null|nieuwe|  131|
|null|null|   omg|  131|
|null|null|    ja|  124|
|null|null| tegen|  122|
|null|null|  iets|  121|
+----+----+------+-----+
only showing top 20 rows



In [None]:
result.show()

In [7]:
from datetime import datetime
import pytz
from pyspark.sql.functions import udf, to_date, to_utc_timestamp

In [8]:
def getDate(x):
    if x is not None:
        return str(datetime.strptime(x,'%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return None


In [9]:
date_fn = udf(getDate, StringType())

## Converting datatype in spark dataframe
df = df.withColumn("created_at", to_utc_timestamp(date_fn("created_at"),"UTC")) 

In [15]:
df.select(col("created_at"),to_date(col("created_at"),"MM-dd-yyyy").alias("date")) \
  .show()


+-------------------+----------+
|         created_at|      date|
+-------------------+----------+
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:52|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:51|2020-05-29|
|2020-05-29 15:38:50|2020-05-29|
+-------------------+----------+
only showing top 20 rows



In [None]:
df.withColumn("date", to_date(col("created_at"),"MM-dd-yyyy")).show()

In [11]:
#Created a new dataframe with right timeformat


df_with_date = df.withColumn("date", to_date(col("created_at"),"MM-dd-yyyy"))
df_with_date.show()


+------------+-----------+-------------------+------------------+--------------------+--------------------+--------------+---------+--------------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+------------+--------------------+------------------+--------------------+-------------------+--------------------+-------------+---------+--------------------+--------------------+---------+--------------------+----------+
|contributors|coordinates|         created_at|display_text_range|            entities|   extended_entities|favorite_count|favorited|           full_text| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|    metadata|               place|possibly_sensitive|       quoted_status|   quoted_status_id|quoted_statu

In [16]:
#total number of tweets in that time slot  10.000 twitter_small. 800.000+ for twitter_big.
df_with_date.printSchema()
df_with_date.withColumn('Date', f.explode(f.split(f.col("date"), ' '))) \
  .show(200)


root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |  

+------------+-----------+-------------------+------------------+--------------------+--------------------+--------------+---------+--------------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+------------+--------------------+------------------+--------------------+-------------------+--------------------+-------------+---------+--------------------+--------------------+---------+--------------------+----------+
|contributors|coordinates|         created_at|display_text_range|            entities|   extended_entities|favorite_count|favorited|           full_text| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_user_id_str|is_quote_status|lang|    metadata|               place|possibly_sensitive|       quoted_status|   quoted_status_id|quoted_statu

In [None]:
#merging two dataframe with date and word count

merged_df = result.unionByName(df_with_date, allowMissingColumns=True)

In [None]:
merged_df.show(truncate=False)



In [None]:
#Checking to see if values still present

merged_df.select(col("word"), col("count")) \
  .show()

In [None]:
#
merged_df.select('word','count')\
  .groupBy('word','count') \
  .count() \
  .sort('count', ascending=False) \
  .show(200000)

In [None]:
#cant get them on the same result
merged_df.withColumn('Date', f.explode(f.split(f.col("date"), ' '))) \
    .withColumn('word', col("word"))\
    .withColumn('count',col("count"))\
    .select(col("word"), col("count"),col("date")) \
    .show()


In [None]:
merged_df.schema