## Setup

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 46.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8021e8b06ac2b0758c58cc9a5ebab98dd391b4c30e86ead929db725d0c0ce184
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

## Setting Environment Variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
os.environ["PYTHONPATH"] = "%SPARK_HOME%\python;%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip:%PYTHONPATH%"

In [None]:
!ls -a

.   .config	 spark-3.2.1-bin-hadoop3.2
..  sample_data  spark-3.2.1-bin-hadoop3.2.tgz


In [None]:
import pyspark
print(pyspark.__version__)

3.2.1


In [None]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init() 
findspark.find()

'/content/spark-3.2.1-bin-hadoop3.2'

# Word Count Program using an article

### Setting the SparkContext

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark import SparkFiles
conf = SparkConf().setMaster("local").setAppName("word-counts")
sc = SparkContext(conf=conf)
sc.addFile("https://raw.githubusercontent.com/sreekeshiyer/dmbi_aids_datasets/main/Machine_Learning_Wikipedia.txt")

### Setting up the data

In [None]:
article = sc.textFile("file://"+SparkFiles.get("Machine_Learning_Wikipedia.txt"))
article.collect()[:10]

['Machine learning',
 'Machine learning (ML) is the study of computer algorithms that can improve automatically through experience and by the use of data.[1] It is seen as a part of artificial intelligence. Machine learning algorithms build a model based on sample data, known as training data, in order to make predictions or decisions without being explicitly programmed to do so.[2] Machine learning algorithms are used in a wide variety of applications, such as in medicine, email filtering, speech recognition, and computer vision, where it is difficult or unfeasible to develop conventional algorithms to perform the needed tasks.[3]',
 '',
 'A subset of machine learning is closely related to computational statistics, which focuses on making predictions using computers; but not all machine learning is statistical learning. The study of mathematical optimization delivers methods, theory and application domains to the field of machine learning. Data mining is a related field of study, focu

### Preprocessing
Remove Punctuation and Transform All Words to Lowercase.
To exclude punctuation values and convert all words to lowercase, we wrote a function like the one below.

In [None]:
def lower_clean_str(x):
  punc='!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~-'
  lowercased_str = x.lower()
  for ch in punc:
    lowercased_str = lowercased_str.replace(ch, '')
  return lowercased_str

In [None]:
article = article.map(lower_clean_str)

We use split function to separate the words in all lines .

In [None]:
article=article.flatMap(lambda satir: satir.split(" "))

We do a filtering below to exclude whitespaces.

In [None]:
article = article.filter(lambda x:x!='')

### Getting word count

In [None]:
article_count=article.map(lambda  word:(word,1))
article_count.take(4)

[('machine', 1), ('learning', 1), ('machine', 1), ('learning', 1)]

Apply ReduceByKey to find frequent words

In [None]:
article_count_RBK=article_count.reduceByKey(lambda x,y:(x+y)).sortByKey()

In [None]:
article_count_RBK.take(10)

[('1', 2),
 ('10', 2),
 ('10000', 1),
 ('13', 1),
 ('1959', 1),
 ('1960s', 1),
 ('1970s', 1),
 ('197316', 1),
 ('1980', 1),
 ('1980s', 1)]

sort the most frequent words in descending order.

In [None]:
article_count_RBK=article_count_RBK.map(lambda x:(x[1],x[0]))

In [None]:
article_count_RBK.take(5)

[(2, '1'), (2, '10'), (1, '10000'), (1, '13'), (1, '1959')]

In [None]:
article_count_RBK.sortByKey(False).take(10)

[(363, 'the'),
 (241, 'of'),
 (230, 'a'),
 (217, 'learning'),
 (212, 'to'),
 (185, 'and'),
 (178, 'in'),
 (129, 'is'),
 (124, 'machine'),
 (101, 'data')]

# Word Count Program using Songs Dataset

In [None]:
import sys

from operator import add
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f

spark = SparkSession\
  .builder \
  .appName("PythonWordCount") \
  .getOrCreate()

spark.sparkContext.addFile("https://raw.githubusercontent.com/sreekeshiyer/dmbi_aids_datasets/main/billboard_lyrics_1964-2015.csv")

data = spark.read.csv("file://"+SparkFiles.get("billboard_lyrics_1964-2015.csv"), header=True, inferSchema= True)

print('############ CSV extract:')
data.show()

############ CSV extract:
+----+--------------------+--------------------+----+--------------------+------+
|Rank|                Song|              Artist|Year|              Lyrics|Source|
+----+--------------------+--------------------+----+--------------------+------+
|   1|         wooly bully|sam the sham and ...|1965|sam the sham misc...|     3|
|   2|i cant help mysel...|           four tops|1965| sugar pie honey ...|     1|
|   3|i cant get no sat...|  the rolling stones|1965|                    |     1|
|   4| you were on my mind|             we five|1965| when i woke up t...|     1|
|   5|youve lost that l...|the righteous bro...|1965| you never close ...|     1|
|   6|            downtown|        petula clark|1965| when youre alone...|     1|
|   7|                help|         the beatles|1965|help i need someb...|     3|
|   8|cant you hear my ...|     hermans hermits|1965|carterlewis every...|     5|
|   9|crying in the chapel|       elvis presley|1965| you saw me cryin..

In [None]:
# Count and group word frequencies on the column Lyrics, when splitted by space comma
data.withColumn('word', f.explode(f.split(f.col('Lyrics'), ' '))) \
  .groupBy('word') \
  .count() \
  .sort('count', ascending=False) \
  .show()

+----+-----+
|word|count|
+----+-----+
| you|64606|
|   i|56466|
| the|53451|
|  to|35752|
| and|32555|
|  me|31170|
|   a|29282|
|  it|25688|
|  my|22821|
|  in|18553|
|that|16151|
|  on|15814|
|your|15459|
|love|15283|
|  im|14278|
|  be|13004|
|  of|12825|
|    |12266|
| all|11895|
|dont|11587|
+----+-----+
only showing top 20 rows



In [None]:
# To remove stop words (like "I", "The", ...), we need to provide arrays of words, not strings. Here we use APache Spark Tokenizer to do so.
# We create a new column to push our arrays of words
tokenizer = Tokenizer(inputCol="Lyrics", outputCol="words_token")
tokenized = tokenizer.transform(data).select('Rank','words_token')

print('############ Tokenized data extract:')
tokenized.show()


# Once in arrays, we can use the Apache Spark function StopWordsRemover
# A new column "words_clean" is here as an output
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
data_clean = remover.transform(tokenized).select('Rank', 'words_clean')

print('############ Data Cleaning extract:')
data_clean.show()


# Final step : like in the beginning, we can group again words and sort them by the most used
result = data_clean.withColumn('word', f.explode(f.col('words_clean'))) \
  .groupBy('word') \
  .count().sort('count', ascending=False) \

print('############ TOP20 Most used words in Billboard songs are:')
result.show()

# Stop Spark Process
spark.stop()

############ Tokenized data extract:
+----+--------------------+
|Rank|         words_token|
+----+--------------------+
|   1|[sam, the, sham, ...|
|   2|[, sugar, pie, ho...|
|   3|                  []|
|   4|[, when, i, woke,...|
|   5|[, you, never, cl...|
|   6|[, when, youre, a...|
|   7|[help, i, need, s...|
|   8|[carterlewis, eve...|
|   9|[, you, saw, me, ...|
|  10|[ive, got, sunshi...|
|  11|[well, since, she...|
|  12|[, trailer, for, ...|
|  13|[let, me, tell, y...|
|  14|[, hold, me, hold...|
|  15|[i, said, ̢shotgu...|
|  16|[they, say, were,...|
|  17|[who, wants, to, ...|
|  18|      [instrumental]|
|  19|[, mrs, brown, yo...|
|  20|[, stop, in, the,...|
+----+--------------------+
only showing top 20 rows

############ Data Cleaning extract:
+----+--------------------+
|Rank|         words_clean|
+----+--------------------+
|   1|[sam, sham, misce...|
|   2|[, sugar, pie, ho...|
|   3|                  []|
|   4|[, woke, morning,...|
|   5|[, never, close, ...|
|   6