### Joins en Spark 

In [None]:
# Cuando se trabaja en cluster y queremos hacer un Join,
# hay que poner la memoria cache del cluster a 0% y la memoria Shuffle al 100%
# Esto hace que si necesitamos realizar operaciones con cache (como machine learning) mejor hacer 
# trabajos por separado

In [5]:
def getCNIngreso(l):
    elems = l.split(",")
    tcn = elems[0]
    ingreso = float(elems[6])
    return (tcn, ingreso)

In [6]:
cs = sc.textFile('../../Data/spark/coupon150720.csv').map(getCNIngreso)

In [7]:
cs.take(2)

[(u'79062005698500', 56.79), (u'79062005698500', 84.34)]

In [8]:
sc.textFile('../../Data/spark/transm150720.csv').first()

u'79062005698500,TKTT,30,150719,FR,0.0,EUR,T,T,141025,PARA127A8,0.0,EUR,   ,EX,,150719,0.0'

In [9]:
def getTCNCanal(l):
    elems = l.split(",")
    tcn = elems[0]
    canal = elems[8]
    return (tcn, canal)

In [10]:
ts = sc.textFile('../../Data/spark/transm150720.csv').map(getTCNCanal)

In [11]:
ts.take(2)

[(u'79062005698500', u'T'), (u'79062005924069', u'T')]

In [12]:
cs.count()

1232662

In [13]:
ts.count()

631405

In [None]:
# Tenemos Tickets T(TCN,Canal) y Cupones c(TCN,$)

In [15]:
cs.cache()
ts.cache()

PythonRDD[25] at RDD at PythonRDD.scala:43

In [16]:
ts.leftOuterJoin(cs).take(3)

[(u'79062005994272', (u'A', 56.27)),
 (u'79062005994272', (u'A', 56.27)),
 (u'79065668614400', (u'T', 162.33))]

In [17]:
cs.filter(lambda x: x[0] == '79062005994272').take(5)

[(u'79062005994272', 56.27), (u'79062005994272', 56.27)]

In [18]:
cs.leftOuterJoin(ts).take(3)

[(u'79062005994272', (56.27, u'A')),
 (u'79062005994272', (56.27, u'A')),
 (u'79065668614400', (162.33, u'T'))]

In [19]:
ts.leftOuterJoin(cs).count()

1232662

In [20]:
cs.leftOuterJoin(ts).count()

1232662

In [None]:
#Nos da igual hacerlo con left o con right ya que cs tiene claves repetidas

In [21]:
j = ts.leftOuterJoin(cs).cache()

In [22]:
j.take(2)

[(u'79062005994272', (u'A', 56.27)), (u'79062005994272', (u'A', 56.27))]

In [24]:
j.map(lambda x: x[1]).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], False).take(10)

[(u'A', 99011336.9600026),
 (u'T', 83637221.2200021),
 (u'E', 994305.880000001),
 (u'V', 843207.13),
 (u'', 345827.3100000002)]

In [None]:
# 'A' Son cupones vendidos por aerolineas , 'T' por agencias

In [None]:
# Vamos a agrupar por canales para no tener claves duplicadas

In [25]:
ts.take(2)

[(u'79062005698500', u'T'), (u'79062005924069', u'T')]

In [26]:
cs.take(2)

[(u'79062005698500', 56.79), (u'79062005698500', 84.34)]

In [27]:
valorTs = cs.groupByKey()

In [28]:
valorTs.take(2)

[(u'79062005558463',
  <pyspark.resultiterable.ResultIterable at 0x7fc2e0e63550>),
 (u'79065668432713',
  <pyspark.resultiterable.ResultIterable at 0x7fc2e0e632d0>)]

In [29]:
valorTs.mapValues(lambda vs: sum(vs)).take(2)

[(u'79062005558463', 0.0), (u'79065668432713', 286.98)]

In [30]:
valorTs = cs.groupByKey().mapValues(lambda vs: sum(vs))

In [31]:
valorTs.count()

631405

In [None]:
ts.map(lambda x: x[0]).distinct().count() #Si nos da el mismo resultado entonces no hay duplicados

In [32]:
j = ts.leftOuterJoin(valorTs)

In [33]:
j.take(3)

[(u'79062005994272', (u'A', 112.54)),
 (u'79065668614400', (u'T', 229.51000000000002)),
 (u'79062005879562', (u'T', 99.50999999999999))]

In [34]:
j.map(lambda x: x[1]).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], False).take(5)

[(u'A', 99011336.96000284),
 (u'T', 83637221.22000207),
 (u'E', 994305.8800000008),
 (u'V', 843207.1299999999),
 (u'', 345827.31)]

### Top 5 de Aerolineas con más cantidad de niños

In [44]:
def getTipoPasajero (l):
    elems = l.split(",")
    tcn = elems[0]
    tipoPasajero = elems[13]
    return (tcn, tipoPasajero)

In [63]:
paxtypes = sc.textFile('../../Data/spark/transm150720.csv').map(getTipoPasajero)

In [62]:
def getAerolineas (l):
    elems = l.split(",")
    tcn = elems[0]
    al = elems[5]
    return (tcn, al)

In [64]:
als = sc.textFile('../../Data/spark/coupon150720.csv').map(getAerolineas)

In [65]:
als.take(2)

[(u'79062005698500', u'9W'), (u'79062005698500', u'9W')]

In [66]:
paxtypes.take(2)

[(u'79062005698500', u'   '), (u'79062005924069', u'ADT')]

In [68]:
fpax = paxtypes.filter(lambda x: x[1] == 'CHD')

In [69]:
fpax.leftOuterJoin(als).take(2)

[(u'79065668446486', (u'CHD', u'UL')), (u'79065668446486', (u'CHD', u'AI'))]

In [71]:
j = als.rightOuterJoin(fpax).map(lambda x: x[1])

In [72]:
j.take(2)

[(u'LH', u'CHD'), (u'OS', u'CHD')]

In [75]:
j.mapValues(lambda x: 1).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], False).take(5)

[(u'JJ', 5350), (u'SV', 5073), (u'BA', 3556), (u'AF', 3378), (u'AB', 3226)]

### Top 5 de cupones mas caros de niños

In [91]:
paxtypes.take(4)

[(u'79062005698500', u'   '),
 (u'79062005924069', u'ADT'),
 (u'79065668570385', u'CHD'),
 (u'79065668737021', u'   ')]

In [96]:
cs = sc.textFile('../../Data/spark/coupon150720.csv').map(getCNIngreso)

In [97]:
cs.take(2)

[(u'79062005698500', 56.79), (u'79062005698500', 84.34)]

In [94]:
fpax.take(2)

[(u'79065668570385', u'CHD'), (u'79062005305018', u'CHD')]

In [98]:
fpax.join(cs).take(2)

[(u'79065668446486', (u'CHD', 165.69)), (u'79065668446486', (u'CHD', 172.37))]

In [101]:
fpax.join(cs).map(lambda x: x[1]).sortBy(lambda x: x[1], False).take(5)

[(u'CHD', 14498.76),
 (u'CHD', 13547.31),
 (u'CHD', 9446.75),
 (u'CHD', 7709.74),
 (u'CHD', 6629.51)]