In [1]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import humanfriendly
from IPython.display import display
from loguru import logger as LOG
import tqdm
import time

In [5]:
mpl.rc('figure', facecolor='white')
print(mpl.rcParams['figure.facecolor'])

white


In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


In [27]:
df_events = df = pd.read_msgpack('data/z6_ts_events.msgpack')
df_events.head(5)

Unnamed: 0,date,date2,event_type,user_id,merchant_id,coupon_id,discount_name,distance,click_count
1054678,2016-01-01,2016-01-01,offline_buy_with_coupon,3292058,6594,6124,90,8.0,
1054912,2016-01-01,2016-01-01,offline_buy_with_coupon,3091874,1169,2663,150:30,,
1058299,2016-01-01,2016-01-01,offline_buy_with_coupon,902667,1520,13092,90,,
1061290,2016-01-01,2016-01-01,offline_buy_with_coupon,5262672,6735,7496,95,0.0,
1066893,2016-01-01,2016-01-01,offline_buy_with_coupon,4046619,3786,7924,10:5,0.0,


In [58]:
df_discount = df = pd.read_msgpack('data/z6_ts_discount.msgpack')
df_discount.head(5)

Unnamed: 0_level_0,is_xianshi,is_dazhe,is_manjian,discount_man,discount_jian,discount_rate
discount_name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
100:1,False,False,True,100,1.0,0.99
100:10,False,False,True,100,10.0,0.9
100:20,False,False,True,100,20.0,0.8
100:30,False,False,True,100,30.0,0.7
100:5,False,False,True,100,5.0,0.95


In [59]:
df_discount.to_csv('data/z6_ts_discount.csv')

In [31]:
df_events.to_csv('data/z6_ts_events.csv', index=False)

In [60]:
discount_schema = StructType([
    StructField('discount_name', StringType(), False),
    StructField('is_xianshi', BooleanType(), False),
    StructField('is_dazhe', BooleanType(), False),
    StructField('is_manjian', BooleanType(), False),
    StructField('discount_man', FloatType(), True),
    StructField('discount_jian', FloatType(), True),
    StructField('discount_rate', FloatType(), True),
])
df_discount = spark.read.csv('data/z6_ts_discount.csv', schema=discount_schema, header=True, mode='FAILFAST')

In [71]:
df_discount.show(5)

+-------------+----------+--------+----------+------------+-------------+-------------+
|discount_name|is_xianshi|is_dazhe|is_manjian|discount_man|discount_jian|discount_rate|
+-------------+----------+--------+----------+------------+-------------+-------------+
|        100:1|     false|   false|      true|       100.0|          1.0|         0.99|
|       100:10|     false|   false|      true|       100.0|         10.0|          0.9|
|       100:20|     false|   false|      true|       100.0|         20.0|          0.8|
|       100:30|     false|   false|      true|       100.0|         30.0|          0.7|
|        100:5|     false|   false|      true|       100.0|          5.0|         0.95|
+-------------+----------+--------+----------+------------+-------------+-------------+
only showing top 5 rows



In [62]:
df_discount.createOrReplaceTempView('discount')

In [70]:
events_schema = StructType([
    StructField('date', DateType(), False),
    StructField('date2', DateType(), True),
    StructField('event_type', StringType(), False),
    StructField('user_id', StringType(), False),
    StructField('merchant_id', StringType(), False),
    StructField('coupon_id', StringType(), True),
    StructField('discount_name', StringType(), True),
    StructField('distance', FloatType(), True),
    StructField('click_count', FloatType(), True),
])
df_events = spark.read.csv('data/z6_ts_events.csv', schema=events_schema, header=True, mode='FAILFAST')
df_events.show(5)

+----------+----------+--------------------+-------+-----------+---------+-------------+--------+-----------+
|      date|     date2|          event_type|user_id|merchant_id|coupon_id|discount_name|distance|click_count|
+----------+----------+--------------------+-------+-----------+---------+-------------+--------+-----------+
|2016-01-01|2016-01-01|offline_buy_with_...|3292058|       6594|     6124|           90|     8.0|       null|
|2016-01-01|2016-01-01|offline_buy_with_...|3091874|       1169|     2663|       150:30|    null|       null|
|2016-01-01|2016-01-01|offline_buy_with_...| 902667|       1520|    13092|           90|    null|       null|
|2016-01-01|2016-01-01|offline_buy_with_...|5262672|       6735|     7496|           95|     0.0|       null|
|2016-01-01|2016-01-01|offline_buy_with_...|4046619|       3786|     7924|         10:5|     0.0|       null|
+----------+----------+--------------------+-------+-----------+---------+-------------+--------+-----------+
only showi

