# Summmary


<p>
This project focus on Resilient Distributed Datasets or RDDs of the open-source engine Apache Spark developed specifically for handling large-scale data processing and analytics. RDDs are the unique native core data structure of Apache Spark, which allows to process data on numerous remote worker machines.
RDDs are partioned and distributed on the nodes in the Spark cluster.
The RDDs are resilient and are retrieved from the data existing in other nodes  even when processes are crashing or nodes are failing .
</p>
<p>
According to the 
<a href="https://spark.apache.org/docs/0.8.1/api/core/org/apache/spark/rdd/RDD.html" target="_blank">documentation</a> 
a Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition, PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join
<p>
    
<p>
There are two types of data sets used here. Numeric data created from random data and the breast cancer data set, and string data randomly choosen from text of the internet. RDD operations are carried out on all data sets. For example a word count is made for the the text data to get the 10 most frequent words.
</p>

<p>Among many others the following tasks are carried out:</p> 
        
<ul>
  <li>Import packages</li>
  <li>Inspect SparkContext</li>
  <li>Lambda functions</li>    
  <li>Creating RDDs by using schemas and parallelize</li>
  <li>Selecting data from RDDs</li>
  <li>RDD operations</li>  
  <li>Creating a dictionary of word counts</li>
  <li>Creating a list of top ten words</li>
  <li>Using stopwords to filter the text</li>
</ul>     
    
    
<p>
<h2>Spark system architecture</h2>
</p> 



<img src="spark_achitecture.png" alt="Smiley face" align="left"  style="margin-left: 0px; margin-right: 0px; margin-top: 20px; margin-bottom: 20px; float: left; width: 800px; height: 300px"> 


# Import packages

In [258]:
import numpy as np
import pandas as pd
import math
import random
import pathlib

from pyspark.sql import DataFrame
from pyspark.rdd import RDD
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import lower, col
from pyspark.sql import SparkSession
from pyspark.rdd import RDD
from pyspark.sql.functions import mean, stddev , col, avg, round
from pyspark.sql.types import IntegerType, FloatType
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

from pyspark.mllib.stat import Statistics
# throws an annoying Py4JJavaError
# it seems there is a compatibility issues between Spark and Java
# upgrading Java solve the problem



### Inspect working directory

In [259]:
cwd_1=os.getcwd()
# print(cwd_1)

dirname = os.path.dirname(cwd_1) 
# print(dirname)

path_cwd = pathlib.Path.cwd()
# print(path_cwd)

path_home = pathlib.Path.home()
# print(path_home)


### Inspect Spark context

<p>
Spark context is a variable automatically created, when starting Pyspark
from the shell. It is an entry point into the whole arrangement
of Spark functionality. The SparkContext is the entry point into the cluster. Without creating a SparkContext nothing in the Spark session will work. Spark context can be accessed with "sd".
</p> 

<p>Furthermore:</p> 

<p>
"A SparkContext represents the connection to a Spark cluster, 
and can be used to create RDDs, accumulators and broadcast variables 
on that cluster." 
</p> 
<a href="https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/SparkContext.html" target="_blank">Class SparkContext</a> 

In [260]:
print("Pyspark version running in this session {}."\
      .format(sc.version))

print("Python version used in this session of Spark Context is {}."\
      .format(sc.pythonVer))


print("Python version used in this session of Spark Context is {}."\
      .format(sc.master))

Pyspark version running in this session 3.0.0-preview.
Python version used in this session of Spark Context is 3.7.
Python version used in this session of Spark Context is local[*].


In [261]:

# Path where Spark is installed on worker nodes
# print(str(sc.sparkHome))

# Retrieve name of the Spark User running
print(str(sc.sparkUser()))

# Return application name
print(sc.appName)

# Retrieve application ID
print(sc.applicationId)

# Return default level of parallelism 
print(sc.defaultParallelism)


gamarandor
PySparkShell
local-1578651229420
4


### Difference of SparkContext and SparkSession

From 
<a href="https://data-flair.training/forums/topic/what-is-sparksession-in-apache-spark/" target="_blank">data-flair</a>:

<p>
Starting from Apache Spark 2.0, Spark Session is the new entry point for Spark applications.

Prior to 2.0, SparkContext was the entry point for spark jobs. RDD was one of the main APIs then, and it was created and manipulated using Spark Context. For every other APIs, different contexts were required – For SQL, SQL Context was required; For Streaming, Streaming Context was required; For Hive, Hive Context was required.

But from 2.0, RDD along with DataSet and its subset DataFrame APIs are becoming the standard APIs and are a basic unit of data abstraction in Spark. All of the user defined code will be written and evaluated against the DataSet and DataFrame APIs as well as RDD.

So, there is a need for a new entry point build for handling these new APIs, which is why Spark Session has been introduced. Spark Session also includes all the APIs available in different contexts – Spark Context, SQL Context, Streaming Context, Hive Context.
</p> 

<p> 
SparkSession commands a variety of methods explained on 
<a href="https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html" target="_blank">spark.apache.org.</a> 
</p> 

In [262]:
spark_session = SparkSession.builder.getOrCreate()
print(spark_session)
print(spark_session.catalog.currentDatabase())
print(spark_session.catalog.listTables())

<pyspark.sql.session.SparkSession object at 0x00000124E6D3D7C8>
default
[]


### Lambda functions (lambda arguments: expression)

as anonymous, inline functions are useful in Python in general but are extensively used in Pyspark and should be part of the tool kit.
Only one expression is possible but numerous arguments can be created.

#### map(function, iterable)

In [263]:
# generate random data
# numpy.random.randint(low, high=None, size=None, dtype='l')
# makes random numbers predictable
np.random.seed(10)
lamb_dt = np.random.randint(1,10000, size=1000).tolist()

print(min(lamb_dt))
print(max(lamb_dt))
print(lamb_dt[:5])

27
9986
[1290, 7294, 1345, 7292, 9373]


In [264]:
# map applies a function on a list and 
# returns the results as a map object
# transformed to list with list
# map(function, iteration object like list, dict ...)

# take square root of every value in the list
list_sqrt = list(map(lambda x: math.sqrt(x), lamb_dt))

print(lamb_dt[10:15])
print(list_sqrt[10:15])

# double every value
list_multi = list(map(lambda d: d * 2, lamb_dt))
print(list_multi[510:515])

# cube every value
list_cubed = list(map(lambda e: e**3, lamb_dt))
print(list_cubed[900:905])

[5649, 4453, 240, 2444, 2103]
[75.15982969645421, 66.73080248281148, 15.491933384829668, 49.4368283772331, 45.858477951192405]
[6188, 2752, 2056, 9440, 9414]
[127795585653, 27379766744, 633839779000, 17173512, 21300003648]


