In [2]:
sc

<pyspark.context.SparkContext at 0x7f0b41c37310>

In [1]:
couponPath = "/home/dsc/Data/spark/data/coupon150720.csv"
transmPath = "/home/dsc/Data/spark/data/transm150720.csv"

In [2]:
ls = sc.textFile(couponPath)

In [5]:
ls.take(3)

[u'79062005698500,1,MAA,AUH,9W,9W,56.79,USD,1,H,H,0526,150904,OK,IAF0',
 u'79062005698500,2,AUH,CDG,9W,9W,84.34,USD,1,H,H,6120,150905,OK,IAF0',
 u'79062005924069,1,CJB,MAA,9W,9W,60.0,USD,1,H,H,2768,150721,OK,IAA0']

In [3]:
def getALIngreso(l):
    elems = l.split(",")
    al = elems[4]
    ingreso = float(elems[6])
    return (al, ingreso)

In [8]:
getALIngreso(ls.first())

(u'9W', 56.79)

In [10]:
#¿Cuál es el ingreso total por aerolínea?
#Primero una lista de tuplas:
als = ls.map(getALIngreso)
als.take(3)

[(u'9W', 56.79), (u'9W', 84.34), (u'9W', 60.0)]

In [13]:
#Agrupar por cada criterio es ineficiente porque  las particiones se generan por las claves de agrupación
#Agrupar es caro y lleva tiempo y recursos...
als.groupByKey().mapValues(lambda vs: sum(vs)).take(4)

[(u'', 478.31000000000006),
 (u'BE', 64073.329999999834),
 (u'WN', 922132.1599999971),
 (u'JV', 131.66)]

In [28]:
totales = als.groupByKey().mapValues(lambda vs: round(sum(vs)/1e3,5))

In [29]:
totales.ordenados = totales.sortBy(lambda x: x[1], False)

In [30]:
#Top ten airlines by income in thousands.
totales.ordenados.take(10)

[(u'GA', 20712.22657),
 (u'BA', 12443.98698),
 (u'AF', 10221.50791),
 (u'QF', 8358.5879),
 (u'LH', 7715.60834),
 (u'QR', 6935.58437),
 (u'SV', 6286.29934),
 (u'UA', 5151.81529),
 (u'JJ', 5066.30051),
 (u'AA', 4151.99646)]

In [33]:
alsReduced = als.reduceByKey (lambda e,acum: e+acum)
alsReducedSorted = alsReduced.sortBy(lambda x: x[1], False)

In [34]:
alsReducedSorted.take(5)

[(u'GA', 20712226.569999855),
 (u'BA', 12443986.980000002),
 (u'AF', 10221507.909999428),
 (u'QF', 8358587.9000001205),
 (u'LH', 7715608.339999718)]

In [4]:
def getALIngresoOne(l):
    elems = l.split(",")
    al = elems[4]
    ingreso = float(elems[6])
    return (al, (ingreso,float(1)))

In [51]:
als = ls.map(getALIngresoOne)
als.take(3)

[(u'9W', (56.79, 1.0)), (u'9W', (84.34, 1.0)), (u'9W', (60.0, 1.0))]

In [52]:
def reduceALS(e, acum):
    aggrCount = e[1]+acum[1]
    aggrSum = e[0] + acum[0]
    return (aggrSum,aggrCount)

In [7]:
als = ls.map(getALIngreso)

In [53]:
alsReduced = als.reduceByKey (reduceALS)

In [54]:
alsReduced.first()

(u'', (478.31000000000006, 18811.0))

In [8]:
als.groupByKey().mapValues(lambda vs: sum(vs)/len(vs)) \
.sortBy(lambda x: x[1],False).take(5)

[(u'S3', 5225.068852459017),
 (u'9V', 1488.673986486487),
 (u'GA', 991.2527671691715),
 (u'TN', 964.6472580645161),
 (u'7F', 668.035)]

In [9]:
#En reduce las operaciones que se apliquen en reduce tienen que ser aditivas (monoides, como la suma). Si no son monoides
#No se pueden paralelizar.

def calculaMediaReduce (e, acum):
    sumaTotal = acum [0]
    numElems = acum [1]
    return (e[0] + sumaTotal, e[1] + numElems)

In [10]:
#Agregamos al map de als un entero 1 para sumar el número de ocurrencias.
als.map(lambda x: (x[0],(x[1],1))).take(2)

[(u'9W', (56.79, 1)), (u'9W', (84.34, 1))]

In [11]:
#Agrupamos ahora el map y extraemos el resultado agrupado por aerolínea.
als.map(lambda x: (x[0],(x[1],1))).reduceByKey(calculaMediaReduce).take(2)

[(u'', (478.31000000000006, 18811)), (u'BE', (64073.32999999998, 788))]

In [14]:
#Lo mapeamos (el resultado del reduce) y obtenemos la media.
tr = als.map(lambda x: (x[0],(x[1],1))).reduceByKey(calculaMediaReduce)

In [16]:
tr.mapValues(lambda x: x[0]/x[1]).take(5)

[(u'', 0.025427143692520335),
 (u'BE', 81.31133248730961),
 (u'WN', 107.49966892049395),
 (u'JV', 131.66),
 (u'WK', 125.48430577223107)]