<a href="https://colab.research.google.com/github/anedun2/Analytics-in-Big-Data/blob/master/anedun2_final_text_count_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q http://apache.mirrors.pair.com/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

In [0]:
!ls

In [0]:
!tar -xvf spark-2.4.4-bin-hadoop2.7.tgz

In [0]:
!ls 

drive	     spark-2.4.4-bin-hadoop2.7	    spark-2.4.4-bin-hadoop2.7.tgz.1
sample_data  spark-2.4.4-bin-hadoop2.7.tgz  spark-2.4.4-bin-hadoop2.7.tgz.2


In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
df = spark.createDataFrame([{"Google": "Colab","Spark": "Scala"} ,{"Google": "Dataproc","Spark":"Python"}])
df.show()

+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+





In [0]:
import string # to remove punctuation
import re
import pandas as pd
import os
import itertools
from collections import Counter
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import udf, col


# Step 1: Load data

In [0]:
#mounting google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [0]:
#importing file from local - optional
from google.colab import files
uploaded = files.upload()

Saving Amazon_Responded_Oct05.csv to Amazon_Responded_Oct05.csv


In [0]:
!ls

drive	     spark-2.4.4-bin-hadoop2.7	    spark-2.4.4-bin-hadoop2.7.tgz.1
sample_data  spark-2.4.4-bin-hadoop2.7.tgz  spark-2.4.4-bin-hadoop2.7.tgz.2


In [0]:
#!rm 'Amazon_Responded_Oct05 (1).csv'

In [0]:
#storing csv as dataframes
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
dataset = spark.read.csv('/content/drive/My Drive/Amazon_Responded_Oct05.csv',inferSchema=True, header =True)
type(dataset)

pyspark.sql.dataframe.DataFrame

In [0]:
dataset.printSchema()