#### filter(function, iterable)

works fine as it returns the values based on the condition
made in the lambda function.

In [265]:
# filter a list based on a greater condition
list_greater_map = filter(lambda l: l > 100, lamb_dt)
print(list_greater_map)

list_greater_list = list(list_greater_map)

print(len(list_greater_list))
print(list_greater_list[100:120])

# filter on the condition that the values can be divided by 10
# based on modulo
list_mod = list(filter(lambda k: k%10 == 0, lamb_dt))
print(len(list_mod))
print(list_mod[:10])

# filter odd numbers
list_mod2 = list(filter(lambda g: g%2 != 0, lamb_dt))
print(len(list_mod2))
print(list_mod2[:10])

<filter object at 0x00000124E85FA1C8>
991
[2815, 535, 3681, 7236, 7264, 4452, 1107, 2111, 3831, 1614, 507, 9265, 9590, 6110, 5870, 460, 2034, 3494, 5014, 9958]
104
[1290, 4830, 9290, 240, 6900, 410, 7490, 6160, 1160, 6860]
506
[1345, 9373, 1521, 9225, 6401, 5649, 4453, 2103, 3417, 7291]


### Loading data into Spark 


<p>
with the "spark.read.csv", "parallelize" and the "textFile" method.
</p>

<p>
The "spark.read.csv" - method create data frames similar to
Pandas data frames.
</p>

<p>
Both other methods are creating Resilient Distributed Datasets (RDDs).
RDDs are the unique native core data structure of Apache Spark,
which allows to process data on numerous remote worker machines.
RDDs are partioned and distributed on the nodes in the Spark cluster.
The RDDs are resilient and are retrieved from the data existing in other nodes  even when processes are crashing or nodes are failing .
</p>

<p>
Other than SQL-tables and Pandas data frames RDDS are not tabular
data structures and relies on NoSQL data models like Key-value-stores.
This allows it to work with unstructured data like text very
efficient.
</p>

<a href="https://hackersandslackers.com/working-with-pyspark-rdds/?utm_campaign=News&utm_medium=Community&utm_source=DataCamp.com" target="_blank">Working with PySpark RDDs</a> 


#### Loading csv-data into a Pyspark data frame

<p>
given by the breast cancer dataset obtained from the University of Wisconsin. The dataset has 11 variables with 699 observations. The first variable is the anonyminous identifier. There are 9 features. 
The response variable is "class". The response has two values: 
“Malignant” or “Benign” cases.
</p> 

In [266]:
# there is not a header in the raw data
# therefore a schema is created to provided the names
# when reading the data in

bc_schema = \
StructType([
    StructField("Sample code number", IntegerType(), True),
    StructField("Clump Thickness", IntegerType(), True),
    StructField("Uniformity of Cell Size", IntegerType(), True),
    StructField("Uniformity of Cell Shape", IntegerType(), True),
    StructField("Single Epithelial Cell Size", IntegerType(), True),
    StructField("Bare Nuclei", IntegerType(), True),
    StructField("Bland Chromatin", IntegerType(), True),
    StructField("Normal Nucleoli", IntegerType(), True),
    StructField("Mitoses", IntegerType(), True),
    StructField("Class", IntegerType(), True)])


bc = \
spark.read.csv("breast-cancer-wisconsin.csv", inferSchema=True, 
               header=False, schema=bc_schema)

print(type(bc))


<class 'pyspark.sql.dataframe.DataFrame'>


Pyspark data frames offer a variety of methods to understand
the data structure.

In [267]:
print(bc.select("Bare Nuclei", "Bland Chromatin", 
                "Normal Nucleoli", 
                "Mitoses", "Class").show(5))

print("n observations: {}".format(bc.count()))
print("n cols: {}".format(len(bc.columns)))

# class is the predictor variable

+-----------+---------------+---------------+-------+-----+
|Bare Nuclei|Bland Chromatin|Normal Nucleoli|Mitoses|Class|
+-----------+---------------+---------------+-------+-----+
|          2|              1|              3|      1|    1|
|          7|             10|              3|      2|    1|
|          2|              2|              3|      1|    1|
|          3|              4|              3|      7|    1|
|          2|              1|              3|      1|    1|
+-----------+---------------+---------------+-------+-----+
only showing top 5 rows

None
n observations: 699
n cols: 10


In [268]:
bc.printSchema()

root
 |-- Sample code number: integer (nullable = true)
 |-- Clump Thickness: integer (nullable = true)
 |-- Uniformity of Cell Size: integer (nullable = true)
 |-- Uniformity of Cell Shape: integer (nullable = true)
 |-- Single Epithelial Cell Size: integer (nullable = true)
 |-- Bare Nuclei: integer (nullable = true)
 |-- Bland Chromatin: integer (nullable = true)
 |-- Normal Nucleoli: integer (nullable = true)
 |-- Mitoses: integer (nullable = true)
 |-- Class: integer (nullable = true)



In [269]:
bc.select("Bare Nuclei", "Mitoses", "Class").describe().show()

+-------+-----------------+------------------+------------------+
|summary|      Bare Nuclei|           Mitoses|             Class|
+-------+-----------------+------------------+------------------+
|  count|              699|               699|               699|
|   mean|3.216022889842632| 2.866952789699571|1.5894134477825466|
| stddev|2.214299886649047|3.0536338936127745| 1.715077942506795|
|    min|                1|                 1|                 1|
|    max|               10|                10|                10|
+-------+-----------------+------------------+------------------+



#### Different methods to select data from a RDD

In [270]:
bc_rdd = bc.rdd
print(bc_rdd)
print(type(bc_rdd))

# look into the RDD
for i in bc_rdd.take(1): 
    print(i)

MapPartitionsRDD[833] at javaToPython at <unknown>:0
<class 'pyspark.rdd.RDD'>
Row(Sample code number=1000025, Clump Thickness=5, Uniformity of Cell Size=1, Uniformity of Cell Shape=1, Single Epithelial Cell Size=1, Bare Nuclei=2, Bland Chromatin=1, Normal Nucleoli=3, Mitoses=1, Class=1)


In [271]:
# Selecting data
print(bc_rdd.take(1))

print("")

# Take top 5 RDD elements
print(bc_rdd.first())

print("")

# Return distinct RDD values
print(bc_rdd.distinct().take(1))

print("")


[Row(Sample code number=1000025, Clump Thickness=5, Uniformity of Cell Size=1, Uniformity of Cell Shape=1, Single Epithelial Cell Size=1, Bare Nuclei=2, Bland Chromatin=1, Normal Nucleoli=3, Mitoses=1, Class=1)]

