In [1]:
import datetime

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, ArrayType
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

In [2]:
spark = SparkSession \
    .builder \
    .appName("example") \
    .getOrCreate()
sc = spark.sparkContext
sql_ctx = SQLContext(sc)

In [3]:
forward_days = 1
backward_days = 2
sdate = datetime.date(2017,10,18)
edate = datetime.date(2017,10,19)

In [4]:
log_df = sql_ctx.createDataFrame([
    (1082, datetime.date(2017,10,17), '1433', '1691', 9999),
    (1082, datetime.date(2017,10,17), '1433', '1391', 9999),
    (1082, datetime.date(2017,10,18), '1433', '2900', 9999),
    (1082, datetime.date(2017,10,18), '1433', '1406', 9999),
    (1082, datetime.date(2017,10,19), '1433', '3237', 9999),
    (1082, datetime.date(2017,10,20), '1433', '2231', 9999),
    (1082, datetime.date(2017,10,21), '1433', '1690', 9999),
    (1082, datetime.date(2017,10,22), '1433', '2921', 9999),
], ['game_id', 'data_desc', 'uid', 'b_uid', 'area_id'])

In [5]:
log_df.show()

+-------+----------+----+-----+-------+
|game_id| data_desc| uid|b_uid|area_id|
+-------+----------+----+-----+-------+
|   1082|2017-10-17|1433| 1691|   9999|
|   1082|2017-10-17|1433| 1391|   9999|
|   1082|2017-10-18|1433| 2900|   9999|
|   1082|2017-10-18|1433| 1406|   9999|
|   1082|2017-10-19|1433| 3237|   9999|
|   1082|2017-10-20|1433| 2231|   9999|
|   1082|2017-10-21|1433| 1690|   9999|
|   1082|2017-10-22|1433| 2921|   9999|
+-------+----------+----+-----+-------+



In [6]:
unique_key = ['game_id', 'area_id', 'uid', 'data_desc']

In [7]:
uid_df = log_df.select(unique_key) \
    .distinct() \
    .filter(col('data_desc').between(sdate, edate))
uid_df.show()

+-------+-------+----+----------+
|game_id|area_id| uid| data_desc|
+-------+-------+----+----------+
|   1082|   9999|1433|2017-10-19|
|   1082|   9999|1433|2017-10-18|
+-------+-------+----+----------+



In [8]:
cond = [
    col('t1.game_id') == col('t2.game_id'),
    col('t1.area_id') == col('t2.area_id'),
    col('t1.uid') == col('t2.uid'),
    F.datediff(col('t2.data_desc'), col('t1.data_desc')) >= forward_days,
    F.datediff(col('t2.data_desc'), col('t1.data_desc')) <= backward_days,
]
uid_df.alias('t1') \
    .join(log_df.alias('t2'), on=cond, how='inner') \
    .select(col('t1.game_id'), col('t1.area_id'), col('t1.uid'), col('t1.data_desc'),
            col('t2.data_desc').alias('b_date'), col('t2.b_uid'),
            F.datediff(col('t2.data_desc'), col('t1.data_desc'))) \
    .orderBy([col('t1.data_desc'), col('t2.data_desc')]) \
    .show(100)

+-------+-------+----+----------+----------+-----+------------------------------------+
|game_id|area_id| uid| data_desc|    b_date|b_uid|datediff(t2.data_desc, t1.data_desc)|
+-------+-------+----+----------+----------+-----+------------------------------------+
|   1082|   9999|1433|2017-10-18|2017-10-19| 3237|                                   1|
|   1082|   9999|1433|2017-10-18|2017-10-20| 2231|                                   2|
|   1082|   9999|1433|2017-10-19|2017-10-20| 2231|                                   1|
|   1082|   9999|1433|2017-10-19|2017-10-21| 1690|                                   2|
+-------+-------+----+----------+----------+-----+------------------------------------+



In [18]:
df = sql_ctx.read.csv('/Users/hahadsg/Downloads/tmp/s3/ml_feature_data/2017-11-06/', header=True)

In [19]:
df.select(['game_id', 'area_id', 'uid']).show()

+-------+-------+------------------+
|game_id|area_id|               uid|
+-------+-------+------------------+
|   1082|   9999|             14777|
|   1082|   9999|             14408|
|   1082|   9999|             15663|
|   1035|   9999|         126757579|
|   1082|   9999|             13221|
|   1082|   9999|             10709|
|      3|   9999|   100002477250320|
|   1082|   9999|             17428|
|   1035|   9999|          42960003|
|   1082|   9999|             12915|
|   1082|   9999|             14021|
|     19|   9999|277739171169177291|
|   1082|   9999|             16021|
|   1059|   9999| 10210132430614063|
|      3|   9999|   100007462772380|
|   1082|   9999|             15334|
|      2|   9999|   120428011647399|
|   1082|   9999|              6308|
|      1|   9999| 10211941047821120|
|     19|   9999|291914306650715892|
+-------+-------+------------------+
only showing top 20 rows



In [20]:
df.filter(col('uid') == '16050').show()

+-------+-------+----------+-----+------------------------------+-----------------------------+----------------------+---------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+---------------------------