root
 |-- id_str: string (nullable = true)
 |-- tweet_created_at: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_id_str: string (nullable = true)
 |-- user_statuses_count: string (nullable = true)
 |-- user_favourites_count: string (nullable = true)
 |-- user_protected: string (nullable = true)
 |-- user_listed_count: string (nullable = true)
 |-- user_following: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_verified: string (nullable = true)
 |-- user_followers_count: string (nullable = true)
 |-- user_friends_count: string (nullable = true)
 |-- user_created_at: string (nullable = true)
 |-- tweet_language: string (nullable = true)
 |-- text_: string (nullable = true)
 |-- favorite_count: string (nullable = true)
 |-- favorited: string (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- in_reply_to_status_id_str: string (nullable = true)
 |--

# Step 2: Extract the three columns

In [0]:
#selecting columns
d1 = dataset.select("user_id_str", "user_followers_count", "text_")

#converting user_followers_count to integer type
d1 = d1.withColumn("user_followers_count", d1["user_followers_count"].cast(IntegerType()))

d1

DataFrame[user_id_str: string, user_followers_count: int, text_: string]

In [0]:
print((d1.count(), len(d1.columns)))

(413247, 3)


In [0]:
d2 = d1.na.fill({'user_id_str': '', 'user_followers_count': 0, 'text_': ''})

In [0]:
d2.filter(d2.user_id_str == 85741735).show()

+-----------+--------------------+--------------------+
|user_id_str|user_followers_count|               text_|
+-----------+--------------------+--------------------+
|   85741735|              149569|@SeanEPanjab I'm ...|
|   85741735|              149569|@SeanEPanjab Plea...|
|   85741735|              149569|@SeanEPanjab With...|
|   85741735|              149569|@SeanEPanjab I'm ...|
|   85741735|              149568|@aakashwangnoo Hi...|
|   85741735|              149568|@aakashwangnoo Hi...|
|   85741735|              149568|@aakashwangnoo Pl...|
|   85741735|              149568|@aakashwangnoo Hi...|
|   85741735|              149568|@aakashwangnoo He...|
|   85741735|              149568|@aakashwangnoo at...|
|   85741735|              149568|@aakashwangnoo So...|
|   85741735|              149571|@Murtazansp Hello...|
|   85741735|              149571|@Murtazansp Could...|
|   85741735|              149571|@Murtazansp Hi, s...|
|   85741735|              149571|@Murtazansp Pl

# Step 3: Remove the duplicated records

In [0]:
#arranging in descending order and removing duplicates
d3 = d2.orderBy('user_followers_count', ascending=False).dropDuplicates(subset=['user_id_str'])


# Step 4: Find popular users

In [0]:
#finding users with followers greater than 5000
d5 =  d3.filter(d3.user_followers_count > 5000)
d6 = d5.orderBy('user_followers_count', ascending=False)

In [0]:
d6.show()

+-----------+--------------------+-----+
|user_id_str|user_followers_count|text_|
+-----------+--------------------+-----+
|        667|          1969294734|     |
|        259|          1921566948|     |
|        192|          1904765227|     |
|         75|          1883058216|     |
|        104|          1878410551|     |
|        348|          1873286317|     |
|        308|          1864643978|     |
|        338|          1830107652|     |
|        882|          1666728474|     |
|       1811|          1656420602|     |
|         38|          1653560462|     |
|         12|          1653412544|     |
|        149|          1642580719|     |
|       2081|          1620541724|     |
|        253|          1618351628|     |
|        153|          1608877050|     |
|        358|          1540800860|     |
|         59|          1539664699|     |
|        417|          1531752193|     |
|        237|          1518827550|     |
+-----------+--------------------+-----+
only showing top

# Step 5: Find top words and save the results

In [0]:
#filtering records with blanks
d7 = d6.filter(d6["text_"] != '')

In [0]:
#changing all the text to lowercase
from pyspark.sql.functions import lower, col
d8 = d7.select(lower(col('text_')).alias('text_'))

In [0]:
#removing numbers and punctuations

from pyspark.sql.functions import regexp_replace

d9 = d8.select(regexp_replace(col('text_'), '[0-9]', '').alias("replaced"))
d9 = d8.select(regexp_replace(col('text_'), "[\@$#,.?':;]", '').alias("words"))

In [0]:
d9.show()

+--------------------+
|               words|
+--------------------+
|liberal_lisa your...|
|amazonhelp driver...|
|love writing lear...|
|amazonhelp could ...|
|cevalogistics i o...|
|cosanostrakosa it...|
|can anyone tell m...|
|amazonhelp contac...|
|amazonhelp hey am...|
|amazon this is wh...|
|amazonhelp when a...|
|amazon been prime...|
|amazon they ask f...|
|mhhh a plot to ge...|
|amazonhelp is the...|
|amazonhelp hey co...|
|amazonhelp hey a ...|
|wow amazon really...|
|amazonuk i cant b...|
|amazonhelp i have...|
+--------------------+
only showing top 20 rows



In [0]:
#counting the number of words
d10 = d9.withColumn('words', f.explode(f.split(f.col('words'), ' ')))\
      .groupBy('words').count()\
      .sort('count', ascending=False)

In [0]:
d11 = d10.filter(d10["words"] != '')

In [0]:
d11.show()

+----------+-----+
|     words|count|
+----------+-----+
|         i| 1077|
|        to| 1013|
|    amazon|  926|
|       the|  812|
|        my|  792|
|amazonhelp|  738|
|         a|  726|
|       and|  535|
|       for|  532|
|        is|  481|
|        it|  475|
|       you|  435|
|        on|  431|
|        in|  315|
|        of|  306|
|      this|  290|
|        me|  279|
|       but|  259|
|      have|  251|
|       not|  244|
+----------+-----+
only showing top 20 rows



In [0]:
d11.printSchema()

root
 |-- replaced: string (nullable = true)
 |-- count: long (nullable = false)



In [0]:
d12 = d11.withColumn("count", d11["count"].cast(StringType()))

In [0]:
d12.printSchema()

root
 |-- words: string (nullable = true)
 |-- count: string (nullable = false)



In [0]:
d14 = d12.limit(10)

In [0]:
d14.show()

+----------+-----+
|     words|count|
+----------+-----+
|         i| 1077|
|        to| 1013|
|    amazon|  926|
|       the|  812|
|        my|  792|
|amazonhelp|  738|
|         a|  726|
|       and|  535|
|       for|  532|
|        is|  481|
+----------+-----+



In [0]:
#concat words and count to new column words_count
d15 = d14.withColumn('words_count', 
                    f.concat(f.col('words'),f.lit(','), f.col('count')))

In [0]:
d15.show()

+----------+-----+--------------+
|     words|count|   words_count|
+----------+-----+--------------+
|         i| 1077|        i,1077|
|        to| 1013|       to,1013|
|    amazon|  926|    amazon,926|
|       the|  812|       the,812|
|        my|  792|        my,792|
|amazonhelp|  738|amazonhelp,738|
|         a|  726|         a,726|
|       and|  535|       and,535|
|       for|  532|       for,532|
|        is|  481|        is,481|
+----------+-----+--------------+



In [0]:
#selecting the column and saving it as a text file in Gdrive
d15.select('words_count').write.text("/content/drive/My Drive/anedun2_HW2_text.txt")