Row(Sample code number=1000025, Clump Thickness=5, Uniformity of Cell Size=1, Uniformity of Cell Shape=1, Single Epithelial Cell Size=1, Bare Nuclei=2, Bland Chromatin=1, Normal Nucleoli=3, Mitoses=1, Class=1)

[Row(Sample code number=1000025, Clump Thickness=5, Uniformity of Cell Size=1, Uniformity of Cell Shape=1, Single Epithelial Cell Size=1, Bare Nuclei=2, Bland Chromatin=1, Normal Nucleoli=3, Mitoses=1, Class=1)]



In [272]:
sample = bc_rdd.sample(False, 0.15, 81).collect()

print(type(sample))
print(len(sample))
print(sample[:1])

<class 'list'>
104
[Row(Sample code number=1016277, Clump Thickness=6, Uniformity of Cell Size=8, Uniformity of Cell Shape=8, Single Epithelial Cell Size=1, Bare Nuclei=3, Bland Chromatin=4, Normal Nucleoli=3, Mitoses=7, Class=1)]


In [273]:
# selecting a column by index
# for selecting a column by name better use sqlContext

thick = bc_rdd.map(lambda x: x[1])

print(type(thick))
print(thick.first())
print(thick.take(5))


<class 'pyspark.rdd.PipelinedRDD'>
5
[5, 5, 3, 6, 4]


#### Loading numeric data with "parallelize"

In [274]:
# generate random data
# numpy.random.randint(low, high=None, size=None, dtype='l')
# makes random numbers predictable
np.random.seed(0)
data_1 = np.random.randint(1,1000, size=1000).tolist()
print(type(data_1))
print(np.random.choice(data_1, size=50, replace=True))

data_2 = range(1,1001)
print([i for i in data_2[:6]])


<class 'list'>
[438 198 671 754  48 206 697 348 291 910 354  24 644 577 449 849 813 708
 600 350 838 411  34 341 950 342 341  94 767 208 646  75 986 366 735 711
 948  33 887 287 508 604 883 646 618 455 393 854 701 137]
[1, 2, 3, 4, 5, 6]


Loading the data into Spark with parallelize 
creates Resilient Distributed Datasets.

In [275]:
# this creates a pointer object in memory with an address
spark_dt_1 = sc.parallelize(data_1)
print(spark_dt_1)
print(type(spark_dt_1))

# this creates a pointer object in memory with an address
spark_dt_2 = sc.parallelize(data_2)
print(spark_dt_2)
print(type(spark_dt_2))

ParallelCollectionRDD[845] at readRDDFromFile at PythonRDD.scala:247
<class 'pyspark.rdd.RDD'>
PythonRDD[847] at RDD at PythonRDD.scala:53
<class 'pyspark.rdd.PipelinedRDD'>


In [276]:
def check_type(object):
    
    if isinstance(object, DataFrame):
        print("object is DataFrame")
        
    if isinstance(object, RDD):
        print("object is RDD")
            
    else:
        print("other type")
        

print(check_type(spark_dt_1))
print(check_type(spark_dt_2))

object is RDD
None
object is RDD
None


Print the first values in the RDDs (similar to the head method in R or pandas).

In [277]:
# how to print out snippets of a RDD in the spark-shell / pyspark?
# simulate pandas head
# https://stackoverflow.com/questions/31115892/how-to-print-out-snippets-of-a-rdd-in-the-spark-shell-pyspark

for i in spark_dt_1.take(5): 
    print(i)

print("")

for d in spark_dt_2.take(5):
    print(d)

685
560
630
193
836

1
2
3
4
5


In [278]:
# Return sampled subset of rdd3
# sample(self, withReplacement, fraction, seed=None)

spark_dt_1.sample(withReplacement=False, fraction=0.01, 
                  seed=15).collect()

# collect retrieves all data in the RDD
# careful if the RDD is large
# https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html

[634, 640, 287, 803, 769, 210, 892, 30, 641, 711, 79, 533]

In [279]:
# this creates a pointer object
bc_csv_rdd = sc.parallelize("breast-cancer-wisconsin.csv")
print(bc_csv_rdd)

# the pointer object can be accessed with a for loop
for o in bc_csv_rdd.take(10):
    print(o)

ParallelCollectionRDD[851] at readRDDFromFile at PythonRDD.scala:247
b
r
e
a
s
t
-
c
a
n


#### Transform the RDD to a data frame.

In [280]:
bc_df_45 = sqlContext.createDataFrame(bc_rdd, bc_schema)
print(type(bc_df_45 ))

bc_df_45_pd = bc_df_45.toPandas()
print(type(bc_df_45_pd))

print("")

print(bc_df_45_pd.info())

print("")

print(bc_df_45_pd.head())

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 699 entries, 0 to 698
Data columns (total 10 columns):
Sample code number             683 non-null float64
Clump Thickness                683 non-null float64
Uniformity of Cell Size        683 non-null float64
Uniformity of Cell Shape       683 non-null float64
Single Epithelial Cell Size    683 non-null float64
Bare Nuclei                    683 non-null float64
Bland Chromatin                683 non-null float64
Normal Nucleoli                683 non-null float64
Mitoses                        683 non-null float64
Class                          683 non-null float64
dtypes: float64(10)
memory usage: 54.7 KB
None

   Sample code number  Clump Thickness  Uniformity of Cell Size  \
0           1000025.0              5.0                      1.0   
1           1002945.0              5.0                      4.0   
2           1015425.0              3.0       

#### Loading string data with "textFile"

In [281]:
# filepath is short as the file is in the same directory as the notebook
# it is book about roundtrips in the U.S.A. around 1912

highway = sc.textFile('OnSunsetHighways.txt')

print("object: {}.".format(highway))
print(type(highway))


object: OnSunsetHighways.txt MapPartitionsRDD[861] at textFile at <unknown>:0.
<class 'pyspark.rdd.RDD'>


In [282]:
# print number of rows or observations
print(highway.count())

10387


In [283]:
print(highway.first())

 ON SUNSET HIGHWAYS ***


In [284]:
# take a look into the text RDD
for line in highway.take(50):
    print(line)

 ON SUNSET HIGHWAYS ***




Produced by Melissa McDaniel and the Online Distributed
Proofreading Team at http://www.pgdp.net (This file was
produced from images generously made available by The
Internet Archive. The map and cover are courtesy of the
California History Room, California State Library,
Sacramento, California.)







Transcriber's Note:

  Inconsistent hyphenation in the original document has been
  preserved. Obvious typographical errors have been corrected.




     ON SUNSET
     HIGHWAYS




"SEE AMERICA FIRST" SERIES

