In [1]:
from datetime import datetime, timedelta
import pandas as pd

In [2]:
from pyspark import SparkContext, SQLContext
sc = SparkContext.getOrCreate()
#export PYSPARK_PYTHON=python3.6


In [3]:
import pyspark

spark = pyspark.sql.SparkSession.builder.appName("auctions").getOrCreate()
df_auctions = spark.read.csv('auctions.csv', header=True)
rdd_auctions = df_auctions.rdd

In [4]:
rdd_auctions.take(5)

[Row(date='2019-04-23 18:58:00.842116', device_id='2564673204772915246', ref_type_id='1', source_id='0'),
 Row(date='2019-04-23 18:58:01.530771', device_id='4441121667607578179', ref_type_id='7', source_id='0'),
 Row(date='2019-04-23 18:58:01.767562', device_id='7721769811471055264', ref_type_id='1', source_id='0'),
 Row(date='2019-04-23 18:58:02.363468', device_id='6416039086842158968', ref_type_id='1', source_id='0'),
 Row(date='2019-04-23 18:58:02.397559', device_id='1258642015983312729', ref_type_id='1', source_id='0')]

In [5]:
#ahora va a ser clave=device_id y valor=date
rdd_auctions = rdd_auctions.map(lambda x: (x[1], datetime.strptime(x[0], "%Y-%m-%d %H:%M:%S.%f")))

In [6]:
#ME QUEDO solo con los 3 primeros dias para uqe la distancia maxima sea de 3 dias
#arranca el 2019-04-18 00:00:00 asi que el date limite es 2019-04-20 23:59:59
limit_date_train = datetime(2019, 4, 20, 23, 59, 59, 999)
#uso los siguientes 3 dias para testear
limit_date_test_begin = datetime(2019, 4, 21, 0, 0, 0, 0)
limit_date_test_end = datetime(2019, 4, 23, 23, 59, 59, 999)

In [7]:
#filtro por los que son los primeros 3 dias para el train
rdd_train = rdd_auctions.filter(lambda x: x[1] < limit_date_train)
rdd_test = rdd_auctions.filter(lambda x: x[1] > limit_date_test_begin and x[1] < limit_date_test_end)

In [8]:
rdd_train.take(1)

[('1109595589636746168', datetime.datetime(2019, 4, 20, 23, 57, 27, 912838))]

In [9]:
#ahora me va a quedar para cada ID la lista de todas las apariciones ordenadas por tiempo
rdd_train = rdd_train.groupByKey().mapValues(list).mapValues(sorted)
rdd_test = rdd_test.groupByKey().mapValues(list).mapValues(sorted)

In [10]:
def get_all_reappearances(dateList):
    """
    esta funcion recibe una lista de fechas ordenada de las apariciones de un ID
    la idea es devolver una lista de tuplas, en la que cada tupla sea asi:
    valor 1 = fecha en que aparecio en un auction
    valor 2 = cuanto tardo en volver a aparecer
    """
    distancias = []
    longitud_actual = len(dateList)
    if(longitud_actual > 1):
        for x in range(longitud_actual):
            if(x + 1 < longitud_actual):
                distancias.append([dateList[x], 
                                   (pd.to_datetime(dateList[x + 1]) - pd.to_datetime(dateList[x])).total_seconds()
                                  ])
        
    return distancias

In [11]:
#ahora me queda cada registro asi (id, [fecha aparicion, tiempo que luego tardo en volver a aparecer])
rdd_train = rdd_train.flatMapValues(get_all_reappearances)
rdd_test = rdd_test.flatMapValues(get_all_reappearances)
rdd_train.take(10)

[('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 6, 27, 50, 996158), 848.908411]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 6, 41, 59, 904569), 28513.038578]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 37, 12, 943147), 199.094384]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 40, 32, 37531), 185.775504]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 43, 37, 813035), 255.313251]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 47, 53, 126286), 0.993333]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 47, 54, 119619), 1.044167]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 47, 55, 163786), 15.432364]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 48, 10, 596150), 62.328209]),
 ('4172466725848941608',
  [datetime.datetime(2019, 4, 18, 14, 49, 12, 924359), 67.688715])]

In [18]:
#los mapeo para pasar a csv 
rdd_train = rdd_train.map(lambda x: (x[0], x[1][0], x[1][1]))
rdd_test = rdd_test.map(lambda x: (x[0], x[1][0], x[1][1]))

In [19]:
def toCSVLine(data):
  return ','.join(str(d) for d in data)


In [20]:
#paso a csv. el repartition es para que se cree 1 solo archivo. sino me crea 1 archivo por particion
lines_train = rdd_train.map(toCSVLine)
lines_train.repartition(1).saveAsTextFile('data_train.csv')

In [22]:
lines_test = rdd_test.map(toCSVLine)
lines_test.repartition(1).saveAsTextFile('data_test.csv')

In [None]:
#estas son boludeces para probar los map values con un rdd chiquito

In [77]:
rdd = sc.parallelize([("martin", datetime(2019, 4, 20, 23, 57, 27, 912838)), ("martin",datetime(2019, 4, 24, 23, 57, 27, 912838)),("martin",datetime(2019, 4, 19, 23, 57, 27, 912838)),("amrtin",datetime(2019, 4, 20, 23, 57, 27, 912838)),("amrtin",datetime(2019, 4, 29, 23, 57, 27, 912838))])

In [78]:
rdd = rdd.map(lambda x: (x[0],x[1]))

In [80]:
rdd.groupByKey().mapValues(list).mapValues(sorted).flatMapValues(get_all_reappearances).collect()

[('amrtin', [datetime.datetime(2019, 4, 20, 23, 57, 27, 912838), 777600.0]),
 ('martin', [datetime.datetime(2019, 4, 19, 23, 57, 27, 912838), 86400.0]),
 ('martin', [datetime.datetime(2019, 4, 20, 23, 57, 27, 912838), 345600.0])]