# 2C 2017
# Enunciado
Se cuenta con un RDD con información sobre patentamientos de autos con la siguiente información:
(patente, marca, modelo,versión, tipo_vehiculo, provincia, fecha), 
donde **tipo_vehiculo** indica si la unidad patentada es auto, pickup, camión o moto.

Se pide generar un programa en pySpark que indique la marca y modelo del auto más patentado por tipo de vehículo en la provincia de Buenos Aires en el mes de Abril de 2017.

In [1]:
## -------------------------------------------------------------- ##
## Configuración para MAC                                         ##
## con múltiples versiones de python                              ##
## -------------------------------------------------------------- ##
import pyspark
import os
os.environ["PYSPARK_PYTHON"] = "python3.6"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3.6"
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("sample_app")
sc = SparkContext(conf=conf)

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

data_path = 'data';

df_patentamientos = sqlContext.read.csv(data_path + '/patentamientos.csv', header=True)

In [3]:
rdd_patentamientos = df_patentamientos.rdd
rdd_patentamientos

MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0

In [4]:
rdd_patentamientos.first()

Row(patente='AAA111', marca='VW', modelo='GOL', version='Confort-line', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01')

In [5]:
rdd_patentamientos.collect()

[Row(patente='AAA111', marca='VW', modelo='GOL', version='Confort-line', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA222', marca='VW', modelo='GOL', version='Confort-line', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA333', marca='VW', modelo='GOL', version='Trend-line', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA444', marca='VW', modelo='GOL', version='Power', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA555', marca='VW', modelo='GOL', version='Confort-line', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA666', marca='VW', modelo='AMAROK', version='Confort-line', tipo_vehiculo='pickup', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA777', marca='VW', modelo='AMAROK', version='Trend-line', tipo_vehiculo='pickup', provincia='BS AS', fecha='2017-04-01'),
 Row(patente='AAA888', marca='VW', modelo='AMAROK', vers

In [6]:
rdd_patentamientos.filter(lambda a: (a[5] == "BS AS") and (a[6] >= "2017-04-01" or a[6] <= "2017-04-30")).take(1)

[Row(patente='AAA111', marca='VW', modelo='GOL', version='Confort-line', tipo_vehiculo='auto', provincia='BS AS', fecha='2017-04-01')]

In [7]:
## -------------------------------------------------------------- ##
## Es posible agregar .cache()                                    ##
## para evitar el filtrado cada vez que usamos el rdd_filtered    ##
## -------------------------------------------------------------- ##
rdd_filtered = rdd_patentamientos.filter(lambda a: (a[5] == "BS AS") and (a[6] >= "2017-04-01" or a[6] <= "2017-04-30"))

In [8]:
rdd_grouped = rdd_filtered.map(lambda x: ((x[1], x[2], x[3], x[4]), 1))\
            .reduceByKey(lambda x, y: x + y)
            

In [9]:
rdd_grouped.collect()

[(('VW', 'GOL', 'Confort-line', 'auto'), 5),
 (('VW', 'GOL', 'Trend-line', 'auto'), 2),
 (('VW', 'GOL', 'Power', 'auto'), 2),
 (('VW', 'AMAROK', 'Confort-line', 'pickup'), 1),
 (('VW', 'AMAROK', 'Trend-line', 'pickup'), 1),
 (('VW', 'AMAROK', 'Power', 'pickup'), 1),
 (('RENAULT', 'CLIO', 'Confort-line', 'auto'), 2),
 (('RENAULT', 'CLIO', 'Trend-line', 'auto'), 2),
 (('RENAULT', 'CLIO', 'Power', 'auto'), 6),
 (('RENAULT', 'DUSTER', 'Confort-line', 'pickup'), 1),
 (('RENAULT', 'DUSTER', 'Trend-line', 'pickup'), 1),
 (('HONDA', '207', 'Full', 'moto'), 2),
 (('SCANIA', 'MOD-1', 'Full', 'camión'), 2)]

In [10]:
rdd_grouped.map(lambda x: ((x[0][0], x[0][1], x[0][3]), (x[0][2], x[1]))).collect()

[(('VW', 'GOL', 'auto'), ('Confort-line', 5)),
 (('VW', 'GOL', 'auto'), ('Trend-line', 2)),
 (('VW', 'GOL', 'auto'), ('Power', 2)),
 (('VW', 'AMAROK', 'pickup'), ('Confort-line', 1)),
 (('VW', 'AMAROK', 'pickup'), ('Trend-line', 1)),
 (('VW', 'AMAROK', 'pickup'), ('Power', 1)),
 (('RENAULT', 'CLIO', 'auto'), ('Confort-line', 2)),
 (('RENAULT', 'CLIO', 'auto'), ('Trend-line', 2)),
 (('RENAULT', 'CLIO', 'auto'), ('Power', 6)),
 (('RENAULT', 'DUSTER', 'pickup'), ('Confort-line', 1)),
 (('RENAULT', 'DUSTER', 'pickup'), ('Trend-line', 1)),
 (('HONDA', '207', 'moto'), ('Full', 2)),
 (('SCANIA', 'MOD-1', 'camión'), ('Full', 2))]

In [11]:
rdd_max = rdd_grouped.map(lambda x: ((x[0][0], x[0][1], x[0][3]), (x[0][2], x[1])))\
           .reduceByKey(lambda x, y: x if (x[1] >= y[1]) else y)

In [12]:
rdd_max.count()

6

In [13]:
rdd_max.collect()

[(('VW', 'GOL', 'auto'), ('Confort-line', 5)),
 (('VW', 'AMAROK', 'pickup'), ('Confort-line', 1)),
 (('RENAULT', 'CLIO', 'auto'), ('Power', 6)),
 (('RENAULT', 'DUSTER', 'pickup'), ('Confort-line', 1)),
 (('HONDA', '207', 'moto'), ('Full', 2)),
 (('SCANIA', 'MOD-1', 'camión'), ('Full', 2))]

---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------
# 1C 2017
# Enunciado

Se tiene información estadística de la temporada regular de todos los jugadores de la NBA en un RDD de tuplas con el siguiente formato:

```
(id_jugador, nombre, promedio_puntos, promedio_asistencias, promedio_robos, promedio_bloqueos, promedio_rebotes, promedio_faltas) 
```

Un analista de la cadena ESPN está trabajando con un RDD que corresponde a la primera ronda de playoffs y que tiene el siguiente formato: 

```
(id_jugador, id_partido, timestamp, cantidad_puntos, cantidad_rebotes, cantidad_bloqueos, cantidad_robos, cantidad_asistencias, cantidad_faltas) 
```

En base a estos RDDs se quiere programar en PySpark un programa que genere un RDD con los nombres (sin duplicados) de los jugadores que lograron en algún partido de playoffs una cantidad de asistencias mayor a su promedio histórico.


In [14]:
temporada_regular = [
    ("1938", "Emanuel Ginobili", 8.9, 2.5, 2.2, 15.9, 2.2, 3.4),
    ("2544", "LeBron James", 27.5, 9.1, 1.9, 9.4, 9.1, 2.4),
    ("201142", "Kevin Durant", 26.4, 5.4, 1.9, 1.0, 8.0, 1.7),
    ("201566", "Russell Westbrook", 25.4, 10.3, 1.5, 1.0, 12.0, 3.5)
]

playoffs = [
    ("1938", "1", 1526245886, 10, 2, 2, 15, 2, 3),
    ("2544", "1", 1526245886, 27, 9, 1, 9, 9, 2),
    ("201142", "2", 1526245886, 25, 10, 1, 1, 9, 2),
    ("201566", "2", 1526245886, 27, 9, 1, 9, 9, 2)
]

In [15]:
temporada_regular_rdd = sc.parallelize(temporada_regular)
playoffs_rdd = sc.parallelize(playoffs)

In [16]:
rdd_asist_hist = temporada_regular_rdd.map(lambda x: (x[0], (x[1], x[3])))
rdd_asist_playoffs = playoffs_rdd.map(lambda x: (x[0], (x[1], x[7])))

In [17]:
rdd_asist_hist.collect()

[('1938', ('Emanuel Ginobili', 2.5)),
 ('2544', ('LeBron James', 9.1)),
 ('201142', ('Kevin Durant', 5.4)),
 ('201566', ('Russell Westbrook', 10.3))]

In [18]:
rdd_asist_playoffs.collect()

[('1938', ('1', 2)),
 ('2544', ('1', 9)),
 ('201142', ('2', 9)),
 ('201566', ('2', 9))]

In [19]:
rdd_joined = rdd_asist_playoffs.join(rdd_asist_hist)

In [20]:
rdd_joined.collect()

[('2544', (('1', 9), ('LeBron James', 9.1))),
 ('1938', (('1', 2), ('Emanuel Ginobili', 2.5))),
 ('201142', (('2', 9), ('Kevin Durant', 5.4))),
 ('201566', (('2', 9), ('Russell Westbrook', 10.3)))]

In [21]:
rdd_joined.filter(lambda x: x[1][0][1] >= x[1][1][1]).distinct().collect()

[('201142', (('2', 9), ('Kevin Durant', 5.4)))]

---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------

# 2C 2016
# Enunciado

Contamos con un cluster que tiene 4 computadoras. Queremos aprovechar el paralelismo del cluster para calcular los números primos entre 2 y 20.000.000. Para esto usaremos el conocido algoritmo de la criba de Eratóstenes. Por ejemplo si empezamos con una lista de tipo (2,3,4,5,6,7,8...) en un primer paso eliminamos todos los que son mayores a 2 y divisibles por 2 y nos queda (2,3,5,7,9,11,13…) luego eliminamos todos los mayores a 3 divisibles por 3 y nos queda (2,3,5,7,11,13….etc) luego todos los divisibles por 5 y así sucesivamente. El resultado final es una lista de números que son primos. Programar este programa usando PySpark

In [32]:
maxVal = 20e6
print(maxVal)

20000000.0


In [38]:
maxVal = 200#e6
rdd = sc.parallelize(range(2,maxVal+1))
primes = []

while(rdd.count() > 0):
    minval = rdd.reduce(lambda x,y: x if (x < y) else y)
    def isNotDivisible(n, div=minval):
        return n%div != 0
    rdd = rdd.filter(isNotDivisible)
    primes.append(minval)
print(primes)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199]


---------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------

# 1C 2016
# Enunciado
UBER almacena en un cluster todos los datos sobre el movimiento y viajes de todos sus vehículos.

Existe un proceso que nos devuelve un RDD llamado **trip_summary** con los siguientes campos: (driver_id, car_id, trip_id, customer_id, date (YYYYMMDD), distance_traveled)

Programar usando Py Spark un programa que nos indique cual fue el conductor con mayor promedio de distancia recorrida por viaje para Abril de 2016.

In [22]:
trip_summary = [
    ("driver-1", "car-1", "trip-1", "customer Pepe", "20160401", 10),
    ("driver-2", "car-2", "trip-2", "customer Lalo", "20160401", 5),
    ("driver-3", "car-3", "trip-3", "customer Pablo", "20160401", 7),
    ("driver-1", "car-1", "trip-4", "customer Paco", "20160402", 3),
    ("driver-2", "car-2", "trip-5", "customer Fito", "20160402", 12),
    ("driver-4", "car-4", "trip-6", "customer Facu", "20160401", 14),
]

In [23]:
data = sc.parallelize(trip_summary)

In [24]:
data.collect()

[('driver-1', 'car-1', 'trip-1', 'customer Pepe', '20160401', 10),
 ('driver-2', 'car-2', 'trip-2', 'customer Lalo', '20160401', 5),
 ('driver-3', 'car-3', 'trip-3', 'customer Pablo', '20160401', 7),
 ('driver-1', 'car-1', 'trip-4', 'customer Paco', '20160402', 3),
 ('driver-2', 'car-2', 'trip-5', 'customer Fito', '20160402', 12),
 ('driver-4', 'car-4', 'trip-6', 'customer Facu', '20160401', 14)]

In [25]:
## Filtrar los viajes de Abril 2016
rdd_filtered = data.filter(lambda x: (x[4] >= "20160401") and (x[4] <= "20160430"))

In [26]:
rdd_filtered.collect()

[('driver-1', 'car-1', 'trip-1', 'customer Pepe', '20160401', 10),
 ('driver-2', 'car-2', 'trip-2', 'customer Lalo', '20160401', 5),
 ('driver-3', 'car-3', 'trip-3', 'customer Pablo', '20160401', 7),
 ('driver-1', 'car-1', 'trip-4', 'customer Paco', '20160402', 3),
 ('driver-2', 'car-2', 'trip-5', 'customer Fito', '20160402', 12),
 ('driver-4', 'car-4', 'trip-6', 'customer Facu', '20160401', 14)]

In [27]:
## Agrupar los drivers, sumando distancia y contando cantidad de viajes
rdd_drivers = rdd_filtered.map(lambda x: (x[0], (x[5], 1)))\
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [28]:
## Opcional: filtar por los drivers que tengan >= n viajes
n = 1
rdd_drivers_filtered = rdd_drivers.filter(lambda x: x[1][1] >= n)

In [29]:
rdd_drivers_filtered.collect()

[('driver-1', (13, 2)),
 ('driver-4', (14, 1)),
 ('driver-2', (17, 2)),
 ('driver-3', (7, 1))]

In [30]:
## Reducir hasta quedarme con el driver con mayor promedio de distancia recorrida
rdd_drivers_filtered.map(lambda x: (x[0], x[1][0]/x[1][1]))\
                    .reduce(lambda x, y: x if (x[1] >= y[1]) else y)

('driver-4', 14.0)