Each in one volume, decorative cover, profusely illustrated


     CALIFORNIA, ROMANTIC AND BEAUTIFUL
               BY GEORGE WHARTON JAMES                 $6.00

     NEW MEXICO: The Land of the Delight Makers
               BY GEORGE WHARTON JAMES                 $6.00

     SEVEN WONDERLANDS OF THE AMERICAN WEST
               BY THOMAS D. MURPHY                     $6.00

     A WONDERLAND OF THE EAST: The Mountain and Lake Region of
     New Engla

In [285]:
# copy and paste from Forbes about the Samsung’s Galaxy S11
galaxy = sc.textFile("galaxy.txt")

print(type(galaxy))
galaxy.collect()

<class 'pyspark.rdd.RDD'>


['Samsung’s Galaxy S11 is close to release and its supersized battery, upgraded biometrics and class-leading new display look set to make up for a surprisingly strange name change. But the real star of 2020 could be the phone Samsung accidentally just confirmed. Following a major blunder, the manual for Samsung’s Galaxy S10 Lite has been leaked to SamMobile. It reveals a phone which delivers far more than its name suggests, for significantly less than you might expect. In fact, industry insiders Ice Universe and OnLeaks have stated it will deliver elements which blow both the Galaxy S10 and Note 10 away. ']

### Partitions
are logical divisions of a RDD and allows to distribute 
data subsets on nodes.

In [286]:
# Default minimum number of partitions for RDDs
print("default partition setting:", sc.defaultMinPartitions)

# n of highway
n_part_high = highway.getNumPartitions()
print("n partitions of highway: {}".format(n_part_high))

spark_1_part = spark_dt_1.getNumPartitions()
print("spark_1_part:", spark_1_part )

spark_2_part = spark_dt_2.getNumPartitions()
print("spark_dt_2:", spark_2_part )

default partition setting: 2
n partitions of highway: 2
spark_1_part: 4
spark_dt_2: 4


In [287]:
# partitions are set with minPartitions
highway_2 = sc.textFile('OnSunsetHighways.txt', minPartitions=5 )
n_part_high2 = highway_2.getNumPartitions()

print("n partitions of highway set with min: {}".format(n_part_high2))

n partitions of highway set with min: 5


# RDD operations

<p>
in Spark are dividied between transformation and actions. 
Transformations create new data frames and return pointers. Actions conduct computations with RDD data and return results.
Spark creates a graph of all transformation and execution is
only done, when an action is triggered. This is called
"lazy evaluation" and the reason for the robustness of Spark.
</p> 


#### RDD operations with tabular structured numeric data

In [288]:
# Transformation
thick_power = thick.map(lambda j: j**2)

print(type(thick_power))
print(type(thick))

print("")

# Actions
print("first five of thick: ", thick.take(5))
print("first five of thick power: ", thick_power.take(5))

print("n observations thick: ", thick.count())

print("")

for i in thick_power.take(5):
    print(i)

print("")

# transformation
thick_p = thick_power.filter(lambda k: k > 20)

# action
print("greater 10: ", thick_p.take(5))


<class 'pyspark.rdd.PipelinedRDD'>
<class 'pyspark.rdd.PipelinedRDD'>

first five of thick:  [5, 5, 3, 6, 4]
first five of thick power:  [25, 25, 9, 36, 16]
n observations thick:  699

25
25
9
36
16

greater 10:  [25, 25, 36, 64, 25]


In [289]:
# Selecting data
print(RDD_tuple.take(5))

print("")

# Take top 5 RDD elements
print(RDD_tuple.top(5))

print("")

# Return distinct RDD values
print(RDD_tuple.distinct().take(5))

print("")

# Return (key,value) RDD's keys
print(RDD_tuple.keys().take(5))

# print the first 5 keys
for i in RDD_tuple.take(5):
    print(i[0])

[(3, 2), (4, 102), (1, 193), (2, 141), (6, 24)]

[(6, 199), (6, 198), (6, 194), (6, 190), (6, 188)]

[(4, 102), (1, 193), (6, 24), (5, 173), (6, 140)]

[3, 4, 1, 2, 6]
3
4
1
2
6


###  RDD operations on text

### Text RDD: highway

In [290]:
# highway is an unstructured text file
print(type(highway))

<class 'pyspark.rdd.RDD'>


In [291]:
def correct_lines(lines):
    lines = lines.lower()
    lines = lines.split()
    return lines

In [292]:
# flatmap returns multiple values for each element in the original RDD
# and returns a transformed RDD

highway_mc = highway.flatMap(correct_lines)
print(type(highway_mc))

# action
print(highway_mc.take(50))

<class 'pyspark.rdd.PipelinedRDD'>
['on', 'sunset', 'highways', '***', 'produced', 'by', 'melissa', 'mcdaniel', 'and', 'the', 'online', 'distributed', 'proofreading', 'team', 'at', 'http://www.pgdp.net', '(this', 'file', 'was', 'produced', 'from', 'images', 'generously', 'made', 'available', 'by', 'the', 'internet', 'archive.', 'the', 'map', 'and', 'cover', 'are', 'courtesy', 'of', 'the', 'california', 'history', 'room,', 'california', 'state', 'library,', 'sacramento,', 'california.)', "transcriber's", 'note:', 'inconsistent', 'hyphenation', 'in']


In [293]:
climate = highway_mc.filter(lambda line: 'climate' in line)
print(climate.take(5))
print(climate.count())

['climate', 'climate', 'climate', 'climate,', 'climate']
24


#### Creating a dictionary of word counts

In [294]:

# making a function because word count will be done more than two times
def wordcount(rdd):
    
    rdd_wc = rdd.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda i, m: i + m)
    
# reduce by key combines values with the same key
# map creates here a key-value-pair of form (word, 1)

    return rdd_wc


count_highway = wordcount(highway)

print(type(count_highway))
print(count_highway.take(20))

# was appeared 854 in the text
# in text analysis 'was' is considered a stop word
# which does not contribute to the analysis

<class 'pyspark.rdd.PipelinedRDD'>
[('', 6601), ('***', 1), ('Distributed', 1), ('at', 493), ('was', 854), ('produced', 6), ('generously', 1), ('The', 682), ('Archive.', 1), ('are', 458), ('of', 4282), ('State', 12), ('Library,', 1), ('Note:', 1), ('in', 1792), ('preserved.', 2), ('errors', 1), ('have', 314), ('corrected.', 1), ('AMERICA', 1)]


### Determine the 10 most frequent words

In [295]:
# key-values-swap places values at first and then keys
# the values become keys
# keys are sorted with sortByKey
# more on sortByKey below

count_highway_swap = count_highway.map(lambda x: (x[1], x[0]))
count_highway_sort = count_highway_swap.sortByKey(ascending = False)

print("")

