## Resilient Distributed Datasets

* Resilient Distributed Datasets (RDDs) are a distributed collection of *immutable* JVM objects that allow you to perform calculations very quickly
* As the name suggests, the dataset is distributed; it is split into chunks based on some key and distributed to executor nodes
* RDDs keep track (log) of all the transformations applied to each chunk to speed up the computations and provide a fallback if things go wrong. This is the *data lineage*.

**Internal workings of an RDD**

* RDDs operate (each transformation is executed ) in parallel.
* The transformations to the dataset are lazy. This means that any transformation is only executed when an action on a dataset is called. This helps Spark to optimize the execution

In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setMaster("local").setAppName("MyApp"))

In [2]:
from lxml.html import fromstring
import os, re

In [3]:
# Creating RDDs
# Manually
data = sc.parallelize([("Amber", 10), ("Sophie", 8), ("Parkin", 90),("Parkin", 10)])

In [5]:
dg = data.map(lambda x:(x[0] + "PLO", x[1] + 8))

In [8]:
dg.take()

[('AmberPLO', 18), ('SophiePLO', 16)]

Try with other data

In [5]:
# Read from a file
VS14MORT = sc.textFile(r"./assets/files/VS14MORT.txt.gz", use_unicode=True, minPartitions= 4)
# A rule of thumb would be to break your dataset into two-four partitions for each in your cluster
# In this case 4
# Schema => [""]

### Transformation

* The `.map(...)` transformation: The method is applied to each element of the RDD.

* The `.filter(...)` transformation: allows you to select elements from your dataset that fit specified criteria.

* The `.flatMap(...)` transformation: it works similarly to `.map(...)`, but it returns a flattened result instead of a list

* The `.distinct(...)` transformation: This method returns a list of distinct values in a specified column

* The `.sample(...)` transformation: returns a randomized sample from the dataset.
  * False: no replacement
  * fraction: 
  * random_state: 

* The `.leftOuterJoin(...)` transformation: 
  * `join`
  * `intersection`

* The `.repartition(...)` transformation

In [13]:
data_users_1 = data_users.map(lambda x:eval(x))

In [14]:
data_users_2 = data_users_1.flatMap(lambda x:x)

In [18]:
data_users_2.take(3)

[{'CustomerId': 1001,
  'Address': 'Avenida Iquitos 876',
  'Age': 37,
  'CellPhone': 981914173},
 {'CustomerId': 1002,
  'Address': 'Avenida San Juan de Dios 126',
  'Age': 34,
  'CellPhone': 969461498},
 {'CustomerId': 1003,
  'Address': 'Avenida Santa Rosa (San Juan de Lurigancho) 1086',
  'Age': 42,
  'CellPhone': 998535877}]

In [9]:
data_users_3 = (
    data_users_2
    .filter(lambda x:x['Age']>45)
    .filter(lambda x:re.search(r".*28.*",x['Address'])))

In [10]:
data_users_3.take(2)

[{'CustomerId': 1029,
  'Address': 'Avenida Balta (Barranco) 284',
  'Age': 49,
  'CellPhone': 995413921},
 {'CustomerId': 1159,
  'Address': 'Avenida 28 de Julio (Lima) 1320',
  'Age': 46,
  'CellPhone': 981676929}]

In [11]:
data_users_4 = data_users_3.map(lambda x:x['Age'])

In [12]:
data_users_4.distinct().collect()

[48, 49, 46, 50, 47]

In [13]:
data_users_sample = data_users_2.sample(False, 0.1, 123)

In [14]:
data_users_sample.take(2)

[{'CustomerId': 1006,
  'Address': 'Avenida Circunvalación (Lima) 633',
  'Age': 45,
  'CellPhone': 925527751},
 {'CustomerId': 1008,
  'Address': 'Avenida de las Américas (Lima) 1381',
  'Age': 49,
  'CellPhone': 944413570}]

In [15]:
data_sales = sc.textFile(r"C:\Users\LENOVO\Desktop\python_course\PySpark\Projects\DataSales.json", use_unicode=True, minPartitions= 4)

In [16]:
data_sales_1 = data_sales.flatMap(lambda x:eval(x))

In [17]:
# Sample
data_sales_1_sample = data_sales_1.sample(False, 0.01, 123)

In [18]:
data_sales_1_sample.take(1)

[{'CustomerId': 1163,
  'Date': '2019-08-27T23:28:07',
  'PaymentMethod': {'Cash': 'Debit'},
  'Items': [{'ProductId': 10183, 'Amount': 2.0, 'Price': 6.56},
   {'ProductId': 10475, 'Amount': 4.0, 'Price': 5.76}]}]

In [19]:
print(data)
print(data_sales)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274
C:\Users\LENOVO\Desktop\python_course\PySpark\Projects\DataSales.json MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0


* The path cannot contain special characters `[]`. Note, that this also applies to paths stored on Amazon S3 or Microsoft Azure Data Storage.
* Multiple data formats are supported: Text, parquet, JSON, Hive tables, and data from relational databases can be read using a JDBC driver. Note that Spark can automatically work with compressed datasets like the Gzipped. 
    ``` python
    data_from_file = sc.textFile("VS14MORT.txt.gz", 4)
    ```