In [72]:
df_events.createOrReplaceTempView('events')

In [65]:
spark.sql("""
    select user_id, discount_rate 
    from events 
    left outer join discount 
    on events.discount_name=discount.discount_name
""").show()

+-------+-------------+
|user_id|discount_rate|
+-------+-------------+
|3292058|          0.9|
|3091874|          0.8|
| 902667|          0.9|
|5262672|         0.95|
|4046619|          0.5|
|4868006|         0.95|
|3021843|         0.95|
|2618907|          0.9|
|5903263|         0.95|
|2742690|         0.95|
| 245786|         0.95|
|1467367|          0.9|
|2564124|         0.95|
|2062081|         0.95|
|3564078|         0.95|
|1380154|          0.9|
|2400402|         0.95|
|5086042|          0.8|
|7340314|         0.95|
|4583746|         0.95|
+-------+-------------+
only showing top 20 rows



In [122]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import *

In [75]:
df = df_events.join(df_discount, on='discount_name')
df

DataFrame[discount_name: string, date: date, date2: date, event_type: string, user_id: string, merchant_id: string, coupon_id: string, distance: float, click_count: float, is_xianshi: boolean, is_dazhe: boolean, is_manjian: boolean, discount_man: float, discount_jian: float, discount_rate: float]

In [136]:
PandasUDFType?