topten_1 = count_highway_sort.take(10)
print(type(topten_1))
print(topten_1)

print("")

for i in count_highway_sort.take(10):
    print("The word \"{}\" appears {} times.".format(i[1], i[0]))


<class 'list'>
[(7929, 'the'), (6601, ''), (4282, 'of'), (3355, 'and'), (2447, 'a'), (2157, 'to'), (1792, 'in'), (1155, 'is'), (1021, 'we'), (854, 'was')]

The word "the" appears 7929 times.
The word "" appears 6601 times.
The word "of" appears 4282 times.
The word "and" appears 3355 times.
The word "a" appears 2447 times.
The word "to" appears 2157 times.
The word "in" appears 1792 times.
The word "is" appears 1155 times.
The word "we" appears 1021 times.
The word "was" appears 854 times.


Make a function for the top-ten words.

In [296]:
# input is the wordcount rdd
# both functions could be integrated into one function
def topten(wordcount_rdd):
    
    wordcount_rdd_swap = wordcount_rdd.map(lambda x: (x[1], x[0]))
    wordcount_rdd_sort = wordcount_rdd_swap.sortByKey(ascending = False)


    for i in wordcount_rdd_sort.take(10):
        print("The word \"{}\" appears {} times.".format(i[1], i[0]))
    
    

In [297]:
topten(count_highway)

The word "the" appears 7929 times.
The word "" appears 6601 times.
The word "of" appears 4282 times.
The word "and" appears 3355 times.
The word "a" appears 2447 times.
The word "to" appears 2157 times.
The word "in" appears 1792 times.
The word "is" appears 1155 times.
The word "we" appears 1021 times.
The word "was" appears 854 times.


### Text RDD: galaxy

In [298]:

# splits the text into multiple words each a line
# flatMap flats a nested list into a one-dimensional list
galaxy_samsung = galaxy.flatMap(correct_lines)

for line in galaxy_samsung.take(4): 
  print(line)

# checks the if Samsung is in the text
samsung = galaxy_samsung.filter(lambda line: 'Samsung' in line)
print(samsung.collect())
print(samsung.count())
# samsung is 3 times in the text


samsung’s
galaxy
s11
is
[]
0


In [299]:
# creating a word count

galaxy_wc = wordcount(galaxy_samsung)

print("word count galaxy text: {}".format(galaxy_wc.count()))
print(galaxy_wc.take(15))


word count galaxy text: 79
[('is', 1), ('close', 1), ('upgraded', 1), ('biometrics', 1), ('class-leading', 1), ('new', 1), ('look', 1), ('set', 1), ('make', 1), ('strange', 1), ('name', 2), ('but', 1), ('of', 1), ('2020', 1), ('phone', 2)]


In [300]:
topten(galaxy_wc)

The word "and" appears 4 times.
The word "the" appears 4 times.
The word "galaxy" appears 3 times.
The word "to" appears 3 times.
The word "for" appears 3 times.
The word "a" appears 3 times.
The word "name" appears 2 times.
The word "phone" appears 2 times.
The word "than" appears 2 times.
The word "samsung’s" appears 2 times.


## Stopwords

<p>
There are a lot of stopwords in the text.
</p>

<p>
In computing, stop words are words which are filtered out before processing of natural language data (text).Stop words are generally the most common words in a language.
</p> 

<a href="https://www.w3schools.cohttps://en.wikipedia.org/wiki/Stop_wordsm" target="_blank">Wikipedia</a> 

<p>
Stopwords are now filtered out. This makes for example the word count cleare.
The english.txt ist taken from GitHub. Thanks to the contributors.
</p> 

<a href="https://github.com/Alir3z4/stop-words/blob/master/english.txt</p> 
" target="_blank">GitHubWikipedia</a> 


In [301]:
# Additionally a very small stopwords-list is created manually
stopwords_small = ['is', 'cause', 'am', 'these', 'are','the','for','a', 'will']

### Different methods to import the stopwords-txt-file

In [302]:
# Creating a file path with Pathlib
path_join = pathlib.Path.cwd().joinpath(r'english.txt')
path_join = path_join.resolve()

# print(path_join)
# print(type(path_join ))

# Alternative way to create a file path with Pathlib
# path = pathlib.Path.cwd() / 'english.txt'

# print(path)
# print(type(path))

In [303]:
# access the file with read

with open(path_join, encoding='utf-8') as file: # Use file to refer to the file object
    # access files
    data1 = file.read()
    # split strings into list
    stopwords1 = data1.split('\n')
       
print(stopwords1[:10])
print(type(stopwords1))

print("")

# access the file with read lines
full = os.path.join(path_join)
with open(full, encoding="utf-8") as file:
    stopwords2 = file.readlines()
    
print(stopwords2[:5])
print(type(stopwords2))

["'ll", "'tis", "'twas", "'ve", '10', '39', 'a', "a's", 'able', 'ableabout']
<class 'list'>

["'ll\n", "'tis\n", "'twas\n", "'ve\n", '10\n']
<class 'list'>


In [304]:
# alternatively using read table of Pandas
stopwords_pd = pd.read_table(r"english.txt")
print(type(stopwords_pd))
print(stopwords_pd.shape)

# to pd.Series
stop_pd_series = pd.Series(stopwords_pd.iloc[:,0])

stop_list = stop_pd_series.tolist()
print(type(stop_list))
print(len(stop_list))

<class 'pandas.core.frame.DataFrame'>
(1297, 1)
<class 'list'>
1297


In [305]:
# loading the stopwords text file into Pyspark
# returns a RDD
stopwords_RDD = sc.textFile('english.txt')

print(type(stopwords_RDD))
print(stopwords_RDD.take(5))


<class 'pyspark.rdd.RDD'>
["'ll", "'tis", "'twas", "'ve", '10']


### Filter with stopwords

Text: galaxy

In [306]:
galaxy_stop = galaxy_samsung.filter(lambda x: x not in stopwords_small)

print(type(galaxy_stop))
print(galaxy_stop.take(15))

# galaxy_stop is the flattened RDD filtered of the words contained in stopwords_small

<class 'pyspark.rdd.PipelinedRDD'>
['samsung’s', 'galaxy', 's11', 'close', 'to', 'release', 'and', 'its', 'supersized', 'battery,', 'upgraded', 'biometrics', 'and', 'class-leading', 'new']


In [307]:
galaxy_stop2 = galaxy_samsung.filter(lambda x: x not in stopwords1)

print(type(galaxy_stop2))
print(galaxy_stop2.take(15))
# galaxy_stop2 is the flattened RDD filtered of the words contained in stopwords1 taken from GitHub as txt file

