Operaciones RDD
===

In [0]:
import findspark
findspark.init()

In [0]:
from pyspark import SparkConf, SparkContext

APP_NAME = "My Spark Application"
conf = SparkConf().setAppName(APP_NAME) 
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(APP_NAME).getOrCreate() 

---

## Lectura de archivos

### Desde variables

In [0]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:194

In [0]:
## Retorna una lista que contiene todos los 
## elementos en un RDD
rdd.collect()

[1, 2, 3, 4, 5]

In [0]:
rdd = sc.parallelize({"a":1, "b":2, "c":3, "d":4})
rdd.collect()

['a', 'b', 'c', 'd']

### textFile

In [0]:
rdd = sc.textFile("files/wordcount/")
rdd

files/wordcount/ MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [0]:
type(rdd)

pyspark.rdd.RDD

In [0]:
rdd.collect()

['Analytics is the discovery, interpretation, and communication of meaningful patterns ',
 'in data. Especially valuable in areas rich with recorded information, analytics relies ',
 'on the simultaneous application of statistics, computer programming and operations research ',
 'to quantify performance.',
 '',
 'Organizations may apply analytics to business data to describe, predict, and improve business ',
 'performance. Specifically, areas within analytics include predictive analytics, prescriptive ',
 'analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big ',
 'Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, ',
 'marketing optimization and marketing mix modeling, web analytics, call analytics, speech ',
 'analytics, sales force sizing and optimization, price and promotion modeling, predictive ',
 'science, credit risk analysis, and fraud analytics. Since analytics can require extensive ',
 'computation (see

In [0]:
rdd = sc.wholeTextFiles("files/wordcount/")
for row in rdd.collect():
    print(row)
    print('----')

('file:/Volumes/Data/GitHub/Analitica-de-grandes-datos/AGD-05-Spark/files/wordcount/text0.txt', 'Analytics is the discovery, interpretation, and communication of meaningful patterns \nin data. Especially valuable in areas rich with recorded information, analytics relies \non the simultaneous application of statistics, computer programming and operations research \nto quantify performance.\n\nOrganizations may apply analytics to business data to describe, predict, and improve business \nperformance. Specifically, areas within analytics include predictive analytics, prescriptive \nanalytics, enterprise decision management, descriptive analytics, cognitive analytics, Big \nData Analytics, retail analytics, store assortment and stock-keeping unit optimization, \nmarketing optimization and marketing mix modeling, web analytics, call analytics, speech \nanalytics, sales force sizing and optimization, price and promotion modeling, predictive \nscience, credit risk analysis, and fraud analytic

## Operaciones RDD

Ver http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

En este ejemplo se computa la cantidad de caracteres leídos de un grupo de archivos.

In [0]:
## carga el archivo
rdd = sc.textFile("files/wordcount/")

## calcula la longitud de cada linea
rdd = rdd.map(lambda x: len(x))
print(rdd.collect())

## calcula el total de caracteres
rdd = rdd.reduce(lambda a, b: a+b)
rdd

[85, 87, 92, 24, 0, 94, 93, 91, 88, 89, 89, 90, 92, 65, 86, 88, 90, 84, 87, 85, 87, 83, 81, 11]


1861

In [0]:
## en este ejemplo se pasa una función arbitraria a `map`
from operator import add
rdd = sc.textFile("files/wordcount/")
rdd = rdd.map(len)
print(rdd.collect())
rdd = rdd.reduce(add)
rdd

[85, 87, 92, 24, 0, 94, 93, 91, 88, 89, 89, 90, 92, 65, 86, 88, 90, 84, 87, 85, 87, 83, 81, 11]


1861

### Pares (Key, Value) 

In [0]:
##
## creación de parejas (key, value)
##
rdd = sc.parallelize(["a", "b", "a", "c", "d", "a", "b"])
rdd = rdd.map(lambda s: (s, 1))
print(rdd.collect())
rdd = rdd.reduceByKey(lambda a, b: a + b)
print(rdd.collect())

[('a', 1), ('b', 1), ('a', 1), ('c', 1), ('d', 1), ('a', 1), ('b', 1)]
[('a', 3), ('b', 2), ('c', 1), ('d', 1)]


---