[0;31mInit signature:[0m [0mPandasUDFType[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
Pandas UDF Types. See :meth:`pyspark.sql.functions.pandas_udf`.
    
[0;31mFile:[0m           ~/anaconda3/lib/python3.6/site-packages/pyspark/sql/functions.py
[0;31mType:[0m           type


In [82]:
from pyspark.sql import functions

In [110]:
keys = df.select('merchant_id').distinct().rdd.flatMap(lambda x: x).collect()
keys[:5]

['35004', '29912', '16504', '25912', '37311']

In [147]:
@pandas_udf('merchant_id string, date date', PandasUDFType.GROUPED_MAP)
def func(key, df):
    idx_date = df[df['event_type']=='offline_receive_coupon']['date'].unique()
    return pd.DataFrame({
        'merchant_id': key[0],
        'date': idx_date,
    })

func

<function __main__.func(key, df)>

In [182]:
df = pd.DataFrame([[1,2,3,4,i] for i in range(10000)], columns=list('abcde'))
arr = df.values
df.head(5)

Unnamed: 0,a,b,c,d,e
0,1,2,3,4,0
1,1,2,3,4,1
2,1,2,3,4,2
3,1,2,3,4,3
4,1,2,3,4,4


In [183]:
%%timeit

df.iloc[1000:1001]

74.3 µs ± 1.75 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [184]:
%%timeit
arr[1000:1002]

254 ns ± 10.6 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [149]:
merchant_date = df.groupby('merchant_id').apply(func)
merchant_date.show(5)

+-----------+----------+
|merchant_id|      date|
+-----------+----------+
|       1090|2016-06-05|
|       1159|2016-04-22|
|       1159|2016-04-24|
|       1159|2016-04-29|
|       1159|2016-05-01|
+-----------+----------+
only showing top 5 rows



In [151]:
merchant_date.cache()

DataFrame[merchant_id: string, date: date]

In [156]:
def func(row):
    merchant_id, date = row
    return str((merchant_id, date))

merchant_date.rdd.map(func).take(5)

["('1090', datetime.date(2016, 6, 5))",
 "('1159', datetime.date(2016, 4, 22))",
 "('1159', datetime.date(2016, 4, 24))",
 "('1159', datetime.date(2016, 4, 29))",
 "('1159', datetime.date(2016, 5, 1))"]

In [25]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
result_pdf

Unnamed: 0,0,1,2
0,0.570693,0.069152,0.945669
1,0.575289,0.461587,0.708947
2,0.768387,0.493845,0.624628
3,0.708017,0.932988,0.483427
4,0.548956,0.670273,0.634007
5,0.514265,0.860078,0.218264
6,0.940506,0.060052,0.592625
7,0.647632,0.826532,0.904886
8,0.560699,0.123889,0.514715
9,0.477724,0.330610,0.371416


In [173]:
arrays = [
    pd.Series([i, 1])
    for i in range(100)
]

In [174]:
%%timeit
pd.concat(arrays)

1.49 ms ± 11.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [199]:
import time
from collections import OrderedDict
from contextlib import contextmanager

import numpy as np
import pandas as pd
import tqdm
from loguru import logger as LOG
from cached_property import cached_property


@contextmanager
def TLOG(name):
    t0 = time.time()
    try:
        yield
    finally:
        t1 = time.time()
        LOG.info('{} cost {:.3f}s', name, t1 - t0)

        
class IndexedEvents:
    def __init__(self, df_events):
        df_events = df_events.sort_values(['merchant_id', 'date', 'event_type'])
        self.df_events = df_events
        self._data = df_events.values
        self._index = {}
        with TLOG('_build_index'):
            self._build_index()

    def _build_index(self):
        df = self.df_events.copy()
        df['t'] = df['date'].dt.dayofyear - 1
        df = df[['merchant_id', 't']]
        prev_key = None
        prev_t = 0
        for i, (key, t) in tqdm.tqdm(enumerate(df.values), total=len(df)):
            if key == prev_key:
                idx = self._index[key]
                if t == prev_t:
                    # prev merchant, prev date
                    pass
                else:
                    # prev merchant, new date
                    idx[prev_t, 1] = i - 1
                    idx[prev_t+1:t, 0] = i - 1
                    idx[t, 0] = i
            else:
                # new merchant, new date
                idx = np.full((366, 2), -1)
                idx[:t, 0] = i - 1
                idx[t, 0] = i
                self._index[key] = idx
                if prev_key is not None:
                    prev_idx = self._index[prev_key]
                    prev_idx[prev_t, 1] = i - 1
                    prev_idx[prev_t+1:, 0] = i - 1
            prev_key = key
            prev_t = t
        if prev_key is not None:
            prev_idx = self._index[prev_key]
            prev_idx[prev_t, 1] = i - 1
            prev_idx[prev_t+1:, 0] = i - 1

    def loc(self, key, t1, t2):
        if not isinstance(t1, int):
            t1 = pd.Timestamp(t1).dayofyear - 1
        if not isinstance(t2, int):
            t2 = pd.Timestamp(t2).dayofyear - 1
        idx = self._index[key]
        i1, j1 = idx[t1]
        i2, j2 = idx[t2]
        if j1 == -1:
            i = i1 + 1
        else:
            i = i1
        if j2 == -1:
            j = i2
        else:
            j = j2
        return self._data[i:j+1]


In [188]:
df_events = pd.read_msgpack('data/z6_ts_events.msgpack')
df_events['t'] = df_events['date'].dt.dayofyear - 1
df_events['t2'] = df_events['date2'].dt.dayofyear - 1
df_events.head(5)

Unnamed: 0,date,date2,event_type,user_id,merchant_id,coupon_id,discount_name,distance,click_count,t,t2
1054678,2016-01-01,2016-01-01,offline_buy_with_coupon,3292058,6594,6124,90,8.0,,0,0.0
1054912,2016-01-01,2016-01-01,offline_buy_with_coupon,3091874,1169,2663,150:30,,,0,0.0
1058299,2016-01-01,2016-01-01,offline_buy_with_coupon,902667,1520,13092,90,,,0,0.0
1061290,2016-01-01,2016-01-01,offline_buy_with_coupon,5262672,6735,7496,95,0.0,,0,0.0
1066893,2016-01-01,2016-01-01,offline_buy_with_coupon,4046619,3786,7924,10:5,0.0,,0,0.0


In [190]:
df_merchant = df_events.set_index(['merchant_id', 't', 'event_type']).sort_index()
df_merchant.sample(5)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,date,date2,user_id,coupon_id,discount_name,distance,click_count,t2
merchant_id,t,event_type,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
49605,57,online_click,2016-02-27,NaT,5009858,,,,1.0,
760,206,offline_receive_coupon,2016-07-25,NaT,3285552,13602.0,30:5,0.0,,
3293,89,offline_buy_without_coupon,2016-03-30,NaT,302200,,,0.0,,
6901,147,offline_receive_coupon,2016-05-27,NaT,5412918,2366.0,30:5,0.0,,
2099,31,offline_receive_coupon,2016-02-01,NaT,6464173,12034.0,100:10,3.0,,


In [194]:
idx = pd.IndexSlice

In [204]:
%%timeit
df_merchant.loc[idx[760, 206:256],]

35.9 ms ± 2.97 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [200]:
ievents = IndexedEvents(df_events)

100%|██████████| 3458396/3458396 [00:05<00:00, 578806.67it/s]
2019-01-14 19:55:01.908 | INFO     | __main__:TLOG:19 - _build_index cost 6.429s


In [205]:
%%timeit
ievents.loc(760, 206, 256)

3.6 µs ± 65.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [209]:
values = ievents.loc(760, 206, 256)

In [213]:
values



array([[Timestamp('2016-07-25 00:00:00'), NaT, 'offline_receive_coupon',
        ..., nan, 206, nan],
       [Timestamp('2016-07-25 00:00:00'), NaT, 'offline_receive_coupon',
        ..., nan, 206, nan],
       [Timestamp('2016-07-25 00:00:00'), NaT, 'offline_receive_coupon',
        ..., nan, 206, nan],
       ...,
       [Timestamp('2016-07-31 00:00:00'), NaT, 'offline_receive_coupon',
        ..., nan, 212, nan],
       [Timestamp('2016-07-31 00:00:00'), NaT, 'offline_receive_coupon',
        ..., nan, 212, nan],
       [Timestamp('2016-07-31 00:00:00'), NaT, 'offline_receive_coupon',
        ..., nan, 212, nan]], dtype=object)

In [220]:
len(df_merchant.loc[idx[760, 206:256],])

8407

In [219]:
values.shape

(8407, 11)

In [216]:
%%timeit
i = 0
for row in values:
    i += 1
i

1.65 ms ± 8.69 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [223]:
df_events.head(5)

Unnamed: 0,date,date2,event_type,user_id,merchant_id,coupon_id,discount_name,distance,click_count,t,t2
1054678,2016-01-01,2016-01-01,offline_buy_with_coupon,3292058,6594,6124,90,8.0,,0,0.0
1054912,2016-01-01,2016-01-01,offline_buy_with_coupon,3091874,1169,2663,150:30,,,0,0.0
1058299,2016-01-01,2016-01-01,offline_buy_with_coupon,902667,1520,13092,90,,,0,0.0
1061290,2016-01-01,2016-01-01,offline_buy_with_coupon,5262672,6735,7496,95,0.0,,0,0.0
1066893,2016-01-01,2016-01-01,offline_buy_with_coupon,4046619,3786,7924,10:5,0.0,,0,0.0


In [227]:
df_merchant.index.names

FrozenList(['merchant_id', 't', 'event_type'])

In [229]:
arr = pd.DataFrame([
    [True, False],
    [True, False],
]).values
arr

array([[ True, False],
       [ True, False]])

In [234]:
import numpy as np 

arraylist =[(1526869384.273246, 0, 'a0'),
(1526869385.273246, 1, 'a1'),
(1526869386.273246, 2, 'a2'),
(1526869387.273246, 3, 'a3'),
(1526869388.273246, 4, 'a4'),
(1526869389.273246, 5, 'a5'),
(1526869390.273246, 6, 'a6'),
(1526869391.273246, 7, 'a7'),
(1526869392.273246, 8, 'a8'),
(1526869393.273246, 9, 'a9'),
(1526869384.273246, 0, 'a0'),
(1526869385.273246, 1, 'a1'),
(1526869386.273246, 2, 'a2'),
(1526869387.273246, 3, 'a3'),
(1526869388.273246, 4, 'a4'),
(1526869389.273246, 5, 'a5'),
(1526869390.273246, 6, 'a6'),
(1526869391.273246, 7, 'a7'),
(1526869392.273246, 8, 'a8'),
(1526869393.273246, 9, 'a9')]

dtype = [('A', 'float'), ('B', 'int'), ('C', '<U32')]
array = np.array(arraylist, dtype=dtype)

In [244]:
df = df_events.head(100)
dtype = list(df.dtypes.items())
# np.array(df.values, dtype=dtype)

In [255]:
np_dtype = []
for k, v in df.dtypes:
    if v == np.object:
        v = np.str
    else:
        try:
            v = np.dtype(v)
        except:
            v = np.str
    np_dtype.append((k, v))
np_dtype

[('date', dtype('<M8[ns]')),
 ('date2', dtype('<M8[ns]')),
 ('event_type', str),
 ('user_id', dtype('int64')),
 ('merchant_id', dtype('int64')),
 ('coupon_id', str),
 ('discount_name', str),
 ('distance', dtype('float64')),
 ('click_count', dtype('float64')),
 ('t', dtype('int64')),
 ('t2', dtype('float64'))]

In [266]:
np.array(df_events['event_type'].values).astype(str)

array(['offline_buy_with_coupon', 'offline_buy_with_coupon',
       'offline_buy_with_coupon', ..., 'offline_receive_coupon',
       'offline_receive_coupon', 'offline_receive_coupon'], dtype='<U26')

In [268]:
to_datetime64 = np.vectorize(lambda x: x.to_datetime64())

In [280]:
np.min(np.array([]))

ValueError: zero-size array to reduction operation minimum which has no identity

In [275]:
np.array(list(zip([1,2,3], [4,5,6]))).tolist()

[[1, 4], [2, 5], [3, 6]]