<class 'pyspark.rdd.PipelinedRDD'>
['samsung’s', 'galaxy', 's11', 'close', 'release', 'supersized', 'battery,', 'upgraded', 'biometrics', 'class-leading', 'display', 'set', 'surprisingly', 'strange', 'change.']


Notice: galaxy_stop contains the word "and", galaxy_stop2 does not. "and" is contained in stopwords1 and the original
file from GitHub english.txt. It is not contained in the self created list stopwords_small. Therefore "and" is not
filtered out in galaxy_stop.

#### Wordcount

In [308]:
galaxy_stop_wc2 = wordcount(galaxy_stop2)

print(galaxy_stop_wc2)

print("")

print("word count galaxy stop text: {}".format(galaxy_stop_wc2.count()))

print("")

print(galaxy_stop_wc2.take(15))

print("")

print("maximum key-value-pair:" , galaxy_stop_wc2.max(lambda x:x[1]))
print("minimum key-value-pair:" , galaxy_stop_wc2.min(lambda x:x[1]))


PythonRDD[931] at RDD at PythonRDD.scala:53

word count galaxy stop text: 45

[('close', 1), ('upgraded', 1), ('biometrics', 1), ('class-leading', 1), ('set', 1), ('strange', 1), ('2020', 1), ('phone', 2), ('accidentally', 1), ('confirmed.', 1), ('major', 1), ('manual', 1), ('lite', 1), ('sammobile.', 1), ('reveals', 1)]

maximum key-value-pair: ('galaxy', 3)
minimum key-value-pair: ('close', 1)


In [309]:
topten(galaxy_stop_wc2)
# top ten word apperances cleaned from stop words

The word "galaxy" appears 3 times.
The word "phone" appears 2 times.
The word "samsung’s" appears 2 times.
The word "s10" appears 2 times.
The word "close" appears 1 times.
The word "upgraded" appears 1 times.
The word "biometrics" appears 1 times.
The word "class-leading" appears 1 times.
The word "set" appears 1 times.
The word "strange" appears 1 times.


Text: highway

In [310]:
highway_mc_stop = highway_mc.filter(lambda i: i not in stopwords1)
print(type(highway_mc_stop ))
print("number of words: ", highway_mc_stop.count())
print(highway_mc_stop.take(10))

print("")

highway_mc_stop_wc = wordcount(highway_mc_stop)

print(highway_mc_stop_wc)
print("word key-value-pairs: ", highway_mc_stop_wc.count())
print(highway_mc_stop_wc.take(10))
print("maximum key-value-pair:" , highway_mc_stop_wc.max(lambda x:x[1]))
print("minimum key-value-pair:" , highway_mc_stop_wc.min(lambda x:x[1]))

<class 'pyspark.rdd.PipelinedRDD'>
number of words:  44298
['sunset', 'highways', '***', 'produced', 'melissa', 'mcdaniel', 'online', 'distributed', 'proofreading', 'team']

PythonRDD[950] at RDD at PythonRDD.scala:53
word key-value-pairs:  14153
[('sunset', 28), ('***', 1), ('produced', 7), ('online', 1), ('team', 5), ('generously', 1), ('internet', 1), ('library,', 1), ('sacramento,', 4), ("transcriber's", 1)]
maximum key-value-pair: ('san', 375)
minimum key-value-pair: ('***', 1)


In [311]:
topten(highway_mc_stop_wc)

The word "san" appears 375 times.
The word "road" appears 338 times.
The word "miles" appears 263 times.
The word "mission" appears 192 times.
The word "santa" appears 178 times.
The word "california" appears 163 times.
The word "time" appears 136 times.
The word "town" appears 130 times.
The word "valley" appears 115 times.
The word "mountain" appears 112 times.


### Paired RDDs
<p>The word count data structure are RDDs. For example print(highway_mc_stop_wc) shows "PythonRDD[237] at RDD",
but those are special RDDs using key-value-pairs like for example Python dictionaries or JSON-files do.
The keys are the identifiers of the values.
</p> 

<p>Spark offers a number of functions optimized to operate on paired RDDs:</p> 


<ul>
  <li>reduceByKey(func): Combine values with the same key</li>
  <li>groupByKey(): Group values with the same </li>
  <li>sortByKey(): Return an RDD sorted by the key</li>
  <li>join(): Join two pair RDDs based on their key</li> 
</ul> 

<p>
Paired RDDs are often created from tuples, immutable structure of key-value pairs.
Pairs of random tuples are created below and then transformed into a paired RDD.
</p> 


In [312]:
# import random

random.seed()
tuple_list = \
[ ( random.randint(1, 6), random.randint(0, 200) ) for k in range(300) ]

print(tuple_list[:15])

[(6, 6), (3, 14), (4, 99), (6, 164), (2, 197), (3, 148), (2, 82), (2, 61), (5, 164), (4, 93), (1, 92), (1, 36), (5, 62), (4, 19), (1, 131)]


In [313]:
# paired RDDs are often created from tuples
# immutable structure of key-value pairs

RDD_tuple = sc.parallelize(tuple_list)
print(RDD_tuple)



ParallelCollectionRDD[962] at readRDDFromFile at PythonRDD.scala:247


In [314]:
summary788 = Statistics.colStats(RDD_tuple)

print(summary788.mean())  # a dense vector containing the mean value for each column
print(summary788.variance())  # column-wise variance
print(summary788.numNonzeros())  # number of nonzeros in each column

[ 3.52666667 95.69333333]
[2.58457079e+00 3.13404611e+03]
[300. 300.]


In [315]:
# An rdd is an row oject
# iteration is necessary to read out the row values
for i in RDD_tuple.take(10):
    print("The value of Key {} is {}".format(i[0], i[1]))

The value of Key 6 is 6
The value of Key 3 is 14
The value of Key 4 is 99
The value of Key 6 is 164
The value of Key 2 is 197
The value of Key 3 is 148
The value of Key 2 is 82
The value of Key 2 is 61
The value of Key 5 is 164
The value of Key 4 is 93


#### Selecting data

In [316]:
print(RDD_tuple.take(5))

print("")

# Take top 5 RDD elements
print(RDD_tuple.top(5))

print("")

# Return distinct RDD values
print(RDD_tuple.distinct().take(5))

print("")

# Return (key,value) RDD's keys
print(RDD_tuple.keys().take(5))

# print the first 5 keys
for i in RDD_tuple.take(5):
    print(i[0])

[(6, 6), (3, 14), (4, 99), (6, 164), (2, 197)]

[(6, 174), (6, 173), (6, 165), (6, 164), (6, 164)]

[(6, 164), (5, 137), (1, 9), (4, 66), (3, 95)]

[6, 3, 4, 6, 2]
6
3
4
6
2


Alternatively a paired RDD created is created from the breast cancer data frame above.
This maps a long list of observations to one key equal to the variable name.

