# RDD creation

#### [Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

In this notebook we will introduce two different ways of getting data into the basic Spark data structure, the **Resilient Distributed Dataset** or **RDD**. An RDD is a distributed collection of elements. All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result. Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

#### References

The reference book for these and other Spark related topics is *Learning Spark* by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.

The KDD Cup 1999 competition dataset is described in detail [here](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99).

## SparkContext

In [None]:
import pyspark

sc = spark.sparkContext # spark es el Sparksession

In [None]:
sc

## Getting the data files

In this notebook we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a *Gzip* file that we will download locally.

In [None]:
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
from pyspark import SparkFiles
spark.sparkContext.addFile(url)

In [None]:
myRDD = sc.textFile("file://"+SparkFiles.get("kddcup.data_10_percent.gz"))

In [None]:
SparkFiles

Out[8]: pyspark.files.SparkFiles

In [None]:
display(dbutils.fs.ls("/databricks-datasets/COVID/USAFacts/"))

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/USAFacts/USAFacts_readme.md,USAFacts_readme.md,2507,1615898752000
dbfs:/databricks-datasets/COVID/USAFacts/covid_confirmed_usafacts.csv,covid_confirmed_usafacts.csv,4939138,1615898753000
dbfs:/databricks-datasets/COVID/USAFacts/covid_deaths_usafacts.csv,covid_deaths_usafacts.csv,3397903,1615898753000


In [None]:
 myRDD.take(5)

Out[10]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

## Creating a RDD from a file

The most common way of creating an RDD is to load it from a file. Notice that Spark's `textFile` can handle compressed files directly.

In [None]:
type(myRDD)

Out[14]: pyspark.rdd.RDD

Now we have our data file loaded into the `raw_data` RDD.

Without getting into Spark *transformations* and *actions*, the most basic thing we can do to check that we got our RDD contents right is to `count()` the number of lines loaded from the file into the RDD.

In [None]:
myRDD.count() #Cantidad de filas

Out[16]: 494021

We can also check the first few entries in our data.

In [None]:
myRDD.take(2)

Out[17]: ['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.']

In the following notebooks, we will use this raw data to learn about the different Spark transformations and actions.

## Creating and RDD using `parallelize`

Another way of creating an RDD is to parallelize an already existing list.

In [None]:
a = range(100)
data = sc.parallelize(a) # leer del range y me va a devolver un RDD, cuantas particiones? Cuando no digo nada, depende de los cores
type(data) # todo esto es lazing, se lo ha apuntado, no se ha ejecutado.

Out[18]: pyspark.rdd.PipelinedRDD

As we did before, we can `count()` the number of elements in the RDD.

In [None]:
data.count()

Out[19]: 100

As before, we can access the first few elements on our RDD.

In [None]:
data.take(5)

Out[20]: [0, 1, 2, 3, 4]

## Get data and partitions

In [None]:
rddCollect = data.collect() # data RDD, resilient distribuited dataset, es una estructura. Con collect le todas las particiones y se meten en el nodo dirver, y se puede dañar.
print("Number of Partitions: " + str(data.getNumPartitions()))
print("Action: First element: " + str(data.first()))
print(rddCollect)

'''
data = sc.parallelize(a, p) # hace p particiones
'''

Number of Partitions: 8
Action: First element: 0
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
Out[21]: '\ndata = sc.parallelize(a, p) # hace p particiones\n'