**Schema**
* RDDs are schema-less data structures 
* So, we can mix almost anything: a tuple, a dict, or a list and Spark will not complain.
* The .collect() method returns all the elements of the RDD to the driver where it is serialized as a list.

In [20]:
data_heterogeneous = sc.parallelize([("Ferrari", "fast"), {"Porsche":10000}, [10, 80, 30]]).collect()
data_heterogeneous[1]['Porsche']

10000

**Reading from files**

* When you read from a text file, each row from the file forms an element of an RDD

### Actions

Execute the scheduled task on the datase. Once you have finished transforming your data you can execute your transformations

* `.take(n)`:
* `.takeSample(n)`:
* `.collect()`: returns all the elements of the RDD to the driver
* `.reduce(...)`: reduces the elements of an RDD using a specified method
* `rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])`
  
  `sorted(rdd.countByKey().items())`
  
  `[('a', 2), ('b', 1)]`
* The `.saveAsTextFile(...)` : 
* The `.foreach(...)` method

In [21]:
data_sales_1.takeSample(False, 2, 123)
# False -> Replacement
# 2 -> size
# 123 -> random_state

[{'CustomerId': 1056,
  'Date': '2019-10-18T20:47:31',
  'PaymentMethod': {'Cash': 'Cash'},
  'Items': [{'ProductId': 10113, 'Amount': 2.0, 'Price': 7.48},
   {'ProductId': 10238, 'Amount': 2.0, 'Price': 7.45},
   {'ProductId': 10144, 'Amount': 2.0, 'Price': 13.41},
   {'ProductId': 10092, 'Amount': 2.0, 'Price': 7.99},
   {'ProductId': 10094, 'Amount': 5.0, 'Price': 4.43},
   {'ProductId': 10320, 'Amount': 1.0, 'Price': 12.73},
   {'ProductId': 10437, 'Amount': 4.0, 'Price': 3.13},
   {'ProductId': 10404, 'Amount': 4.0, 'Price': 2.32},
   {'ProductId': 10189, 'Amount': 3.0, 'Price': 8.22},
   {'ProductId': 10458, 'Amount': 2.0, 'Price': 4.26},
   {'ProductId': 10125, 'Amount': 1.0, 'Price': 0.94}]},
 {'CustomerId': 1148,
  'Date': '2020-10-11T03:21:50',
  'PaymentMethod': {'Cash': 'Debit'},
  'Items': [{'ProductId': 10237, 'Amount': 1.0, 'Price': 4.97},
   {'ProductId': 10115, 'Amount': 7.0, 'Price': 9.78},
   {'ProductId': 10089, 'Amount': 4.0, 'Price': 3.69},
   {'ProductId': 10015,

In [22]:
sample_now = data_sales_1.filter(lambda x:x['CustomerId'] == 1056).flatMap(lambda x:x['Items'])

In [41]:
sample_now.count()

13

In [12]:
import download_page

In [3]:
if not os.path.exists('webpage.txt'):
    
    html_page = download_page.get_page('https://www.nytimes.com/live/2022/05/27/business/economy-news-stocks-inflation?')
    tree = fromstring(html_page)
    html_list = tree.xpath('//p[@class="live-blog-post-content css-h61jh5 evys1bk0"]//text()')
    html_string = '\n'.join(html_list)

    with open('webpage.txt', 'w') as writer:
        writer.write(html_string)
else:
    pass

In [4]:
data = sc.textFile('webpage.txt')
print("> ","Type of the data:" ,type(data))

Type of the data: <class 'pyspark.rdd.RDD'>


In [21]:
lines_prices = data.map(lambda x: len(re.findall(r'price[ s]', x, re.I)))
print('> ', "The percentage where the price or prices were in a line ",lines_prices.sum()/data.count())

>  The percentage where the price or prices were in a line  0.14935064935064934


In [25]:
print("> ","Here works the transformations: ", type(lines_prices))
print("> ","Here work the actions: " ,type(lines_prices.sum()))

>  Here works the transformations:  <class 'pyspark.rdd.PipelinedRDD'>
>  Here wrok the actions:  <class 'int'>


In [39]:
import re
data_4 = data_3.filter(lambda x:re.search(r'^b.*', x))

In [41]:
data_4 = data_2.map(lambda x: (x, 1))

In [43]:
data_4.groupByKey().collect()

[('name', <pyspark.resultiterable.ResultIterable at 0x176b5e96af0>),
 (' age', <pyspark.resultiterable.ResultIterable at 0x176b5e96850>),
 ('dante', <pyspark.resultiterable.ResultIterable at 0x176b5e96790>),
 (' 10', <pyspark.resultiterable.ResultIterable at 0x176b5e96700>),
 ('bryan', <pyspark.resultiterable.ResultIterable at 0x176b5e96640>),
 (' 15', <pyspark.resultiterable.ResultIterable at 0x176b5e960d0>),
 ('gregory', <pyspark.resultiterable.ResultIterable at 0x176b3bf4a90>),
 ('20', <pyspark.resultiterable.ResultIterable at 0x176b3c17ee0>)]

In [36]:
data_5 = data_4.reduceByKey(lambda x,y : str(x) + str(y)).collect()

In [31]:
data_4.take(3)

['bryan']

In [6]:
sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()

[[0], [2], [3], [4], [6]]