In [317]:
# using the breast cancer data set from above
# and turning it into a dict
bc_df_dict = bc_df_45_pd.to_dict()
print(bc_df_dict.keys())

print(" ")

# and turning it into a tuple
bc_df_tuple = [(k,v) for k,v in  bc_df_dict.items() ]
# tuples are accessed like lists
tuple1_names = [[t[0] for t in bc_df_tuple]]
print(tuple1_names )

# creating a paired RDD from tuple
RDD_tuple2 = sc.parallelize(bc_df_tuple)

dict_keys(['Sample code number', 'Clump Thickness', 'Uniformity of Cell Size', 'Uniformity of Cell Shape', 'Single Epithelial Cell Size', 'Bare Nuclei', 'Bland Chromatin', 'Normal Nucleoli', 'Mitoses', 'Class'])
 
[['Sample code number', 'Clump Thickness', 'Uniformity of Cell Size', 'Uniformity of Cell Shape', 'Single Epithelial Cell Size', 'Bare Nuclei', 'Bland Chromatin', 'Normal Nucleoli', 'Mitoses', 'Class']]


In [318]:
# elements of the RDD are accessed by iterating over the RDD
# print out the variable name, the first and fifth element of every value list

for j in RDD_tuple2.collect():
    
# susetting a tuple is similar to subsetting a list
    print("Variable:{} / 1st-value: {}, 5th-value: {}".format(j[0], j[1][1], j[1][5]))

Variable:Sample code number / 1st-value: 1002945.0, 5th-value: 1017122.0
Variable:Clump Thickness / 1st-value: 5.0, 5th-value: 8.0
Variable:Uniformity of Cell Size / 1st-value: 4.0, 5th-value: 10.0
Variable:Uniformity of Cell Shape / 1st-value: 4.0, 5th-value: 10.0
Variable:Single Epithelial Cell Size / 1st-value: 5.0, 5th-value: 8.0
Variable:Bare Nuclei / 1st-value: 7.0, 5th-value: 7.0
Variable:Bland Chromatin / 1st-value: 10.0, 5th-value: 10.0
Variable:Normal Nucleoli / 1st-value: 3.0, 5th-value: 9.0
Variable:Mitoses / 1st-value: 2.0, 5th-value: 7.0
Variable:Class / 1st-value: 1.0, 5th-value: 1.0


#### Difference between count and countByKey

In [319]:
print("number of key-value-pairs:", RDD_tuple2.count())
print("number of keys:", len(bc_df_dict.keys()))

print(" ")

# count how how the keys appear in the tuple
print("CountByKey: ", RDD_tuple2.countByKey())
# one time every key appears


number of key-value-pairs: 10
number of keys: 10
 
CountByKey:  defaultdict(<class 'int'>, {'Sample code number': 1, 'Clump Thickness': 1, 'Uniformity of Cell Size': 1, 'Uniformity of Cell Shape': 1, 'Single Epithelial Cell Size': 1, 'Bare Nuclei': 1, 'Bland Chromatin': 1, 'Normal Nucleoli': 1, 'Mitoses': 1, 'Class': 1})


In [320]:
# random data
print("number of key-value-pairs:", RDD_tuple.count())

print(" ")

print("CountByKey: ", RDD_tuple.countByKey())
# the key 200 appears 2 times


number of key-value-pairs: 300
 
CountByKey:  defaultdict(<class 'int'>, {6: 43, 3: 55, 4: 64, 2: 49, 5: 48, 1: 41})


#### Operation: reduceByKey

In [321]:
# breast cancer data
rddred1 = RDD_tuple2.reduceByKey(lambda x, y: x+y)

# Iterate over the result and print the output
for num in rddred1.collect(): 
  print("The key or variable {} has {} Counts".format(num[0], num[0:][1][3]))

The key or variable Sample code number has 1016277.0 Counts
The key or variable Single Epithelial Cell Size has 1.0 Counts
The key or variable Bland Chromatin has 4.0 Counts
The key or variable Class has 1.0 Counts
The key or variable Clump Thickness has 6.0 Counts
The key or variable Uniformity of Cell Size has 8.0 Counts
The key or variable Bare Nuclei has 3.0 Counts
The key or variable Normal Nucleoli has 3.0 Counts
The key or variable Uniformity of Cell Shape has 8.0 Counts
The key or variable Mitoses has 7.0 Counts


In [322]:
# reduceByKey combines the keys and allows them to do operation like sum, subtraction ... on the values
# in this case the values for every same key are summed up
RDDred1 = RDD_tuple.reduceByKey(lambda h, j: h+j)

for i in RDDred1.collect():
    print("The sum of key-variable {} is {}".format(i[0], i[1]))

The sum of key-variable 4 is 5654
The sum of key-variable 5 is 4741
The sum of key-variable 1 is 3535
The sum of key-variable 6 is 3986
The sum of key-variable 2 is 4828
The sum of key-variable 3 is 5964


#### Operation: sortbyKey

does exactly what it says. It sorts the data by "key" (and not the values).

In [323]:
RDDsort1 = RDDred1.sortByKey(ascending=False)

for i in RDDsort1.collect():
    print("Key or variable {} has the value: {}".format(i[0], i[1]))
# Sort the variable by key in desceding order from 6 to 1

print("")

RDDsort2 = RDDred1.sortByKey(ascending=True)

for i in RDDsort2.collect():
    print("Key or variable {} has the value: {}".format(i[0], i[1]))
# Sort the variable by key in desceding order from 6 to 1

Key or variable 6 has the value: 3986
Key or variable 5 has the value: 4741
Key or variable 4 has the value: 5654
Key or variable 3 has the value: 5964
Key or variable 2 has the value: 4828
Key or variable 1 has the value: 3535

Key or variable 1 has the value: 3535
Key or variable 2 has the value: 4828
Key or variable 3 has the value: 5964
Key or variable 4 has the value: 5654
Key or variable 5 has the value: 4741
Key or variable 6 has the value: 3986


In [324]:
rddsort3 = rddred1.sortByKey()
for p in rddsort3.collect():
    print(p[0])
    
print(" ")
    
rddsort3 = rddred1.sortByKey(ascending=False)
for p in rddsort3.collect():
    print(p[0])
# works for text as well

Bare Nuclei
Bland Chromatin
Class
Clump Thickness
Mitoses
Normal Nucleoli
Sample code number
Single Epithelial Cell Size
Uniformity of Cell Shape
Uniformity of Cell Size
 
Uniformity of Cell Size
Uniformity of Cell Shape
Single Epithelial Cell Size
Sample code number
Normal Nucleoli
Mitoses
Clump Thickness
Class
Bland Chromatin
Bare Nuclei


