<a href="https://colab.research.google.com/github/antonelladamico17/Breast-Cancer-Analysis/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz 
!tar xf spark-2.4.7-bin-hadoop2.7.tgz 
!pip install -q findspark

In [5]:
  import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [6]:
import findspark
findspark.init("spark-2.4.7-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [7]:
import pyspark
type(spark)

pyspark.sql.session.SparkSession

In [8]:
sc = spark.sparkContext

In [10]:
s = '''Every person had a star, every star had a friend, and for every person
carrying a star there was someone else who reflected it, and everyone
carried this reflection like a secret confidante in the heart'''
simple_rdd = sc.parallelize(s.split('\n'))

Here we have just assigned to the variable s the string above, containing 3 lines. we have invoked the method parallelize that accepts a python iterable and build a new RDD where the elements are that of the iterable. We split this string in lines, so we obtain 3 elements.

In [11]:
simple_rdd.collect()

['Every person had a star, every star had a friend, and for every person',
 'carrying a star there was someone else who reflected it, and everyone',
 'carried this reflection like a secret confidante in the heart']

**Collect** is among the ACTION. So the results here is a list of all the elements in my RDD; we have 3 elements, each element is a small string. 

We can perform some operation. the **map** transformation, takes an RDD and allpies to all its element a function which is specified as argument. 
If we call only map it happens nothing, bcs Spark follow a lazy approach: it not execute something unless it is explicitly needed.

In [12]:
 (simple_rdd.map(lambda line: line.split(' '))
           .collect())

[['Every',
  'person',
  'had',
  'a',
  'star,',
  'every',
  'star',
  'had',
  'a',
  'friend,',
  'and',
  'for',
  'every',
  'person'],
 ['carrying',
  'a',
  'star',
  'there',
  'was',
  'someone',
  'else',
  'who',
  'reflected',
  'it,',
  'and',
  'everyone'],
 ['carried',
  'this',
  'reflection',
  'like',
  'a',
  'secret',
  'confidante',
  'in',
  'the',
  'heart']]

**flatMap** is a method that does the same of map but flatten the results.\
**take** only takes a prefixed numbers of elements.\
We do not see the duble brakets in the result, because it is flat.

In [13]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .take(10))

['Every',
 'person',
 'had',
 'a',
 'star,',
 'every',
 'star',
 'had',
 'a',
 'friend,']

**flatMap** is the same operation as before.\
**map**  Now we know that we have obtained an RDD where those elements are stings, and we want to simply remove any comma that are possibly contained in that string (like for ex. friends,). 

In [14]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .take(5))

['every', 'person', 'had', 'a', 'star']

Now we subsequently apply **map** to the dataset which resulted here before we run the **take** action. \
We map any element (word) by using a fct that takes it and return a pair (tuple of two elements) where the first is the word and second is the fixed element 1.\
We want to compute the abs frequencies of words in a corpus.


In [15]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .map(lambda word: (word, 1))
           .take(5))

[('every', 1), ('person', 1), ('had', 1), ('a', 1), ('star', 1)]

Now we can add the *reduce* step. \
**reduceByKey** is a function that takes two arguments and return one value.\
We sum all the elements, so we have a+b.

In [16]:
(simple_rdd.flatMap(lambda line: line.split(' '))
           .map(lambda word: word.replace(',', '').lower())
           .map(lambda word: (word, 1))
           .reduceByKey(lambda a,b: a+b)
           .collect())

[('person', 2),
 ('there', 1),
 ('was', 1),
 ('carried', 1),
 ('this', 1),
 ('like', 1),
 ('secret', 1),
 ('confidante', 1),
 ('in', 1),
 ('heart', 1),
 ('every', 3),
 ('had', 2),
 ('a', 4),
 ('star', 3),
 ('friend', 1),
 ('and', 2),
 ('for', 1),
 ('carrying', 1),
 ('someone', 1),
 ('else', 1),
 ('who', 1),
 ('reflected', 1),
 ('it', 1),
 ('everyone', 1),
 ('reflection', 1),
 ('the', 1)]

Below we have a simple fct that run what we have done before.

In [17]:
def count_freq(rdd):
  return (rdd.flatMap(lambda line: line.split(' '))
            .map(lambda word: word.replace(',', '').lower())
            .map(lambda word: (word, 1))
            .reduceByKey(lambda a,b: a+b)
            .collect())

In [18]:
!wget http://www.scifiscripts.com/scripts/swd1_5-74.txt

--2021-04-25 08:53:01--  http://www.scifiscripts.com/scripts/swd1_5-74.txt
Resolving www.scifiscripts.com (www.scifiscripts.com)... 207.32.177.145
Connecting to www.scifiscripts.com (www.scifiscripts.com)|207.32.177.145|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 203125 (198K) [text/plain]
Saving to: ‘swd1_5-74.txt’


2021-04-25 08:53:02 (632 KB/s) - ‘swd1_5-74.txt’ saved [203125/203125]



In [19]:
sw = sc.textFile('swd1_5-74.txt')

**textFile** allows us to create a RDD starting froma  file stored somewhere. If the argument is a string that does not specify any specific way of storing file this is interpreted as the path name of a file contained in the hardisk of the machine over we are running the program.

In [23]:
sw.take(10)

['The Star Wars',
 'by',
 'George Lucas',
 '',
 '',
 '',
 'Rough Draft [First of four major screenplay drafts]',
 'Lucasfilm Ltd.',
 '5/74',
 '']

In [22]:
count_freq(sw)[: 10]

[('george', 1),
 ('', 3543),
 ('draft', 1),
 ('of', 744),
 ('four', 25),
 ('major', 2),
 ('drafts]', 1),
 ('ltd.', 1),
 ('5/74', 1),
 ('1.', 1)]

In [None]:
import random

NUM_SAMPLES = 10**7

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = (sc.parallelize(range(0, NUM_SAMPLES))
           .filter(inside).count())

print('Pi is roughly {}'.format(4.0 * count / NUM_SAMPLES))