In [1]:
# ACARA posee informacion historica sobre la venta de autos 0 km en la Arg. Posee un RDD con informacion
# de cada modelo (marca, modelo, motor, transmision, origen) y otro con la informacion de ventas
# (marca, modelo, fecha, concesionaria). 
# Se desea conocer, para los modelos de origen nacional, cuales son los modelos que ya se discontinuaron
# (un modelo discontinuado es aquel que no tuvo ventas en los ultimos 12 meses), obteniendo un nuevo RDD
# con (marca, modelo, total_vendido, fecha_inicio_venta, fecha_discontinuacion), donde la fecha de
# discontinuacion es la fecha cuando se vendio el ultimo auto de ese modelo, ordenado ascendentemente por fecha.
# Aclaracion: se puede asumir que el primer RDD tiene un unico registro para cada marca y modelo.

In [2]:
import pandas
import pyspark
sc = pyspark.SparkContext('local[*]')

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by <module> at /home/daniela/anaconda3/lib/python3.7/site-packages/IPython/utils/py3compat.py:188 

In [3]:
# marca, modelo, motor, transmision, origen
infoModelos = [
    ('Fiat', 'Cronos', 'M1', 'T1', 'Argentina'),
    ('Mitsubishi', 'Lancer', 'M1', 'T1', 'Japon'),
    ('Toyota', 'Hilux', 'M3', 'T3', 'Argentina'),
    ('Chevrolet', 'Onix', 'M2', 'T2', 'Argentina'),
    ('Honda', 'Civic', 'M2', 'T2', 'China'),
    ('Ford', 'Ka', 'M1', 'T2', 'Argentina'),
    ('Ford', 'Uno', 'M1', 'T1', 'Argentina') #este no va a estar en RDD2
]

# marca, modelo, fecha, concesionaria
infoVentas = [
    ('Fiat', 'Cronos', '2019-05-15', 'Santa Fe'),
    ('Fiat', 'Cronos', '2019-03-04', 'Santa Fe'),
    ('Fiat', 'Cronos', '2019-03-04', 'Martinez'),
    ('Ford', 'Ka', '2019-05-29', 'Martinez'),
    ('Fiat', 'Cronos', '2019-01-01', 'Corrientes'),
    ('Fiat', 'Cronos', '2019-01-04', 'Santa Fe'),
    ('Mitsubishi', 'Lancer', '2020-04-04', 'Santa Fe'),
    ('Mitsubishi', 'Lancer', '2019-04-04', 'Corrientes'),
    ('Toyota', 'Hilux', '2020-04-04', 'Santa Fe'),
    ('Chevrolet', 'Onix', '2019-05-04', 'Corrientes'),
    ('Chevrolet', 'Onix', '2019-01-04', 'Santa Fe'),
    ('Chevrolet', 'Onix', '2019-02-04', 'Corrientes')
]

In [5]:
modelos = sc.parallelize(infoModelos)
ventas  = sc.parallelize(infoVentas)

In [6]:
modelos.collect()

[('Fiat', 'Cronos', 'M1', 'T1', 'Argentina'),
 ('Mitsubishi', 'Lancer', 'M1', 'T1', 'Japon'),
 ('Toyota', 'Hilux', 'M3', 'T3', 'Argentina'),
 ('Chevrolet', 'Onix', 'M2', 'T2', 'Argentina'),
 ('Honda', 'Civic', 'M2', 'T2', 'China'),
 ('Ford', 'Ka', 'M1', 'T2', 'Argentina'),
 ('Ford', 'Uno', 'M1', 'T1', 'Argentina')]

In [7]:
ventas.collect()

[('Fiat', 'Cronos', '2019-05-15', 'Santa Fe'),
 ('Fiat', 'Cronos', '2019-03-04', 'Santa Fe'),
 ('Fiat', 'Cronos', '2019-03-04', 'Martinez'),
 ('Ford', 'Ka', '2019-05-29', 'Martinez'),
 ('Fiat', 'Cronos', '2019-01-01', 'Corrientes'),
 ('Fiat', 'Cronos', '2019-01-04', 'Santa Fe'),
 ('Mitsubishi', 'Lancer', '2020-04-04', 'Santa Fe'),
 ('Mitsubishi', 'Lancer', '2019-04-04', 'Corrientes'),
 ('Toyota', 'Hilux', '2020-04-04', 'Santa Fe'),
 ('Chevrolet', 'Onix', '2019-05-04', 'Corrientes'),
 ('Chevrolet', 'Onix', '2019-01-04', 'Santa Fe'),
 ('Chevrolet', 'Onix', '2019-02-04', 'Corrientes')]

In [8]:
#filtro modelos de origen nacional
modelos = modelos.filter(lambda x: x[4] == 'Argentina')
modelos.collect()

[('Fiat', 'Cronos', 'M1', 'T1', 'Argentina'),
 ('Toyota', 'Hilux', 'M3', 'T3', 'Argentina'),
 ('Chevrolet', 'Onix', 'M2', 'T2', 'Argentina'),
 ('Ford', 'Ka', 'M1', 'T2', 'Argentina'),
 ('Ford', 'Uno', 'M1', 'T1', 'Argentina')]

In [9]:
# cuales son los modelos que ya se discontinuaron
# (un modelo discontinuado es aquel que no tuvo ventas en los ultimos 12 meses), obteniendo un nuevo RDD
# con (marca, modelo, total_vendido, fecha_inicio_venta, fecha_discontinuacion), donde la fecha de
# discontinuacion es la fecha cuando se vendio el ultimo auto de ese modelo

#primero hago join para quedarme con las ventas de autos de origen nacional
#antes del join mapeo poniendo marca y modelo como clave

modelos = modelos.map(lambda x: ((x[0],x[1]),1))
ventas  = ventas.map(lambda x: ((x[0],x[1]),x[2]))

modelosNacionales = modelos.join(ventas)
modelosNacionales.collect()

[(('Ford', 'Ka'), (1, '2019-05-29')),
 (('Toyota', 'Hilux'), (1, '2020-04-04')),
 (('Fiat', 'Cronos'), (1, '2019-05-15')),
 (('Fiat', 'Cronos'), (1, '2019-03-04')),
 (('Fiat', 'Cronos'), (1, '2019-03-04')),
 (('Fiat', 'Cronos'), (1, '2019-01-01')),
 (('Fiat', 'Cronos'), (1, '2019-01-04')),
 (('Chevrolet', 'Onix'), (1, '2019-05-04')),
 (('Chevrolet', 'Onix'), (1, '2019-01-04')),
 (('Chevrolet', 'Onix'), (1, '2019-02-04'))]

In [13]:
modelosNacionales.reduceByKey(lambda x,y: (x[0]+y[0], x[1], y[1]) if x[1]<y[1] else (x[0]+y[0], y[1], x[1]))\
.filter(lambda x: x[1][-1]<='2019-06-10')\
.map(lambda x: (x[0][0],x[0][1],x[1][0],x[1][1],x[1][2]) if len(x[1])==3 else (x[0][0],x[0][1],x[1][0],x[1][1],x[1][1]))\
.sortBy(lambda x:-x[2])\
.collect()

[('Fiat', 'Cronos', 5, '2019-01-01', '2019-01-04'),
 ('Chevrolet', 'Onix', 3, '2019-01-04', '2019-02-04'),
 ('Ford', 'Ka', 1, '2019-05-29', '2019-05-29')]