#### countByKey

In [325]:
# creating a new data set of tuples just for excercise
tuples_34 =\
[(random.randint(1,10), random.randint(1, 10000)) for i in range(1000)]
# transform this into an RDD
RDD34 = sc.parallelize(tuples_34)

print("type", type(RDD34))
print("")
print(RDD34.top(5))
print("")
# Return distinct RDD values
print(RDD34.distinct().take(5))
print("")
# Return (key,value) RDD's keys
print(RDD34.keys().take(5))

type <class 'pyspark.rdd.RDD'>

[(10, 9997), (10, 9977), (10, 9960), (10, 9901), (10, 9599)]

[(10, 1080), (5, 3077), (9, 129), (3, 8627), (3, 6387)]

[3, 4, 10, 7, 5]


In [326]:
# works here?
summary557 = Statistics.colStats(RDD34)

print(summary557.mean())  
# a dense vector containing the mean value for each column
print(summary557.variance())
# column-wise variance
print(summary557.numNonzeros())  
# number of nonzeros in each column



[   5.456 5002.479]
[8.36843243e+00 8.50986612e+06]
[1000. 1000.]


In [327]:
RDD34cby = \
RDD34.countByKey()

print(RDD34cby)
print(type(RDD34cby))

defaultdict(<class 'int'>, {3: 99, 4: 108, 10: 91, 7: 99, 5: 96, 8: 86, 1: 116, 9: 118, 6: 101, 2: 86})
<class 'collections.defaultdict'>


In [328]:
# Iterate over the total and print the output
for k, v in RDD34cby.items(): 
  print("key", k, "has", v, "counts")
# This mean key 3 occurs 105 times in the RDD data set "RDD34"

key 3 has 99 counts
key 4 has 108 counts
key 10 has 91 counts
key 7 has 99 counts
key 5 has 96 counts
key 8 has 86 counts
key 1 has 116 counts
key 9 has 118 counts
key 6 has 101 counts
key 2 has 86 counts


####  groupBy & groupByKey

whereby the latter works on paired RDDs.

In [341]:
# groupBy the first 4 letters the words start with
# this is the RDD cleaned from stop words
galaxy_grb = galaxy_stop2.groupBy(lambda letters: letters[0:4])
print(type(galaxy_grb))
print(galaxy_grb.take)

for k,v in galaxy_grb.take(30):
    print(k,list(v))
# groupBy attaches every word starting with the first 4 letters to this

<class 'pyspark.rdd.PipelinedRDD'>
<bound method RDD.take of PythonRDD[1072] at RDD at PythonRDD.scala:53>
sams ['samsung’s', 'samsung', 'samsung’s']
rele ['release']
batt ['battery,']
biom ['biometrics']
disp ['display']
set ['set']
stra ['strange']
2020 ['2020']
phon ['phone', 'phone']
conf ['confirmed.']
majo ['major']
blun ['blunder,']
manu ['manual']
lite ['lite']
leak ['leaked']
samm ['sammobile.']
reve ['reveals']
expe ['expect.']
insi ['insiders']
ice ['ice']
univ ['universe']
onle ['onleaks']
stat ['stated']
away ['away.']
gala ['galaxy', 'galaxy', 'galaxy']
s11 ['s11']
clos ['close']
supe ['supersized']
upgr ['upgraded']
clas ['class-leading']


Counting the frequency of the grouped RDD.

In [381]:
# this transforms the RDD into a paired RDD consisting of key-value-pairs
galaxy_paired = galaxy_stop2.map(lambda word: (word, 1))
# now the paired RDD is grouped by the key word


galaxy_paired_group = galaxy_paired.groupByKey()
print([(i, list(m)) for i,m in galaxy_paired_group.take(20)])

print("")

# mapValues can aggregate, sum, count the values in every key
# map is the key value swap need for sorting
galaxy_mV =\
galaxy_paired_group.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(ascending=False)
print("Mapped values: ", galaxy_mV.take(10))

[('close', [1]), ('upgraded', [1]), ('biometrics', [1]), ('class-leading', [1]), ('set', [1]), ('strange', [1]), ('2020', [1]), ('phone', [1, 1]), ('accidentally', [1]), ('confirmed.', [1]), ('major', [1]), ('manual', [1]), ('lite', [1]), ('sammobile.', [1]), ('reveals', [1]), ('suggests,', [1]), ('expect.', [1]), ('insiders', [1]), ('ice', [1]), ('universe', [1])]

Mapped values:  [(3, 'galaxy'), (2, 'phone'), (2, 'samsung’s'), (2, 's10'), (1, 'close'), (1, 'upgraded'), (1, 'biometrics'), (1, 'class-leading'), (1, 'set'), (1, 'strange')]


In [384]:
# this transforms the RDD into a paired RDD consisting of key-value-pairs
highway_paired = highway_mc_stop.map(lambda word: (word, 1))
# now the paired RDD is grouped by the key word

highway_paired_group = highway_paired.groupByKey()
print([(i, list(m)) for i,m in highway_paired_group.take(10)])

print("")

# mapValues can aggregate, sum, count the values in every key
# map is the key value swap need for sorting
highway_mV =\
highway_paired_group.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(ascending=False)
print("Mapped values: ", highway_mV.take(10)) 

[('sunset', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), ('***', [1]), ('produced', [1, 1, 1, 1, 1, 1, 1]), ('online', [1]), ('team', [1, 1, 1, 1, 1]), ('generously', [1]), ('internet', [1]), ('library,', [1]), ('sacramento,', [1, 1, 1, 1]), ("transcriber's", [1])]

Mapped values:  [(375, 'san'), (338, 'road'), (263, 'miles'), (192, 'mission'), (178, 'santa'), (163, 'california'), (136, 'time'), (130, 'town'), (115, 'valley'), (112, 'mountain')]


#### Join

two RDDs by keys.

In [391]:
# At first making to RDDs from one RDD by taking samples
high_sample_1 = highway_paired.sample(withReplacement=False, fraction=0.1, seed=45)
high_sample_2 = highway_paired.sample(withReplacement=False, fraction=0.12, seed=78)

print(type(high_sample_1), type(high_sample_2))
print(len(high_sample_1.collect()), len(high_sample_2.collect()))

<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.PipelinedRDD'>
4441 5230


In [390]:
# join
join_1 = high_sample_1.join(high_sample_2)
print(join_1.take(3))

[('sunset', (1, 1)), ('sunset', (1, 1)), ('sunset', (1, 1))]


In [None]:
# Distinct

In [399]:
distinct1 = galaxy_stop.distinct()

[i for i in distinct1.take(5)]

['close', 'upgraded', 'biometrics', 'class-leading', 'new']